当前位置: 首页 > news >正文

go-ethereum core之交易索引txIndexer

简介

为交易提供快速查询的能力,比如通过交易哈希找到交易详情,或者关联到所在的区块,txIndexer是负责管理和维护交易索引的核心组件​​

结构

txIndexer
ethdb.Database
type txIndexer struct {// limit is the maximum number of blocks from head whose tx indexes// are reserved://  * 0: means the entire chain should be indexed//  * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed//       and all others shouldn't.limit uint64// The current head of blockchain for transaction indexing. This field// is accessed by both the indexer and the indexing progress queries.head atomic.Uint64// The current tail of the indexed transactions, null indicates// that no transactions have been indexed yet.//// This field is accessed by both the indexer and the indexing// progress queries.tail atomic.Pointer[uint64]// cutoff denotes the block number before which the chain segment should// be pruned and not available locally.cutoff uint64db     ethdb.Databaseterm   chan chan struct{}closed chan struct{}
}

limit :索引范围限制​,控制​​需要保留交易索引的区块范围​​(从链的​​最新头部​​往回数)
head :当前索引的区块链头部​,记录​​当前已索引到的最新区块号​​(即索引进度跟进到哪条链)
tail:当前索引的交易尾部​,记录​​已索引的最老区块号​​(即索引范围的“起点”)
cutoff:索引修剪阈值​,指定​​链修剪的临界点​​——​​所有区块号小于 cutoff的交易索引将被永久删除​​(本地不再保留),清理过期的索引数据,避免存储无限增长的旧索引
db:底层索引数据库​,存储交易索引的​​持久化数据​​
term:终止信号通道​,用于​​通知 txIndexer协程终止
closed:关闭完成通知通道​,通知外部​​txIndexer已完全关闭

创建

在新建区块链时创建NewBlockChain,新建txIndexer时开启协程执行loop

//NewBlockChain
// Start tx indexer if it's enabled.
if bc.cfg.TxLookupLimit >= 0 {bc.txIndexer = newTxIndexer(uint64(bc.cfg.TxLookupLimit), bc)
}func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {cutoff, _ := chain.HistoryPruningCutoff()indexer := &txIndexer{limit:  limit,cutoff: cutoff,db:     chain.db,term:   make(chan chan struct{}),closed: make(chan struct{}),}indexer.head.Store(indexer.resolveHead())indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))go indexer.loop(chain)var msg stringif limit == 0 {if indexer.cutoff == 0 {msg = "entire chain"} else {msg = fmt.Sprintf("blocks since #%d", indexer.cutoff)}} else {msg = fmt.Sprintf("last %d blocks", limit)}log.Info("Initialized transaction indexer", "range", msg)return indexer
}func (indexer *txIndexer) loop(chain *BlockChain) {defer close(indexer.closed)// Listening to chain events and manipulate the transaction indexes.var (stop   chan struct{} // Non-nil if background routine is activedone   chan struct{} // Non-nil if background routine is activeheadCh = make(chan ChainHeadEvent)sub    = chain.SubscribeChainHeadEvent(headCh))defer sub.Unsubscribe()// Validate the transaction indexes and repair if necessaryhead := indexer.head.Load()indexer.repair(head)// Launch the initial processing if chain is not empty (head != genesis).// This step is useful in these scenarios that chain has no progress.if head != 0 {stop = make(chan struct{})done = make(chan struct{})go indexer.run(head, stop, done)}for {select {case h := <-headCh:indexer.head.Store(h.Header.Number.Uint64())if done == nil {stop = make(chan struct{})done = make(chan struct{})go indexer.run(h.Header.Number.Uint64(), stop, done)}case <-done:stop = nildone = nilindexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))case ch := <-indexer.term:if stop != nil {close(stop)}if done != nil {log.Info("Waiting background transaction indexer to exit")<-done}close(ch)return}}
}

索引执行loop

其步骤为

  • 初始化创建ChainHeadEvent通道,添加到区块链scope的订阅列表中
  • 从通道中读取到ChainHeadEvent时,开启协程执行txIndexer.run,执行完后关闭done通道
    • 索引区间块内的交易indexTransactions
      • 迭代区间内的交易iterateTransactions,返回交易索引通道
      • 从通道读取交易索引,写入数据库WriteTxLookupEntries
      • 更新交易索引中的最早区块号即tailWriteTxIndexTail
  • 从done通道读取,更新txIndexer.tail
  • 从term通道读取通道ch,关闭stop通道,读取done通道的数据,关闭通道ch
func (indexer *txIndexer) run(head uint64, stop chan struct{}, done chan struct{}) {defer func() { close(done) }()// Short circuit if the chain is either empty, or entirely below the// cutoff point.if head == 0 || head < indexer.cutoff {return}// The tail flag is not existent, it means the node is just initialized// and all blocks in the chain (part of them may from ancient store) are// not indexed yet, index the chain according to the configured limit.tail := rawdb.ReadTxIndexTail(indexer.db)if tail == nil {// Determine the first block for transaction indexing, taking the// configured cutoff point into account.from := uint64(0)if indexer.limit != 0 && head >= indexer.limit {from = head - indexer.limit + 1}from = max(from, indexer.cutoff)rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)return}// The tail flag is existent (which means indexes in [tail, head] should be// present), while the whole chain are requested for indexing.if indexer.limit == 0 || head < indexer.limit {if *tail > 0 {from := max(uint64(0), indexer.cutoff)rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)}return}// The tail flag is existent, adjust the index range according to configured// limit and the latest chain head.from := head - indexer.limit + 1from = max(from, indexer.cutoff)if from < *tail {// Reindex a part of missing indices and rewind index tail to HEAD-limitrawdb.IndexTransactions(indexer.db, from, *tail, stop, true)} else {// Unindex a part of stale indices and forward index tail to HEAD-limitrawdb.UnindexTransactions(indexer.db, *tail, from, stop, false)}
}func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {indexTransactions(db, from, to, interrupt, nil, report)
}func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {// short circuit for invalid rangeif from >= to {return}var (hashesCh = iterateTransactions(db, from, to, true, interrupt)batch    = db.NewBatch()start    = time.Now()logged   = start.Add(-7 * time.Second)// Since we iterate in reverse, we expect the first number to come// in to be [to-1]. Therefore, setting lastNum to means that the// queue gap-evaluation will work correctlylastNum     = toqueue       = prque.New[int64, *blockTxHashes](nil)blocks, txs = 0, 0 // for stats reporting)for chanDelivery := range hashesCh {// Push the delivery into the queue and process contiguous ranges.// Since we iterate in reverse, so lower numbers have lower prio, and// we can use the number directly as prio markerqueue.Push(chanDelivery, int64(chanDelivery.number))for !queue.Empty() {// If the next available item is gapped, returnif _, priority := queue.Peek(); priority != int64(lastNum-1) {break}// For testingif hook != nil && !hook(lastNum-1) {break}// Next block available, pop it off and index itdelivery := queue.PopItem()lastNum = delivery.numberWriteTxLookupEntries(batch, delivery.number, delivery.hashes)blocks++txs += len(delivery.hashes)// If enough data was accumulated in memory or we're at the last block, dump to diskif batch.ValueSize() > ethdb.IdealBatchSize {WriteTxIndexTail(batch, lastNum) // Also write the tail hereif err := batch.Write(); err != nil {log.Crit("Failed writing batch to db", "error", err)return}batch.Reset()}// If we've spent too much time already, notify the user of what we're doingif time.Since(logged) > 8*time.Second {log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))logged = time.Now()}}}// Flush the new indexing tail and the last committed data. It can also happen// that the last batch is empty because nothing to index, but the tail has to// be flushed anyway.WriteTxIndexTail(batch, lastNum)if err := batch.Write(); err != nil {log.Crit("Failed writing batch to db", "error", err)return}logger := log.Debugif report {logger = log.Info}select {case <-interrupt:logger("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))default:logger("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))}
}

迭代区块区间内的交易iterateTransactions

loopup:读取区块区间内的交易,构造numberRlp数据,添加到rlpCh通道中

type numberRlp struct {number uint64 //区块号rlp    rlp.RawValue //rlp数据
}

process:从rlpCh通道中读取数据,构造blockTxHashes数据,添加到hashesCh通道中


type blockTxHashes struct {number uint64 //区块号hashes []common.Hash //区块内所有交易的hash
}
func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {// One thread sequentially reads data from dbtype numberRlp struct {number uint64rlp    rlp.RawValue}if to == from {return nil}threads := to - fromif cpus := runtime.NumCPU(); threads > uint64(cpus) {threads = uint64(cpus)}var (rlpCh    = make(chan *numberRlp, threads*2)     // we send raw rlp over this channelhashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh)// lookup runs in one instancelookup := func() {n, end := from, toif reverse {n, end = to-1, from-1}defer close(rlpCh)for n != end {data := ReadCanonicalBodyRLP(db, n, nil)// Feed the block to the aggregator, or abort on interruptselect {case rlpCh <- &numberRlp{n, data}:case <-interrupt:return}if reverse {n--} else {n++}}}// process runs in parallelvar nThreadsAlive atomic.Int32nThreadsAlive.Store(int32(threads))process := func() {defer func() {// Last processor closes the result channelif nThreadsAlive.Add(-1) == 0 {close(hashesCh)}}()for data := range rlpCh {var body types.Bodyif err := rlp.DecodeBytes(data.rlp, &body); err != nil {log.Warn("Failed to decode block body", "block", data.number, "error", err)return}var hashes []common.Hashfor _, tx := range body.Transactions {hashes = append(hashes, tx.Hash())}result := &blockTxHashes{hashes: hashes,number: data.number,}// Feed the block to the aggregator, or abort on interruptselect {case hashesCh <- result:case <-interrupt:return}}}go lookup() // start the sequential db accessorfor i := 0; i < int(threads); i++ {go process()}return hashesCh
}
http://www.dtcms.com/a/532029.html

相关文章:

  • 描述对于营销型网站建设很重要飘红效果更佳信阳做网站 汉狮网络
  • 油猴脚本学习1——元数据头部
  • mysql 如何让事件执行
  • PantherX2 debain/armbian Jellyfin10.10.7升级10.11启动后无法监听端口8096的解决办法
  • 网站建设利弊中山币做网站公司
  • Kaleidoscope for mac 文件对比工具
  • LeetCode 1901.寻找峰值2
  • 沈阳建设网站费用北京网站优化软件
  • 【Android】【底层原理】深入解析SELinux模块
  • 阮一峰《TypeScript 教程》学习笔记——注释指令
  • 最好的开发网站建设价格app免费制作平台下载
  • 供应链进销存源码uniapp全开源ERP多仓库管理系统pc+app手机端
  • 生物化学Learning Track(15)酶活性的调节
  • 站酷设计网站官网入口下载国外手机设计网站
  • Rust 泛型与特性
  • GPT-1 技术报告
  • 中英双语 网站 模板wordpress返佣
  • 机器学习库的决策树绘制
  • k8s的calico出现ipset报错解决方法
  • SimpleDateFormat
  • 网站后期的维护和更新池州网站建设推广
  • 低空经济的实时神经系统:空地一体化音视频架构的技术演进
  • 更换MacbookAir固态硬盘,并用U盘安装MacOS操作系统
  • 创建一个简单的SpringBoot
  • 硅基计划2.0 学习总结 玖 图书管理系统 2.0复盘版(文字末尾源码可复制)
  • 河北省建设信息网站十种营销方法
  • qt调用摄像头进行yolo的实时检测
  • 网站备案通过什么可以备案wordpress熊掌号插件
  • IntelliJ IDEA 使用 Lombok 报错:“Lombok requires enabled annotation processing” 解决方案
  • qtmqtt: 一个开源且好用的mqtt开源客户端