首页 > 编程语言 >以太坊源码阅读---一笔交易从生到死(一) txpool

以太坊源码阅读---一笔交易从生到死(一) txpool

时间:2022-12-28 16:58:11浏览次数:61  
标签:transaction return tx err nil txpool 从生到死 源码 pool

   markdown太难写了记不住。跑到博客园来写写 今年还好 不是原地踏步的一年,最近在家办公工作转成半个区块链安全工程师(依旧不热爱这个行业,但是区块链技术还是挺有意思的)

   先小总结下

   1.学了solidity(入门容易 语法简单)

    2.简单学了学golang语法(写个小脚本啥的不难 鸭子类型看得有点绕)

    3.阅读、梳理了以太坊源码(30%进度吧  rlp解码没看 矿工看了一点  共识引擎没看  p2p简单看了下 生命周期 接口管理 evm没细看 tx基本看了下   差不多就这些? ) 很伟大的工程 坑也挺多的。

简单总结下一笔交易流程 a->b转账

在节点控制台的话直接调用eth.sendTransaction( 就好了 然而我们一般都用小狐狸  

 

小狐狸发的就是 "method":"eth_SendTransaction"  

以太坊的接口怎么找对应的方法呢 在注册的时候会把api注册进去  有两个地方会注册  一个是启动的时候node.New里面会调用node.rpcAPIs = append(node.rpcAPIs, node.apis()...)方法还有就是 eth.New(stack, cfg)的时候也会调用stack.RegisterAPIs(eth.APIs())  方法

转账接口就注册在这里面  以太坊根据method 以"_"切割前面是namespace  后面是对应的方法

func (s *Ethereum) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.APIBackend) //  !!

    // Append any APIs exposed explicitly by the consensus engine
    apis = append(apis, s.engine.APIs(s.BlockChain())...)

    // Append all the local APIs and return
    return append(apis, []rpc.API{
        {
            Namespace: "eth",
            Service:   NewEthereumAPI(s),
        }, {
            Namespace: "miner",
            Service:   NewMinerAPI(s),
        }, {
            Namespace: "eth",
            Service:   downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
        }, {
            Namespace: "admin",
            Service:   NewAdminAPI(s),
        }, {
            Namespace: "debug",
            Service:   NewDebugAPI(s),
        }, {
            Namespace: "net",
            Service:   s.netRPCService,
        },
    }...)
}

转账还不在这 apis := ethapi.GetAPIs(s.APIBackend)在这里 

            Namespace: "eth", //  eth_xxxx
            Service:   NewTransactionAPI(apiBackend, nonceLock),

 

 NewTransactionAPI  返回的TransactionAPI  结构体

然后看下哪个方法绑定了TransactionAPI结构体并且方法名叫SendTransaction就好了

在internal/ethapi/api.go这里了

func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionArgs) (common.Hash, error) {
    // Look up the wallet containing the requested signer
    account := accounts.Account{Address: args.from()}

    wallet, err := s.b.AccountManager().Find(account)
    if err != nil {
        return common.Hash{}, err
    }

    if args.Nonce == nil {
        // Hold the addresse's mutex around signing to prevent concurrent assignment of
        // the same nonce to multiple accounts.
        s.nonceLock.LockAddr(args.from())
        defer s.nonceLock.UnlockAddr(args.from())
    }

    // Set some sanity defaults and terminate on failure
    if err := args.setDefaults(ctx, s.b); err != nil {
        return common.Hash{}, err
    }
    // Assemble the transaction and sign with the wallet
    tx := args.toTransaction()

    signed, err := wallet.SignTx(account, tx, s.b.ChainConfig().ChainID)
    if err != nil {
        return common.Hash{}, err
    }
    return SubmitTransaction(ctx, s.b, signed)
}

代码先是本地找下参数中的from  然后如果没传nonce的话加个nonce锁这里说是防止同一个随机数分发 

接下来设置些默认属性 setDefaults 里面有些判断啥的比如to为空data也是空那么就报错(to为空就是创建合约 )

然后把参数设置到Transaction结构体里面 SubmitTransaction 看着是提交其实哪有那么容易

func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
    // If the transaction fee cap is already specified, ensure the
    // fee of the given transaction is _reasonable_.
    if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
        return common.Hash{}, err
    }
    if !b.UnprotectedAllowed() && !tx.Protected() {
        // Ensure only eip155 signed transactions are submitted if EIP155Required is set.
        return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
    }
    if err := b.SendTx(ctx, tx); err != nil { // EthAPIBackend   提交到   txpool
        return common.Hash{}, err
    }
    // Print a log with full tx details for manual investigations and interventions
    signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
    from, err := types.Sender(signer, tx)
    if err != nil {
        return common.Hash{}, err
    }

    if tx.To() == nil {
        addr := crypto.CreateAddress(from, tx.Nonce())
        log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
    } else {
        log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
    }
    return tx.Hash(), nil
}

先校验手续费 然后是eip155 的校验 防止重放攻击

接下来就是b.SendTx 提交到pool池子里 SendTx有两种实现 一种是les轻节点一种是全节点  我们看全节点就好 一路跟

func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
    // Filter out known ones without obtaining the pool lock or recovering signatures
    var (
        errs = make([]error, len(txs))
        news = make([]*types.Transaction, 0, len(txs))
    )
    for i, tx := range txs {
        // If the transaction is known, pre-set the error slot
        if pool.all.Get(tx.Hash()) != nil { //  限制下提交了在提交没用了
            errs[i] = ErrAlreadyKnown
            knownTxMeter.Mark(1)
            continue
        }
        // Exclude transactions with invalid signatures as soon as
        // possible and cache senders in transactions before
        // obtaining lock
        _, err := types.Sender(pool.signer, tx)
        if err != nil {
            errs[i] = ErrInvalidSender
            invalidTxMeter.Mark(1)
            continue
        }
        // Accumulate all unknown transactions for deeper processing
        news = append(news, tx)
    }
    if len(news) == 0 {
        return errs
    }

    // Process all the new transaction and merge any errors into the original slice
    pool.mu.Lock()
    newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
    pool.mu.Unlock()

    var nilSlot = 0
    for _, err := range newErrs {
        for errs[nilSlot] != nil {
            nilSlot++
        }
        errs[nilSlot] = err
        nilSlot++
    }
    // Reorg the pool internals if needed and return
    done := pool.requestPromoteExecutables(dirtyAddrs)
    if sync {
        <-done
    }
    return errs
}

先是for循环 校验重复提交、签名

接下来加锁 pool.addTxsLocked(news, local) 提交

 

func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
    dirty := newAccountSet(pool.signer)
    errs := make([]error, len(txs))
    for i, tx := range txs {
        replaced, err := pool.add(tx, local) // 提交
        errs[i] = err
        if err == nil && !replaced {
            dirty.addTx(tx)
        }
    }
    validTxMeter.Mark(int64(len(dirty.accounts)))
    return errs, dirty
}

pool.add(tx, local) 提交

func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
    // If the transaction is already known, discard it
    hash := tx.Hash() // 检查当前的交易是否已经知晓
    if pool.all.Get(hash) != nil {
        log.Trace("Discarding already known transaction", "hash", hash)
        knownTxMeter.Mark(1)
        return false, ErrAlreadyKnown
    }
    // Make the local flag. If it's from local source or it's from the network but
    // the sender is marked as local previously, treat it as the local transaction.
    isLocal := local || pool.locals.containsTx(tx)

    // If the transaction fails basic validation, discard it
    if err := pool.validateTx(tx, isLocal); err != nil { // 验证 重点
        log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
        invalidTxMeter.Mark(1)
        return false, err
    }
    // If the transaction pool is full, discard underpriced transactions   检查交易池 满了放弃低gas交易   没细看
    if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
        // If the new transaction is underpriced, don't accept it
        if !isLocal && pool.priced.Underpriced(tx) {
            log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
            underpricedTxMeter.Mark(1)
            return false, ErrUnderpriced
        }
        // We're about to replace a transaction. The reorg does a more thorough
        // analysis of what to remove and how, but it runs async. We don't want to
        // do too many replacements between reorg-runs, so we cap the number of
        // replacements to 25% of the slots
        if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
            throttleTxMeter.Mark(1)
            return false, ErrTxPoolOverflow
        }

        // New transaction is better than our worse ones, make room for it.
        // If it's a local transaction, forcibly discard all available transactions.
        // Otherwise if we can't make enough room for new one, abort the operation.
        drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)

        // Special case, we still can't make the room for the new remote one.
        if !isLocal && !success {
            log.Trace("Discarding overflown transaction", "hash", hash)
            overflowedTxMeter.Mark(1)
            return false, ErrTxPoolOverflow
        }
        // Bump the counter of rejections-since-reorg
        pool.changesSinceReorg += len(drop)
        // Kick out the underpriced remote transactions.
        for _, tx := range drop {
            log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
            underpricedTxMeter.Mark(1)
            pool.removeTx(tx.Hash(), false)
        }
    }
    // Try to replace an existing transaction in the pending pool
    from, _ := types.Sender(pool.signer, tx) // already validated
    if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
        // Nonce already pending, check if required price bump is met
        //         这里做  nonce 相同的 tx 替换  (替换指的是  替换 pending 中的tx)
        //         Nonce已经挂起,检查是否满足所需的价格暴涨
        //         PriceBump (默认为: 10)
        inserted, old := list.Add(tx, pool.config.PriceBump) // 提交
        if !inserted {
            pendingDiscardMeter.Mark(1)
            return false, ErrReplaceUnderpriced
        }
        // New transaction is better, replace old one
        if old != nil {
            pool.all.Remove(old.Hash())
            pool.priced.Removed(1)
            pendingReplaceMeter.Mark(1)
        }

        pool.all.Add(tx, isLocal)
        pool.priced.Put(tx, isLocal)
        // 将属于本地账户的tx写入磁盘
        pool.journalTx(from, tx)
        ///  新的重新排队?
        pool.queueTxEvent(tx)
        log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

        // Successful promotion, bump the heartbeat
        pool.beats[from] = time.Now()
        return old != nil, nil
    }
    // New transaction isn't replacing a pending one, push into queue
    replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
    if err != nil {
        return false, err
    }
    // Mark local addresses and journal local transactions
    if local && !pool.locals.contains(from) {
        log.Info("Setting new local account", "address", from)
        pool.locals.add(from)
        pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
    }
    if isLocal {
        localGauge.Inc(1)
    }
    pool.journalTx(from, tx)

    log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
    return replaced, nil
}

提交代码巨特么长

先是校验hash  又来一遍

pool.validateTx 验证tx  这个是重点 等会再说

然后是交易池子满了的话放弃低gas交易

接下来如果nonce相同 且在pending池子里面 把gas高得替换了 这里替换的是pending的tx  有点绕其实这就是抢交易的问题  以太坊pool池子里面 有两种字典 一种是pending 一种是queue  pending顾名思义就是等待执行的tx 另一种queue就是在排队的tx  pending高于queue

 queueTxEvent 排队也就是把tx扔到queueTxEventCh通道里面

如果交易是新的没走上面流程那么久调用enqueuetx 把交易扔到queue池子里面  queue池子同样也可能存在同nonce的交易  如果有并且替换会返回一个bool  ture然后将这个tx扔到reqPromoteCh 来做交易升级处理 即queue->pending

scheduleReorgLoop 

func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
    defer func(t0 time.Time) {
        reorgDurationTimer.Update(time.Since(t0))
    }(time.Now())
    defer close(done)

    var promoteAddrs []common.Address
    if dirtyAccounts != nil && reset == nil {
        // Only dirty accounts need to be promoted, unless we're resetting.
        // For resets, all addresses in the tx queue will be promoted and
        // the flatten operation can be avoided.
        promoteAddrs = dirtyAccounts.flatten()
    }
    pool.mu.Lock()
    if reset != nil {
        // Reset from the old head to the new, rescheduling any reorged transactions
        pool.reset(reset.oldHead, reset.newHead) //重置交易池子

        // Nonces were reset, discard any events that became stale
        for addr := range events {
            events[addr].Forward(pool.pendingNonces.get(addr))
            if events[addr].Len() == 0 {
                delete(events, addr)
            }
        }
        // Reset needs promote for all addresses
        promoteAddrs = make([]common.Address, 0, len(pool.queue))
        for addr := range pool.queue {
            promoteAddrs = append(promoteAddrs, addr)
        }
    }
    // Check for pending transactions for every account that sent new ones  //检查  交易升级
    promoted := pool.promoteExecutables(promoteAddrs)

    // If a new block appeared, validate the pool of pending transactions. This will
    // remove any transaction that has been included in the block or was invalidated
    // because of another transaction (e.g. higher gas price).
    if reset != nil {
        pool.demoteUnexecutables() //交易降级
        if reset.newHead != nil && pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
            pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead)
            pool.priced.SetBaseFee(pendingBaseFee)
        }
        // Update all accounts to the latest known pending nonce
        nonces := make(map[common.Address]uint64, len(pool.pending))
        for addr, list := range pool.pending {
            highestPending := list.LastElement()
            nonces[addr] = highestPending.Nonce() + 1
        }
        pool.pendingNonces.setAll(nonces)
    }
    // Ensure pool.queue and pool.pending sizes stay within the configured limits.
    pool.truncatePending()
    pool.truncateQueue()

    dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
    pool.changesSinceReorg = 0 // Reset change counter
    pool.mu.Unlock()

    // Notify subsystems for newly added transactions
    for _, tx := range promoted {
        addr, _ := types.Sender(pool.signer, tx)
        if _, ok := events[addr]; !ok {
            events[addr] = newTxSortedMap()
        }
        events[addr].Put(tx)
    }
    if len(events) > 0 {
        var txs []*types.Transaction
        for _, set := range events {
            txs = append(txs, set.Flatten()...)
        }
        pool.txFeed.Send(NewTxsEvent{txs})
    }
}

 

循环里会开个协程来执行runReorg操作  主要是交易升降级的处理  以及关键的txfeed订阅事件发送  升降级处理没啥好说的 就是pending->queue转换  最后的tx都会通过订阅机制发送出去

txFeed即为SubscribeNewTxsEvent的订阅   订阅SubscribeNewTxsEvent事件的一共有两个程序 一个是广播程序 另一个就是矿工

worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)

 矿工接下来就是执行evm操作了 这个篇幅也很长 下一篇在写

  

 

        TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back     此页面的语言为英语   翻译为中文(简体)        
  • 中文(简体)
  • 中文(繁体)
  • 丹麦语
  • 乌克兰语
  • 乌尔都语
  • 亚美尼亚语
  • 俄语
  • 保加利亚语
  • 克罗地亚语
  • 冰岛语
  • 加泰罗尼亚语
  • 匈牙利语
  • 卡纳达语
  • 印地语
  • 印尼语
  • 古吉拉特语
  • 哈萨克语
  • 土耳其语
  • 威尔士语
  • 孟加拉语
  • 尼泊尔语
  • 布尔语(南非荷兰语)
  • 希伯来语
  • 希腊语
  • 库尔德语
  • 德语
  • 意大利语
  • 拉脱维亚语
  • 挪威语
  • 捷克语
  • 斯洛伐克语
  • 斯洛文尼亚语
  • 旁遮普语
  • 日语
  • 普什图语
  • 毛利语
  • 法语
  • 波兰语
  • 波斯语
  • 泰卢固语
  • 泰米尔语
  • 泰语
  • 海地克里奥尔语
  • 爱沙尼亚语
  • 瑞典语
  • 立陶宛语
  • 缅甸语
  • 罗马尼亚语
  • 老挝语
  • 芬兰语
  • 英语
  • 荷兰语
  • 萨摩亚语
  • 葡萄牙语
  • 西班牙语
  • 越南语
  • 阿塞拜疆语
  • 阿姆哈拉语
  • 阿尔巴尼亚语
  • 阿拉伯语
  • 韩语
  • 马尔加什语
  • 马拉地语
  • 马拉雅拉姆语
  • 马来语
  • 马耳他语
  • 高棉语
 

标签:transaction,return,tx,err,nil,txpool,从生到死,源码,pool
From: https://www.cnblogs.com/bfpiaoran/p/17009195.html

相关文章