go-ethereum core之交易索引txIndexer
简介
为交易提供快速查询的能力,比如通过交易哈希找到交易详情,或者关联到所在的区块,txIndexer是负责管理和维护交易索引的核心组件
结构
type txIndexer struct {// limit is the maximum number of blocks from head whose tx indexes// are reserved:// * 0: means the entire chain should be indexed// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed// and all others shouldn't.limit uint64// The current head of blockchain for transaction indexing. This field// is accessed by both the indexer and the indexing progress queries.head atomic.Uint64// The current tail of the indexed transactions, null indicates// that no transactions have been indexed yet.//// This field is accessed by both the indexer and the indexing// progress queries.tail atomic.Pointer[uint64]// cutoff denotes the block number before which the chain segment should// be pruned and not available locally.cutoff uint64db ethdb.Databaseterm chan chan struct{}closed chan struct{}
}
limit :索引范围限制,控制需要保留交易索引的区块范围(从链的最新头部往回数)
head :当前索引的区块链头部,记录当前已索引到的最新区块号(即索引进度跟进到哪条链)
tail:当前索引的交易尾部,记录已索引的最老区块号(即索引范围的“起点”)
cutoff:索引修剪阈值,指定链修剪的临界点——所有区块号小于 cutoff的交易索引将被永久删除(本地不再保留),清理过期的索引数据,避免存储无限增长的旧索引
db:底层索引数据库,存储交易索引的持久化数据
term:终止信号通道,用于通知 txIndexer协程终止
closed:关闭完成通知通道,通知外部txIndexer已完全关闭
创建
在新建区块链时创建NewBlockChain,新建txIndexer时开启协程执行loop
//NewBlockChain
// Start tx indexer if it's enabled.
if bc.cfg.TxLookupLimit >= 0 {bc.txIndexer = newTxIndexer(uint64(bc.cfg.TxLookupLimit), bc)
}func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {cutoff, _ := chain.HistoryPruningCutoff()indexer := &txIndexer{limit: limit,cutoff: cutoff,db: chain.db,term: make(chan chan struct{}),closed: make(chan struct{}),}indexer.head.Store(indexer.resolveHead())indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))go indexer.loop(chain)var msg stringif limit == 0 {if indexer.cutoff == 0 {msg = "entire chain"} else {msg = fmt.Sprintf("blocks since #%d", indexer.cutoff)}} else {msg = fmt.Sprintf("last %d blocks", limit)}log.Info("Initialized transaction indexer", "range", msg)return indexer
}func (indexer *txIndexer) loop(chain *BlockChain) {defer close(indexer.closed)// Listening to chain events and manipulate the transaction indexes.var (stop chan struct{} // Non-nil if background routine is activedone chan struct{} // Non-nil if background routine is activeheadCh = make(chan ChainHeadEvent)sub = chain.SubscribeChainHeadEvent(headCh))defer sub.Unsubscribe()// Validate the transaction indexes and repair if necessaryhead := indexer.head.Load()indexer.repair(head)// Launch the initial processing if chain is not empty (head != genesis).// This step is useful in these scenarios that chain has no progress.if head != 0 {stop = make(chan struct{})done = make(chan struct{})go indexer.run(head, stop, done)}for {select {case h := <-headCh:indexer.head.Store(h.Header.Number.Uint64())if done == nil {stop = make(chan struct{})done = make(chan struct{})go indexer.run(h.Header.Number.Uint64(), stop, done)}case <-done:stop = nildone = nilindexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))case ch := <-indexer.term:if stop != nil {close(stop)}if done != nil {log.Info("Waiting background transaction indexer to exit")<-done}close(ch)return}}
}
索引执行loop
其步骤为
- 初始化创建ChainHeadEvent通道,添加到区块链scope的订阅列表中
- 从通道中读取到ChainHeadEvent时,开启协程执行
txIndexer.run,执行完后关闭done通道- 索引区间块内的交易
indexTransactions- 迭代区间内的交易
iterateTransactions,返回交易索引通道 - 从通道读取交易索引,写入数据库
WriteTxLookupEntries - 更新交易索引中的最早区块号即tail
WriteTxIndexTail
- 迭代区间内的交易
- 索引区间块内的交易
- 从done通道读取,更新txIndexer.tail
- 从term通道读取通道ch,关闭stop通道,读取done通道的数据,关闭通道ch
func (indexer *txIndexer) run(head uint64, stop chan struct{}, done chan struct{}) {defer func() { close(done) }()// Short circuit if the chain is either empty, or entirely below the// cutoff point.if head == 0 || head < indexer.cutoff {return}// The tail flag is not existent, it means the node is just initialized// and all blocks in the chain (part of them may from ancient store) are// not indexed yet, index the chain according to the configured limit.tail := rawdb.ReadTxIndexTail(indexer.db)if tail == nil {// Determine the first block for transaction indexing, taking the// configured cutoff point into account.from := uint64(0)if indexer.limit != 0 && head >= indexer.limit {from = head - indexer.limit + 1}from = max(from, indexer.cutoff)rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)return}// The tail flag is existent (which means indexes in [tail, head] should be// present), while the whole chain are requested for indexing.if indexer.limit == 0 || head < indexer.limit {if *tail > 0 {from := max(uint64(0), indexer.cutoff)rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)}return}// The tail flag is existent, adjust the index range according to configured// limit and the latest chain head.from := head - indexer.limit + 1from = max(from, indexer.cutoff)if from < *tail {// Reindex a part of missing indices and rewind index tail to HEAD-limitrawdb.IndexTransactions(indexer.db, from, *tail, stop, true)} else {// Unindex a part of stale indices and forward index tail to HEAD-limitrawdb.UnindexTransactions(indexer.db, *tail, from, stop, false)}
}func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {indexTransactions(db, from, to, interrupt, nil, report)
}func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {// short circuit for invalid rangeif from >= to {return}var (hashesCh = iterateTransactions(db, from, to, true, interrupt)batch = db.NewBatch()start = time.Now()logged = start.Add(-7 * time.Second)// Since we iterate in reverse, we expect the first number to come// in to be [to-1]. Therefore, setting lastNum to means that the// queue gap-evaluation will work correctlylastNum = toqueue = prque.New[int64, *blockTxHashes](nil)blocks, txs = 0, 0 // for stats reporting)for chanDelivery := range hashesCh {// Push the delivery into the queue and process contiguous ranges.// Since we iterate in reverse, so lower numbers have lower prio, and// we can use the number directly as prio markerqueue.Push(chanDelivery, int64(chanDelivery.number))for !queue.Empty() {// If the next available item is gapped, returnif _, priority := queue.Peek(); priority != int64(lastNum-1) {break}// For testingif hook != nil && !hook(lastNum-1) {break}// Next block available, pop it off and index itdelivery := queue.PopItem()lastNum = delivery.numberWriteTxLookupEntries(batch, delivery.number, delivery.hashes)blocks++txs += len(delivery.hashes)// If enough data was accumulated in memory or we're at the last block, dump to diskif batch.ValueSize() > ethdb.IdealBatchSize {WriteTxIndexTail(batch, lastNum) // Also write the tail hereif err := batch.Write(); err != nil {log.Crit("Failed writing batch to db", "error", err)return}batch.Reset()}// If we've spent too much time already, notify the user of what we're doingif time.Since(logged) > 8*time.Second {log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))logged = time.Now()}}}// Flush the new indexing tail and the last committed data. It can also happen// that the last batch is empty because nothing to index, but the tail has to// be flushed anyway.WriteTxIndexTail(batch, lastNum)if err := batch.Write(); err != nil {log.Crit("Failed writing batch to db", "error", err)return}logger := log.Debugif report {logger = log.Info}select {case <-interrupt:logger("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))default:logger("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))}
}
迭代区块区间内的交易iterateTransactions
loopup:读取区块区间内的交易,构造numberRlp数据,添加到rlpCh通道中
type numberRlp struct {number uint64 //区块号rlp rlp.RawValue //rlp数据
}
process:从rlpCh通道中读取数据,构造blockTxHashes数据,添加到hashesCh通道中
type blockTxHashes struct {number uint64 //区块号hashes []common.Hash //区块内所有交易的hash
}
func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {// One thread sequentially reads data from dbtype numberRlp struct {number uint64rlp rlp.RawValue}if to == from {return nil}threads := to - fromif cpus := runtime.NumCPU(); threads > uint64(cpus) {threads = uint64(cpus)}var (rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channelhashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh)// lookup runs in one instancelookup := func() {n, end := from, toif reverse {n, end = to-1, from-1}defer close(rlpCh)for n != end {data := ReadCanonicalBodyRLP(db, n, nil)// Feed the block to the aggregator, or abort on interruptselect {case rlpCh <- &numberRlp{n, data}:case <-interrupt:return}if reverse {n--} else {n++}}}// process runs in parallelvar nThreadsAlive atomic.Int32nThreadsAlive.Store(int32(threads))process := func() {defer func() {// Last processor closes the result channelif nThreadsAlive.Add(-1) == 0 {close(hashesCh)}}()for data := range rlpCh {var body types.Bodyif err := rlp.DecodeBytes(data.rlp, &body); err != nil {log.Warn("Failed to decode block body", "block", data.number, "error", err)return}var hashes []common.Hashfor _, tx := range body.Transactions {hashes = append(hashes, tx.Hash())}result := &blockTxHashes{hashes: hashes,number: data.number,}// Feed the block to the aggregator, or abort on interruptselect {case hashesCh <- result:case <-interrupt:return}}}go lookup() // start the sequential db accessorfor i := 0; i < int(threads); i++ {go process()}return hashesCh
}