filecoin技術(shù)架構(gòu)分析之十五:filecoin源碼分析之節(jié)點(diǎn)運(yùn)行邏輯

本文作者:楊尉;原創(chuàng)作品,轉(zhuǎn)載請(qǐng)注明出處

[上一篇鏈接] filecoin技術(shù)架構(gòu)分析之十四:filecoin源碼分析之服務(wù)層鏈同步、共識(shí)協(xié)議及挖礦

[下一篇鏈接] filecoin技術(shù)架構(gòu)分析之十六:go-filecoin總結(jié)篇--待續(xù)

目錄

  • 15.filecoin源碼分析之節(jié)點(diǎn)運(yùn)行邏輯
    • 15.1 前提
    • 15.2 filecoin節(jié)點(diǎn)運(yùn)行邏輯簡(jiǎn)析
      • 15.2.1 基本數(shù)據(jù)結(jié)構(gòu)
      • 15.2.2 創(chuàng)建filecoin節(jié)點(diǎn)實(shí)例
      • 15.2.3 啟動(dòng)及停止filecoin節(jié)點(diǎn)
      • 15.2.4 啟動(dòng)及停止挖礦
    • 15.3 階段性分析結(jié)束說(shuō)明

分析基于的源碼版本:go-filecoin master a0598a54(2019年3月9日)

15.1 前提

  • 我們?cè)谇懊娴恼鹿?jié)已經(jīng)經(jīng)過(guò)了三個(gè)階段的梳理分析

    • 概念階段,包括概念、通用語(yǔ)言理解、開(kāi)發(fā)網(wǎng)絡(luò)使用
    • 頂層架構(gòu)與概念的結(jié)合理解
    • 具體源碼的簡(jiǎn)析,包括協(xié)議層、支撐包、內(nèi)部api層、服務(wù)層
  • 源碼部分的command部分比較容易理解,就不單獨(dú)文章贅述了,基本與內(nèi)部api層都可以對(duì)應(yīng)起來(lái)

  • 現(xiàn)在再來(lái)看節(jié)點(diǎn)的運(yùn)行邏輯應(yīng)該會(huì)更加清晰了

15.2 filecoin節(jié)點(diǎn)運(yùn)行邏輯簡(jiǎn)析

15.2.1 基本數(shù)據(jù)結(jié)構(gòu)

▼ package
    node

? imports

▼ variables
   +ErrNoMinerAddress
   -filecoinDHTProtocol : dhtprotocol.ID
   -log

   // 創(chuàng)建具體的filecoin節(jié)點(diǎn)實(shí)例
▼+Config : struct
    [fields]
    // 設(shè)置區(qū)塊時(shí)間
   +BlockTime : time.Duration
    // 配置節(jié)點(diǎn)是否轉(zhuǎn)發(fā)
   +IsRelay : bool
    // libp2p選項(xiàng)
   +Libp2pOpts : []libp2p.Option
    // 在離線模式下,會(huì)關(guān)閉libp2p
   +OfflineMode : bool
    // 配置資源
   +Repo : repo.Repo
    // 配置區(qū)塊獎(jiǎng)勵(lì)方法
   +Rewarder : consensus.BlockRewarder
    // 配置節(jié)點(diǎn)時(shí)空證明校驗(yàn)函數(shù)
   +Verifier : proofs.Verifier
    [methods]
    // 創(chuàng)建node實(shí)例
   +Build(ctx context.Context) : *Node, error
   -buildHost(ctx context.Context, makeDHT func(host host.Host) routing.IpfsRouting, error) : host.Host, error

 +ConfigOpt : func(*Config) error

▼+Node : struct
    [fields]
    // 確認(rèn)最新區(qū)塊,本地持久化并廣播
   +AddNewlyMinedBlock : newBlockFunc
    // 訂閱主題"/fil/blocks"
   +BlockSub : pubsub.Subscription
    // 塊服務(wù)接口
   +Blockstore : bstore.Blockstore
    // 維持相關(guān)節(jié)點(diǎn)連接
   +Bootstrapper : *net.Bootstrapper
    // 讀取區(qū)塊信息
   +ChainReader : chain.ReadStore
    // 同時(shí)協(xié)議
   +Consensus : consensus.Protocol
    // 塊交換,節(jié)點(diǎn)間的數(shù)據(jù)交換
   +Exchange : exchange.Interface
    // new-head 主題
   +HeaviestTipSetCh : chan interface{}
    // 新區(qū)塊處理請(qǐng)求
   +HeaviestTipSetHandled : func()
    // hello服務(wù)
   +HelloSvc : *hello.Handler
    // 消息訂閱
   +MessageSub : pubsub.Subscription
    // 挖礦調(diào)度
   +MiningScheduler : mining.Scheduler
    // 消息池操作
   +MsgPool : *core.MessagePool
    // 離線模式
   +OfflineMode : bool
   +OnlineStore : *hamt.CborIpldStore
    // 對(duì)應(yīng)libp2p中的host
   +PeerHost : host.Host
    // libp2p中的ping service
   +Ping : *ping.PingService
    // 高層api
   +PorcelainAPI : *porcelain.API
    // 功率表
   +PowerTable : consensus.PowerTableView
    // 配置資源
   +Repo : repo.Repo
    // 檢索客戶端
   +RetrievalClient : *retrieval.Client
    // 檢索礦工
   +RetrievalMiner : *retrieval.Miner
    // 路由,libp2p
   +Router : routing.IpfsRouting
    // 存儲(chǔ)礦工
   +StorageMiner : *storage.Miner
    // 存儲(chǔ)客戶
   +StorageMinerClient : *storage.Client
    // 鏈同步
   +Syncer : chain.Syncer
    // 錢包管理
   +Wallet : *wallet.Wallet
   -blockTime : time.Duration
   -blockservice : bserv.BlockService
   -cancelMining : context.CancelFunc
   -cancelSubscriptionsCtx : context.CancelFunc
   -cborStore : *hamt.CborIpldStore
   -host : host.Host
   -lookup : lookup.PeerLookupService
   -mining
   -miningCtx : context.Context
   -miningDoneWg : *sync.WaitGroup
   -sectorBuilder : sectorbuilder.SectorBuilder

    [methods]
   +BlockHeight() : *types.BlockHeight, error
   +BlockService() : bserv.BlockService
   +CborStore() : *hamt.CborIpldStore
   +ChainReadStore() : chain.ReadStore
    // 創(chuàng)建礦工方法
   +CreateMiner(ctx context.Context, accountAddr address.Address, gasPrice types.AttoFIL, gasLimit types.GasUnits, pledge uint64, pid libp2ppeer.ID, collateral *types.AttoFIL) : *address.Address, error
   +GetBlockTime() : time.Duration
   +Host() : host.Host
    // 節(jié)點(diǎn)查找方法
   +Lookup() : lookup.PeerLookupService
   +MiningSignerAddress() : address.Address
   +MiningTimes() : time.Duration, time.Duration
    // 創(chuàng)建新的account地址,錢包地址
   +NewAddress() : address.Address, error
   +SectorBuilder() : sectorbuilder.SectorBuilder
   +SetBlockTime(blockTime time.Duration)
    // 啟動(dòng)節(jié)點(diǎn)
   +Start(ctx context.Context) : error
    // 啟動(dòng)挖礦
   +StartMining(ctx context.Context) : error
    // 停止節(jié)點(diǎn)
   +Stop(ctx context.Context)
    // 停止挖礦
   +StopMining(ctx context.Context)
   -addNewlyMinedBlock(ctx context.Context, b *types.Block)
   -cancelSubscriptions()
   -getLastUsedSectorID(ctx context.Context, minerAddr address.Address) : uint64, error
   -getMinerActorPubKey() : []byte, error
   -handleNewHeaviestTipSet(ctx context.Context, head types.TipSet)
   -handleNewMiningOutput(miningOutCh chan mining.Output)
   -handleSubscription(ctx context.Context, f pubSubProcessorFunc, fname string, s pubsub.Subscription, sname string)
   -isMining() : bool
   -miningAddress() : address.Address, error
   -miningOwnerAddress(ctx context.Context, miningAddr address.Address) : address.Address, error
   -saveMinerConfig(minerAddr address.Address, signerAddr address.Address) : error
   -setIsMining(isMining bool)
   -setupHeartbeatServices(ctx context.Context) : error
   -setupMining(ctx context.Context) : error
    [functions]
    // 調(diào)用Build創(chuàng)建node實(shí)例
   +New(ctx context.Context, opts ...ConfigOpt) : *Node, error

▼-blankValidator : struct
    [methods]
   +Select(_ string, _ [][]byte) : int, error
   +Validate(_ string, _ []byte) : error

 -newBlockFunc : func(context.Context, *types.Block)

 -pubSubProcessorFunc : func(ctx context.Context, msg pubsub.Message) error

▼ functions
   +BlockTime(blockTime time.Duration) : ConfigOpt
   +IsRelay() : ConfigOpt
   +Libp2pOptions(opts ...libp2p.Option) : ConfigOpt
   +OfflineMode(offlineMode bool) : ConfigOpt
   +RewarderConfigOption(rewarder consensus.BlockRewarder) : ConfigOpt
   +StartMining(ctx context.Context, node *Node) : error
   +VerifierConfigOption(verifier proofs.Verifier) : ConfigOpt
   -initSectorBuilderForNode(ctx context.Context, node *Node, sectorStoreType proofs.SectorStoreType) : sectorbuilder.SectorBuilder, error
   -initStorageMinerForNode(ctx context.Context, node *Node) : *storage.Miner, error
   -readGenesisCid(ds datastore.Datastore) : cid.Cid, error

15.2.2 創(chuàng)建filecoin節(jié)點(diǎn)實(shí)例

  • 實(shí)例化filecoin節(jié)點(diǎn),簡(jiǎn)析見(jiàn)如下添加的注釋
// Build instantiates a filecoin Node from the settings specified in the config.
func (nc *Config) Build(ctx context.Context) (*Node, error) {
    // 創(chuàng)建內(nèi)存資源實(shí)例
    if nc.Repo == nil {
        nc.Repo = repo.NewInMemoryRepo()
    }

    // 創(chuàng)建塊服務(wù)實(shí)例
    bs := bstore.NewBlockstore(nc.Repo.Datastore())

    validator := blankValidator{}

    var peerHost host.Host
    var router routing.IpfsRouting

    // 帶寬統(tǒng)計(jì)實(shí)例,加入libp2popts
    bandwidthTracker := p2pmetrics.NewBandwidthCounter()
    nc.Libp2pOpts = append(nc.Libp2pOpts, libp2p.BandwidthReporter(bandwidthTracker))

    // 非離線模式才啟用libp2p
    if !nc.OfflineMode {
        makeDHT := func(h host.Host) (routing.IpfsRouting, error) {
            r, err := dht.New(
                ctx,
                h,
                dhtopts.Datastore(nc.Repo.Datastore()),
                dhtopts.NamespacedValidator("v", validator),
                dhtopts.Protocols(filecoinDHTProtocol),
            )
            if err != nil {
                return nil, errors.Wrap(err, "failed to setup routing")
            }
            router = r
            return r, err
        }

        var err error
        // 實(shí)例化非離線模式libp2p host
        peerHost, err = nc.buildHost(ctx, makeDHT)
        if err != nil {
            return nil, err
        }
    } else {
        // 離線模式處理
        router = offroute.NewOfflineRouter(nc.Repo.Datastore(), validator)
        peerHost = rhost.Wrap(noopLibP2PHost{}, router)
    }

    // ping服務(wù)實(shí)例
    // set up pinger
    pinger := ping.NewPingService(peerHost)

    // bitswap實(shí)例
    // set up bitswap
    nwork := bsnet.NewFromIpfsHost(peerHost, router)
    //nwork := bsnet.NewFromIpfsHost(innerHost, router)
    bswap := bitswap.New(ctx, nwork, bs)
    bservice := bserv.New(bs, bswap)

    cstOnline := hamt.CborIpldStore{Blocks: bservice}
    cstOffline := hamt.CborIpldStore{Blocks: bserv.New(bs, offline.Exchange(bs))}
    // 獲取創(chuàng)世塊cid
    genCid, err := readGenesisCid(nc.Repo.Datastore())
    if err != nil {
        return nil, err
    }

    // chain.Store實(shí)例以及功率表
    var chainStore chain.Store = chain.NewDefaultStore(nc.Repo.ChainDatastore(), &cstOffline, genCid)
    powerTable := &consensus.MarketView{}

    // 共識(shí)協(xié)議processor實(shí)例
    var processor consensus.Processor
    if nc.Rewarder == nil {
        processor = consensus.NewDefaultProcessor()
    } else {
        processor = consensus.NewConfiguredProcessor(consensus.NewDefaultMessageValidator(), nc.Rewarder)
    }

    // 共識(shí)協(xié)議實(shí)例
    var nodeConsensus consensus.Protocol
    if nc.Verifier == nil {
        nodeConsensus = consensus.NewExpected(&cstOffline, bs, processor, powerTable, genCid, &proofs.RustVerifier{})
    } else {
        nodeConsensus = consensus.NewExpected(&cstOffline, bs, processor, powerTable, genCid, nc.Verifier)
    }

    // 鏈同步,鏈讀取,消息池實(shí)例
    // only the syncer gets the storage which is online connected
    chainSyncer := chain.NewDefaultSyncer(&cstOnline, &cstOffline, nodeConsensus, chainStore)
    chainReader, ok := chainStore.(chain.ReadStore)
    if !ok {
        return nil, errors.New("failed to cast chain.Store to chain.ReadStore")
    }
    msgPool := core.NewMessagePool()

    // Set up libp2p pubsub
    fsub, err := libp2pps.NewFloodSub(ctx, peerHost)
    if err != nil {
        return nil, errors.Wrap(err, "failed to set up pubsub")
    }

    // 錢包服務(wù)實(shí)例
    backend, err := wallet.NewDSBackend(nc.Repo.WalletDatastore())
    if err != nil {
        return nil, errors.Wrap(err, "failed to set up wallet backend")
    }
    fcWallet := wallet.New(backend)

    // 實(shí)例化高層api
    PorcelainAPI := porcelain.New(plumbing.New(&plumbing.APIDeps{
        Chain:        chainReader,
        Config:       cfg.NewConfig(nc.Repo),
        Deals:        strgdls.New(nc.Repo.DealsDatastore()),
        MsgPool:      msgPool,
        MsgPreviewer: msg.NewPreviewer(fcWallet, chainReader, &cstOffline, bs),
        MsgQueryer:   msg.NewQueryer(nc.Repo, fcWallet, chainReader, &cstOffline, bs),
        MsgSender:    msg.NewSender(fcWallet, chainReader, msgPool, consensus.NewOutboundMessageValidator(), fsub.Publish),
        MsgWaiter:    msg.NewWaiter(chainReader, bs, &cstOffline),
        Network:      net.New(peerHost, pubsub.NewPublisher(fsub), pubsub.NewSubscriber(fsub), net.NewRouter(router), bandwidthTracker),
        SigGetter:    mthdsig.NewGetter(chainReader),
        Wallet:       fcWallet,
    }))

    // 實(shí)例化node
    nd := &Node{
        blockservice: bservice,
        Blockstore:   bs,
        cborStore:    &cstOffline,
        OnlineStore:  &cstOnline,
        Consensus:    nodeConsensus,
        ChainReader:  chainReader,
        Syncer:       chainSyncer,
        PowerTable:   powerTable,
        PorcelainAPI: PorcelainAPI,
        Exchange:     bswap,
        host:         peerHost,
        MsgPool:      msgPool,
        OfflineMode:  nc.OfflineMode,
        PeerHost:     peerHost,
        Ping:         pinger,
        Repo:         nc.Repo,
        Wallet:       fcWallet,
        blockTime:    nc.BlockTime,
        Router:       router,
    }

    // Bootstrapping network peers.
    periodStr := nd.Repo.Config().Bootstrap.Period
    period, err := time.ParseDuration(periodStr)
    if err != nil {
        return nil, errors.Wrapf(err, "couldn't parse bootstrap period %s", periodStr)
    }

    // 實(shí)例化Bootstrapper,指定node的該方法
    // Bootstrapper maintains connections to some subset of addresses
    ba := nd.Repo.Config().Bootstrap.Addresses
    bpi, err := net.PeerAddrsToPeerInfos(ba)
    if err != nil {
        return nil, errors.Wrapf(err, "couldn't parse bootstrap addresses [%s]", ba)
    }
    minPeerThreshold := nd.Repo.Config().Bootstrap.MinPeerThreshold
    nd.Bootstrapper = net.NewBootstrapper(bpi, nd.Host(), nd.Host().Network(), nd.Router, minPeerThreshold, period)

    // 實(shí)例化鏈查找服務(wù),指定node的該方法
    // On-chain lookup service
    defaultAddressGetter := func() (address.Address, error) {
        return nd.PorcelainAPI.GetAndMaybeSetDefaultSenderAddress()
    }
    nd.lookup = lookup.NewChainLookupService(nd.ChainReader, defaultAddressGetter, bs)

    return nd, nil
}

15.2.3 啟動(dòng)及停止filecoin節(jié)點(diǎn)

  • 啟動(dòng)filecoin節(jié)點(diǎn)的流程概覽
// Start boots up the node.
func (node *Node) Start(ctx context.Context) error {
    // 加載本地chain信息
    if err := node.ChainReader.Load(ctx); err != nil {
        return err
    }

    // 如果存在存儲(chǔ)礦工,配置挖礦功能
    // Only set these up if there is a miner configured.
    if _, err := node.miningAddress(); err == nil {
        if err := node.setupMining(ctx); err != nil {
            log.Errorf("setup mining failed: %v", err)
            return err
        }
    }

    // 設(shè)置鏈同步回調(diào)函數(shù)
    // Start up 'hello' handshake service
    syncCallBack := func(pid libp2ppeer.ID, cids []cid.Cid, height uint64) {
        // TODO it is possible the syncer interface should be modified to
        // make use of the additional context not used here (from addr + height).
        // To keep things simple for now this info is not used.
        err := node.Syncer.HandleNewBlocks(context.Background(), cids)
        if err != nil {
            log.Infof("error handling blocks: %s", types.NewSortedCidSet(cids...).String())
        }
    }
    // 實(shí)例化hello握手協(xié)議
    node.HelloSvc = hello.New(node.Host(), node.ChainReader.GenesisCid(), syncCallBack, node.ChainReader.Head)

    // 實(shí)例化存儲(chǔ)礦工協(xié)議
    cni := storage.NewClientNodeImpl(dag.NewDAGService(node.BlockService()), node.Host(), node.GetBlockTime())
    var err error
    node.StorageMinerClient, err = storage.NewClient(cni, node.PorcelainAPI)
    if err != nil {
        return errors.Wrap(err, "Could not make new storage client")
    }

    // 實(shí)例化檢索客戶及檢索礦工協(xié)議
    node.RetrievalClient = retrieval.NewClient(node)
    node.RetrievalMiner = retrieval.NewMiner(node)

    // 訂閱區(qū)塊通知
    // subscribe to block notifications
    blkSub, err := node.PorcelainAPI.PubSubSubscribe(BlockTopic)
    if err != nil {
        return errors.Wrap(err, "failed to subscribe to blocks topic")
    }
    node.BlockSub = blkSub

    // 訂閱消息通知
    // subscribe to message notifications
    msgSub, err := node.PorcelainAPI.PubSubSubscribe(msg.Topic)
    if err != nil {
        return errors.Wrap(err, "failed to subscribe to message topic")
    }
    node.MessageSub = msgSub

    cctx, cancel := context.WithCancel(context.Background())
    node.cancelSubscriptionsCtx = cancel

    // 啟用新線程訂閱區(qū)塊及消息主題,設(shè)置handle回調(diào)
    go node.handleSubscription(cctx, node.processBlock, "processBlock", node.BlockSub, "BlockSub")
    go node.handleSubscription(cctx, node.processMessage, "processMessage", node.MessageSub, "MessageSub")

    // 啟用新線程處理新的tipset事件
    node.HeaviestTipSetHandled = func() {}
    node.HeaviestTipSetCh = node.ChainReader.HeadEvents().Sub(chain.NewHeadTopic)
    go node.handleNewHeaviestTipSet(cctx, node.ChainReader.Head())

    // 非離線模式啟動(dòng)bootstapper服務(wù)
    if !node.OfflineMode {
        node.Bootstrapper.Start(context.Background())
    }

    // 啟動(dòng)心跳服務(wù)
    if err := node.setupHeartbeatServices(ctx); err != nil {
        return errors.Wrap(err, "failed to start heartbeat services")
    }

    return nil
}
  • 停止filecoin節(jié)點(diǎn)的流程概覽

釋放資源,停止相關(guān)服務(wù)

// Stop initiates the shutdown of the node.
func (node *Node) Stop(ctx context.Context) {
    node.ChainReader.HeadEvents().Unsub(node.HeaviestTipSetCh)
    // 停止挖礦
    node.StopMining(ctx)

    // 取消訂閱
    node.cancelSubscriptions()
    // 停止鏈讀取服務(wù)
    node.ChainReader.Stop()

    // 停止密封服務(wù)
    if node.SectorBuilder() != nil {
        if err := node.SectorBuilder().Close(); err != nil {
            fmt.Printf("error closing sector builder: %s\n", err)
        }
        node.sectorBuilder = nil
    }

    // 關(guān)閉host實(shí)例
    if err := node.Host().Close(); err != nil {
        fmt.Printf("error closing host: %s\n", err)
    }

    // 關(guān)閉資源實(shí)例
    if err := node.Repo.Close(); err != nil {
        fmt.Printf("error closing repo: %s\n", err)
    }

    // 關(guān)閉bootstqpper實(shí)例
    node.Bootstrapper.Stop()

    fmt.Println("stopping filecoin :(")
}

15.2.4 啟動(dòng)及停止挖礦

  • 啟動(dòng)挖礦
// StartMining causes the node to start feeding blocks to the mining worker and initializes
// the SectorBuilder for the mining address.
func (node *Node) StartMining(ctx context.Context) error {
    // 如果在挖礦中,退出
    if node.isMining() {
        return errors.New("Node is already mining")
    }
    // 獲取礦工地址
    minerAddr, err := node.miningAddress()
    if err != nil {
        return errors.Wrap(err, "failed to get mining address")
    }

    // 確保密封服務(wù)實(shí)例存在
    // ensure we have a sector builder
    if node.SectorBuilder() == nil {
        if err := node.setupMining(ctx); err != nil {
            return err
        }
    }

    // 獲取地址
    minerOwnerAddr, err := node.miningOwnerAddress(ctx, minerAddr)
    minerSigningAddress := node.MiningSignerAddress()
    if err != nil {
        return errors.Wrapf(err, "failed to get mining owner address for miner %s", minerAddr)
    }

    blockTime, mineDelay := node.MiningTimes()

    // 實(shí)例化挖礦調(diào)度服務(wù)
    if node.MiningScheduler == nil {
        getStateFromKey := func(ctx context.Context, tsKey string) (state.Tree, error) {
            tsas, err := node.ChainReader.GetTipSetAndState(ctx, tsKey)
            if err != nil {
                return nil, err
            }
            return state.LoadStateTree(ctx, node.CborStore(), tsas.TipSetStateRoot, builtin.Actors)
        }
        getState := func(ctx context.Context, ts types.TipSet) (state.Tree, error) {
            return getStateFromKey(ctx, ts.String())
        }
        getWeight := func(ctx context.Context, ts types.TipSet) (uint64, error) {
            parent, err := ts.Parents()
            if err != nil {
                return uint64(0), err
            }
            // TODO handle genesis cid more gracefully
            if parent.Len() == 0 {
                return node.Consensus.Weight(ctx, ts, nil)
            }
            pSt, err := getStateFromKey(ctx, parent.String())
            if err != nil {
                return uint64(0), err
            }
            return node.Consensus.Weight(ctx, ts, pSt)
        }
        getAncestors := func(ctx context.Context, ts types.TipSet, newBlockHeight *types.BlockHeight) ([]types.TipSet, error) {
            return chain.GetRecentAncestors(ctx, ts, node.ChainReader, newBlockHeight, consensus.AncestorRoundsNeeded, consensus.LookBackParameter)
        }
        processor := consensus.NewDefaultProcessor()
        worker := mining.NewDefaultWorker(node.MsgPool, getState, getWeight, getAncestors, processor, node.PowerTable,
            node.Blockstore, node.CborStore(), minerAddr, minerOwnerAddr, minerSigningAddress, node.Wallet, blockTime)
        node.MiningScheduler = mining.NewScheduler(worker, mineDelay, node.ChainReader.Head)
    }

    // paranoid check
    // 啟動(dòng)挖礦服務(wù)
    if !node.MiningScheduler.IsStarted() {
        node.miningCtx, node.cancelMining = context.WithCancel(context.Background())
        outCh, doneWg := node.MiningScheduler.Start(node.miningCtx)

        node.miningDoneWg = doneWg
        node.AddNewlyMinedBlock = node.addNewlyMinedBlock
        node.miningDoneWg.Add(1)
        go node.handleNewMiningOutput(outCh)
    }

    // initialize a storage miner
    // 初始化存儲(chǔ)礦工
    storageMiner, err := initStorageMinerForNode(ctx, node)
    if err != nil {
        return errors.Wrap(err, "failed to initialize storage miner")
    }
    node.StorageMiner = storageMiner

    // loop, turning sealing-results into commitSector messages to be included
    // in the chain
    // 新開(kāi)線程處理,1 密封完成處理;2 接受停止挖礦消息
    go func() {
        for {
            select {
                // 密封完成處理
            case result := <-node.SectorBuilder().SectorSealResults():
                if result.SealingErr != nil {
                    log.Errorf("failed to seal sector with id %d: %s", result.SectorID, result.SealingErr.Error())
                } else if result.SealingResult != nil {

                    // TODO: determine these algorithmically by simulating call and querying historical prices
                    gasPrice := types.NewGasPrice(0)
                    gasUnits := types.NewGasUnits(300)

                    val := result.SealingResult
                    // This call can fail due to, e.g. nonce collisions. Our miners existence depends on this.
                    // We should deal with this, but MessageSendWithRetry is problematic.
                    _, err := node.PorcelainAPI.MessageSend(
                        node.miningCtx,
                        minerOwnerAddr,
                        minerAddr,
                        nil,
                        gasPrice,
                        gasUnits,
                        "commitSector",
                        val.SectorID,
                        val.CommD[:],
                        val.CommR[:],
                        val.CommRStar[:],
                        val.Proof[:],
                    )
                    if err != nil {
                        log.Errorf("failed to send commitSector message from %s to %s for sector with id %d: %s", minerOwnerAddr, minerAddr, val.SectorID, err)
                        continue
                    }

                    node.StorageMiner.OnCommitmentAddedToChain(val, nil)
                }
                // 挖礦取消
            case <-node.miningCtx.Done():
                return
            }
        }
    }()

    // schedules sealing of staged piece-data
    // 定時(shí)密封階段性的碎片數(shù)據(jù)
    if node.Repo.Config().Mining.AutoSealIntervalSeconds > 0 {
        go func() {
            for {
                select {
                    // 取消
                case <-node.miningCtx.Done():
                    return
                    // 定時(shí)密封
                case <-time.After(time.Duration(node.Repo.Config().Mining.AutoSealIntervalSeconds) * time.Second):
                    log.Info("auto-seal has been triggered")
                    if err := node.SectorBuilder().SealAllStagedSectors(node.miningCtx); err != nil {
                        log.Errorf("scheduler received error from node.SectorBuilder.SealAllStagedSectors (%s) - exiting", err.Error())
                        return
                    }
                }
            }
        }()
    } else {
        log.Debug("auto-seal is disabled")
    }
    // 設(shè)置微挖礦狀態(tài)
    node.setIsMining(true)

    return nil
}
  • 停止挖礦
// StopMining stops mining on new blocks.
func (node *Node) StopMining(ctx context.Context) {
    node.setIsMining(false)

    // 取消挖礦
    if node.cancelMining != nil {
        node.cancelMining()
    }

    // 等待執(zhí)行中的挖礦任務(wù)完成后結(jié)束
    if node.miningDoneWg != nil {
        node.miningDoneWg.Wait()
    }

    // TODO: stop node.StorageMiner
}

15.3 階段性分析結(jié)束說(shuō)明

至此筆者針對(duì)go-filecoin部分的分析快告一個(gè)小的段落了

文章因?yàn)闀r(shí)間的關(guān)系,書(shū)面出來(lái)只是將關(guān)鍵部分書(shū)面表達(dá)出來(lái),更多的像是筆者的一個(gè)分析筆記,但是我相信對(duì)于想分析源碼的朋友有一定幫助

后面會(huì)抽空補(bǔ)充一章總結(jié),筆者在第4章中有提到過(guò),薄讀->厚讀->再薄讀,我們還需要一次薄讀,來(lái)加深我們對(duì)go-filecoin的認(rèn)識(shí)。

[上一篇鏈接] filecoin技術(shù)架構(gòu)分析之十四:filecoin源碼分析之服務(wù)層鏈同步、共識(shí)協(xié)議及挖礦

[下一篇鏈接] filecoin技術(shù)架構(gòu)分析之十六:go-filecoin總結(jié)篇--待續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容