以太坊源码情景分析之挖矿

数据结构分析

以太坊源码情景分析之挖矿
  • Miner类
       它是对外类,暴露给外部模块使用的, 具体挖矿的事务由它的成员变量worker管理
       它的update函数负责监听downloader的各种事件(收到新块事件等),恰当的通知worker启动或停止mine,以防止多个新块出现。
  • Work类
       这个是一个数据类,里面包含了交易数据(transactions)及收据(receipts)
  • worker类


    它负责准备区管理组装事务,比如什么时候开始挖新块,停止挖块,使用engine组装Work, 交给agent做seal,即使用pow/clique打包块

  • Engine类:
       共识算法的实现,比如区块头的格式,难度,pow/clique算法
  • CpuAgent类:
       继承Agent, 只是一种agent,调用具体的engine执行seal,即pow/clique算法,找出合适的nonce,填充block的剩余字段数据
       该类处理完后的最终结果通过returnCh(chan *Result)通道唤醒worker.wait函数,worker.wait拿到完整的block数据然后插入chain并通知其他节点
       其实,还有其他agent的实现类,比如remoteagent实现矿池各个节点共同计算

   
总结:
    一个新区块被挖掘出的过程其实就是Block对象数据被填充的过程,这个填充过程可以分为两个步骤
  • 一是组装出一个新区块,这个区块的数据基本完整,包括成员Header的大部分属性以及交易列表txs,叔区块uncles[],并且所有交易已经执行完毕,所有收据(Receipt)也已收集完毕,这部分主要由worker完成
  • 二是填补该区块剩余的成员属性,比如Header.Difficulty, Header.nonce, Header.mixdigest等,这些工作是由Agent调用<Engine>接口实现体,利用共识算法来完成的。

下面以块生成事件驱动过程来分析挖矿的整个过程

生成新区块


worker.update函数会监听各种事件,当发现有ChainHeadEvent事件时即开始miner新区块, ChainHeadEvent说明新块已经添加到区块链,因而可以挖新区块了

区块数据准备


func (self *worker) update() {
    defer self.txSub.Unsubscribe()
    defer self.chainHeadSub.Unsubscribe()
    defer self.chainSideSub.Unsubscribe()

    for {
        // A real event arrived, process interesting content
        select {
        // Handle ChainHeadEvent
        case <-self.chainHeadCh:
            self.commitNewWork()
}

func (self *worker) commitNewWork() {
    num := parent.Number()
    header := &types.Header{
        ParentHash: parent.Hash(),
        Number: num.Add(num, common.Big1),
        GasLimit: core.CalcGasLimit(parent),
        Extra: self.extra,
        Time: big.NewInt(tstamp),
    }
    // Only set the coinbase if we are mining (avoid spurious block rewards)
    if atomic.LoadInt32(&self.mining) == 1 {
        header.Coinbase = self.coinbase
    }
    //header数据的准备,比如计算难度
    if err := self.engine.Prepare(self.chain, header); err != nil {
        log.Error("Failed to prepare header for mining", "err", err)
        return
    }
    // Could potentially happen if starting to mine in an odd state.
    //新创建一个Work,并设置为worker.current
    err := self.makeCurrent(parent, header)
    if err != nil {
        log.Error("Failed to create mining context", "err", err)
        return
    }
    // Create the current work task and check any fork transitions needed
    work := self.current
    //取出pending交易用来打包到block中
    pending, err := self.eth.TxPool().Pending()
    if err != nil {
        log.Error("Failed to fetch pending transactions", "err", err)
        return
    }
    txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
    //执行transaction(比如智能合约),这个函数会生成work.reciepts
    work.commitTransactions(self.mux, txs, self.chain, self.coinbase)

    // compute uncles for the new block.
    var (
        uncles []*types.Header
        badUncles []common.Hash
    )
    //挑选至多2个uncle block数据打包到block中
    for hash, uncle := range self.possibleUncles {
        if len(uncles) == 2 {
            break
        }
        if err := self.commitUncle(work, uncle.Header()); err != nil {
            log.Trace("Bad uncle found and will be removed", "hash", hash)
            log.Trace(fmt.Sprint(uncle))

            badUncles = append(badUncles, hash)
        } else {
            log.Debug("Committing new uncle to block", "hash", hash)
            uncles = append(uncles, uncle.Header())
        }
    }
    // Create the new block to seal with the consensus engine
    if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
        log.Error("Failed to finalize block for sealing", "err", err)
        return
    }
    self.push(work)
}

func (self *worker) push(work *Work) {
    if atomic.LoadInt32(&self.mining) != 1 {
        return
    }
    for agent := range self.agents {
        atomic.AddInt32(&self.atWork, 1)
        if ch := agent.Work(); ch != nil {
            ch <- work
        }
    }
}

  • 准备新区块的时间Time(系统时间),Num(父区块Num +1),ParentHash;其余字段比如Difficulty,GasLimit, nonce, root等,均留待之后共识算法中确定。
  • 调用Engine.Prepare()函数,完成Header对象的字段的初始值。
  • 根据已有的Header对象,创建一个新的Work对象,并用其更新worker.current成员变量。
  • 准备新区块的交易列表,来源是TxPool中那些最近加入的tx,并执行这些交易。
  • 准备新区块的叔区块uncles[],来源是worker.possibleUncles[],而possibleUncles[]中的每个区块都从事件ChainSideEvent中搜集得到, 叔区块最多有两个。
  • 调用Engine.Finalize()函数,对新区块“定型”,填充上Header.Root, TxHash, ReceiptHash, UncleHash等几个属性。
  • 把创建的Work对象,通过channel发送给每一个登记过的Agent,进行后续的挖掘。(self.push(work))
后续挖掘是执行POW/Clique算法,计算出nonce,并填充mixdigest等字段

区块数据打包

    数据打包其实就是根据共识算法进一步填充block里的信息,这个由Agent(CpuAgent)来执行

func (self *CpuAgent) update() {
out:
    for {
        select {
        //接收到新的work
        case work := <-self.workCh:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
            }
            self.quitCurrentOp = make(chan struct{})
            go self.mine(work, self.quitCurrentOp)
            self.mu.Unlock()
        case <-self.stop:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
                self.quitCurrentOp = nil
            }
            self.mu.Unlock()
            break out
        }
    }
}

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
    if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
        log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
        self.returnCh <- &Result{work, result}
    } else {
        if err != nil {
            log.Warn("Block sealing failed", "err", err)
        }
        self.returnCh <- nil
    }
}

func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
    // If we're running a fake PoW, simply return a 0 nonce immediately
    // Create a runner and the multiple search threads it directs
    abort := make(chan struct{})
    found := make(chan *types.Block)

    ethash.lock.Lock()
    threads := ethash.threads
    ethash.lock.Unlock()
    if threads == 0 {
        threads = runtime.NumCPU()
    }
    if threads < 0 {
        threads = 0 // Allows disabling local mining without extra logic around local/remote
    }
    var pend sync.WaitGroup
    //多线程挖矿
    for i := 0; i < threads; i++ {
        pend.Add(1)
        go func(id int, nonce uint64) {
            defer pend.Done()
            // 执行Pow挖矿逻辑
            ethash.mine(block, id, nonce, abort, found)
        }(i, uint64(ethash.rand.Int63()))
    }
    // Wait until sealing is terminated or a nonce is found
    var result *types.Block
    select {
    case <-stop:
        // Outside abort, stop all miner threads
        close(abort)
    case result = <-found:
         // 等待直至挖出一个新块,返回
        // One of the threads found a block, abort all others
        close(abort)
    case <-ethash.update:
        // 用新配置重新挖矿
        // Thread count was changed on user request, restart
        close(abort)
        pend.Wait()
        return ethash.Seal(chain, block, stop)
    }
    // Wait for all miners to terminate and return the block
    pend.Wait()
    return result, nil
}

func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
    // Extract some data from the header
    var (
        header = block.Header()
        hash = header.HashNoNonce().Bytes()
        //根据难度系数计算出pow目标值target
        target = new(big.Int).Div(maxUint256, header.Difficulty)
        number = header.Number.Uint64()
        dataset = ethash.dataset(number)
    )
    // Start generating random nonces until we abort or find a good one
    var (
        attempts = int64(0)
        //nonce初始值,后面就不停循环递增碰撞
        nonce = seed
    )
    //无穷循环执行POW碰撞
    for {
        select {
        default:
            // We don't have to update hash rate on every nonce, so update after after 2^X nonces
            attempts++
            if (attempts % (1 << 15)) == 0 {
                ethash.hashrate.Mark(attempts)
                attempts = 0
            }
            // Compute the PoW value of this nonce
            // 用POW算法计算nonce
            digest, result := hashimotoFull(dataset.dataset, hash, nonce)
            if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
                // Correct nonce found, create a new header with it
                header = types.CopyHeader(header)
                header.Nonce = types.EncodeNonce(nonce)
                header.MixDigest = common.BytesToHash(digest)

                // Seal and return a block (if still needed)
                select {
                //将最终数据返回给worker
                case found <- block.WithSeal(header):
                    logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
                case <-abort:
                    logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
                }
                break search
            }
            nonce++
        }
    }
    // Datasets are unmapped in a finalizer. Ensure that the dataset stays live
    // during sealing so it's not unmapped while being read.
    runtime.KeepAlive(dataset)
}

区块数据插入主链


    刚刚前面区块已经完整的打包好了,就等加入链了。从前面的代码可知,打包好的blocky已经传送到CpuAgent.returnCh这个通道了

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
    if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
        log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
        self.returnCh <- &Result{work, result}
}
那这个通道的接收方是谁呢?

func (self *worker) register(agent Agent) {
    self.mu.Lock()
    defer self.mu.Unlock()
    self.agents[agent] = struct{}{}
    agent.SetReturnCh(self.recv)
}

func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }


从上面的代码可以看出CpuAgent.returnCh就是woker.recv,所以接收打包后的block的对象是worker.recv,处理函数是worker.wait,具体如下

func (self *worker) wait() {
    for {
        mustCommitNewWork := true
        //等待获取打包好的block
        for result := range self.recv {
            atomic.AddInt32(&self.atWork, -1)

            if result == nil {
                continue
            }
            block := result.Block
            work := result.Work

            // Update the block hash in all logs since it is now available and not when the
            // receipt/log of individual transactions were created.
            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
                }
            }
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            }
            //将交易的state写入数据库及更新trie对象,这个后面会单独写一个章节介绍
            stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
                continue
            }
            // check if canon block and write transactions
            if stat == core.CanonStatTy {
                // implicit by posting ChainHeadEvent
                mustCommitNewWork = false
            }
            // Broadcast the block and announce chain insertion event
            //通知新块产生了
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs = work.state.Logs()
            )
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
            if stat == core.CanonStatTy {
                events = append(events, core.ChainHeadEvent{Block: block})
            }
            self.chain.PostChainEvents(events, logs)

            // Insert the block into the set of pending ones to wait for confirmations
            self.unconfirmed.Insert(block.NumberU64(), block.Hash())

            if mustCommitNewWork {
                //开始新块的挖掘
                self.commitNewWork()
            }
        }
    }
}

取消生产新区快

    我们知道POW模式下生产一个区块需要很长的时间,那生产的过程中就会发生很多事,比如本地开始从其他节点同步区块,这时就需要停止本地区块的生成

func (self *Miner) update() {
    events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
    for ev := range events.Chan() {
        switch ev.Data.(type) {
        case downloader.StartEvent:
            atomic.StoreInt32(&self.canStart, 0)
            if self.Mining() {
                self.Stop()
                atomic.StoreInt32(&self.shouldStart, 1)
                log.Info("Mining aborted due to sync")
            }
        case downloader.DoneEvent, downloader.FailedEvent:
            shouldStart := atomic.LoadInt32(&self.shouldStart) == 1

            atomic.StoreInt32(&self.canStart, 1)
            atomic.StoreInt32(&self.shouldStart, 0)
            if shouldStart {
                self.Start(self.coinbase)
            }
            // unsubscribe. we're only interested in this event once
            events.Unsubscribe()
            // stop immediately and ignore all further pending events
            break out
        }
    }
}

func (self *Miner) Stop() {
    self.worker.stop()
    atomic.StoreInt32(&self.mining, 0)
    atomic.StoreInt32(&self.shouldStart, 0)
}

func (self *worker) stop() {
    if atomic.LoadInt32(&self.mining) == 1 {
        for agent := range self.agents {
            agent.Stop()
        }
    }
}

func (self *CpuAgent) Stop() {
    self.stop <- struct{}{}
done:
    // Empty work channel
    for {
        select {
        //将所有的work清空
        case <-self.workCh:
        default:
            break done
        }
    }
}

func (self *CpuAgent) update() {
out:
    for {
        select {
        case <-self.stop:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
                self.quitCurrentOp = nil
            }
            self.mu.Unlock()
            break out
        }
    }
}

到这里为止,stop传递到了CpuAgent.quitCurrentOp,那这个通道又是如何影响最后pow逻辑的呢?
接着看


func (self *CpuAgent) update() {
out:
    for {
        select {
        case work := <-self.workCh:
            self.mu.Lock()
            if self.quitCurrentOp != nil {
                close(self.quitCurrentOp)
            }
            self.quitCurrentOp = make(chan struct{})
            go self.mine(work, self.quitCurrentOp)
}

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
    if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
        log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
        self.returnCh <- &Result{work, result}
        ….
    }
}

func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
    // Extract some data from the header
    for {
        select {
        case <-abort:
            // Mining terminated, update stats and abort
            logger.Trace("Ethash nonce search aborted", "attempts", nonce-seed)
            ethash.hashrate.Mark(attempts)
            break search
        
        default:
            // We don't have to update hash rate on every nonce, so update after after 2^X nonces
            attempts++
            if (attempts % (1 << 15)) == 0 {
                ethash.hashrate.Mark(attempts)
                attempts = 0
            }
            // Compute the PoW value of this nonce
            // 用POW算法计算nonce
            digest, result := hashimotoFull(dataset.dataset, hash, nonce)
            if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
                // Correct nonce found, create a new header with it
                header = types.CopyHeader(header)
                header.Nonce = types.EncodeNonce(nonce)
                header.MixDigest = common.BytesToHash(digest)

                // Seal and return a block (if still needed)
                select {
                //将最终数据返回给worker
                case found <- block.WithSeal(header):
                    logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
                case <-abort:
                    logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
                }
                break search
            }
            nonce++
}
从上可知,stop的动作最后传递到了hash碰撞循环逻辑,然后就会取消hash碰撞

/********************************
* 本文来自****博主"爱踢门"
* 转载请标明出处:http://blog.****.net/itleaks
******************************************/
以太坊源码情景分析之挖矿