Influxdb中的Series file解析

SeriesFile 解析

SeriesFile是什么
  • SeriesFile其實(shí)叫SeriesKeyFile比較合適,里面存儲(chǔ)了當(dāng)前DB下的所有series key;
  • 其中的series key = (measurement + tag set)
SeriesFile的持久化
  • 它對(duì)應(yīng)于磁盤上的若干文件, 每個(gè)database都有自己的一組SeriesFile, 其目錄為: [influxdb data path]/[database]/_series
  • 我們來看下_series目錄下的結(jié)構(gòu):
./_series/     
├── 00         
│   └── 0000   
├── 01         
│   └── 0000   
├── 02         
│   └── 0000   
├── 03         
│   └── 0000   
├── 04         
│   └── 0000   
├── 05         
│   └── 0000   
├── 06         
│   └── 0000   
└── 07         
    └── 0000   
  • 每個(gè)DB下面的series文件分成至多8個(gè)partition, 每個(gè)partition下又分成多個(gè)Segment, 每個(gè)partition又對(duì)應(yīng)一個(gè)內(nèi)存索引


    influxdb_series_file.png
SeriesSegment
  • 定義: 由seriese entries的log會(huì)組成磁盤文件, 這個(gè)類就負(fù)責(zé)讀寫這個(gè)磁盤文件
type SeriesSegment struct {
    id   uint16
    path string

    data []byte        // mmap file
    file *os.File      // write file handle
    w    *bufio.Writer // bufferred file handle
    size uint32        // current file size
}
  • SeriesSegment磁盤文件格式:


    influxdb_series_file_format.png

其中的flag有兩個(gè)可能的值:
SeriesEntryInsertFlag:表示當(dāng)前寫入的SeriesKey是有效的;
SeriesEntryTombstoneFlag:墓碑標(biāo)識(shí)。

  • 創(chuàng)建SeriesSegment:CreateSeriesSegment
func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) {
    // 先創(chuàng)建 .initializing結(jié)尾的臨時(shí)文件 
    f, err := os.Create(path + ".initializing")
    if err != nil {
        return nil, err
    }
    defer f.Close()

    // 先頭部,包括Magic, Version
    hdr := NewSeriesSegmentHeader()
    if _, err := hdr.WriteTo(f); err != nil {
        return nil, err
        // 一個(gè)Segment文件需要預(yù)分配文件大小 ,最小4M, 最大256M
    } else if err := f.Truncate(int64(SeriesSegmentSize(id))); err != nil {
        return nil, err
    } else if err := f.Close(); err != nil {
        return nil, err
    }

    // Swap with target path.
    if err := os.Rename(f.Name(), path); err != nil {
        return nil, err
    }

    // Open segment at new location.
    segment := NewSeriesSegment(id, path)
    // 打開當(dāng)前的Segment, 我們?cè)谙旅鎲为?dú)介紹
    if err := segment.Open(); err != nil {
        return nil, err
    }
    return segment, nil
}
  • 打開一個(gè)SeriesSegment, 使用內(nèi)存映射讀到內(nèi)存:Open
func (s *SeriesSegment) Open() error {
    if err := func() (err error) {
        // 內(nèi)存映射讀到內(nèi)存中
        if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil {
            return err
        }

        // 讀頭部并且校驗(yàn)Version
        hdr, err := ReadSeriesSegmentHeader(s.data)
        if err != nil {
            return err
        } else if hdr.Version != SeriesSegmentVersion {
            return ErrInvalidSeriesSegmentVersion
        }

        return nil
    }(); err != nil {
        s.Close()
        return err
    }

    return nil
}
  • 初始化寫操作 InitForWrite:從頭開始讀取當(dāng)前segment, 計(jì)算讀到結(jié)尾時(shí)的size,在讀的過程中作簡(jiǎn)單有效性校驗(yàn),然后打開文件,文件寫入的游標(biāo)定位在文件結(jié)尾
func (s *SeriesSegment) InitForWrite() (err error) {
    // Only calculcate segment data size if writing.
    for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); {
        flag, _, _, sz := ReadSeriesEntry(s.data[s.size:])
        if !IsValidSeriesEntryFlag(flag) {
            break
        }
        s.size += uint32(sz)
    }

    // Open file handler for writing & seek to end of data.
    if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil {
        return err
    } else if _, err := s.file.Seek(int64(s.size), io.SeekStart); err != nil {
        return err
    }
    s.w = bufio.NewWriterSize(s.file, 32*1024)

    return nil
}
  • 寫入log entry到segment文件 WriteLogEntry:
func (s *SeriesSegment) WriteLogEntry(data []byte) (offset int64, err error) {
    if !s.CanWrite(data) {
        return 0, ErrSeriesSegmentNotWritable
    }

    offset = JoinSeriesOffset(s.id, s.size)
    if _, err := s.w.Write(data); err != nil {
        return 0, err
    }
    s.size += uint32(len(data))

    return offset, nil
}

這個(gè)方法返回的offset非常有用,它由segment id和 segment size組成:

func JoinSeriesOffset(segmentID uint16, pos uint32) int64 {
    return (int64(segmentID) << 32) | int64(pos)
}

通過segment id可以知道寫入了哪個(gè)segment文件,通過segment size可知道寫了segment文件的什么位置

  • Segment文件遍歷操作 ForEachEntry:遍歷讀取每一條SeriesEntry, 然后回調(diào)傳入的函數(shù)
func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error {
    for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); {
        flag, id, key, sz := ReadSeriesEntry(s.data[pos:])
        if !IsValidSeriesEntryFlag(flag) {
            break
        }

        offset := JoinSeriesOffset(s.id, pos)
        if err := fn(flag, id, offset, key); err != nil {
            return err
        }
        pos += uint32(sz)
    }
    return nil
}
  • 根據(jù)offset讀取SeriesKey ReadSeriesKeyFromSegments
func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte {
    // 從offset中分離出sigment id 和 pos
    segmentID, pos := SplitSeriesOffset(offset)
    segment := FindSegment(a, segmentID)
    if segment == nil {
        return nil
    }
    buf := segment.Slice(pos)
    key, _ := ReadSeriesKey(buf)
    return key
}
  • 讀取SeriesKey ReadSeriesEntry, 按照Segment的format按字節(jié)讀取flag, id, key
func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) {
    // If flag byte is zero then no more entries exist.
    flag, data = uint8(data[0]), data[1:]
    if !IsValidSeriesEntryFlag(flag) {
        return 0, 0, nil, 1
    }

    id, data = binary.BigEndian.Uint64(data), data[8:]
    switch flag {
    case SeriesEntryInsertFlag:
        key, _ = ReadSeriesKey(data)
    }
    return flag, id, key, int64(SeriesEntryHeaderSize + len(key))
}
SeriesIndex
  • 定義: SeriesIndex是對(duì)Partition下所有Segment file的內(nèi)存索引,最主要的就是series key到 series id的map和series id到offset的map;
    在內(nèi)存中的Index數(shù)量超過閾值時(shí),會(huì)在調(diào)用CreateSeriesListIfNoExists時(shí)被compact到磁盤文件;SeriesIndex對(duì)象在被初始化時(shí)會(huì)從磁盤文件中讀取index, 在磁盤文件中的存儲(chǔ)是按hash方式來定位寫入的,使用的是mmap的方式;查找索引時(shí)先從內(nèi)存查找才從磁盤文件查找
type SeriesIndex struct {
    path string

    count    uint64
    capacity int64
    mask     int64

    maxSeriesID uint64
    maxOffset   int64

    //以下這三項(xiàng)用來mmap磁盤index到內(nèi)存
    data         []byte // mmap data
    keyIDData    []byte // key/id mmap data
    idOffsetData []byte // id/offset mmap data

    // In-memory data since rebuild.
    keyIDMap    *rhh.HashMap //series key到 series id的map
    idOffsetMap map[uint64]int64 //series id到offset的map
    tombstones  map[uint64]struct{}
}
  • 我們來看一下磁盤index的結(jié)構(gòu)
  1. 先是Header:
    SeriesIndexHeaderSize = 0 +
        4 + 1 + // magic + version
        8 + 8 + // max series + max offset
        8 + 8 + // count + capacity
        8 + 8 + // key/id map offset & size
        8 + 8 + // id/offset map offset & size
  1. 具體的內(nèi)容部分就是兩個(gè)map(兩個(gè)hash map), serieskey -> seriesid和 seriesid -> seriesoffset, 它們?cè)谖募械钠鹗嘉恢煤痛笮≡趆eader里都可以讀到;
  2. 針對(duì)serieskey -> seriesid這個(gè)hash map, 存入時(shí)的key是series key, value是offset和id
  3. 針對(duì)seriesid -> seriesoffset這個(gè)hash map, 存入時(shí)的key是series id, value是id和offset
  • Open操作
func (idx *SeriesIndex) Open() (err error) {
    // Map data file, if it exists.
    if err := func() error {
        if _, err := os.Stat(idx.path); err != nil && !os.IsNotExist(err) {
            return err
        } else if err == nil {
            //將index磁盤文件內(nèi)存映射到idx.data
            if idx.data, err = mmap.Map(idx.path, 0); err != nil {
                return err
            }

            // 讀文件構(gòu)造header
            hdr, err := ReadSeriesIndexHeader(idx.data)
            if err != nil {
                return err
            }
            idx.count, idx.capacity, idx.mask = hdr.Count, hdr.Capacity, hdr.Capacity-1
            idx.maxSeriesID, idx.maxOffset = hdr.MaxSeriesID, hdr.MaxOffset

            // 通過header信息構(gòu)造兩個(gè)map的byte slice
            idx.keyIDData = idx.data[hdr.KeyIDMap.Offset : hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size]
            idx.idOffsetData = idx.data[hdr.IDOffsetMap.Offset : hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size]
        }
        return nil
    }(); err != nil {
        idx.Close()
        return err
    }

    idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
    idx.idOffsetMap = make(map[uint64]int64)
    idx.tombstones = make(map[uint64]struct{})
    return nil
}
  • 在內(nèi)存中構(gòu)建索引 Recover
func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
    // Allocate new in-memory maps.
    idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)
    idx.idOffsetMap = make(map[uint64]int64)
    idx.tombstones = make(map[uint64]struct{})

    // Process all entries since the maximum offset in the on-disk index.
    minSegmentID, _ := SplitSeriesOffset(idx.maxOffset)
    
    //遍歷每一個(gè)Segment
    for _, segment := range segments {
        if segment.ID() < minSegmentID {
            continue
        }

        //遍歷Segment中的每一個(gè)SeriesEntry
        if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {
            if offset <= idx.maxOffset {
                return nil
            }
            // 每個(gè)SeriesEntry都用idx.execEntry處理
            idx.execEntry(flag, id, offset, key)
            return nil
        }); err != nil {
            return err
        }
    }
    return nil
}
  • 操作一個(gè)Entry
func (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {
    switch flag {
    // 更新兩個(gè)map
    case SeriesEntryInsertFlag:
        idx.keyIDMap.Put(key, id)
        idx.idOffsetMap[id] = offset

        if id > idx.maxSeriesID {
            idx.maxSeriesID = id
        }
        if offset > idx.maxOffset {
            idx.maxOffset = offset
        }

    case SeriesEntryTombstoneFlag:
        idx.tombstones[id] = struct{}{}

    default:
        panic("unreachable")
    }
}
  • 各種查找方法, 先查內(nèi)存中map, 再查從磁盤文件mmap后構(gòu)建的map
func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64
func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) uint64
func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, ok bool)
func (idx *SeriesIndex) FindOffsetByID(id uint64) int64
SeriesPartition
  • 管理旗下所有的SeriesPartition
  • Open方法:遍歷目錄下所有的segment file, 針對(duì)最新的一個(gè)segment作寫入的初始化,構(gòu)建內(nèi)存index
func (p *SeriesPartition) Open() error {
    // Create path if it doesn't exist.
    if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil {
        return err
    }

    // Open components.
    if err := func() (err error) {
       // 遍歷所有的segment
        if err := p.openSegments(); err != nil {
            return err
        }

        // Init last segment for writes.
        if err := p.activeSegment().InitForWrite(); err != nil {
            return err
        }

        // 構(gòu)建內(nèi)存索引
        p.index = NewSeriesIndex(p.IndexPath())
        if err := p.index.Open(); err != nil {
            return err
        } else if p.index.Recover(p.segments); err != nil {
            return err
        }

        return nil
    }(); err != nil {
        p.Close()
        return err
    }

    return nil
}
  • 給定一系列Series key, 返回對(duì)應(yīng)的Series id,如果沒有對(duì)應(yīng)的id,則將series key插入到Partition(其實(shí)就是寫入到對(duì)應(yīng)的segment中)
func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitionIDs []int, ids []uint64) error {
    var writeRequired bool
    p.mu.RLock()
    
    //利用index作快速查找,同時(shí)確認(rèn)是否沒有包含的key
    for i := range keys {
        if keyPartitionIDs[i] != p.id {
            continue
        }
        id := p.index.FindIDBySeriesKey(p.segments, keys[i])
        if id == 0 {
            writeRequired = true
            continue
        }
        ids[i] = id
    }
    p.mu.RUnlock()

    // Exit if all series for this partition already exist.
    if !writeRequired {
        return nil
    }

    type keyRange struct {
        id     uint64
        offset int64
    }
    newKeyRanges := make([]keyRange, 0, len(keys))

    // Obtain write lock to create new series.
    p.mu.Lock()
    defer p.mu.Unlock()

    // Track offsets of duplicate series.
    newIDs := make(map[string]uint64, len(ids))

    for i := range keys {
        // Skip series that don't belong to the partition or have already been created.
        if keyPartitionIDs[i] != p.id || ids[i] != 0 {
            continue
        }

        // Re-attempt lookup under write lock.
        key := keys[i]
        if ids[i] = newIDs[string(key)]; ids[i] != 0 {
            continue
        } else if ids[i] = p.index.FindIDBySeriesKey(p.segments, key); ids[i] != 0 {
            continue
        }

        // Write to series log and save offset.
        // 寫入log entry
        id, offset, err := p.insert(key)
        if err != nil {
            return err
        }
        // Append new key to be added to hash map after flush.
        ids[i] = id
        newIDs[string(key)] = id
        newKeyRanges = append(newKeyRanges, keyRange{id, offset})
    }

    // Flush active segment writes so we can access data in mmap.
    // 立即寫入文件 
    if segment := p.activeSegment(); segment != nil {
        if err := segment.Flush(); err != nil {
            return err
        }
    }

    // Add keys to hash map(s).
    // 更新內(nèi)存索引
    for _, keyRange := range newKeyRanges {
        p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
    }

    // compact 到磁盤文件,下面會(huì)詳細(xì)講解
    return nil
}
  • 根據(jù)id獲取series key
func (p *SeriesPartition) SeriesKey(id uint64) []byte {
    if id == 0 {
        return nil
    }
    p.mu.RLock()
    // 先用索引根據(jù)id獲取到offset, 再用offset獲取到key
    key := p.seriesKeyByOffset(p.index.FindOffsetByID(id))
    p.mu.RUnlock()
    return key
}
  • 根據(jù)series key獲取id
func (p *SeriesPartition) FindIDBySeriesKey(key []byte) uint64 {
    p.mu.RLock()
    if p.closed {
        p.mu.RUnlock()
        return 0
    }
    id := p.index.FindIDBySeriesKey(p.segments, key)
    p.mu.RUnlock()
    return id
}
  • 插入series key
func (p *SeriesPartition) insert(key []byte) (id uint64, offset int64, err error) {
    // id為p.seq, 每插入一條,p.seq會(huì)遞增SeriesFilePartitionN
    id = p.seq
    offset, err = p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryInsertFlag, id, key))
    if err != nil {
        return 0, 0, err
    }

    p.seq += SeriesFilePartitionN
    return id, offset, nil
}
  • compact index到磁盤
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {
    hdr := NewSeriesIndexHeader()
    hdr.Count = seriesN
    hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)

    //分配兩個(gè)hash map的內(nèi)存空間,后面就是填充這兩個(gè)map,然后寫到磁盤
    keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))
    idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))

    // Reindex all partitions.
    var entryN int
    // 遍歷所有的segment
    for _, segment := range segments {
        errDone := errors.New("done")

        // 遍歷segment內(nèi)部的每一個(gè)Series Entry
        if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {

            // Make sure we don't go past the offset where the compaction began.
            if offset > index.maxOffset {
                return errDone
            }

            // Check for cancellation periodically.
            // 每處理1000條entry,檢查這個(gè)compact過程是否需要中斷
            if entryN++; entryN%1000 == 0 {
                select {
                case <-c.cancel:
                    return ErrSeriesPartitionCompactionCancelled
                default:
                }
            }

            // Only process insert entries.
            switch flag {
            case SeriesEntryInsertFlag: // fallthrough
            case SeriesEntryTombstoneFlag: //遇到墓碑flag就跳過
                return nil
            default:
                return fmt.Errorf("unexpected series partition log entry flag: %d", flag)
            }

            // Save max series identifier processed.
            hdr.MaxSeriesID, hdr.MaxOffset = id, offset

            // Ignore entry if tombstoned.
            // 如果id已經(jīng)被標(biāo)識(shí)為刪除,就跳過
            if index.IsDeleted(id) {
                return nil
            }

            // Insert into maps.
            // 根據(jù)key定位到寫入的內(nèi)存map的位置后寫入
            c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)
            return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)
        }); err == errDone {
            break
        } else if err != nil {
            return err
        }
    }

    // Open file handler.
    f, err := os.Create(path)
    if err != nil {
        return err
    }
    defer f.Close()

    // Calculate map positions.
    hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = SeriesIndexHeaderSize, int64(len(keyIDMap))
    hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size, int64(len(idOffsetMap))

    // Write header.
    if _, err := hdr.WriteTo(f); err != nil {
        return err
    }

    // Write maps.
    if _, err := f.Write(keyIDMap); err != nil {
        return err
    } else if _, err := f.Write(idOffsetMap); err != nil {
        return err
    }

    // Sync & close.
    if err := f.Sync(); err != nil {
        return err
    } else if err := f.Close(); err != nil {
        return err
    }

    return nil
}
SeriesFile
  • 定義: 管理當(dāng)前db下所有的SeriesePartition, 提供了操作Series的公共接口,對(duì)外屏蔽了SeriesPartition和SeriesSegment的存在;
  • 我們?cè)谶@里講一下series id的產(chǎn)生規(guī)則
  1. Influxdb將paritition數(shù)量定死了為 8, 就是說所有的serieskey放在這8個(gè)桶里
  2. 如何確定放在哪個(gè)桶里呢?就是上面提到的計(jì)算SeriesKey的hash值然后取模parition個(gè)數(shù) int(xxhash.Sum64(key) % SeriesFilePartitionN)
  3. 所有這些partition的id是0 到 7, 每個(gè)partiton都有一個(gè)順列號(hào)seq, 初始值為partition id + 1, 這個(gè)順列號(hào)就是放入這個(gè)parition中的seriese key對(duì)應(yīng)的id,每次增加 8, 比如對(duì)于1號(hào)partition, 第一個(gè)放入的series id就是2, 第二個(gè)就是10
  4. 有了上面的規(guī)則,從seriese id上就很容易得到它屬于哪個(gè) partition:int((id - 1) % SeriesFilePartitionN)
  • 將一系列的SeriesKey寫入相應(yīng)的Partiton, 寫入哪個(gè)partition是計(jì)算SeriesKey的hash值然后取模parition個(gè)數(shù) int(xxhash.Sum64(key) % SeriesFilePartitionN)
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
    //生成SeriesKey的slice
    keys := GenerateSeriesKeys(names, tagsSlice)
    //生成parition id的slice
    keyPartitionIDs := f.SeriesKeysPartitionIDs(keys)
    ids := make([]uint64, len(keys))

    var g errgroup.Group
    //使用goroutine并行寫入
    for i := range f.partitions {
        p := f.partitions[i]
        g.Go(func() error {
            return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids)
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return ids, nil
}
  • 所有查詢操作,基本上都是首先定位到Partition, 然后再由partition代勞,partition使用index和segment也來搞定, 這里不詳述了
SeriesIDSet
  • 用bitmap存儲(chǔ)的Series ID, 這其實(shí)是個(gè)布隆過濾器的實(shí)現(xiàn),如果布隆過濾器說一個(gè)id不存在于這個(gè)bitmap中那一定是不存在,但如果它說存在卻不一定是真存在;
  • 定義:
type SeriesIDSet struct {
    sync.RWMutex
    bitmap *roaring.Bitmap
}
  • 這個(gè)bitmap使用roaring.Bitmap實(shí)現(xiàn)
  • 我們來簡(jiǎn)單的看一下New方法的實(shí)現(xiàn)
func NewSeriesIDSet(a ...uint64) *SeriesIDSet {
    ss := &SeriesIDSet{bitmap: roaring.NewBitmap()}
    if len(a) > 0 {
        a32 := make([]uint32, len(a))
        for i := range a {
            a32[i] = uint32(a[i])
        }
        ss.bitmap.AddMany(a32)
    }
    return ss
}

這里最關(guān)鍵的是參數(shù)里SeriesID是uinit64, 但存入bitmap時(shí)強(qiáng)轉(zhuǎn)成了uinit32, 只取了SeriesID的低32位,也因此在查詢id是否存在時(shí),也只用低32位去查詢,如果查到了有兩種可能,存入的id就是這個(gè)uinit32值,存入的id的低32位是這個(gè)uint32值

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容