RocksDB源碼分析 Read(一)內(nèi)存讀取

Get

SuperVersion* sv = GetAndRefSuperVersion;
SequenceNumber snapshot;
//獲取snapshot (目前最大的sequence)
...
bool done = false;
  if (!skip_memtable) {
    // Get value associated with key
    if (get_impl_options.get_value) {
      //查詢(xún)memtable
      if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
                       &merge_context, &max_covering_tombstone_seq,
                       read_options, get_impl_options.callback,
                       get_impl_options.is_blob_index)) {
        done = true;
        get_impl_options.value->PinSelf();
        RecordTick(stats_, MEMTABLE_HIT);
      } else if ((s.ok() || s.IsMergeInProgress()) &&
                 sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
                              &merge_context, &max_covering_tombstone_seq,
                              read_options, get_impl_options.callback,
                              get_impl_options.is_blob_index)) {
        done = true;
        get_impl_options.value->PinSelf();
        RecordTick(stats_, MEMTABLE_HIT);
      }
    } 
...

memtable get

存在memtable里的key是key+(type and sequence)其中type and seq混合8字節(jié)

//先查詢(xún)bloom filter
if (bloom_filter_) {
  // when both memtable_whole_key_filtering and prefix_extractor_ are set,
  // only do whole key filtering for Get() to save CPU
  if (moptions_.memtable_whole_key_filtering) {
    may_contain =
        bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
  } else {
    assert(prefix_extractor_);
    may_contain =
        !prefix_extractor_->InDomain(user_key) ||
        bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
  }
}

//從memtable里拿
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
                 is_blob_index, value, s, merge_context, seq,
                 &found_final_value, &merge_in_progress);
 
void MemTable::GetFromTable(...){
  //構(gòu)建saver 和回調(diào)
  Saver saver;
  saver.status = s;
  saver.found_final_value = found_final_value;
  saver.merge_in_progress = merge_in_progress;
  saver.key = &key;
  saver.value = value;
  saver.seq = kMaxSequenceNumber;
  saver.mem = this;
  saver.merge_context = merge_context;
  saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
  saver.merge_operator = moptions_.merge_operator;
  saver.logger = moptions_.info_log;
  saver.inplace_update_support = moptions_.inplace_update_support;
  saver.statistics = moptions_.statistics;
  saver.env_ = env_;
  saver.callback_ = callback;
  saver.is_blob_index = is_blob_index;
  saver.do_merge = do_merge;
  //執(zhí)行查找
  table_->Get(key, &saver, SaveValue);
} 


void MemTableRep::Get(const LookupKey& k, void* callback_args,
                      bool (*callback_func)(void* arg, const char* entry)) {
  auto iter = GetDynamicPrefixIterator();
  //從skiplist里查找
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
       iter->Valid() && callback_func(callback_args, iter->key());
       iter->Next()) {
  }
}

inline void InlineSkipList<Comparator>::Iterator::Seek(const char* target) {
  //尋找key和sequence符合條件的 node
  //返回小于我們seq的值
  //Returns the earliest node with a key >= key.
  // Return nullptr if there is no such node.
  //key在skip list里從大到小排
  //所以查找會(huì)找到key >= 我們需要的key
  //如果key相等,會(huì)按照seq降序排,所以順序過(guò)去一定是seq小于等與我們需要的seq
  node_ = list_->FindGreaterOrEqual(target);
}



static bool SaveValue(void* arg, const char* entry) {
  Saver* s = reinterpret_cast<Saver*>(arg);
  assert(s != nullptr);
  MergeContext* merge_context = s->merge_context;
  SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
  const MergeOperator* merge_operator = s->merge_operator;

  assert(merge_context != nullptr);

  // entry format is:
  //    klength  varint32
  //    userkey  char[klength-8]
  //    tag      uint64
  //    vlength  varint32f
  //    value    char[vlength]
  // Check that it belongs to same user key.  We do not check the
  // sequence number since the Seek() call above should have skipped
  // all entries with overly large sequence numbers.
  uint32_t key_length;
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
  Slice user_key_slice = Slice(key_ptr, key_length - 8);
  //這里因?yàn)閟eek可能找到key大于我們需要的key,此時(shí)需要比較一下,如果不想等則直接跳過(guò)
  if (s->mem->GetInternalKeyComparator()
          .user_comparator()
          ->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
    // Correct user key
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
    ValueType type;
    SequenceNumber seq;
    UnPackSequenceAndType(tag, &seq, &type);
    // If the value is not in the snapshot, skip it
    if (!s->CheckCallback(seq)) {
      return true;  // to continue to the next seq
    }

    s->seq = seq;

    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
        max_covering_tombstone_seq > seq) {
      type = kTypeRangeDeletion;
    }
    switch (type) {
      ...
      //根據(jù)type處理key value
  }

  // s->state could be Corrupt, merge or notfound
  return false;
}

ThreadLocalSuperVersion

Rocksdb利用線程局部緩存和atomic來(lái)替換掉原先leveldb的version加鎖的邏輯

//在讀之前需要獲得新的superversion(最新的versionset)
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
  //通過(guò)swap獲得當(dāng)前的superversion(每個(gè)線程都用InUse對(duì)象替換tls對(duì)象)
  //如果沒(méi)有寫(xiě),那么在執(zhí)行ReturnThreadLocalSuperVersion前,tls都保持inuse對(duì)象
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  //如果剛獲取完superversion,就發(fā)現(xiàn)已經(jīng)過(guò)期了。那就把這個(gè)給刪了,直接通過(guò)加鎖獲取當(dāng)前最新的super version
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
      db->mutex()->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
        db->AddSuperVersionsToFreeQueue(sv);
        db->SchedulePurge();
      } else {
        sv_to_delete = sv;
      }
    } else {
      db->mutex()->Lock();
    }
    //這里一定要加鎖,防止在被后臺(tái)線程作出變更,并獲取當(dāng)前的全局super_version
    sv = super_version_->Ref();
    db->mutex()->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

ReturnAndCleanupSuperVersion

void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
                                          SuperVersion* sv) {
  if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
    //將當(dāng)前的superversion反還給tls,如果此時(shí)cas發(fā)現(xiàn)換不回去,則說(shuō)明已經(jīng)被變更了(寫(xiě)線程修改了所有線程的tls為nullptr)
    //清除掉當(dāng)前保留的舊版本superversion
    CleanupSuperVersion(sv);
  }
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happened. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

InstallSuperVersion

InstallSuperVersionAndScheduleWork->
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
    const MutableCFOptions& mutable_cf_options) {
  //外部加鎖了
  SuperVersion* new_superversion = sv_context->new_superversion.release();
  new_superversion->db_mutex = db_mutex;
  new_superversion->mutable_cf_options = mutable_cf_options;
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  //設(shè)置新的suerversion
  super_version_ = new_superversion;
  ++super_version_number_;
  super_version_->version_number = super_version_number_;
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    //將其他線程的tls設(shè)置為nullptr(SuperVersion::kSVObsolete)
    //這個(gè)在old_superversion->Unref()之前調(diào)用,這樣local_sv就不會(huì)是最后一個(gè)superversion的引用
    ResetThreadLocalSuperVersions();

    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    //如果這是最后一個(gè)對(duì)old_superversion的引用,那么就將其清除掉
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
}

rocksdb對(duì)leveldb的讀優(yōu)化

Mutex用時(shí)也是Atomic的3倍。

rocksdb就是將leveldb里Get()實(shí)現(xiàn)中一上來(lái)就mutex加鎖的操作換成atmoic+線程私有存儲(chǔ)的方式來(lái)進(jìn)行優(yōu)化,優(yōu)化后讀操作基本很少再會(huì)有互斥,性能提高不少

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • MemTable MemTable是一個(gè)內(nèi)存中數(shù)據(jù)結(jié)構(gòu),用來(lái)保存新寫(xiě)入的還沒(méi)有flush到SST文件中的數(shù)據(jù)。 讀...
    周肅閱讀 5,088評(píng)論 1 5
  • 久違的晴天,家長(zhǎng)會(huì)。 家長(zhǎng)大會(huì)開(kāi)好到教室時(shí),離放學(xué)已經(jīng)沒(méi)多少時(shí)間了。班主任說(shuō)已經(jīng)安排了三個(gè)家長(zhǎng)分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,818評(píng)論 16 22
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開(kāi)了第一次的黨會(huì),身份的轉(zhuǎn)變要...
    余生動(dòng)聽(tīng)閱讀 10,836評(píng)論 0 11
  • 可愛(ài)進(jìn)取,孤獨(dú)成精。努力飛翔,天堂翱翔。戰(zhàn)爭(zhēng)美好,孤獨(dú)進(jìn)取。膽大飛翔,成就輝煌。努力進(jìn)取,遙望,和諧家園??蓯?ài)游走...
    趙原野閱讀 3,493評(píng)論 1 1
  • 在妖界我有個(gè)名頭叫胡百曉,無(wú)論是何事,只要找到胡百曉即可有解決的辦法。因?yàn)槭侵缓偞蠹乙杂瀭饔灲形摇皟A城百曉”,...
    貓九0110閱讀 3,712評(píng)論 7 3

友情鏈接更多精彩內(nèi)容