<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    小明思考

    Just a software engineer
    posts - 124, comments - 36, trackbacks - 0, articles - 0
      BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

    leveldb研究3-數(shù)據(jù)庫日志文件格式

    Posted on 2012-03-09 16:00 小明 閱讀(3643) 評論(1)  編輯  收藏 所屬分類: 分布式計算
    leveldb在每次數(shù)據(jù)庫操作之前都會把操作記錄下來。
    主要實現(xiàn)在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我們來具體看看實現(xiàn)。

    日志格式
    db\log_format.h
    log是分塊的,每塊為32K,每條記錄的記錄頭為7個字節(jié),前四個為CRC,然后是長度(2個字節(jié)),最后是記錄類型(1個字節(jié))
    ---------------------------------------
    BLOCK1|BLOCK2|BLOCK3|...|BLOCKN
    ---------------------------------------

    enum RecordType {
     
    // Zero is reserved for preallocated files
      kZeroType = 0,

      kFullType 
    = 1,

      
    // For fragments
      kFirstType = 2,
      kMiddleType 
    = 3,
      kLastType 
    = 4
    };
    static const int kMaxRecordType = kLastType;

    static const int kBlockSize = 32768;

    // Header is checksum (4 bytes), type (1 byte), length (2 bytes).
    static const int kHeaderSize = 4 + 1 + 2;

    }  
    // namespace log
    }  // namespace leveldb

    寫日志操作
    db\log_writer.cc
    請注意這里的處理,由于1條記錄可能超過一個BLOCK的大小,所以需要分成多個片段寫入。
    //增加一條記錄
    Status Writer::AddRecord(const Slice& slice) {
      
    const char* ptr = slice.data();
      size_t left 
    = slice.size();

      
    // Fragment the record if necessary and emit it.  Note that if slice
      
    // is empty, we still want to iterate once to emit a single
      
    // zero-length record
      Status s;
      
    bool begin = true;
      
    do {
        
    const int leftover = kBlockSize - block_offset_; //當(dāng)前剩余多少字節(jié)
        assert(leftover >= 0);
        
    if (leftover < kHeaderSize) { //不夠文件頭大小7bytes
          
    // 轉(zhuǎn)入新的block
          if (leftover > 0) {
            
    //用0來填充空白
            assert(kHeaderSize == 7);
            dest_
    ->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
          }
          block_offset_ 
    = 0;
        }

        
    // Invariant: we never leave < kHeaderSize bytes in a block.
        assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

        
    //avail:除掉頭還算多少字節(jié)
        const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
        
    //實際寫入大小
        const size_t fragment_length = (left < avail) ? left : avail;

        RecordType type;
        
    const bool end = (left == fragment_length); //記錄是否結(jié)束
        if (begin && end) {
          type 
    = kFullType; //完整記錄
        } else if (begin) {
          type 
    = kFirstType; //開頭
        } else if (end) {
          type 
    = kLastType; //結(jié)尾
        } else {
          type 
    = kMiddleType; //中間
        }
        
    //寫入
        s = EmitPhysicalRecord(type, ptr, fragment_length);
        ptr 
    += fragment_length;
        left 
    -= fragment_length;
        begin 
    = false;
      } 
    while (s.ok() && left > 0);
      
    return s;
    }

    //實際寫入日志文件
    Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
      assert(n 
    <= 0xffff);  // Must fit in two bytes
      assert(block_offset_ + kHeaderSize + n <= kBlockSize);

      
    // 記錄頭
      char buf[kHeaderSize];
      buf[
    4= static_cast<char>(n & 0xff);
      buf[
    5= static_cast<char>(n >> 8);
      buf[
    6= static_cast<char>(t);

      
    // 計算CRC
      uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
      crc 
    = crc32c::Mask(crc);                 // Adjust for storage
      EncodeFixed32(buf, crc);

      
    // 寫入頭部
      Status s = dest_->Append(Slice(buf, kHeaderSize));
      
    if (s.ok()) {
        
    //寫入記錄片段
        s = dest_->Append(Slice(ptr, n));
        
    if (s.ok()) {
          s 
    = dest_->Flush();
        }
      }
      block_offset_ 
    += kHeaderSize + n;
      
    return s;
    }

    讀日志操作
    這里可以看出使用BLOCK的好處,能夠減少文件IO次數(shù),讀日志基本上就是寫日志反向過程。

    //讀取記錄,scratch為緩沖,record是結(jié)果
    bool Reader::ReadRecord(Slice* record, std::string* scratch) {
      
    if (last_record_offset_ < initial_offset_) { //需要跳過文件頭部信息,目前未實現(xiàn)
        if (!SkipToInitialBlock()) {
          
    return false;
        }
      }

      scratch
    ->clear();
      record
    ->clear();
      
    bool in_fragmented_record = false//是否是碎片記錄
    // Record offset of the logical record that we're reading
      
    // 0 is a dummy value to make compilers happy
      uint64_t prospective_record_offset = 0;
      Slice fragment;
      
    while (true) {
        uint64_t physical_record_offset 
    = end_of_buffer_offset_ - buffer_.size();
        
    //從文件中讀取一個BLOCK
        const unsigned int record_type = ReadPhysicalRecord(&fragment);
        
    switch (record_type) {
          
    case kFullType: //完整Record
            if (in_fragmented_record) {
              
    // Handle bug in earlier versions of log::Writer where
              
    // it could emit an empty kFirstType record at the tail end
              
    // of a block followed by a kFullType or kFirstType record
              
    // at the beginning of the next block.
              if (scratch->empty()) {
                in_fragmented_record 
    = false;
              } 
    else {
                ReportCorruption(scratch
    ->size(), "partial record without end(1)");
              }
            }
            prospective_record_offset 
    = physical_record_offset;
            scratch
    ->clear();
            
    *record = fragment;
            last_record_offset_ 
    = prospective_record_offset;
            
    return true;

          
    case kFirstType: //Record開始
            if (in_fragmented_record) {
              
    // Handle bug in earlier versions of log::Writer where
              
    // it could emit an empty kFirstType record at the tail end
              
    // of a block followed by a kFullType or kFirstType record
              
    // at the beginning of the next block.
              if (scratch->empty()) {
                in_fragmented_record 
    = false;
              } 
    else {
                ReportCorruption(scratch
    ->size(), "partial record without end(2)");
              }
            }
            prospective_record_offset 
    = physical_record_offset;
            scratch
    ->assign(fragment.data(), fragment.size());
            in_fragmented_record 
    = true;
            
    break;

          
    case kMiddleType://Record中間
            if (!in_fragmented_record) {
              ReportCorruption(fragment.size(),
                               
    "missing start of fragmented record(1)");
            } 
    else {
              scratch
    ->append(fragment.data(), fragment.size());
            }
            
    break;

          
    case kLastType://Record結(jié)尾
            if (!in_fragmented_record) {
              ReportCorruption(fragment.size(),
                               
    "missing start of fragmented record(2)");
            } 
    else {
              scratch
    ->append(fragment.data(), fragment.size());
              
    *record = Slice(*scratch);
              last_record_offset_ 
    = prospective_record_offset;
              
    return true;
            }
            
    break;

          
    case kEof://文件結(jié)束
            if (in_fragmented_record) {
              ReportCorruption(scratch
    ->size(), "partial record without end(3)");
              scratch
    ->clear();
            }
            
    return false;

          
    case kBadRecord://壞記錄
            if (in_fragmented_record) {
              ReportCorruption(scratch
    ->size(), "error in middle of record");
              in_fragmented_record 
    = false;
              scratch
    ->clear();
            }
            
    break;

          
    default: {//無法識別
            char buf[40];
            snprintf(buf, 
    sizeof(buf), "unknown record type %u", record_type);
            ReportCorruption(
                (fragment.size() 
    + (in_fragmented_record ? scratch->size() : 0)),
                buf);
            in_fragmented_record 
    = false;
            scratch
    ->clear();
            
    break;
          }
        }
      }
      
    return false;
    }

    //從文件中讀取
    unsigned int Reader::ReadPhysicalRecord(Slice* result) {
      
    while (true) {
        
    if (buffer_.size() < kHeaderSize) {
          
    if (!eof_) {
            
    // Last read was a full read, so this is a trailer to skip
            buffer_.clear();
            
    //讀入一個BLOCK
            Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
            end_of_buffer_offset_ 
    += buffer_.size();
            
    if (!status.ok()) {
              buffer_.clear();
              ReportDrop(kBlockSize, status);
              eof_ 
    = true;
              
    return kEof;
            } 
    else if (buffer_.size() < kBlockSize) {
              eof_ 
    = true;
            }
            
    continue;
          } 
    else if (buffer_.size() == 0) {
            
    // End of file
            return kEof;
          } 
    else {
            size_t drop_size 
    = buffer_.size();
            buffer_.clear();
            ReportCorruption(drop_size, 
    "truncated record at end of file");
            
    return kEof;
          }
        }

        
    // 解析record頭
        const char* header = buffer_.data();
        
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
        
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
        
    const unsigned int type = header[6];
        
    const uint32_t length = a | (b << 8);
        
    if (kHeaderSize + length > buffer_.size()) {
          size_t drop_size 
    = buffer_.size();
          buffer_.clear();
          ReportCorruption(drop_size, 
    "bad record length");
          
    return kBadRecord;
        }

        
    if (type == kZeroType && length == 0) {
          
    // Skip zero length record without reporting any drops since
          
    // such records are produced by the mmap based writing code in
          
    // env_posix.cc that preallocates file regions.
          buffer_.clear();
          
    return kBadRecord;
        }

        
    // 檢查CRC
        if (checksum_) {
          uint32_t expected_crc 
    = crc32c::Unmask(DecodeFixed32(header));
          uint32_t actual_crc 
    = crc32c::Value(header + 61 + length);
          
    if (actual_crc != expected_crc) {
            
    // Drop the rest of the buffer since "length" itself may have
            
    // been corrupted and if we trust it, we could find some
            
    // fragment of a real log record that just happens to look
            
    // like a valid log record.
            size_t drop_size = buffer_.size();
            buffer_.clear();
            ReportCorruption(drop_size, 
    "checksum mismatch");
            
    return kBadRecord;
          }
        }

        buffer_.remove_prefix(kHeaderSize 
    + length);

        
    // Skip physical record that started before initial_offset_
        if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
            initial_offset_) {
          result
    ->clear();
          
    return kBadRecord;
        }

        
    *result = Slice(header + kHeaderSize, length);
        
    return type;
      }
    }



    評論

    # re: leveldb研究3-數(shù)據(jù)庫日志文件格式  回復(fù)  更多評論   

    2012-03-12 09:23 by tb
    呵呵 不錯啊
    主站蜘蛛池模板: 一个人看的www免费在线视频| 国产成人精品日本亚洲专区61 | 精品国产综合成人亚洲区| 美女黄色免费网站| 亚洲日韩国产成网在线观看| 成av免费大片黄在线观看| 2048亚洲精品国产| 好男人看视频免费2019中文| 亚洲av永久无码精品网址| 免费又黄又爽又猛的毛片| 成人妇女免费播放久久久| 国产成人亚洲精品播放器下载| 三上悠亚亚洲一区高清| 久久久久免费看成人影片| 亚洲精品岛国片在线观看| 成人久久免费网站| 亚洲国产美女视频| 性做久久久久免费看| 久久er国产精品免费观看8| 亚洲精品高清视频| 全免费一级午夜毛片| 97在线免费观看视频| 亚洲人成网站在线在线观看| 亚洲免费一区二区| 亚洲日本在线观看视频| 久久久亚洲精品蜜桃臀| 亚洲日韩涩涩成人午夜私人影院| 亚洲国产成人五月综合网| free哆啪啪免费永久| 亚洲欧美中文日韩视频| 亚洲人片在线观看天堂无码| 亚洲一本到无码av中文字幕 | 一区二区免费国产在线观看| 91久久亚洲国产成人精品性色 | 久久亚洲精品国产亚洲老地址| 亚洲国产精品国产自在在线 | 黄色一级毛片免费| 黄色一级视频免费观看| 亚洲免费在线观看| 亚洲综合一区二区三区四区五区| 亚洲熟妇自偷自拍另欧美|