FaceBook 開(kāi)源項(xiàng)目rocksdb分析

rocksdb 介紹

Rocksdb同樣是一種基于operation log的文件系統(tǒng)。
由于采用了op log,將對(duì)磁盤(pán)的隨機(jī)寫(xiě)轉(zhuǎn)換成了對(duì)op log的順序?qū)?,最新的?shù)據(jù)是存儲(chǔ)在內(nèi)存的memrory中,可以提高IO效率。每一個(gè)的column family分別有一個(gè)memtable與sstablle.當(dāng)某一coloumn family內(nèi)存中的memory table超過(guò)閾值時(shí),轉(zhuǎn)換成immute memtable并創(chuàng)建新的op log,immute memtable由一系列的memtable組成,它們是只讀的,可供查詢,不能更新數(shù)據(jù)。當(dāng)immute memtable的數(shù)目超過(guò)設(shè)置的數(shù)值時(shí),會(huì)觸發(fā)flush,DB會(huì)調(diào)度后臺(tái)線程將多個(gè)memtable合并后再dump到磁盤(pán)生成Level0中一個(gè)新的sstable文件,Level0中的sstable文件不斷累積,會(huì)觸發(fā)compaction,DB會(huì)調(diào)度后臺(tái)compaction線程將Level0中的sstable文件根據(jù)key與Level1中的sstable合并并生成新的sstable,依次類(lèi)推,根據(jù)key的空間從低層往上compact,最終形成了一層層的結(jié)構(gòu),層級(jí)數(shù)目是由用戶設(shè)置的。

leveldb中,memtable在內(nèi)存中核心s的數(shù)據(jù)結(jié)構(gòu)為skiplist,而在rocksdb中,memtable在內(nèi)存中的形式有三種:skiplist、hash-skiplist、hash-linklist,從字面中就可以看出數(shù)據(jù)結(jié)構(gòu)的大體形式,hash-skiplist就是每個(gè)hash bucket中是一個(gè)skiplist,hash-linklist中,每個(gè)hash bucket中是一個(gè)link-list,啟用何用數(shù)據(jù)結(jié)構(gòu)可在配置中選擇。

日志結(jié)構(gòu)樹(shù)

從概念上說(shuō),最基本的LSM是很簡(jiǎn)單的 。將之前使用一個(gè)大的查找結(jié)構(gòu)(造成隨機(jī)讀寫(xiě),影響寫(xiě)性能),變換為將寫(xiě)操作順序的保存到一些相似的有序文件(也就是sstable)中。所以每個(gè)文件包 含短時(shí)間內(nèi)的一些改動(dòng)。因?yàn)槲募怯行虻模灾蟛檎乙矔?huì)很快。文件是不可修改的,他們永遠(yuǎn)不會(huì)被更新,新的更新操作只會(huì)寫(xiě)到新的文件中。讀操作檢查很 有的文件。通過(guò)周期性的合并這些文件來(lái)減少文件個(gè)數(shù)。

讀操作

LevelDb 讀操作順序
Status DBImpl::Get(const ReadOptions& read_options,
                   ColumnFamilyHandle* column_family, const Slice& key,
                   PinnableSlice* value) {
  return GetImpl(read_options, column_family, key, value);
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
                       ColumnFamilyHandle* column_family, const Slice& key,
                       PinnableSlice* pinnable_val, bool* value_found,
                       ReadCallback* callback, bool* is_blob_index) {
  assert(pinnable_val != nullptr);
  StopWatch sw(env_, stats_, DB_GET);
  PERF_TIMER_GUARD(get_snapshot_time);

  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

  // Acquire SuperVersion
// holds references to memtable, all immutable memtables and version

  SuperVersion* sv = GetAndRefSuperVersion(cfd);

  TEST_SYNC_POINT("DBImpl::GetImpl:1");
  TEST_SYNC_POINT("DBImpl::GetImpl:2");

  SequenceNumber snapshot;
  if (read_options.snapshot != nullptr) {
    // Note: In WritePrepared txns this is not necessary but not harmful either.
    // Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
    // specified we should be fine with skipping seq numbers that are greater
    // than that.

// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.

    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
  } else {
    // Since we get and reference the super version before getting
    // the snapshot number, without a mutex protection, it is possible
    // that a memtable switch happened in the middle and not all the
    // data for this snapshot is available. But it will contain all
    // the data available in the super version we have, which is also
    // a valid snapshot to read from.
    // We shouldn't get snapshot before finding and referencing the
    // super versipon because a flush happening in between may compact
    // away data for the snapshot, but the snapshot is earlier than the
    // data overwriting it, so users may see wrong results.
    snapshot = last_seq_same_as_publish_seq_
                   ? versions_->LastSequence()
                   : versions_->LastPublishedSequence();
  }
  TEST_SYNC_POINT("DBImpl::GetImpl:3");
  TEST_SYNC_POINT("DBImpl::GetImpl:4");

  // Prepare to store a list of merge operations if merge occurs.
  MergeContext merge_context;
  RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);

  Status s;
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
  // merge_operands will contain the sequence of merges in the latter case.
  LookupKey lkey(key, snapshot);
  PERF_TIMER_STOP(get_snapshot_time);

  bool skip_memtable = (read_options.read_tier == kPersistedTier &&
                        has_unpersisted_data_.load(std::memory_order_relaxed));
  bool done = false;
  if (!skip_memtable) {
    if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
                     &range_del_agg, read_options, callback, is_blob_index)) {
      done = true;
      pinnable_val->PinSelf();
      RecordTick(stats_, MEMTABLE_HIT);
    } else if ((s.ok() || s.IsMergeInProgress()) &&
               sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
                            &range_del_agg, read_options, callback,
                            is_blob_index)) {
      done = true;
      pinnable_val->PinSelf();
      RecordTick(stats_, MEMTABLE_HIT);
    }
    if (!done && !s.ok() && !s.IsMergeInProgress()) {
      ReturnAndCleanupSuperVersion(cfd, sv);
      return s;
    }
  }
  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
                     &range_del_agg, value_found, nullptr, nullptr, callback,
                     is_blob_index);
    RecordTick(stats_, MEMTABLE_MISS);
  }

  {
    PERF_TIMER_GUARD(get_post_process_time);

    ReturnAndCleanupSuperVersion(cfd, sv);

    RecordTick(stats_, NUMBER_KEYS_READ);
    size_t size = pinnable_val->size();
    RecordTick(stats_, BYTES_READ, size);
    MeasureTime(stats_, BYTES_PER_READ, size);
    PERF_COUNTER_ADD(get_read_bytes, size);
  }
  return s;
}

其中,LookupKey 實(shí)現(xiàn)如下:

LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
  size_t usize = _user_key.size();
  size_t needed = usize + 13;  // A conservative estimate
  char* dst;
  if (needed <= sizeof(space_)) {
    dst = space_;
  } else {
    dst = new char[needed];
  }
  start_ = dst;
  // NOTE: We don't support users keys of more than 2GB :)
  dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
  kstart_ = dst;
  memcpy(dst, _user_key.data(), usize);
  dst += usize;
  EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
  dst += 8;
  end_ = dst;
}

寫(xiě)操作

rocks db 的寫(xiě)操作是通過(guò) WriteImpl實(shí)現(xiàn)的,其結(jié)構(gòu)如下所示:

Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
  return WriteImpl(write_options, my_batch, nullptr, nullptr);
}
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback,
                         uint64_t* log_used, uint64_t log_ref,
                         bool disable_memtable, uint64_t* seq_used,
                         PreReleaseCallback* pre_release_callback) {
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
  if (write_options.sync && write_options.disableWAL) {
    return Status::InvalidArgument("Sync writes has to enable WAL.");
  }
  if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_writes is not compatible with concurrent prepares");
  }
  if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_writes is not compatible with seq_per_batch");
  }
  // Otherwise IsLatestPersistentState optimization does not make sense
  assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
         disable_memtable);

  Status status;
  if (write_options.low_pri) {
    status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
    if (!status.ok()) {
      return status;
    }
  }

  if (two_write_queues_ && disable_memtable) {
    return WriteImplWALOnly(write_options, my_batch, callback, log_used,
                            log_ref, seq_used, pre_release_callback);
  }

  if (immutable_db_options_.enable_pipelined_write) {
    return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
                              log_ref, disable_memtable, seq_used);
  }

  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        disable_memtable, pre_release_callback);

  if (!write_options.disableWAL) {
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  write_thread_.JoinBatchGroup(&w);
  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    // we are a non-leader in a parallel group
    PERF_TIMER_GUARD(write_memtable_time);

    if (w.ShouldWriteToMemtable()) {
      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());
      w.status = WriteBatchInternal::InsertInto(
          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          true /*concurrent_memtable_writes*/, seq_per_batch_);
    }

    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      // we're responsible for exit batch group
      for (auto* writer : *(w.write_group)) {
        if (!writer->CallbackFailed() && writer->pre_release_callback) {
          assert(writer->sequence != kMaxSequenceNumber);
          Status ws = writer->pre_release_callback->Callback(writer->sequence);
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
      }
      // TODO(myabandeh): propagate status to write_group
      auto last_sequence = w.write_group->last_sequence;
      versions_->SetLastSequence(last_sequence);
      MemTableInsertStatusCheck(w.status);
      write_thread_.ExitAsBatchGroupFollower(&w);
    }
    assert(w.state == WriteThread::STATE_COMPLETED);
    // STATE_COMPLETED conditional below handles exit

    status = w.FinalStatus();
  }
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
    // write is complete and leader has updated sequence
    return w.FinalStatus();
  }
  // else we are the leader of the write batch group
  assert(w.state == WriteThread::STATE_GROUP_LEADER);

  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
  WriteContext write_context;
  WriteThread::WriteGroup write_group;
  bool in_parallel_group = false;
  uint64_t last_sequence = kMaxSequenceNumber;
  if (!two_write_queues_) {
    last_sequence = versions_->LastSequence();
  }

  mutex_.Lock();

  bool need_log_sync = write_options.sync;
  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
  if (!two_write_queues_ || !disable_memtable) {
    // With concurrent writes we do preprocess only in the write thread that
    // also does write to memtable to avoid sync issue on shared data structure
    // with the other thread
    status = PreprocessWrite(write_options, &need_log_sync, &write_context);
  }
  log::Writer* log_writer = logs_.back().writer;

  mutex_.Unlock();

  // Add to log and apply to memtable.  We can release the lock
  // during this phase since &w is currently responsible for logging
  // and protects against concurrent loggers and concurrent writes
  // into memtables

  last_batch_group_size_ =
      write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

  if (status.ok()) {
    // Rules for when we can update the memtable concurrently
    // 1. supported by memtable
    // 2. Puts are not okay if inplace_update_support
    // 3. Merges are not okay
    //
    // Rules 1..2 are enforced by checking the options
    // during startup (CheckConcurrentWritesSupported), so if
    // options.allow_concurrent_memtable_write is true then they can be
    // assumed to be true.  Rule 3 is checked for each batch.  We could
    // relax rules 2 if we could prevent write batches from referring
    // more than once to a particular key.
    bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
                    write_group.size > 1;
    size_t total_count = 0;
    size_t valid_batches = 0;
    uint64_t total_byte_size = 0;
    for (auto* writer : write_group) {
      if (writer->CheckCallback(this)) {
        valid_batches++;
        if (writer->ShouldWriteToMemtable()) {
          total_count += WriteBatchInternal::Count(writer->batch);
          parallel = parallel && !writer->batch->HasMerge();
        }

        total_byte_size = WriteBatchInternal::AppendedByteSize(
            total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
      }
    }
    // Note about seq_per_batch_: either disableWAL is set for the entire write
    // group or not. In either case we inc seq for each write batch with no
    // failed callback. This means that there could be a batch with
    // disalbe_memtable in between; although we do not write this batch to
    // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
    // the seq per valid written key to mem.
    size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;

    const bool concurrent_update = two_write_queues_;
    // Update stats while we are an exclusive group leader, so we know
    // that nobody else can be writing to these particular stats.
    // We're optimistic, updating the stats before we successfully
    // commit.  That lets us release our leader status early.
    auto stats = default_cf_internal_stats_;
    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
                      concurrent_update);
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
                      concurrent_update);
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
    stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
    RecordTick(stats_, WRITE_DONE_BY_SELF);
    auto write_done_by_other = write_group.size - 1;
    if (write_done_by_other > 0) {
      stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
                        concurrent_update);
      RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
    }
    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);

    if (write_options.disableWAL) {
      has_unpersisted_data_.store(true, std::memory_order_relaxed);
    }

    PERF_TIMER_STOP(write_pre_and_post_process_time);

    if (!two_write_queues_) {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
                            need_log_dir_sync, last_sequence + 1);
      }
    } else {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        // LastAllocatedSequence is increased inside WriteToWAL under
        // wal_write_mutex_ to ensure ordered events in WAL
        status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
                                      seq_inc);
      } else {
        // Otherwise we inc seq number for memtable writes
        last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
      }
    }
    assert(last_sequence != kMaxSequenceNumber);
    const SequenceNumber current_sequence = last_sequence + 1;
    last_sequence += seq_inc;

    if (status.ok()) {
      PERF_TIMER_GUARD(write_memtable_time);

      if (!parallel) {
        // w.sequence will be set inside InsertInto
        w.status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, column_family_memtables_.get(),
            &flush_scheduler_, write_options.ignore_missing_column_families,
            0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
      } else {
        SequenceNumber next_sequence = current_sequence;
        // Note: the logic for advancing seq here must be consistent with the
        // logic in WriteBatchInternal::InsertInto(write_group...) as well as
        // with WriteBatchInternal::InsertInto(write_batch...) that is called on
        // the merged batch during recovery from the WAL.
        for (auto* writer : write_group) {
          if (writer->CallbackFailed()) {
            continue;
          }
          writer->sequence = next_sequence;
          if (seq_per_batch_) {
            next_sequence++;
          } else if (writer->ShouldWriteToMemtable()) {
            next_sequence += WriteBatchInternal::Count(writer->batch);
          }
        }
        write_group.last_sequence = last_sequence;
        write_group.running.store(static_cast<uint32_t>(write_group.size),
                                  std::memory_order_relaxed);
        write_thread_.LaunchParallelMemTableWriters(&write_group);
        in_parallel_group = true;

        // Each parallel follower is doing each own writes. The leader should
        // also do its own.
        if (w.ShouldWriteToMemtable()) {
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          w.status = WriteBatchInternal::InsertInto(
              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
              write_options.ignore_missing_column_families, 0 /*log_number*/,
              this, true /*concurrent_memtable_writes*/, seq_per_batch_);
        }
      }
      if (seq_used != nullptr) {
        *seq_used = w.sequence;
      }
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

  if (!w.CallbackFailed()) {
    WriteCallbackStatusCheck(status);
  }

  if (need_log_sync) {
    mutex_.Lock();
    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
    mutex_.Unlock();
    // Requesting sync with two_write_queues_ is expected to be very rare. We
    // hance provide a simple implementation that is not necessarily efficient.
    if (two_write_queues_) {
      if (manual_wal_flush_) {
        status = FlushWAL(true);
      } else {
        status = SyncWAL();
      }
    }
  }

  bool should_exit_batch_group = true;
  if (in_parallel_group) {
    // CompleteParallelWorker returns true if this thread should
    // handle exit, false means somebody else did
    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
  }
  if (should_exit_batch_group) {
    if (status.ok()) {
      for (auto* writer : write_group) {
        if (!writer->CallbackFailed() && writer->pre_release_callback) {
          assert(writer->sequence != kMaxSequenceNumber);
          Status ws = writer->pre_release_callback->Callback(writer->sequence);
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
      }
      versions_->SetLastSequence(last_sequence);
    }
    MemTableInsertStatusCheck(w.status);
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
  }

  if (status.ok()) {
    status = w.FinalStatus();
  }
  return status;
}

Compaction操作

minor Compaction操作

跳表直接存到本地磁盤(pán)


major compaction 操作

多路歸并排序算法

Column Family

自從RocksDB 3.0引入支持column family,每個(gè)KV數(shù)據(jù)對(duì)能夠指定關(guān)聯(lián)唯一的column family(默認(rèn)為“default”),做到相對(duì)獨(dú)立的隔離存儲(chǔ);column family提供了邏輯上劃分?jǐn)?shù)據(jù)庫(kù)的能力,支持跨column family進(jìn)行原子寫(xiě)操作(借助WriteBatch實(shí)現(xiàn))等。

所有的column family共享WAL日志文件(write-head log),但是每個(gè)column family有獨(dú)立的MemTable和SSTable。
共享WAL有利于原子操作實(shí)現(xiàn),更高效的成組提交。
獨(dú)立的MemTable和SSTable則更方便數(shù)據(jù)壓縮,配置選項(xiàng),快速的刪除某個(gè)column family。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前面寫(xiě)了兩篇文章介紹 LevelDB 的整體架構(gòu)和接口使用。這篇文章,我們從代碼的角度看看 LevelDB 的設(shè)計(jì)...
    linjinhe閱讀 5,766評(píng)論 0 1
  • 譯文 自從谷歌發(fā)表了他的”big table“論文以來(lái)已經(jīng)過(guò)去了十年。論文中很酷的一點(diǎn)是它使用的文件組織方式。在1...
    Dr_zhang閱讀 969評(píng)論 1 4
  • 簡(jiǎn)介 Log Structured Merge Tree,下面簡(jiǎn)稱 LSM。 2006年,Google 發(fā)表了 B...
    linjinhe閱讀 20,442評(píng)論 0 9
  • 很多時(shí)候,自己寫(xiě)東西就是會(huì)卡住很久,本來(lái)想著要更新的很勤快,但到頭來(lái),卻發(fā)現(xiàn)經(jīng)常會(huì)因?yàn)橐恍┢渌氖虑?,開(kāi)始想著明天...
    良人不叫二狗子閱讀 832評(píng)論 0 0
  • 1、過(guò)橋米線 過(guò)橋米線是云南著名的風(fēng)味小吃。過(guò)橋米線主要以湯、肉片、米線再加作料做成。湯用肥雞、豬筒子骨等熬制以清...
    美食達(dá)人閱讀 990評(píng)論 0 0

友情鏈接更多精彩內(nèi)容