以太坊源码情景分析之区块(block)数据同步之主动同步
主动同步
同步入口
主动同步有好几个主要场景
- geth刚启动
- 新peer加入
- 定时sync
后面两个场景入口都在ProtocolManager.syncer
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle)
defer forceSync.Stop()
for {
select {
case <-pm.newPeerCh:
// Make sure we have peers to select from, then sync
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
case <-pm.noMorePeers:
return
}
}
}
可见syncing的入口函数是synchronise
func (pm *ProtocolManager) synchronise(peer *peer) {
// Otherwise try to sync with the downloader
mode := downloader.FullSync
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Fast sync was explicitly requested, and explicitly granted
mode = downloader.FastSync
} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
// The only scenario where this can happen is if the user manually (or via a
// bad block) rolled back a fast sync node below the sync point. In this case
// however it's safe to reenable fast sync.
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}
// Run the sync cycle, and disable fast sync if we've went past the pivot block
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return
}
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
// all its out-of-date peers of the availability of a new block. This failure
// scenario will most often crop up in private and hackathon networks with
// degenerate connectivity, but it should be healthy for the mainnet too to
// more reliably update peers or the local TD state.
//通知邻近节点有新块
go pm.BroadcastBlock(head, false)
}
}
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
....
return err
}
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
// Set the requested sync mode, unless it's forbidden
d.mode = mode
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
return d.syncWithPeer(p, hash, td)
}
查找通信节点主链共同祖先
我们知道同步数据块有一个很重要的事情需要准备,就是找到两个节点链的共同祖先(findAncestor)
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
origin, err := d.findAncestor(p, height)
}
这个模块,比特币和以太币的实现很不一样
- 比特币是将本地chain顶端N个block的hash及后续以1/2跳跃的方式得到m个block的hash(blocklocator)发送给外部节点,这样外部节点能轻松的找到两个节点的链的共同祖先
- 以太币不一样,它分两个步骤来操作,第一步是向外部节点请求N个block的hash并和本地对比找到共同祖先,如果第一步没有找到祖先,则按照类似1/2跳跃的方式循环请求更前面的区块的hash,并和本地对比来找到共同祖先
- 可见两种方式的核心区别是,比特币是主动提供本地链区块头信息,外部节点负责找出祖先,而以太币是从外部节点获取数据,本地负责找出祖先。如果共同祖先大部分都是在前N个区块,这两种方式差不多,但是如果进行到1/2跳跃请求,则以太坊的请求次数明显增多。
func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
if d.mode == FullSync {
ceil = d.blockchain.CurrentBlock().NumberU64()
} else if d.mode == FastSync {
ceil = d.blockchain.CurrentFastBlock().NumberU64()
}
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
}
p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
// Request the topmost blocks to short circuit binary ancestor lookup
head := ceil
if head > height {
head = height
}
//请求tip区块前N=MaxHeaderFetch个区块的信息
from := int64(head) - int64(MaxHeaderFetch)
if from < 0 {
from = 0
}
// Span out with 15 block gaps into the future to catch bad head reports
limit := 2 * MaxHeaderFetch / 16
count := 1 + int((int64(ceil)-from)/16)
if count > limit {
count = limit
}
//请求前N=MaxHeaderFetch个区块头
go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
ttl := d.requestTTL()
timeout := time.After(ttl)
for finished := false; !finished; {
select {
//接收到区块头数据
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) == 0 {
p.log.Warn("Empty head header set")
return 0, errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
for i := 0; i < len(headers); i++ {
//验证这些返回的header是否是我们上面请求的headers
if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
return 0, errInvalidChain
}
}
// Check if a common ancestor was found
finished = true
for i := len(headers) - 1; i >= 0; i-- {
// Skip any headers that underflow/overflow our requested set
if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
continue
}
// Otherwise check if we already know the header or not
if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit-1 {
p.log.Warn("Lied about chain head", "reported", height, "found", number)
return 0, errStallingPeer
}
break
}
}
// Out of bounds delivery, ignore
}
}
// Ancestor not found, we need to binary search over our chain
start, end := uint64(0), head
if floor > 0 {
start = uint64(floor)
}
//1/2跳跃模式的循环请求
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
ttl := d.requestTTL()
timeout := time.After(ttl)
go p.peer.RequestHeadersByNumber(check, 1, 0, false)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
select {
case <-d.cancelCh:
return 0, errCancelHeaderFetch
case packer := <-d.headerCh:
// Discard anything not from the origin peer
…
}
}
return start, nil
}
数据请求流程
找到共同祖先区块origin block后就是请求获取数据了
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
origin, err := d.findAncestor(p, height)
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
}
func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers))
wg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer wg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
break
}
}
d.queue.Close()
d.Cancel()
wg.Wait()
return err
}
可见syncing的大概过程就是调用
-
调用fetchHeaders, fetchBodies, fetchReceipts请求数据
- 调用processHeaders, processFullSyncContent处理数据
我们知道fetchBodies和fetchReceipts是依赖header数据的,所以自然需要等待header请求数据返回后才能执行,所以这些函数的执行应该是有顺序的,执行顺序如下
但是这几个过程都是一个独立的go routine, 这些函数先后顺序又是如何保证的?估计你大概都能猜测到,通过chan, 一个等待,一个通知的方式即可实现。
因为fetchBodies,fetchReceipts行为差不多,但是他们都依赖fetchHeaders的,因而和fetchHeaders不一样
fetchHeader只需要两步
- 请求数据(fetch)
- 等待并接收数据(wait-result)
而fetchBodies, fetchReceipt多一个步骤
- 等待header数据(wait-header)
- 请求数据(fetch)
- 等待并接收数据(wait-result)
整个流程图大致如下:
这个流程涉及点多, 最难的点是fetchBodies和fetchReceipt等待header数据的过程,按照这个流程图我们来分析下这个等待header的代码
fetchBodies,fetchReceipt最后都是调用fetchParts,只是参数不一样,然后就分别等待在bodyWakeCh,receiptWakeCh上。
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
update := make(chan struct{}, 1)
// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false
for {
select {
//这里的wakeCh就是bodyWakeCh或者receiptWakeCh
case cont := <-wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
}
// Headers arrive, try to update the progress
select {
//会唤醒update逻辑,也即fetch逻辑
case update <- struct{}{}:
default:
}
case <-ticker.C:
// Sanity check update the progress
select {
case update <- struct{}{}:
default:
}
case <-update:
// If there's nothing more to fetch, wait or terminate
//上面的tick,wake都会唤醒进入该逻辑,所以需要检测是否有pending task
// 如果是header数据接收后,则是先进入wakeCH然后进入这里的,且pending() > 0
if pending() == 0 {
if !inFlight() && finished {
log.Debug("Data fetching completed", "type", kind)
return nil
}
break
}
// Send a download request to all idle peers, until throttled
progressed, throttled, running := false, false, inFlight()
idles, total := idle()
for _, peer := range idles {
// Short circuit if throttling activated
if throttle() {
throttled = true
break
}
// Short circuit if there is no more available task.
if pending() == 0 {
break
}
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
//reserve是reserveBodies或者reserveReceipt
//该函数会从bodyTaskPool或者receiptTaskPool里取出task,也就是request
request, progress, err := reserve(peer, capacity(peer))
if err != nil {
return err
}
if progress {
progressed = true
}
if request == nil {
continue
}
if request.From > 0 {
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
} else {
peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
}
//这里是发出GetBlockBodyMsg,GetReceiptMsg请求数据的函数
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
}
running = true
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable
}
}
}
}
所以核心点是:bodyWakeCh,receiptWakeCh及blockTaskQueue, blockTaskPool, receiptTaskPool在哪里赋值的
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
for {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
//没有Header数据,但是仍旧需要通知
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
…
rollback = nil
return nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len(headers) > 0 {
// Select the next chunk of headers to import
limit := maxHeadersProcess
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
….
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
….
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
log.Debug("Stale headers")
return errBadPeer
}
}
headers = headers[limit:]
origin += uint64(limit)
}
...
// Signal the content downloaders of the availablility of new tasks
// 这里会唤醒前面的fetchParts
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
}
}
}
}
}
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the headers prioritised by the contained block number
inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers {
// Make sure chain order is honoured and preserved throughout
….
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
from++
}
return inserts
}
数据填充及组装
区块数据的请求的目标是获取header,body, receipt数据,这些数据到达后会添加到queue.resultCache[index]的一个fetchResult对象里面。这个对象的header数据有点特殊,它不是由processHeader添加进去的,而是在fetchBodies真正调用fetch请求bodies数据时通过new fetchResult初始化赋值的。
所有数据准备好后,processFullSyncContent会被唤醒并读取fetchResult并插入到主链
fetchResult数据结构:
type fetchResult struct {
Pending int // Number of data fetches still pending
Hash common.Hash // Hash of the header to prevent recalculating
Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}
Header数据填充点:
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool) {
case <-update:
request, progress, err := reserve(peer, capacity(peer))
}
func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
}
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
if taskQueue.Empty() {
return nil, false, nil
}
if _, ok := pendPool[p.id]; ok {
return nil, false, nil
}
// Calculate an upper limit on the items we might fetch (i.e. throttling)
space := q.resultSlots(pendPool, donePool)
// Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
progress := false
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := taskQueue.PopItem().(*types.Header)
hash := header.Hash()
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
components := 1
if q.mode == FastSync {
components = 2
}
q.resultCache[index] = &fetchResult{
Pending: components,
Hash: hash,
Header: header,
}
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
if p.Lacks(hash) {
skip = append(skip, header)
} else {
send = append(send, header)
}
}
if progress {
// Wake WaitResults, resultCache was modified
q.active.Signal()
}
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
}
request := &fetchRequest{
Peer: p,
Headers: send,
Time: time.Now(),
}
pendPool[p.id] = request
return request, progress, nil
}
Body数据填充点:
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
return errInvalidBody
}
result.Transactions = txLists[index]
result.Uncles = uncleLists[index]
return nil
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}
unc (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
if i >= results {
break
}
// Reconstruct the next result if contents match up
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
failure = errInvalidChain
break
}
if err := reconstruct(header, i, q.resultCache[index]); err != nil {
failure = err
break
}
hash := header.Hash()
donePool[hash] = struct{}{}
q.resultCache[index].Pending--
useful = true
accepted++
// Clean up a successful fetch
request.Headers[i] = nil
delete(taskPool, hash)
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
// Wake up WaitResults
if accepted > 0 {
q.active.Signal()
}
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
return accepted, failure
case useful:
return accepted, fmt.Errorf("partial failure: %v", failure)
default:
return accepted, errStaleDelivery
}
}
receipt数据填充点:
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
return errInvalidReceipt
}
result.Receipts = receiptList[index]
return nil
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}
这里的deliver函数和上面一样
数据填充总结:
三个数据赋值到fetchResult的同时都会调用q.active.Signal
所以数据填充完后,q.active.Signal调用了三次。这个也是processFullSyncContent唤醒的条件
数据组装:
func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.Results(true)
if len(results) == 0 {
return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
if err := d.importBlockResults(results); err != nil {
return err
}
}
}
func (q *queue) Results(block bool) []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
// Count the number of items available for processing
nproc := q.countProcessableItems()
for nproc == 0 && !q.closed {
if !block {
return nil
}
//等待,知道所有的fetch完成
q.active.Wait()
nproc = q.countProcessableItems()
}
// Since we have a batch limit, don't pull more into "dangling" memory
if nproc > maxResultsProcess {
nproc = maxResultsProcess
}
results := make([]*fetchResult, nproc)
copy(results, q.resultCache[:nproc])
return results
}
请注意上面的q.active.Wait(),这个就是等待休眠点,前面的header, body, receipt数据填充时会调用q.active.Signal三次后,processFullSyncContent就会从这里继续执行。然后就从q.resultCache中拷贝数据并执行importBlockResults插入到主链
func (d *Downloader) importBlockResults(results []*fetchResult) error {
blocks := make([]*types.Block, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
}
if index, err := d.blockchain.InsertChain(blocks); err != nil {
return errInvalidChain
}
return nil
}