以太坊源码情景分析之挖矿
数据结构分析
- Miner类
它是对外类,暴露给外部模块使用的, 具体挖矿的事务由它的成员变量worker管理
它的update函数负责监听downloader的各种事件(收到新块事件等),恰当的通知worker启动或停止mine,以防止多个新块出现。
- Work类
这个是一个数据类,里面包含了交易数据(transactions)及收据(receipts)
-
worker类
它负责准备区管理组装事务,比如什么时候开始挖新块,停止挖块,使用engine组装Work, 交给agent做seal,即使用pow/clique打包块
- Engine类:
共识算法的实现,比如区块头的格式,难度,pow/clique算法
- CpuAgent类:
继承Agent, 只是一种agent,调用具体的engine执行seal,即pow/clique算法,找出合适的nonce,填充block的剩余字段数据
该类处理完后的最终结果通过returnCh(chan *Result)通道唤醒worker.wait函数,worker.wait拿到完整的block数据然后插入chain并通知其他节点
其实,还有其他agent的实现类,比如remoteagent实现矿池各个节点共同计算
总结:
一个新区块被挖掘出的过程其实就是Block对象数据被填充的过程,这个填充过程可以分为两个步骤
- 一是组装出一个新区块,这个区块的数据基本完整,包括成员Header的大部分属性以及交易列表txs,叔区块uncles[],并且所有交易已经执行完毕,所有收据(Receipt)也已收集完毕,这部分主要由worker完成
- 二是填补该区块剩余的成员属性,比如Header.Difficulty, Header.nonce, Header.mixdigest等,这些工作是由Agent调用<Engine>接口实现体,利用共识算法来完成的。
下面以块生成事件驱动过程来分析挖矿的整个过程
生成新区块
worker.update函数会监听各种事件,当发现有ChainHeadEvent事件时即开始miner新区块, ChainHeadEvent说明新块已经添加到区块链,因而可以挖新区块了
区块数据准备
func (self *worker) update() {
defer self.txSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
for {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.commitNewWork()
}
func (self *worker) commitNewWork() {
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent),
Extra: self.extra,
Time: big.NewInt(tstamp),
}
// Only set the coinbase if we are mining (avoid spurious block rewards)
if atomic.LoadInt32(&self.mining) == 1 {
header.Coinbase = self.coinbase
}
//header数据的准备,比如计算难度
if err := self.engine.Prepare(self.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
// Could potentially happen if starting to mine in an odd state.
//新创建一个Work,并设置为worker.current
err := self.makeCurrent(parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
}
// Create the current work task and check any fork transitions needed
work := self.current
//取出pending交易用来打包到block中
pending, err := self.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
//执行transaction(比如智能合约),这个函数会生成work.reciepts
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
// compute uncles for the new block.
var (
uncles []*types.Header
badUncles []common.Hash
)
//挑选至多2个uncle block数据打包到block中
for hash, uncle := range self.possibleUncles {
if len(uncles) == 2 {
break
}
if err := self.commitUncle(work, uncle.Header()); err != nil {
log.Trace("Bad uncle found and will be removed", "hash", hash)
log.Trace(fmt.Sprint(uncle))
badUncles = append(badUncles, hash)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
}
// Create the new block to seal with the consensus engine
if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
log.Error("Failed to finalize block for sealing", "err", err)
return
}
self.push(work)
}
func (self *worker) push(work *Work) {
if atomic.LoadInt32(&self.mining) != 1 {
return
}
for agent := range self.agents {
atomic.AddInt32(&self.atWork, 1)
if ch := agent.Work(); ch != nil {
ch <- work
}
}
}
- 准备新区块的时间Time(系统时间),Num(父区块Num +1),ParentHash;其余字段比如Difficulty,GasLimit, nonce, root等,均留待之后共识算法中确定。
- 调用Engine.Prepare()函数,完成Header对象的字段的初始值。
- 根据已有的Header对象,创建一个新的Work对象,并用其更新worker.current成员变量。
- 准备新区块的交易列表,来源是TxPool中那些最近加入的tx,并执行这些交易。
- 准备新区块的叔区块uncles[],来源是worker.possibleUncles[],而possibleUncles[]中的每个区块都从事件ChainSideEvent中搜集得到, 叔区块最多有两个。
- 调用Engine.Finalize()函数,对新区块“定型”,填充上Header.Root, TxHash, ReceiptHash, UncleHash等几个属性。
- 把创建的Work对象,通过channel发送给每一个登记过的Agent,进行后续的挖掘。(self.push(work))
后续挖掘是执行POW/Clique算法,计算出nonce,并填充mixdigest等字段
区块数据打包
数据打包其实就是根据共识算法进一步填充block里的信息,这个由Agent(CpuAgent)来执行
func (self *CpuAgent) update() {
out:
for {
select {
//接收到新的work
case work := <-self.workCh:
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
}
self.quitCurrentOp = make(chan struct{})
go self.mine(work, self.quitCurrentOp)
case <-self.stop:
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
self.quitCurrentOp = nil
}
break out
}
}
}
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
self.returnCh <- &Result{work, result}
} else {
if err != nil {
log.Warn("Block sealing failed", "err", err)
}
self.returnCh <- nil
}
}
func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
// If we're running a fake PoW, simply return a 0 nonce immediately
// Create a runner and the multiple search threads it directs
abort := make(chan struct{})
found := make(chan *types.Block)
ethash.lock.Lock()
threads := ethash.threads
ethash.lock.Unlock()
if threads == 0 {
threads = runtime.NumCPU()
}
if threads < 0 {
threads = 0 // Allows disabling local mining without extra logic around local/remote
}
var pend sync.WaitGroup
//多线程挖矿
for i := 0; i < threads; i++ {
pend.Add(1)
go func(id int, nonce uint64) {
defer pend.Done()
// 执行Pow挖矿逻辑
ethash.mine(block, id, nonce, abort, found)
}(i, uint64(ethash.rand.Int63()))
}
// Wait until sealing is terminated or a nonce is found
var result *types.Block
select {
case <-stop:
// Outside abort, stop all miner threads
close(abort)
case result = <-found:
// 等待直至挖出一个新块,返回
// One of the threads found a block, abort all others
close(abort)
case <-ethash.update:
// 用新配置重新挖矿
// Thread count was changed on user request, restart
close(abort)
pend.Wait()
return ethash.Seal(chain, block, stop)
}
// Wait for all miners to terminate and return the block
pend.Wait()
return result, nil
}
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
// Extract some data from the header
var (
header = block.Header()
hash = header.HashNoNonce().Bytes()
//根据难度系数计算出pow目标值target
target = new(big.Int).Div(maxUint256, header.Difficulty)
number = header.Number.Uint64()
dataset = ethash.dataset(number)
)
// Start generating random nonces until we abort or find a good one
var (
attempts = int64(0)
//nonce初始值,后面就不停循环递增碰撞
nonce = seed
)
//无穷循环执行POW碰撞
for {
select {
default:
// We don't have to update hash rate on every nonce, so update after after 2^X nonces
attempts++
if (attempts % (1 << 15)) == 0 {
ethash.hashrate.Mark(attempts)
attempts = 0
}
// Compute the PoW value of this nonce
// 用POW算法计算nonce
digest, result := hashimotoFull(dataset.dataset, hash, nonce)
// Correct nonce found, create a new header with it
header = types.CopyHeader(header)
header.Nonce = types.EncodeNonce(nonce)
header.MixDigest = common.BytesToHash(digest)
// Seal and return a block (if still needed)
select {
//将最终数据返回给worker
case found <- block.WithSeal(header):
logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
case <-abort:
logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
}
break search
}
nonce++
}
}
// Datasets are unmapped in a finalizer. Ensure that the dataset stays live
// during sealing so it's not unmapped while being read.
runtime.KeepAlive(dataset)
}
区块数据插入主链
刚刚前面区块已经完整的打包好了,就等加入链了。从前面的代码可知,打包好的blocky已经传送到CpuAgent.returnCh这个通道了
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
self.returnCh <- &Result{work, result}
}
那这个通道的接收方是谁呢?
func (self *worker) register(agent Agent) {
self.agents[agent] = struct{}{}
agent.SetReturnCh(self.recv)
}
func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
从上面的代码可以看出CpuAgent.returnCh就是woker.recv,所以接收打包后的block的对象是worker.recv,处理函数是worker.wait,具体如下
func (self *worker) wait() {
for {
mustCommitNewWork := true
//等待获取打包好的block
for result := range self.recv {
atomic.AddInt32(&self.atWork, -1)
if result == nil {
continue
}
block := result.Block
work := result.Work
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, r := range work.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
}
for _, log := range work.state.Logs() {
log.BlockHash = block.Hash()
}
//将交易的state写入数据库及更新trie对象,这个后面会单独写一个章节介绍
stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
// check if canon block and write transactions
if stat == core.CanonStatTy {
// implicit by posting ChainHeadEvent
mustCommitNewWork = false
}
// Broadcast the block and announce chain insertion event
//通知新块产生了
self.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = work.state.Logs()
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
events = append(events, core.ChainHeadEvent{Block: block})
}
self.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to wait for confirmations
self.unconfirmed.Insert(block.NumberU64(), block.Hash())
if mustCommitNewWork {
//开始新块的挖掘
self.commitNewWork()
}
}
}
}
取消生产新区快
我们知道POW模式下生产一个区块需要很长的时间,那生产的过程中就会发生很多事,比如本地开始从其他节点同步区块,这时就需要停止本地区块的生成
func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
for ev := range events.Chan() {
switch ev.Data.(type) {
case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case downloader.DoneEvent, downloader.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
}
// unsubscribe. we're only interested in this event once
events.Unsubscribe()
// stop immediately and ignore all further pending events
break out
}
}
}
func (self *Miner) Stop() {
self.worker.stop()
atomic.StoreInt32(&self.mining, 0)
atomic.StoreInt32(&self.shouldStart, 0)
}
func (self *worker) stop() {
if atomic.LoadInt32(&self.mining) == 1 {
for agent := range self.agents {
agent.Stop()
}
}
}
func (self *CpuAgent) Stop() {
self.stop <- struct{}{}
done:
// Empty work channel
for {
select {
//将所有的work清空
case <-self.workCh:
default:
break done
}
}
}
到这里为止,stop传递到了CpuAgent.quitCurrentOp,那这个通道又是如何影响最后pow逻辑的呢?
接着看
func (self *CpuAgent) update() {
out:
for {
select {
case work := <-self.workCh:
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
}
self.quitCurrentOp = make(chan struct{})
go self.mine(work, self.quitCurrentOp)
}
func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
self.returnCh <- &Result{work, result}
….
}
}
func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
// Extract some data from the header
for {
select {
case <-abort:
// Mining terminated, update stats and abort
logger.Trace("Ethash nonce search aborted", "attempts", nonce-seed)
ethash.hashrate.Mark(attempts)
break search
default:
// We don't have to update hash rate on every nonce, so update after after 2^X nonces
attempts++
if (attempts % (1 << 15)) == 0 {
ethash.hashrate.Mark(attempts)
attempts = 0
}
// Compute the PoW value of this nonce
// 用POW算法计算nonce
digest, result := hashimotoFull(dataset.dataset, hash, nonce)
// Correct nonce found, create a new header with it
header = types.CopyHeader(header)
header.Nonce = types.EncodeNonce(nonce)
header.MixDigest = common.BytesToHash(digest)
// Seal and return a block (if still needed)
select {
//将最终数据返回给worker
case found <- block.WithSeal(header):
logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
case <-abort:
logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
}
break search
}
nonce++
}
从上可知,stop的动作最后传递到了hash碰撞循环逻辑,然后就会取消hash碰撞
/********************************
* 本文来自****博主"爱踢门"
******************************************/