leveldb 是通過Open函數來打開/新建數據庫。
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
其中options指定一些選項。
struct Options {
// -------------------
// 影響行為的參數
//comparator用于指定key的排列方式,默認按照字節排序
const Comparator* comparator;
//如果不存在則創建
// Default: false
bool create_if_missing;
// 如果存在則失敗
// Default: false
bool error_if_exists;
// 是否做嚴格的檢查
// Default: false
bool paranoid_checks;
// env: os 封裝
// Default: Env::Default()
Env* env;
// log file,默認和database相同路徑
// Default: NULL
Logger* info_log;
// -------------------
// 影響性能的參數
// 寫緩沖大小,增加會提高寫的性能,但是會增加啟動的時間,因為有更多的數據需要恢復
//
// Default: 4MB
size_t write_buffer_size;
// 最大打開的文件個數,用于TableCache
//
// Default: 1000
int max_open_files;
// Control over blocks (user data is stored in a set of blocks, and
// a block is the unit of reading from disk).
// 指定Block cache,默認leveldb會自動創建8MB的internal cache
// Default: NULL
Cache* block_cache;
//SST file中的Block size,為壓縮之前的數據
//
// Default: 4K
size_t block_size;
// SST file 中的restart pointer的間隔,參見SST的文件格式
//
// Default: 16
int block_restart_interval;
// 壓縮類型,默認為google的snappy壓縮
CompressionType compression;
// Create an Options object with default values for all fields.
Options();
};
具體看看Open的實現:
<db/dbimpl.cc>
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
//實例化對象:DBImpl
DBImpl* impl = new DBImpl(options, dbname);
//加鎖
impl->mutex_.Lock();
VersionEdit edit;
//從log中恢復數據,生成新的SST file
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
//創建新的log file
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
//生成新的manifest文件
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
//刪除失效文件
impl->DeleteObsoleteFiles();
//進行compaction
impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
return s;
}
因為上次關閉數據庫的時候,內存的數據可能并沒有寫入SST文件,所以要從*.log中讀取記錄,并寫入新的SST文件。
<db/dbimpl.cc>
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
//創建folder
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
//生成LOCK文件并鎖定
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
//新建database
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
//重建manifest信息
s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);
//得到上次的log file
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
logs.push_back(number);
}
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
//從*.log中恢復數據
s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
return s;
}
繼續看RecoverLogFile的實現:
<db/dbimpl.cc>
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
//LogReporter:出現壞數據的時候報告
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
//打開Log file用于順序讀取
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
// log::Reader讀取數據
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
//遍歷log file,讀取記錄
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) {
//新建MemTable用于保存數據
mem = new MemTable(internal_comparator_);
mem->Ref();
}
//插入memtable
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
//寫入SST file:level 0
status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) {
break;
}
//釋放并刪除memtable
mem->Unref();
mem = NULL;
}
}
if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
if (mem != NULL) mem->Unref();
delete file;
return status;
}
至此完成SST file的寫入。
接下來看看manifest文件的重建
mainfest的重建有兩步,第一步是調用VersionSet::Recover函數恢復到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件記錄也寫入manifest文件中。
<db/version_set.cc>
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// 讀取CURRENT文件,獲取最新的MANIFEST文件
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
//打開當前MANIFEST文件
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
//使用log::Reader讀取log記錄:VersionEdit
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + "does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
//應用Edit到VersionSet
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) { //生成新的version,并設為current version
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
//使用VersionEdit創建新的Version
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
//創建新的manifest文件
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// 寫入manifest log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
// 設置新的version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}