Spectrum|Spectrum 区块偶尔停止同步问题排查与解决笔记

同步失败问的题追踪

代码地址: https://github.com/SmartMeshFoundation/Spectrum
本次修正将提交在 dev 分支中,并会随 0.5.2 版本一起发布
  • 问题描述:
    当节点为创世节点时,没有进入轮换阵营时节点会处于等待提名状态,永远也不会被成功提名,因为创世节点在共识合约中有拒绝提名的判断。
    此时同步到一定数量的新块之后,块会停留在那个高度不再增长,本次观测停留在 1222972 块不再增长了。
  • 线索日志:
DEBUG[11-21|19:21:46|core/blockchain.go:1009]Inserted new blocknumber=1222972 hash=7af7f5?~@?985c9euncles=0txs=1gas=0elapsed=11.820ms DEBUG[11-21|19:21:52|eth/downloader/downloader.go:1562]Recalculated downloader QoS valuesrtt=11.836220508s confidence=1.000 ttl=35.508697032s DEBUG[11-21|19:22:00|eth/handler.go:642]<>trueTD=3077093 p.td=3077078 p.id=f6cb0c9800fb01fb11cb313848091596ba4fd167e1c7873e0520ebdd59ceb454cf5a16d7f78ff7aaa91f117ad6694bca4de63d3150cb1b48813d75d4b98e2deb DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]Discarded propagated block, exceeded allowance peer=f6cb0c9800fb01fbnumber=1222973 hash=33b369?~@?26e683limit=64 DEBUG[11-21|19:22:00|eth/peer.go:193]Fetching single headerid=4b36359d6b54ab46 conn=dyndial hash=33b369?~@?26e683 DEBUG[11-21|19:22:00|eth/peer.go:214]Fetching batch of block bodiesid=4b36359d6b54ab46 conn=dyndial count=1 DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]Discarded propagated block, exceeded allowance peer=4b36359d6b54ab46number=1222973 hash=33b369?~@?26e683limit=64 DEBUG[11-21|19:22:03|eth/downloader/downloader.go:1562]Recalculated downloader QoS valuesrtt=11.836220508s confidence=1.000 ttl=35.508697032s DEBUG[11-21|19:22:14|eth/handler.go:642]<>trueTD=3077096 p.td=3077081 p.id=6549749a9e83b4bd89e1469d51986cc1689094b6621daa651d3e76dc9720659008cad99e949d274b6c26e87241964775e22a01e167b79b85dd454fd160b46fac DEBUG[11-21|19:22:14|eth/fetcher/fetcher.go:631]Queued propagated blockpeer=6549749a9e83b4bdnumber=1222974 hash=670fb2?~@?f0af05queued=1

同步块的逻辑: 从日志上看一直有 "NewBlockMsg" 日志输出,说明问题不在网络层。
这部分与块的下载也无关,所有逻辑貌似都正常
  • 入口方法:
    func (pm *ProtocolManager) synchronise(peer *peer)
被三个点触发
1、newblock 消息
2、新 peer 连接 :
3、定时器
eth/handler.go 的 func (pm *ProtocolManager) handleMsg(p *peer) error 处理全部消息,其中包括 NewBlockMsg
case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p// Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block)// Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. var ( trueHead = request.Block.ParentHash() trueTD= new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) _, tttt := p.Head()currentBlock := pm.blockchain.CurrentBlock() peerpub, _ := p.ID().Pubkey() peeraddr := crypto.PubkeyToAddress(*peerpub) log.Debug("<>", "currentBlock", currentBlock, "recvBlock", request.Block.Number(), "currentTD", pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), "trueTD", trueTD, "p.td", tttt, "p.id", peeraddr.Hex(), ) // Update the peers total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD)// Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. //currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } }

pm.fetcher.Enqueue(p.id, request.Block) 到底在干什么
// Enqueue tries to fill gaps the the fetcher's future import queue. func (f *Fetcher) Enqueue(peer string, block *types.Block) error { op := &inject{ origin: peer, block:block, } select { case f.inject <- op: return nil case <-f.quit: return errTerminated } }

他是把参数放进 fetcher.inject 中,然后被 loop() 处理,如下片段
case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block)

f.enqueue 里面有很重要的规则
func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash()// Ensure the peer isn't DOSing us count := f.queues[peer] + 1 // 如果当前 peer 已经有超过 64 个块在排队等待处理,则忽略当前块blockLimit = 64 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks // 如果收到的块是 7 块之前的叔块或者 32块以后的块,要忽略掉 maxUncleDist = 7 , maxQueueDist = 32 if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block:block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) } }

主要是放到 f.queue 中,然后在 loop() 中对 f.queue 进行循环处理,我感觉问题就出现在这个函数中 func (f *Fetcher) loop() ,开头部分的 queue 处理逻辑可能有问题
问题出在 if count > blockLimit 这个条件成立时,这个条件为什么会成立?
// enqueue schedules a new future import operation, if the block to be imported // has not yet been seen. func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash()// Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } ......

输出了更多日志进行分析,发现是 insertChain 没有正确返回,里面有阻塞
// insert spawns a new goroutine to run a block insertion into the chain. If the // block's number is at the same height as the current import phase, if updates // the phase states accordingly. func (f *Fetcher) insert(peer string, block *types.Block) { ...... log.Debug("insert_begin","number",block.Number()) // 这里没有返回 if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } log.Debug("insert_end","number",block.Number()) ...... }

观察只有 begin 没有 end 和 done 时,去 grep Inserted 关键字
DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:672]insert_beginnumber=1253372 DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:674]insert_endnumber=1253372 err=nil DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:645]insert_donenumber=1253372

最终确认,是阻塞在事件广播上
//core/blockchain.go func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { n, events, logs, err := bc.insertChain(chain) // 这里阻塞了 bc.PostChainEvents(events, logs) log.Debug("Inserted block end", "number", chain[0].Number()) return n, err }// // PostChainEvents iterates over the events generated by a chain insertion and // posts them into the event feed. // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // post event logs for further processing if logs != nil { bc.logsFeed.Send(logs) } for _, event := range events { switch ev := event.(type) { case ChainEvent: bc.chainFeed.Send(ev)case ChainHeadEvent: //上面的阻塞是因为这里的阻塞造成的 bc.chainHeadFeed.Send(ev)case ChainSideEvent: bc.chainSideFeed.Send(ev) } } }

【Spectrum|Spectrum 区块偶尔停止同步问题排查与解决笔记】可以看到这个阻塞是发布 ChainHead 事件时造成的,首先要找到哪些地方在调用这个 subscribe,可能有锁的竞争,也许是一个没有完成的调用导致的。但是它是运行一段事件后才开始阻塞的,这就有些奇怪了。
通过对 chainHeadFeed 的订阅者排查,最终定位问题,在 worker 的 update 方法中,接受 chainHeadEvent 时要执行 commitNewWork() ,第一次执行成功,第二次失败并阻塞,这个订阅接受消息的 channel 有 10 个 slot ,所以 10 个事件以后彻底阻塞事件广播模块,并在某个 peer 阻塞 send event 的 64 个块之后,将 peer 判定为 dosing 节点,然后就出现上文提到的现象,会有概率丢失一个块,而造成不同步。
  • 通过如下逻辑调整可以解决上述问题:
    在 worker.commitNewWork() 中,经过调试,发现是阻塞在 self.push(work) 上
// push sends a new work task to currently live miner agents. func (self *worker) push(work *Work) { // 在 miner 没有正确启动前应该走这个分支,但是因为之前对 miner.start 做了异步处理 // 误将这个标识错误的设置为启动成功,此时没有启动 agents,最终导致下面for逻辑的阻塞 if atomic.LoadInt32(&self.mining) != 1 { return } for agent := range self.agents { atomic.AddInt32(&self.atWork, 1) if ch := agent.Work(); ch != nil { ch <- work } } }// 解决办法也比较简单,只是排查起来比较复杂,将 self.mining 挪到下面去设置即可 func (self *worker) start(s chan int) { self.mu.Lock() defer self.mu.Unlock() // 挪到下面去 //atomic.StoreInt32(&self.mining, 1)//add by liangc : sync mining status wg := new(sync.WaitGroup) if tribe, ok := self.engine.(*tribe.Tribe); ok { wg.Add(1) go func() { defer wg.Done() if self.chain.CurrentHeader().Number.Int64() > 1 { // free for genesis signer log.Info("?? Everything is ready, Waiting for nomination, pending until miner level upgrade") // pending until miner level upgrade tribe.WaitingNomination() } tribe.SetMining(1, self.chain.CurrentBlock().Number(), self.chain.CurrentHeader().Hash()) }() }go func() { defer func() { s <- 1 }() wg.Wait() // 从上面挪下来的 >>>> atomic.StoreInt32(&self.mining, 1) // 从上面挪下来的 <<<< // spin up agents for agent := range self.agents { agent.Start() } }() }

    推荐阅读