前一篇講完通道創(chuàng)建,接下來馬上趁熱打鐵來看下加入通道這部分的實現(xiàn),先看下命令。
peer channel join -b mychannel.block
可以看到,這里用上了上一步所生成的通道的genesisblock。
peer
executeJoin
func executeJoin(cf *ChannelCmdFactory) (err error) {
spec, err := getJoinCCSpec()
if err != nil {
return err
}
// Build the ChaincodeInvocationSpec message
invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
creator, err := cf.Signer.Serialize()
if err != nil {
return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err)
}
var prop *pb.Proposal
prop, _, err = putils.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator)
if err != nil {
return fmt.Errorf("Error creating proposal for join %s", err)
}
var signedProp *pb.SignedProposal
signedProp, err = putils.GetSignedProposal(prop, cf.Signer)
if err != nil {
return fmt.Errorf("Error creating signed proposal %s", err)
}
var proposalResp *pb.ProposalResponse
proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp)
if err != nil {
return ProposalFailedErr(err.Error())
}
if proposalResp == nil {
return ProposalFailedErr("nil proposal response")
}
if proposalResp.Response.Status != 0 && proposalResp.Response.Status != 200 {
return ProposalFailedErr(fmt.Sprintf("bad proposal response %d: %s", proposalResp.Response.Status, proposalResp.Response.Message))
}
logger.Info("Successfully submitted proposal to join channel")
return nil
}
這里主要做幾件事情
生成cis,也就是ChaincodeInvocationSpec,這里跟chaincode invoke篇高度相似,這里就不再贅述。而這里cis的重點是里面最終調(diào)用的是什么地方。
- 可以看到最終是調(diào)用cscc的JoinChain
func getJoinCCSpec() (*pb.ChaincodeSpec, error) { if genesisBlockPath == common.UndefinedParamValue { return nil, errors.New("Must supply genesis block file") } gb, err := ioutil.ReadFile(genesisBlockPath) if err != nil { return nil, GBFileNotFoundErr(err.Error()) } // Build the spec input := &pb.ChaincodeInput{Args: [][]byte{[]byte(cscc.JoinChain), gb}} spec := &pb.ChaincodeSpec{ Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]), ChaincodeId: &pb.ChaincodeID{Name: "cscc"}, Input: input, } return spec, nil }
- 最后包裝成HeaderType_CONFIG的Proposal
- 注意,這里會去生成數(shù)字簽名,當然是用的signidentity,而簽名身份是跟admin是一致的。
- 接著就是開始處理proposal了
cscc
case JoinChain:
if args[1] == nil {
return shim.Error("Cannot join the channel <nil> configuration block provided")
}
block, err := utils.GetBlockFromBlockBytes(args[1])
if err != nil {
return shim.Error(fmt.Sprintf("Failed to reconstruct the genesis block, %s", err))
}
cid, err := utils.GetChainIDFromBlock(block)
if err != nil {
return shim.Error(fmt.Sprintf("\"JoinChain\" request failed to extract "+
"channel id from the block due to [%s]", err))
}
if err := validateConfigBlock(block); err != nil {
return shim.Error(fmt.Sprintf("\"JoinChain\" for chainID = %s failed because of validation "+
"of configuration block, because of %s", cid, err))
}
// 2. check local MSP Admins policy
// TODO: move to ACLProvider once it will support chainless ACLs
if err = e.policyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil {
return shim.Error(fmt.Sprintf("access denied for [%s][%s]: [%s]", fname, cid, err))
}
// Initialize txsFilter if it does not yet exist. We can do this safely since
// it's the genesis block anyway
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
// add array of validation code hardcoded to valid
txsFilter = util.NewTxValidationFlagsSetValue(len(block.Data.Data), pb.TxValidationCode_VALID)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
return joinChain(cid, block, e.ccp, e.sccp)
前面就是block的各種校驗,跳過
CheckPolicyNoChannel(mgmt.Admins, sp),為什么前面要強調(diào)簽名身份跟admin一致,說明這個proposal包是admin簽署的。而這里就是校驗規(guī)則是否是admin。截取片段證明我所言非虛。
case m.MSPRole_ADMIN: mspLogger.Debugf("Checking if identity satisfies ADMIN role for %s", msp.name) // in the case of admin, we check that the // id is exactly one of our admins for _, admincert := range msp.admins { if bytes.Equal(id.(*identity).cert.Raw, admincert.(*identity).cert.Raw) { // we do not need to check whether the admin is a valid identity // according to this MSP, since we already check this at Setup time // if there is a match, we can just return return nil } }
- 可以看到這里就是比對證書是否跟本地保存的該組織的admin是否一致。
joinChain
func joinChain(chainID string, block *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) pb.Response {
if err := peer.CreateChainFromBlock(block, ccp, sccp); err != nil {
return shim.Error(err.Error())
}
peer.InitChain(chainID)
return shim.Success(nil)
}
這里分兩步,CreateChainFromBlock和InitChain下面會講到。處理完后給peer返回success
CreateChainFromBlock
func CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {
cid, err := utils.GetChainIDFromBlock(cb)
if err != nil {
return err
}
var l ledger.PeerLedger
if l, err = ledgermgmt.CreateLedger(cb); err != nil {
return errors.WithMessage(err, "cannot create ledger from genesis block")
}
return createChain(cid, l, cb, ccp, sccp, pluginMapper)
}
這里做兩件事情
- 創(chuàng)建本地帳本
- createChain,回憶下上篇創(chuàng)建通道,那里會初始化orderer端的chain,并啟動。而這里是peer端
createChain
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider, pm txvalidator.PluginMapper) error {
chanConf, err := retrievePersistedChannelConfig(ledger)
if err != nil {
return err
}
var bundle *channelconfig.Bundle
if chanConf != nil {
bundle, err = channelconfig.NewBundle(cid, chanConf)
if err != nil {
return err
}
} else {
// Config was only stored in the statedb starting with v1.1 binaries
// so if the config is not found there, extract it manually from the config block
envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
if err != nil {
return err
}
bundle, err = channelconfig.NewBundleFromEnvelope(envelopeConfig)
if err != nil {
return err
}
}
capabilitiesSupportedOrPanic(bundle)
channelconfig.LogSanityChecks(bundle)
gossipEventer := service.GetGossipService().NewConfigEventer()
gossipCallbackWrapper := func(bundle *channelconfig.Bundle) {
ac, ok := bundle.ApplicationConfig()
if !ok {
// TODO, handle a missing ApplicationConfig more gracefully
ac = nil
}
gossipEventer.ProcessConfigUpdate(&gossipSupport{
Validator: bundle.ConfigtxValidator(),
Application: ac,
Channel: bundle.ChannelConfig(),
})
service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
// TODO: this is a place-holder that would somehow make the MSP layer suspect
// that a given certificate is revoked, or its intermediate CA is revoked.
// In the meantime, before we have such an ability, we return true in order
// to suspect ALL identities in order to validate all of them.
return true
})
}
trustedRootsCallbackWrapper := func(bundle *channelconfig.Bundle) {
updateTrustedRoots(bundle)
}
mspCallback := func(bundle *channelconfig.Bundle) {
// TODO remove once all references to mspmgmt are gone from peer code
mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
}
ac, ok := bundle.ApplicationConfig()
if !ok {
ac = nil
}
cs := &chainSupport{
Application: ac, // TODO, refactor as this is accessible through Manager
ledger: ledger,
}
peerSingletonCallback := func(bundle *channelconfig.Bundle) {
ac, ok := bundle.ApplicationConfig()
if !ok {
ac = nil
}
cs.Application = ac
cs.Resources = bundle
}
cs.bundleSource = channelconfig.NewBundleSource(
bundle,
gossipCallbackWrapper,
trustedRootsCallbackWrapper,
mspCallback,
peerSingletonCallback,
)
vcs := struct {
*chainSupport
*semaphore.Weighted
}{cs, validationWorkersSemaphore}
validator := txvalidator.NewTxValidator(cid, vcs, sccp, pm)
c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})
ordererAddresses := bundle.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("no ordering service endpoint provided in configuration block")
}
// TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?
store, err := TransientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
if err != nil {
return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())
}
csStoreSupport := &CollectionSupport{
PeerLedger: ledger,
}
simpleCollectionStore := privdata.NewSimpleCollectionStore(csStoreSupport)
service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
Validator: validator,
Committer: c,
Store: store,
Cs: simpleCollectionStore,
IdDeserializeFactory: csStoreSupport,
})
chains.Lock()
defer chains.Unlock()
chains.list[cid] = &chain{
cs: cs,
cb: cb,
committer: c,
}
return nil
}
首先根據(jù)傳入的genesisblock中拿到envelope,進而轉(zhuǎn)換成bundle,又來了,看過上篇的,應(yīng)該很熟悉了。
拿到bundle后,第一時間就是組裝chainsupport,可以看到后面的wrapper和callback都是用來處理當bundle更新的場景,也就是當配置變更的時候會回調(diào)這些邏輯,這里就不展開了。自行分析吧。
cs := &chainSupport{ Application: ac, // TODO, refactor as this is accessible through Manager ledger: ledger, } cs.bundleSource = channelconfig.NewBundleSource( bundle, gossipCallbackWrapper, trustedRootsCallbackWrapper, mspCallback, peerSingletonCallback, )
然后就是根據(jù)賬本去初始化LedgerCommitter,這是賬本提交相關(guān)的。
最后最最重要的是初始化gossip服務(wù),回憶下gossip會做什么?
- 去orderer拉取block
- peer節(jié)點間選舉
- peer成員間主動被動同步block
- peer成員狀態(tài)同步
而這些都會在這里面去初始化,service.GetGossipService().InitializeChannel感興趣的可以進去看看,可以找到前面所講的deliveryService,leaderelection,GossipStateProvider
InitChain
peer.Initialize(func(cid string) {
logger.Debugf("Deploying system CC, for channel <%s>", cid)
sccp.DeploySysCCs(cid, ccp)
sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
return peer.GetLedger(cid).NewQueryExecutor()
}))
if err != nil {
logger.Panicf("Failed subscribing to chaincode lifecycle updates")
}
cceventmgmt.GetMgr().Register(cid, sub)
}
當peer node start的時候也會調(diào)用sccp.DeploySysCCs(cid, ccp),只不過cid為空,說明是系統(tǒng)級的scc,而這里部署的是通道相關(guān)的scc。也就是說以后在通道的基礎(chǔ)上調(diào)用的scc就是該通道獨有的,跟其他通道以及系統(tǒng)區(qū)分開。
好奇,到底有什么區(qū)別,都用系統(tǒng)級的不好么?進去看看
deploySysCC
func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error {
...
if chainID != "" {
lgr := peer.GetLedger(chainID)
if lgr == nil {
panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID))
}
txsim, err := lgr.NewTxSimulator(txid)
if err != nil {
return err
}
txParams.TXSimulator = txsim
defer txsim.Done()
}
...
}
進來可以看到,如果綁定通道id的結(jié)果是生成獨有的事件模擬器。
小結(jié)
至此,整個join處理完畢,很多部分前面已經(jīng)講過了,顯得這里很粗糙,建議搞清楚了再來,抱歉。