RocksDB——Put

RocksDB——Put

涉及的數(shù)據(jù)結(jié)構(gòu)概覽

相關(guān)class以及對應(yīng)的源文件

DB db/db.h
DB include/rocksdb/db.h
DB_impl db/db_impl.cc, db/db_impl_write.cc
WriteBatch include/rocksdb/write_batch.h, db/write_batch.cc
WriteThread db/write_thread.h, db/write_thread.cc
WriteBatchInternal db/write_batch_internal.h, db/write_batch.cc
WriteOptions include/rocksdb/options.h
MemtableInserter db/write_batch.cc

調(diào)用關(guān)系圖

image

默認(rèn)配置下的put流程

外部調(diào)用流程

image

RocksDB從調(diào)用Put接口到真正開始執(zhí)行Put操作之間還有幾層函數(shù)調(diào)用,在這幾層函數(shù)調(diào)用中主要進(jìn)行數(shù)據(jù)的封裝操作,最后進(jìn)入DB_Impl::WriteImpl執(zhí)行寫操作過程。

首先是外部的Put接口,RocksDB提供了兩個Put接口,分別是指定了column_family以及沒有指定column_family的接口,其中沒有指定column_family的Put是調(diào)用指定column_family的Put并指定默認(rèn)的column family(default)封裝實(shí)現(xiàn)的。

DBImpl的Put調(diào)用DB的Put實(shí)現(xiàn),兩者定義一樣,參數(shù)直接傳遞

DB的Put會將傳入的column_family、key以及value封裝到WriteBatch中,然后調(diào)用Write函數(shù),傳入WriteOption以及WriteBatch

Write函數(shù)直接調(diào)用WriteImple函數(shù)進(jìn)入寫流程

WriteImpl

傳入WriteImpl的參數(shù)為:

const WriteOptions& write_options
WriteBatch* my_batch
WriteCallback* callback
uint64_t* log_used
uint64_t log_ref
bool disable_memtable
uint64_t* seq_used
size_t batch_cnt
PreReleaseCallback* pre_release_callback

(由Write調(diào)用的時候只提供了WriteOptions、WriteBatch*以及WriteCallback*(TODO:關(guān)于傳入?yún)?shù)的個數(shù))

初步處理

進(jìn)入WriteImpl之后首先判斷幾個設(shè)置參數(shù)并根據(jù)設(shè)置參數(shù)執(zhí)行不同的操作:

  • tracer_:如果為true,則調(diào)用tracer_->Write,傳入my_batch(TODO:tracer是干啥的)
  • syncdisableWAL設(shè)置沖突,返回NotSurpport
  • two_write_queues_以及enable_pipelined_write設(shè)置沖突,返回NotSurpport
  • seq_per_batchenable_pipelined_write暫時不支持(TODO:seq_per_batch)
  • 如果WriteOptions設(shè)置了low_pri選項(xiàng),則調(diào)用函數(shù)ThrottleLowPriWritesIfNeeded
  • 如果two_write_queue_以及disableMemtable同時設(shè)置,則進(jìn)入WriteImplWALOnly函數(shù)(也就是說禁用memtable,后面的過程就不同了。只需要寫WAL)
  • 如果enable_pipelined_write同時設(shè)置,則進(jìn)入PipelinedWriteImpl

writer以及Write_Group

傳入WriteImpl函數(shù)獲得的所有參數(shù)構(gòu)造WriteThread::Writer結(jié)構(gòu)w,然后調(diào)用write_thread_.JoinBatchGroup,這個函數(shù)會將當(dāng)前這個Writer w加入到WriteThread的Writer鏈表中,當(dāng)w通過JoinBatchGroup之后會自動被設(shè)置一個狀態(tài)state,如果當(dāng)前writer是第一個進(jìn)入WriteThread的writer,則成為當(dāng)前Group的leader,狀態(tài)被設(shè)置為WriteThread::STATE_GROUP_LEADER,否則說明已經(jīng)有了一個leader,則等待leader為當(dāng)前的writer設(shè)置狀態(tài)(AwaitState)

image

PreProcessWrite

當(dāng)程序執(zhí)行到此處說明當(dāng)前writer是Group_Leader,當(dāng)two_write_queues_為false并且disable_memtable為false的時候,進(jìn)入PreProcessWrite函數(shù)進(jìn)行預(yù)處理,該過程需要mutex加鎖,傳入?yún)?shù)write_options、need_log_sync、write_contex。

根據(jù)官方文檔,memtable被flush有三個條件,滿足其中之一則觸發(fā)memtable的flush操作:

  1. 單個memtable的size超過writer_buffer_size
  2. 總memtable size超過db_write_buffer_size或者由write_bufer_namager發(fā)起了一次flush,此時會flush最大的那個memtable
  3. WAL的總size超過max_total_wal_size,此時會將包含最老的數(shù)據(jù)的memtable給flush,這樣讓包含這部分?jǐn)?shù)據(jù)的WAL可以釋放

PreProcessWrite主要過程為一系列判斷,并根據(jù)判斷執(zhí)行對應(yīng)的操作。

  • !single_column_family_mode_ && total_log_size > GetMaxTotalSize():表明總的log size超過了額定的閾值,此時需要更換WAL,調(diào)用函數(shù)SwitchWAL,傳入write_context(滿足flush條件3
  • write_buffer_manager_->ShouldFlush():WriteBufferManager判斷當(dāng)前memtable需要dump(memtable達(dá)到了設(shè)定的閾值大?。?,調(diào)用函數(shù)HandleWriteBufferFull,傳入write_context(滿足條件2中總size超出閾值
  • !flush_scheduler.Empty():調(diào)用ScheduleFlushes,傳入write_context(滿足條件2中的wbm觸發(fā)
  • write_controller_.IsStopped() || write_controller_.NeedsDelay():與write_controler相關(guān),調(diào)用DelayWrite函數(shù),傳入last_batch_group_size以及write_options
  • need_log_sync:等待log同步完成
UNLIKELY以及LIKELY

DB_Impl::PreProcessWrite中大量使用了UNLIKELY以及LIKELY兩個宏,其定義在源文件port/likely.h中。

實(shí)現(xiàn)上主要封裝了函數(shù)__builtin_expect(long exp, long c),這是一個編譯上的優(yōu)化,expect函數(shù)告訴編譯器表達(dá)式exp值為c的幾率比較大,希望可以針對此做優(yōu)化,返回值為exp的值。

宏LIKELY實(shí)現(xiàn)為buildtin_expect((x), 1),也就是說LIKELY中x表達(dá)式為真的概率比較大,對于一個if語句:if(LIKELY(x)),其等價于if(x),只不過這里告訴編譯器x的值為true的可能更大,則可以根據(jù)此來進(jìn)行匯編上的優(yōu)化。

UNLIKELY同理。

Insert過程

insert前的準(zhǔn)備
image

首先調(diào)用write_thread_的EnterAsBatchGroupLeader,傳入?yún)?shù)w以及write_group,這一步的作用主要是盡可能將能夠一塊寫入memtable的writer都加入write group。

然后判斷是否能并行插入memtable,parallel為ture的條件為設(shè)置中allow_concurrent_memtable_write選項(xiàng)為真并且write_group中writer個數(shù)大于1(默認(rèn)情況下只有一個所以這個時候parallel為false)

記錄各種本次write相關(guān)的狀態(tài):NUMBER_KEYS_WRITTEN、BYTES_WRITETENWRITE_DONE_BY_SELF、WRITE_DONE_BY_OTHER

如果開啟了disableWAL選項(xiàng),則將has_unpersisted_data_這個flag設(shè)置為true。

然后將數(shù)據(jù)寫入WAL中,此時有兩種情況,一是當(dāng)two_write_queue_為false的時候,直接調(diào)用WriteToWAL函數(shù)寫入WAL,傳入?yún)?shù)write_group、log_writer、log_used、need_log_sync、need_log_dir_sync、last_sequence+1;否則調(diào)用ConcurrentWriteToWAL。默認(rèn)情況為前者。

insert

如果parallel為false,則執(zhí)行普通的插入,調(diào)用函數(shù)WriteBatchInterl::InsertInto將當(dāng)前write_group中的數(shù)據(jù)寫入memtable,傳入?yún)?shù):

write_group
current_sequence
column_family_memtables.get()
flush_scheduler
write_options.ignore_missing_column_families
recover_log_number
this
parallel
seq_per_batch
batch_per_txn_

否則執(zhí)行并發(fā)的插入操作。

后續(xù)處理

如果need_log_sync為真,即WriteOptions中sync參數(shù)為true,則需要對log進(jìn)行同步操作。(默認(rèn)沒有開啟two_write_queues_的情況下只需要調(diào)用MarkLogsSynced,否則才是調(diào)用FlushWAL或者SyncWAL)

如果當(dāng)前writer在并行的write_group中則需要進(jìn)行并行writer相關(guān)的處理,默認(rèn)情況為false。

最后調(diào)用所有的writer的callback函數(shù)并更新version的LastSequence,最后執(zhí)行write_thread_.ExitAsBatchGroupLeader將writer的狀態(tài)設(shè)置為STATE_COMPLETE并退出(ExitAsBatchGroupLeader在pipeiline write的時候操作比較復(fù)雜,普通情況下只是設(shè)置Writer狀態(tài)為COMPLETE)。

其他寫流程分支的實(shí)現(xiàn)

Concurrent Memtable Insert

在leader writer進(jìn)入WriteThread的EnterAsWriteGroupLeader函數(shù)之后會將符合條件的別的writer加入到write_group中,此時如果能夠進(jìn)行并行的memtable插入,則會由leader發(fā)起一次parallel memtable insert,各個writer共同完成插入memtable的過程。

在完成writer的選擇之后判斷是否能夠進(jìn)行并行的插入,此時的條件為:

immutable_db_options_.allow_concurrent_memtable_write && write_group.size > 1

字面來看就是write_group中writer至少大于1個(這樣才有并行的意義),并且memtable的實(shí)現(xiàn)要支持并行的插入(目前只有skiplist才支持),rocksdb開發(fā)者在代碼注釋中寫了三條規(guī)則,只有滿足這三條規(guī)則的情況下才能夠執(zhí)行并行插入:

  1. memtable支持
  2. 非inplace update
  3. 非merge(需要檢查每個batch)(具體體現(xiàn)為遍歷write_group中的每個writer,當(dāng)batch的HasMerge標(biāo)志位為true的時候設(shè)置parallel為false)

對于WAL,write_gorup中所有writer會由leader統(tǒng)一寫入WAL

進(jìn)入插入流程,此時如果parallel為true,則進(jìn)入并行插入的流程(注意此時除了leader到達(dá)了這里之外,其他的writer還在JoinBatchGroupAwaiteState階段)。首先遍歷所有的writer并設(shè)置其sequence number,完成之后調(diào)用WriteThread::LaunchParallelMemtableWriters(),通過該函數(shù)喚醒等待的其他writer,設(shè)置狀態(tài)成為STATE_PARALLEL_MEMTABLE_WRITER,并開始繼續(xù)執(zhí)行。同時leader也將自己的數(shù)據(jù)寫入memtable中。

image

Parallel Memtable Writer的寫流程

這就回到了代碼前段調(diào)用JoinBatchGroup之后,此時有個判斷:

if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    // we are a non-leader in a parallel group
    ...
}

此處就是并行memtable插入時被leader喚醒的其他writer要執(zhí)行的操作。

寫入過程比較簡單,調(diào)用WriteBatchInternal::InsertInto就完成寫入

image

完成寫入之后需要自行退出,首先調(diào)用WriteThread::CompleteParallelMemtableWriter函數(shù)判斷是否還有別的writer沒有結(jié)束,如果不是最后一個完成的writer則等待別的writer完成寫操作;否則就需要為所有的writer執(zhí)行退出前的后續(xù)工作。這些后續(xù)工作主要就是挨個對write group中的所有writer調(diào)用callback函數(shù),然后設(shè)置version的last_sequence,最后調(diào)用WriteThread::ExistAsBatchGroupFollower將其他等待的writer狀態(tài)設(shè)置成COMPLETED并退出。

image

WriteImplWALOnly

  • 觸發(fā)條件:two_write_queues_ & disable_memtable

PipelineWriteImpl

  • 觸發(fā)條件:enable_pipielined_write = true

重要數(shù)據(jù)結(jié)構(gòu)分析

WriteThread

相關(guān)源代碼文件:db/write_thread.h, db/write_thread.cc

WriteThread主要負(fù)責(zé)管理封裝了Put操作的Writer

數(shù)據(jù)成員
// See AwaitState.
const uint64_t max_yield_usec_;
const uint64_t slow_yield_usec_;

// 并發(fā)memtable插入操作是否允許
const bool allow_concurrent_memtable_write_;

// 針對memtable以及WAL的pipeline write是否允許
const bool enable_pipelined_write_;

// Points to the newest pending writer. Only leader can remove
// elements, adding can be done lock-free by anybody.
std::atomic<Writer*> newest_writer_;

// Points to the newest pending memtable writer. Used only when pipelined
// write is enabled.
std::atomic<Writer*> newest_memtable_writer_;

// The last sequence that have been consumed by a writer. The sequence
// is not necessary visible to reads because the writer can be ongoing.
SequenceNumber last_sequence_;
主要API
  • void JoinBatchGroup(Writer* w);

    JoinBatchGroup實(shí)現(xiàn)的功能是將一個Writer插入到WriteThread的Writer鏈表中,WriteThread中通過一個atomic的指針newest_writer來指向最新的writer,并且這個writer連接了鏈表中其他的writer。

    w進(jìn)入該函數(shù)之后首先調(diào)用LinkOne函數(shù),本質(zhì)上LinkOne函數(shù)做的事就是把傳入的w插入到鏈表中,實(shí)現(xiàn)這一操作的語句:

    Writer* writers = newest_writer->load(std::memory_order_relaxed);
    while (true) {
        w->link_older = writers;
        if (newest_writer->compare_exchange_weak(writers, w)) {
          return (writers == nullptr);
        }
      }
    

    對于這個compacre_exchange_weak,簡單的理解是一個原子的替換操作,原子地將w替換到newest_writer里面,當(dāng)w替換成功的時候返回true,進(jìn)入return語句,這個時候如果writer為nullptr說明當(dāng)前writer是插入的第一個writer,那么當(dāng)前writer就成為leader,否則替換失敗重復(fù)執(zhí)行(這個時候應(yīng)該是別的線程的writer插入了)直到插入成功

    image

    回到JoinBatchGroup,如果LinkOne返回值為true則設(shè)置當(dāng)前writer狀態(tài)為STATE_GROUP_LEADER,其余調(diào) 用AwaitState等待狀態(tài)改變。

  • size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);

    該函數(shù)將leader writer加入到Write Group中,并且選擇符合條件的其他writer加入到同一個group中

  • void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);

    這個函數(shù)主要執(zhí)行退出的時候的狀態(tài)設(shè)置,先不考慮pipeline write的情況,最簡單的功能就是從last_writer開始遍歷group中的所有writer,然后設(shè)置他們的狀態(tài)為STATE_COMPLETE

  • void LaunchParallelMemTableWriters(WriteGroup* write_group);

    將write group中所有的writer的狀態(tài)設(shè)置成STATE_PARALLEL_MEMTABLE_WRITER以喚醒等待的writer執(zhí)行并發(fā)的插入操作

  • bool CompleteParallelMemTableWriter(Writer* w)

    在每個并發(fā)memtable插入的writer執(zhí)行過程最后調(diào)用,判斷如果當(dāng)前writer不是最后一個writer(write group中還有正在執(zhí)行的writer)則等待其他writer完成寫操作(AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);),否則返回true

  • void ExitAsBatchGroupFollower(Writer* w)

    對Group Leader調(diào)用ExitAsBatchGroupLeader以及將leader的狀態(tài)設(shè)置為STATE_COMPLETED

Writer

在Put的時候?qū)σ粋€寫操作的封裝

數(shù)據(jù)成員
//writebatch以及write_options相關(guān)的數(shù)據(jù)
WriteBatch* batch;
bool sync;
bool no_slowdown;
bool disable_wal;
bool disable_memtable;


size_t batch_cnt;  // if non-zero, number of sub-batches in the write batch
                    //如果不為0表示batch中還有其他的子batch

//從write函數(shù)中傳入的數(shù)據(jù)
PreReleaseCallback* pre_release_callback;
uint64_t log_used;  // log number that this batch was inserted into
uint64_t log_ref;   // log number that memtable insert should reference
WriteCallback* callback;

bool made_waitable;          // records lazy construction of mutex and cv
std::atomic<uint8_t> state;  // write under StateMutex() or pre-link

WriteGroup* write_group;    //所屬的write_group

SequenceNumber sequence;  // the sequence number to use for the first key
Status status;            // status of memtable inserter
Status callback_status;   // status returned by callback->Callback()

std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;

//write_group中的鏈表指針
Writer* link_older;  // read/write only before linking, or as leader
Writer* link_newer;  // lazy, read/write only before linking, or as leader

其中state初始為STATE_INIT,其余參數(shù)通過調(diào)用者傳入

writer結(jié)構(gòu)中包含了writer指針link_older以及l(fā)ink_newer,也就是說多個writer在 write group中是以鏈表的形式組織,并且每個write攜帶者其對應(yīng)batch的數(shù)據(jù)

主要API
  • bool CallbackFailed():當(dāng)callback不為空,并且callback_status為不為OK的時候返回true,表示回調(diào)函數(shù)調(diào)用出問題
  • bool ShouldWriteToMemtable():當(dāng)status沒有問題,Callback函數(shù)調(diào)用正常以及disable_memtable為false的時候返回true
  • bool ShouldWriteToWAL():同上,并且當(dāng)disableWAL為false的時候?yàn)閠rue
  • bool CheckCallback(DB* db):調(diào)用callback函數(shù)并將返回的狀態(tài)存儲到callback_status

WriteGroup

WriteGroup是一個將多個writer統(tǒng)一起來的結(jié)構(gòu),類似鏈表的頂層結(jié)構(gòu),其中包含兩個Writer指針分別為leader以及l(fā)ast_writer,類似鏈表中的頭指針和尾指針

數(shù)據(jù)成員
// Writer指針
Writer* leader = nullptr;
Writer* last_writer = nullptr;
SequenceNumber last_sequence;
// before running goes to zero, status needs leader->StateMutex()
// 狀態(tài)相關(guān)的變量
Status status;
std::atomic<size_t> running;
size_t size = 0;

State

state是write_thread中定義的writer的不同狀態(tài),不同狀態(tài)下的writer有著不同的操作方式

enum State : uint8_t {
  // The initial state of a writer.  This is a Writer that is
  // waiting in JoinBatchGroup.  This state can be left when another
  // thread informs the waiter that it has become a group leader
  // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
  // non-parallel informs a follower that its writes have been committed
  // (-> STATE_COMPLETED), or when a leader that has chosen to perform
  // updates in parallel and needs this Writer to apply its batch (->
  // STATE_PARALLEL_FOLLOWER).
  // writer的初始狀態(tài),等待JoinBatchGroup,后續(xù)可能變成其他狀態(tài)
  STATE_INIT = 1,

  // The state used to inform a waiting Writer that it has become the
  // leader, and it should now build a write batch group.  Tricky:
  // this state is not used if newest_writer_ is empty when a writer
  // enqueues itself, because there is no need to wait (or even to
  // create the mutex and condvar used to wait) in that case.  This is
  // a terminal state unless the leader chooses to make this a parallel
  // batch, in which case the last parallel worker to finish will move
  // the leader to STATE_COMPLETED.
  // 通知一個writer現(xiàn)在變成了leader并且他需要創(chuàng)建一個write batch gorup 
  STATE_GROUP_LEADER = 2,

  // The state used to inform a waiting writer that it has become the
  // leader of memtable writer group. The leader will either write
  // memtable for the whole group, or launch a parallel group write
  // to memtable by calling LaunchParallelMemTableWrite.
  // 通知一個writer變成了一個memtable writer group的leader
  // leader要么將整個group寫到memtable
  // 要么調(diào)用LaunchParallelMemtableWrite發(fā)起一次并行寫memtable 
  STATE_MEMTABLE_WRITER_LEADER = 4,

  // The state used to inform a waiting writer that it has become a
  // parallel memtable writer. It can be the group leader who launch the
  // parallel writer group, or one of the followers. The writer should then
  // apply its batch to the memtable concurrently and call
  // CompleteParallelMemTableWriter.
  // 告知一個writer變成了一個parallel memtable writer
  // writer應(yīng)該將其batch同步地應(yīng)用到memtable并調(diào)用CompleteParallelMemtableWriter
  STATE_PARALLEL_MEMTABLE_WRITER = 8,

  // A follower whose writes have been applied, or a parallel leader
  // whose followers have all finished their work.  This is a terminal
  // state.
  // 已經(jīng)成功寫入的writer
  STATE_COMPLETED = 16,

  // A state indicating that the thread may be waiting using StateMutex()
  // and StateCondVar()
  // 告知thread需要等待StateMutex或者StateCondVar
  STATE_LOCKED_WAITING = 32,
};

WriteBatch&WriteBatchInternal

WriteBatch主要是提供kv以及cf信息的封裝,WriteBatchInternal提供針對WriteBatch的相關(guān)操作接口

WriteBatchInternal::InsertInto

根據(jù)傳入的數(shù)據(jù)構(gòu)造MemtableInserter,然后調(diào)用WriteBatch::Iterate,傳入inserter實(shí)現(xiàn)寫操作

如果是concurrent memtbale write還需要調(diào)用inserter的PostProcess(主要是與狀態(tài)信息的處理有關(guān))

WriteBatch::Iterate

【作用】遍歷batch中的所有數(shù)據(jù),并根據(jù)數(shù)據(jù)類型進(jìn)行對應(yīng)的操作

遍歷所有的input

首先通過ReadRecordFromWriteBatch讀取待插入的所有數(shù)據(jù)(input、tag、column_family、key、value、blob、xid),其中tag區(qū)分不同的寫入數(shù)據(jù)類型

根據(jù)tag確定不同寫入數(shù)據(jù)的不同操作,對于普通的寫tag為kTypeValue,此處調(diào)用MemtableInserter::PutCF

MemtableInserter

PutCF&&PutCFImpl

MemtableInserter的PutCF將傳入的kv數(shù)據(jù)寫入memtable,PutCF函數(shù)是調(diào)用PutCFImpl函數(shù)實(shí)現(xiàn)

其他基本操作在Inserter中也有,主要有DeleteCF、DeleteRangeCF、MergeCF、PutBlobIndexCF、SingleDeleteCF

【PutCFImpl】

首先SeekToColumnFamily,根據(jù)傳入的column_family_id,由ColumnFamilyMemtables::Seek查找對應(yīng)的cf數(shù)據(jù),如果沒找到會根據(jù)WriteOptions中的ignore_missing_column_family判斷是否返回錯誤(此時ColumnFamilySet中的Current已經(jīng)定位到查找的這個cf)

獲取memtable,調(diào)用cf_mems->GetMemTable(已經(jīng)由Seek定位到了對應(yīng)的cf)

對于非inplace_update,調(diào)用memtable::Add將數(shù)據(jù)寫入即可,由Add函數(shù)將memtable寫入之后會調(diào)用Memtable::UpdateFlushState由memtable自己決定是不是要更新memtable的狀態(tài)為FLUSH_REQUESTED(memtable的狀態(tài)的這里變化)

【MaybeAdvanceSeq】

與sequence number有關(guān)

【CheckMemtableFull】

獲取當(dāng)前插入的cfd,如果cfd的memtable的狀態(tài)變?yōu)镕LUSH_REQUESTED,則將該memtable的狀態(tài)變?yōu)镕LUSH_SCHEDULED并將該cfd加入flush_scheduler

相關(guān)函數(shù)過程分析

PreProcessWrite中涉及的一些過程

write_buffer_manager->ShouldFlush

首先write_buffer_manager這個功能必須是啟用的

返回true的條件二者滿足其一即可:

  • memtable占用內(nèi)存超過memtable總size的限制
  • 內(nèi)存使用超過了總的buffer size并且其中memtable占用了超過一半的buffer size

SwitchWAL

更換memtable并將舊的memtable加入flush隊(duì)列觸發(fā)flush

判斷條件之一GetMaxTotalWalSize定義于db/db_impl_write.cc,返回值為:

mutable_db_options.max_total_wal_size如果沒有設(shè)置(==0),則為4 * max_total_in_memory_state_

否則為設(shè)定值

即該函數(shù)作用為WAL size超過了設(shè)定的閾值需要釋放掉一部分log占用的空間,釋放log按照時間順序從最舊的開始然后遍歷所有的cfd,對包含了小于最舊的log number的cfd進(jìn)行flush

DB中維護(hù)了一個存放目前正在使用中的log的vector:alive_log_files_,其中每一個元素記錄了一個log的number、size以及是否被flush的標(biāo)志位,進(jìn)行SwitchWAL的流程時首先獲取alive_log_files_中的第一個元素的log作為最老的一個log——oldest_alive_log。

先忽略2pc的情況

然后遍歷所有的cfd,對于包含了log_number小于等于oldest_alive_log的cfd都加入flush隊(duì)列中等待flush,具體實(shí)現(xiàn)為對該cfd調(diào)用SwitchMemtable并將其immutable_memtable列表imm標(biāo)記為請求flush狀態(tài),最后調(diào)用SchedulePendingFlush安排flush操作

最后調(diào)用MaybeScheduleFlushOrCompaction觸發(fā)Flush或者Compaction

HandleWriteBufferFull

更換memtable并將舊的memtable加入flush隊(duì)列觸發(fā)flush

遍歷所有的cfd,對所有的包含非空memtable的cfd,選擇其中CreationSeq最小的cfd(理解為創(chuàng)建時間最久的memtable?),對該cfd調(diào)用SwitchMemtable,如果成功則將該cfd的imm列表標(biāo)記為請求flush并安排一次Flush(SchedulePendingFlush)同時觸發(fā)嘗試flush或者compaction任務(wù)(MaybeScgheduleFlushOrCompaction)

ScheduleFlush

對所有在FlushScheduler中的cfd調(diào)用SwitchMemtable并Unref(如果Unref之后引用數(shù)為0則delete掉該cfd)

涉及到的相關(guān)函數(shù)分析

SchedulePendingFlush

將傳入的cfd加入到flush_queue里(flush_queue是一個deque)

SwitchMemtable

SwitchMemtable的主要是為一個ColumnFamily更換Memtable同時新建一個WAL的過程

【處理log】

進(jìn)入函數(shù)首先不考慮two_write_queue以及pipeline write的情況下,首先判斷是否能夠循環(huán)使用log,如果是則從log_recycle_files隊(duì)列中pop一個出來作為新的log使用

在需要創(chuàng)建新的log file的情況下調(diào)用VersionSet::NewFileNumber分配一個新的log number并創(chuàng)建的新的WritableFile,而對于recycle的情況,則是新建ReuseWritableFile。然后通過新的WritableFile創(chuàng)建新的log::Writer

獲取當(dāng)前的seq,并以這個seq調(diào)用cfd的ConstructNewMemtable創(chuàng)建新的memtable,同時通過context創(chuàng)建新的SuperVersion

將新的log number添加到alive_log_files

遍歷所有的cfd,對于包含memtable還沒使用以及imm列表中沒有flush的imm為0的cfd,更新他們的log number以及memtable的seq(為什么別的cfd不更新log?)

將新的memtable替換到cfd內(nèi)同時將舊的加入到imm列表

最后調(diào)用InstallSuperVersionAndScheduleWork,該函數(shù)構(gòu)建新的SuperVersion并替換,同時觸發(fā)對當(dāng)前cfd的flush以及compaction操作

WriteToWAL

獲取參數(shù):

  • WriteGroup& write_group
  • log:Writer* log_writer
  • uint64_t* log_used
  • bool need_log_sync
  • bool need_log_dir_sync
  • SequenceNumber sequence

主要流程:

外部調(diào)用的WriteToWAL函數(shù)主要做的是對寫WAL這個過程的封裝,比如預(yù)處理以及后續(xù)清理工作之類的,真正完成寫WAL的函數(shù)在另一個WriteToWAL函數(shù)重載里面,另一個函數(shù)接收一個merged_batch結(jié)構(gòu),這個merged_batch就包含了write_group所有的batch的數(shù)據(jù)。

在外部的WriteToWAL,第一項(xiàng)任務(wù)就是生成merged_batch,這里通過函數(shù)MergeBatch實(shí)現(xiàn),MergeBatch的操作邏輯十分簡單,如果write_group中只有一個writer,則merged_batch就是這個leader writer,否則遍歷所有的writer,將其batch追加到tmp_batch中,最后merged_batch即為tmp_batch。所以當(dāng)這個函數(shù)返回的時候如果merged_batch == write_group的leader,則說明只有一個batch,只需要設(shè)置一個batch的log_number,否則就需要遍歷設(shè)置所有writer的log_number。

接下來就是調(diào)用真正的WriteToWAL,傳入merged_batch以及l(fā)og_writer,實(shí)際寫入數(shù)據(jù)的操作是通過log_writer的AddRecord接口實(shí)現(xiàn)。

寫入完成之后,根據(jù)設(shè)置的need_log_sync以及need_log_dir_sync參數(shù)判斷是否對本次write進(jìn)行sync操作

最后記錄狀態(tài)即完成(退出前tmp_batch手動清理)

其他put選項(xiàng)設(shè)置以及實(shí)現(xiàn)原理

pipeline write

【默認(rèn)情況】

單一的write thread隊(duì)列,隊(duì)首writer成為leader,并負(fù)責(zé)寫WAL以及memtable

【pipeline write】

只有一個writer的情況下,要先寫WAL,再寫memtbale

如果有多個writer,默認(rèn)情況就需要先寫完WAL,在寫memtable

啟用pipeline之后,前一個writer寫完WAL就可以寫memtbale,而后一個writer開始寫他的WAL

開啟方式:Options.enable_pipeline_write=true

提升:20%性能提升

two write queue

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • # rocksdb engine 寫邏輯 ## 執(zhí)行路徑 DB::Put(key, value)是一個寫操作簡單封...
    kenry閱讀 2,677評論 0 1
  • ??本文主要講解了RocksDB中二階段提交的實(shí)現(xiàn)。本文總結(jié)一下共有如下幾個要點(diǎn): Modification of...
    薛少佳閱讀 4,123評論 0 3
  • 一、簡介 Hbase:全名Hadoop DataBase,是一種開源的,可伸縮的,嚴(yán)格一致性(并非最終一致性)的分...
    菜鳥小玄閱讀 2,589評論 0 12
  • 老翁沐陽怡自樂, 學(xué)童貪嘻歸家遲。 旅人步匆無行處, 亂問童叟徑何及。 投路怎懼川林險, 人蹤匿處多絕奇。 莫憐孤...
    閑看歲月流光閱讀 415評論 1 3
  • 對圖片進(jìn)行任意變形都支持。只有非透明區(qū)域是可以點(diǎn)擊的。也可以反過來用,對透明區(qū)域可以點(diǎn)擊,比如用來做點(diǎn)擊遮罩。
    Babybus_Unity閱讀 2,017評論 0 1

友情鏈接更多精彩內(nèi)容