leveldb在每次數據庫操作之前都會把操作記錄下來。
主要實現在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我們來具體看看實現。
日志格式
db\log_format.h
log是分塊的,每塊為32K,每條記錄的記錄頭為7個字節,前四個為CRC,然后是長度(2個字節),最后是記錄類型(1個字節)
---------------------------------------
BLOCK1|BLOCK2|BLOCK3|...|BLOCKN
---------------------------------------
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;
static const int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2;
} // namespace log
} // namespace leveldb
寫日志操作
db\log_writer.cc
請注意這里的處理,由于1條記錄可能超過一個BLOCK的大小,所以需要分成多個片段寫入。
//增加一條記錄
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_; //當前剩余多少字節
assert(leftover >= 0);
if (leftover < kHeaderSize) { //不夠文件頭大小7bytes
// 轉入新的block
if (leftover > 0) {
//用0來填充空白
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
//avail:除掉頭還算多少字節
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
//實際寫入大小
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length); //記錄是否結束
if (begin && end) {
type = kFullType; //完整記錄
} else if (begin) {
type = kFirstType; //開頭
} else if (end) {
type = kLastType; //結尾
} else {
type = kMiddleType; //中間
}
//寫入
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
//實際寫入日志文件
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// 記錄頭
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t);
// 計算CRC
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// 寫入頭部
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
//寫入記錄片段
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + n;
return s;
}
讀日志操作
這里可以看出使用BLOCK的好處,能夠減少文件IO次數,讀日志基本上就是寫日志反向過程。
//讀取記錄,scratch為緩沖,record是結果
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) { //需要跳過文件頭部信息,目前未實現
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false//是否是碎片記錄
; // Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
//從文件中讀取一個BLOCK
const unsigned int record_type = ReadPhysicalRecord(&fragment);
switch (record_type) {
case kFullType: //完整Record
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType: //Record開始
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType://Record中間
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType://Record結尾
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof://文件結束
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "partial record without end(3)");
scratch->clear();
}
return false;
case kBadRecord://壞記錄
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {//無法識別
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
//從文件中讀取
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
//讀入一個BLOCK
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
} else if (buffer_.size() == 0) {
// End of file
return kEof;
} else {
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
// 解析record頭
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// 檢查CRC
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
}