币圈信息网 币圈新闻 精通 Filecoin:Lotus真实数据处理之Provider初始化

精通 Filecoin:Lotus真实数据处理之Provider初始化

因为 StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中

因为 StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中,DI 容器会调用 StorageProvider 函数(node/modules/storageminer.go)来创建它。StorageProvider 函数流程如下:调用 NewFromLibp2pHost 函数,生成 StorageMarketNetwork 对象。

net?:=?smnet.NewFromLibp2pHost(h)

调用 NewLocalFileStore 函数,生成 FileStore 存储对象。

store,?err?:=?piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))

NewLocalFileStore 函数(go-fil-markets 类库 filestore/filestore.go)流程如下:

base?:=?filepath.Clean(string(basedirectory))info,?err?:=?os.Stat(string(base))

if?!info.IsDir()?{return?nil,?fmt.Errorf(\"%s?is?not?a?directory\",?base)}

return?&fileStore{string(base)},?nil

NewLocalFileStore 函数使用的路径为仓库目录。即碎片的临时目录就是仓库目录。调用 CustomDealDecisionLogic 函数,返回一个函数对象。在函数对象中调用我们提供的回调函数,进行自定义交易逻辑判断。

opt?:=?storageimpl.CustomDealDecisionLogic(func(ctx?context.Context,?deal?storagemarket.MinerDeal)?(bool,?string,?error)?{

})

生成并返回 StorageProvider 对象。

p,?err?:=?storageimpl.NewProvider(net,?namespace.Wrap(ds,?datastore.NewKey(\"/deals/provider\")),?ibs,?store,?pieceStore,?dataTransfer,?spn,?address.Address(minerAddress),?ffiConfig.SealProofType,?storedAsk,?opt)

return?p,?nil

NewProvider 函数处理如下:生成 PieceIOWithStore 对象。

carIO?:=?cario.NewCarIO()pio?:=?pieceio.NewPieceIOWithStore(carIO,?fs,?bs)

生成 Provider 对象。

h?:=?&Provider{net:net,proofType:rt,spn:spn,fs:fs,pio:pio,pieceStore:pieceStore,conns:connmanager.NewConnManager(),storedAsk:storedAsk,actor:minerAddress,dataTransfer:dataTransfer,dealAcceptanceBuffer:?DefaultDealAcceptanceBuffer,pubSub:pubsub.New(providerDispatcher),}

生成 fsm 状态组对象。

deals,?err?:=?NewProviderStateMachine(ds,&providerDealEnvironment{h},h.dispatch,)

h.deals?=?deals

fsm 状态组对象使用的配置参数如下:

return?fsm.New(ds,?fsm.Parameters{Environment:env,StateType:storagemarket.MinerDeal{},StateKeyField:\"State\",Events:providerstates.ProviderEvents,StateEntryFuncs:?providerstates.ProviderStateEntryFuncs,FinalityStates:providerstates.ProviderFinalityStates,Notifier:notifier,})

环境对象为 providerDealEnvironment。状态对象为 MinerDeal。状态字段为 State。事件集合为 ProviderEvents,参考 storagemarket/impl/providerstates/provider_fsm.go 文件。状态处理函数集合 为 ProviderStateEntryFuncs,状态机的状态处理器根据对应的状态获取到指定的函数进行处理。终止状态集合为 ProviderFinalityStates。通知对象为 Provider 对象的 dispatch 方法。使用配置选项,配置 Provider 对象。

h.Configure(options...)

设置数据传输监听对象。

dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))

当开始数据传输、传输结束、传输错误时会发送 ProviderEventDataTransferInitiated、ProviderEventDataTransferCompleted、ProviderEventDataTransferFailed 等事件到 fsm 状态组。返回 Provider 对象。

在存储矿工启动过程自动调用 HandleDeals 函数(node/modules/storageminer.go)。在这个函数中,调用 StorageProvider 对象的 Start 方法,从而启动这个对象。

Start 方法执行过程如下:调用 StorageMarketNetwork 网络对象的 SetDelegate 设置代理/委托为自身。

err?:=?p.net.SetDelegate(p)

网络对象的实现为 libp2pStorageMarketNetwork 结构体(storagemarket/network/libp2p_impl.go)。它的 SetDelegate 方法内容如下:

impl.receiver?=?rimpl.host.SetStreamHandler(storagemarket.DealProtocolID,?impl.handleNewDealStream)impl.host.SetStreamHandler(storagemarket.AskProtocolID,?impl.handleNewAskStream)return?nil

上面分别设置网络对象的 handleNewDealStream 方法处理 DealProtocolID 协议,表示存储;handleNewAskStream 方法 处理 AskProtocolID 协议,表示 ask。

handleNewDealStream 方法内容如下:

//?客户端?peer?idremotePID?:=?s.Conn().RemotePeer()

buffered?:=?bufio.NewReaderSize(s,?16)

//?对流进行包装ds?:=?&dealStream{remotePID,?impl.host,?s,?buffered}

//?调用?StorageProvider?对象的?HandleDealStream?方法,处理客户端存储请求impl.receiver.HandleDealStream(ds)

在协程中调用 StorageProvider 对象的 restartDeals 方法,重新进行交易处理。restartDeals 方法流程如下:从 fsm 状态组对象中获取所有的交易对象。

var?deals?[]storagemarket.MinerDealerr?:=?c.deals.List(&deals)

遍历所有的交易对象,进行下面的处理:如果当前交易对象已经终止,则进行下一个处理。如果当前交易对象的连接已经关闭,则进行下一个处理。发送初始交易事件给 fsm 状态组。

err?=?c.deals.Send(deal.ProposalCid,?storagemarket.ProviderEventRestart)

交易提案的 Cid 表示了状态机的名称/编号。返回空值。

本文来自网络,不代表币圈信息网立场,转载请注明出处:https://www.lpbwg.com/33730.html

作者: bqxxw

返回顶部