mongo recordid src

mongo recordid src

抽象類RecordStore,封裝了一系列,記錄操作相關(guān)的接口,定義原文如下:

/**
 * An abstraction used for storing documents in a collection or entries in an index.
 *
 * In storage engines implementing the KVEngine, record stores are also used for implementing
 * catalogs.
 *
 * Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
 * all RecordStore specific transaction information, as well as the LockState. Methods that take
 * an OperationContext may throw a WriteConflictException.
 *
 * This class must be thread-safe for document-level locking storage engines. In addition, for
 * storage engines implementing the KVEngine some methods must be thread safe, see DurableCatalog.
 * Only for MMAPv1 is this class not thread-safe.
 */
class RecordStore

insert 一條記錄相關(guān)的源碼記錄。

插入記錄的接口定義

 /**通過復(fù)制傳遞的record data 將指定的記錄插入到RecordStore中,并更新'inOutRecords'的值讓其包含
 * 插入數(shù)據(jù)的id
     * Inserts the specified records into this RecordStore by copying the passed-in record data and
     * updates 'inOutRecords' to contain the ids of the inserted records.
     */
    virtual Status insertRecords(OperationContext* opCtx, // 操作的上下文
                                 std::vector<Record>* inOutRecords, // inOutRecords對(duì)象
                                 const std::vector<Timestamp>& timestamps) = 0; // 時(shí)戳列表

    /**插入單條記錄的簡單封裝
     * A thin wrapper around insertRecords() to simplify handling of single document inserts.
     */
    StatusWith<RecordId> insertRecord(OperationContext* opCtx,
                                      const char* data,
                                      int len,
                                      Timestamp timestamp) {
        std::vector<Record> inOutRecords{Record{RecordId(), RecordData(data, len)}}; //
        // Record{RecordId(), RecordData(data, len)} -> 通過構(gòu)建recordid 和data構(gòu)建Record 對(duì)象
        //然后構(gòu)建inOutRecords數(shù)組,只包含一個(gè)Record對(duì)象
        Status status = insertRecords(opCtx, &inOutRecords, std::vector<Timestamp>{timestamp});
        //傳入時(shí)戳,調(diào)用接口進(jìn)行插入
        if (!status.isOK())
            return status;
        return inOutRecords.front().id;
    }

插入相關(guān)的數(shù)據(jù)結(jié)構(gòu)

Record對(duì)象

/**
 * The data items stored in a RecordStore.
 */
struct Record {
    RecordId id;
    RecordData data;
};

RecordId對(duì)象

RecordId對(duì)象 是一個(gè)collection或RecordStore中記錄的唯一標(biāo)識(shí),是一個(gè)有范圍的值,
有幾種取值分類,null類型,最小保留值,最大大保留值,同時(shí)還有一個(gè)hasher哈希值對(duì)象,可以生成recordid的哈希值

/**
 * The key that uniquely identifies a Record in a Collection or RecordStore.
 */
class RecordId {
public:
    // This set of constants define the boundaries of the 'normal' and 'reserved' id ranges.
    static constexpr int64_t kNullRepr = 0;
    static constexpr int64_t kMinRepr = LLONG_MIN;
    static constexpr int64_t kMaxRepr = LLONG_MAX;
    static constexpr int64_t kMinReservedRepr = kMaxRepr - (1024 * 1024);

    /**
     * Enumerates all ids in the reserved range that have been allocated for a specific purpose.
     */
    enum class ReservedId : int64_t { kWildcardMultikeyMetadataId = kMinReservedRepr };

    /**
     * Constructs a Null RecordId.
     */
    RecordId() : _repr(kNullRepr) {}

    explicit RecordId(int64_t repr) : _repr(repr) {}

    explicit RecordId(ReservedId repr) : RecordId(static_cast<int64_t>(repr)) {}

    /**
     * Construct a RecordId from two halves.
     * TODO consider removing.
     */
    RecordId(int high, int low) : _repr((uint64_t(high) << 32) | uint32_t(low)) {}

    /**
     * A RecordId that compares less than all ids that represent documents in a collection.
     */
    static RecordId min() {
        return RecordId(kMinRepr);
    }

    /**
     * A RecordId that compares greater than all ids that represent documents in a collection.
     */
    static RecordId max() {
        return RecordId(kMaxRepr);
    }

    /**
     * Returns the first record in the reserved id range at the top of the RecordId space.
     */
    static RecordId minReserved() {
        return RecordId(kMinReservedRepr);
    }

    bool isNull() const {
        return _repr == 0;
    }

    int64_t repr() const {
        return _repr;
    }

    /**
     * Valid RecordIds are the only ones which may be used to represent Records. The range of valid
     * RecordIds includes both "normal" ids that refer to user data, and "reserved" ids that are
     * used internally. All RecordIds outside of the valid range are sentinel values.
     */
    bool isValid() const {
        return isNormal() || isReserved();
    }

    /**
     * Normal RecordIds are those which fall within the range used to represent normal user data,
     * excluding the reserved range at the top of the RecordId space.
     */
    bool isNormal() const {
        return _repr > 0 && _repr < kMinReservedRepr;
    }

    /**
     * Returns true if this RecordId falls within the reserved range at the top of the record space.
     */
    bool isReserved() const {
        return _repr >= kMinReservedRepr && _repr < kMaxRepr;
    }

    int compare(RecordId rhs) const {
        return _repr == rhs._repr ? 0 : _repr < rhs._repr ? -1 : 1;
    }

    /**
     * Hash value for this RecordId. The hash implementation may be modified, and its behavior
     * may differ across platforms. Hash values should not be persisted.
     */
    struct Hasher {
        size_t operator()(RecordId rid) const {
            size_t hash = 0;
            // TODO consider better hashes
            boost::hash_combine(hash, rid.repr());
            return hash;
        }
    };

    /// members for Sorter
    struct SorterDeserializeSettings {};  // unused
    void serializeForSorter(BufBuilder& buf) const {
        buf.appendNum(static_cast<long long>(_repr));
    }
    static RecordId deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
        return RecordId(buf.read<LittleEndian<int64_t>>());
    }
    int memUsageForSorter() const {
        return sizeof(RecordId);
    }
    RecordId getOwned() const {
        return *this;
    }

private:
    int64_t _repr;
};

RecordData 對(duì)象

/**
 * A replacement for the Record class. This class represents data in a record store.
 * The _ownedData attribute is used to manage memory ownership.
 */
class RecordData {
public:
    RecordData() : _data(NULL), _size(0) {}
    RecordData(const char* data, int size) : _data(data), _size(size) {}

    RecordData(SharedBuffer ownedData, int size)
        : _data(ownedData.get()), _size(size), _ownedData(std::move(ownedData)) {}

    const char* data() const {
        return _data;
    }

    int size() const {
        return _size;
    }

    /**
     * Returns true if this owns its own memory, and false otherwise
     */
    bool isOwned() const {
        return _ownedData.get();
    }

    SharedBuffer releaseBuffer() {
        return std::move(_ownedData);
    }

    BSONObj toBson() const& {
        return isOwned() ? BSONObj(_ownedData) : BSONObj(_data);
    }

    BSONObj releaseToBson() {
        return isOwned() ? BSONObj(releaseBuffer()) : BSONObj(_data);
    }

    BSONObj toBson() && {
        return releaseToBson();
    }

    RecordData getOwned() const {
        if (isOwned())
            return *this;
        auto buffer = SharedBuffer::allocate(_size);
        memcpy(buffer.get(), _data, _size);
        return RecordData(buffer, _size);
    }

    void makeOwned() {
        if (isOwned())
            return;
        *this = getOwned();
    }

private:
    const char* _data;
    int _size;
    SharedBuffer _ownedData;
};

wiredTiger 中對(duì) insert 接口的實(shí)現(xiàn)

virtual Status insertRecords

Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
                                            std::vector<Record>* records,
                                            const std::vector<Timestamp>& timestamps) {
    return _insertRecords(opCtx, records->data(), timestamps.data(), records->size());
}


Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
                                             Record* records,
                                             const Timestamp* timestamps,
                                             size_t nRecords) {
    dassert(opCtx->lockState()->isWriteLocked()); // 獲取到寫鎖

    // We are kind of cheating on capped collections since we write all of them at once ....
    // Simplest way out would be to just block vector writes for everything except oplog ?
    int64_t totalLength = 0;
    for (size_t i = 0; i < nRecords; i++)
        totalLength += records[i].data.size();

    // caller will retry one element at a time
    // 如果store engine 是內(nèi)存型的并且插入數(shù)據(jù)總的大小大于內(nèi)存大小,之間退出
    /*
     // The capped settings should not be updated once operations have started
    const bool _isCapped;
    // True if the storage engine is an in-memory storage engine
    */
    if (_isCapped && totalLength > _cappedMaxSize)
        return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");

    WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); // 通過表id以及uri等參數(shù)獲取wtCursor對(duì)象
    curwrap.assertInActiveTxn();
    WT_CURSOR* c = curwrap.get(); 
    invariant(c);

    RecordId highestId = RecordId(); // 默認(rèn)recordid
    dassert(nRecords != 0);
    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        if (_isOplog) {// 是讀oplog進(jìn)行表數(shù)據(jù)更新
            StatusWith<RecordId> status =
                oploghack::extractKey(record.data.data(), record.data.size());
            if (!status.isOK())
                return status.getStatus();
            record.id = status.getValue();
        } else {// 直接插入新的記錄
            record.id = _nextId(opCtx);
            //獲取opCtx下的有效的recordid
            // 會(huì)先找到最近一個(gè)在使用的id,并在該基礎(chǔ)上+1

        }
        dassert(record.id > highestId);
        highestId = record.id;
    }

    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        Timestamp ts;
        if (timestamps[i].isNull() && _isOplog) {
            // If the timestamp is 0, that probably means someone inserted a document directly
            // into the oplog.  In this case, use the RecordId as the timestamp, since they are
            // one and the same. Setting this transaction to be unordered will trigger a journal
            // flush. Because these are direct writes into the oplog, the machinery to trigger a
            // journal flush is bypassed. A followup oplog read will require a fresh visibility
            // value to make progress.
            ts = Timestamp(record.id.repr());
            opCtx->recoveryUnit()->setOrderedCommit(false);
        } else {
            ts = timestamps[i];
        }
        if (!ts.isNull()) {
            LOG(4) << "inserting record with timestamp " << ts;
            fassert(39001, opCtx->recoveryUnit()->setTimestamp(ts));
        }
        setKey(c, record.id);
        WiredTigerItem value(record.data.data(), record.data.size());
        c->set_value(c, value.Get());
        int ret = WT_OP_CHECK(c->insert(c));
        if (ret)
            return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
    }

    _changeNumRecords(opCtx, nRecords);
    _increaseDataSize(opCtx, totalLength);

    if (_oplogStones) {
        _oplogStones->updateCurrentStoneAfterInsertOnCommit(
            opCtx, totalLength, highestId, nRecords);
    } else {
        _cappedDeleteAsNeeded(opCtx, highestId);
    }

    return Status::OK();
}



?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Java集合框架 Java平臺(tái)提供了一個(gè)全新的集合框架?!凹峡蚣堋敝饕梢唤M用來操作對(duì)象的接口組成。不同接口描述...
    小石38閱讀 445評(píng)論 0 0
  • 集合類框架的介紹: ![Java 集合類框架](https://upload-images.jianshu.io/...
    LynnGuo閱讀 799評(píng)論 0 1
  • 在編程中,常常需要集中存放多個(gè)數(shù)據(jù)。集合類主要負(fù)責(zé)保存、盛裝其他數(shù)據(jù),因此集合類也被稱為容器類。所有的集合類都位于...
    一一一二二三閱讀 483評(píng)論 0 1
  • 親愛的寧寧: 今天早上爸爸去送你上學(xué),意外接到爺爺?shù)碾娫挘阋恢弊穯栍惺裁词?,因?yàn)槲抑滥愫荜P(guān)心家里的每一...
    春華秋實(shí)吧閱讀 254評(píng)論 0 1
  • 《惠州一絕》宋·蘇軾 羅浮山下四時(shí)春,盧橘楊梅次第新。 日啖荔枝三百顆,不辭長作嶺南人。 緣份,命也。感情講...
    梅十九閱讀 5,554評(píng)論 1 4

友情鏈接更多精彩內(nèi)容