rocksdb 介紹
Rocksdb同樣是一種基于operation log的文件系統(tǒng)。
由于采用了op log,將對(duì)磁盤(pán)的隨機(jī)寫(xiě)轉(zhuǎn)換成了對(duì)op log的順序?qū)?,最新的?shù)據(jù)是存儲(chǔ)在內(nèi)存的memrory中,可以提高IO效率。每一個(gè)的column family分別有一個(gè)memtable與sstablle.當(dāng)某一coloumn family內(nèi)存中的memory table超過(guò)閾值時(shí),轉(zhuǎn)換成immute memtable并創(chuàng)建新的op log,immute memtable由一系列的memtable組成,它們是只讀的,可供查詢,不能更新數(shù)據(jù)。當(dāng)immute memtable的數(shù)目超過(guò)設(shè)置的數(shù)值時(shí),會(huì)觸發(fā)flush,DB會(huì)調(diào)度后臺(tái)線程將多個(gè)memtable合并后再dump到磁盤(pán)生成Level0中一個(gè)新的sstable文件,Level0中的sstable文件不斷累積,會(huì)觸發(fā)compaction,DB會(huì)調(diào)度后臺(tái)compaction線程將Level0中的sstable文件根據(jù)key與Level1中的sstable合并并生成新的sstable,依次類(lèi)推,根據(jù)key的空間從低層往上compact,最終形成了一層層的結(jié)構(gòu),層級(jí)數(shù)目是由用戶設(shè)置的。
leveldb中,memtable在內(nèi)存中核心s的數(shù)據(jù)結(jié)構(gòu)為skiplist,而在rocksdb中,memtable在內(nèi)存中的形式有三種:skiplist、hash-skiplist、hash-linklist,從字面中就可以看出數(shù)據(jù)結(jié)構(gòu)的大體形式,hash-skiplist就是每個(gè)hash bucket中是一個(gè)skiplist,hash-linklist中,每個(gè)hash bucket中是一個(gè)link-list,啟用何用數(shù)據(jù)結(jié)構(gòu)可在配置中選擇。
日志結(jié)構(gòu)樹(shù)
從概念上說(shuō),最基本的LSM是很簡(jiǎn)單的 。將之前使用一個(gè)大的查找結(jié)構(gòu)(造成隨機(jī)讀寫(xiě),影響寫(xiě)性能),變換為將寫(xiě)操作順序的保存到一些相似的有序文件(也就是sstable)中。所以每個(gè)文件包 含短時(shí)間內(nèi)的一些改動(dòng)。因?yàn)槲募怯行虻模灾蟛檎乙矔?huì)很快。文件是不可修改的,他們永遠(yuǎn)不會(huì)被更新,新的更新操作只會(huì)寫(xiě)到新的文件中。讀操作檢查很 有的文件。通過(guò)周期性的合并這些文件來(lái)減少文件個(gè)數(shù)。
讀操作

Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value);
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
assert(pinnable_val != nullptr);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
// Acquire SuperVersion
// holds references to memtable, all immutable memtables and version
SuperVersion* sv = GetAndRefSuperVersion(cfd);
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful either.
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
// specified we should be fine with skipping seq numbers that are greater
// than that.
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_;
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the
// super versipon because a flush happening in between may compact
// away data for the snapshot, but the snapshot is earlier than the
// data overwriting it, so users may see wrong results.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
Status s;
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&range_del_agg, read_options, callback, is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&range_del_agg, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&range_del_agg, value_found, nullptr, nullptr, callback,
is_blob_index);
RecordTick(stats_, MEMTABLE_MISS);
}
{
PERF_TIMER_GUARD(get_post_process_time);
ReturnAndCleanupSuperVersion(cfd, sv);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = pinnable_val->size();
RecordTick(stats_, BYTES_READ, size);
MeasureTime(stats_, BYTES_PER_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
return s;
}
其中,LookupKey 實(shí)現(xiàn)如下:
LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
size_t usize = _user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
// NOTE: We don't support users keys of more than 2GB :)
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
kstart_ = dst;
memcpy(dst, _user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
寫(xiě)操作
rocks db 的寫(xiě)操作是通過(guò) WriteImpl實(shí)現(xiàn)的,其結(jié)構(gòu)如下所示:
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr, nullptr);
}
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
PreReleaseCallback* pre_release_callback) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
if (write_options.sync && write_options.disableWAL) {
return Status::InvalidArgument("Sync writes has to enable WAL.");
}
if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with seq_per_batch");
}
// Otherwise IsLatestPersistentState optimization does not make sense
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);
Status status;
if (write_options.low_pri) {
status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!status.ok()) {
return status;
}
}
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used, pre_release_callback);
}
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, pre_release_callback);
if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
}
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time);
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_);
}
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence);
if (!ws.ok()) {
status = ws;
break;
}
}
}
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit
status = w.FinalStatus();
}
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
// else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER);
// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
WriteContext write_context;
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber;
if (!two_write_queues_) {
last_sequence = versions_->LastSequence();
}
mutex_.Lock();
bool need_log_sync = write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (!two_write_queues_ || !disable_memtable) {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
}
log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock();
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into memtables
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (status.ok()) {
// Rules for when we can update the memtable concurrently
// 1. supported by memtable
// 2. Puts are not okay if inplace_update_support
// 3. Merges are not okay
//
// Rules 1..2 are enforced by checking the options
// during startup (CheckConcurrentWritesSupported), so if
// options.allow_concurrent_memtable_write is true then they can be
// assumed to be true. Rule 3 is checked for each batch. We could
// relax rules 2 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
size_t total_count = 0;
size_t valid_batches = 0;
uint64_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches++;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
// Note about seq_per_batch_: either disableWAL is set for the entire write
// group or not. In either case we inc seq for each write batch with no
// failed callback. This means that there could be a batch with
// disalbe_memtable in between; although we do not write this batch to
// memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
// the seq per valid written key to mem.
size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
const bool concurrent_update = two_write_queues_;
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
concurrent_update);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
SequenceNumber next_sequence = current_sequence;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = next_sequence;
if (seq_per_batch_) {
next_sequence++;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
write_group.last_sequence = last_sequence;
write_group.running.store(static_cast<uint32_t>(write_group.size),
std::memory_order_relaxed);
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// Each parallel follower is doing each own writes. The leader should
// also do its own.
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status);
}
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
if (two_write_queues_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
}
bool should_exit_batch_group = true;
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (should_exit_batch_group) {
if (status.ok()) {
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence);
if (!ws.ok()) {
status = ws;
break;
}
}
}
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}
if (status.ok()) {
status = w.FinalStatus();
}
return status;
}
Compaction操作

跳表直接存到本地磁盤(pán)

多路歸并排序算法
Column Family
自從RocksDB 3.0引入支持column family,每個(gè)KV數(shù)據(jù)對(duì)能夠指定關(guān)聯(lián)唯一的column family(默認(rèn)為“default”),做到相對(duì)獨(dú)立的隔離存儲(chǔ);column family提供了邏輯上劃分?jǐn)?shù)據(jù)庫(kù)的能力,支持跨column family進(jìn)行原子寫(xiě)操作(借助WriteBatch實(shí)現(xiàn))等。
所有的column family共享WAL日志文件(write-head log),但是每個(gè)column family有獨(dú)立的MemTable和SSTable。
共享WAL有利于原子操作實(shí)現(xiàn),更高效的成組提交。
獨(dú)立的MemTable和SSTable則更方便數(shù)據(jù)壓縮,配置選項(xiàng),快速的刪除某個(gè)column family。
參考