chain_iterator.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "runtime"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. )
  28. // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
  29. // of frozen ancient blocks. The method iterates over all the frozen blocks and
  30. // injects into the database the block hash->number mappings.
  31. func InitDatabaseFromFreezer(db ethdb.Database) {
  32. // If we can't access the freezer or it's empty, abort
  33. frozen, err := db.Ancients()
  34. if err != nil || frozen == 0 {
  35. return
  36. }
  37. var (
  38. batch = db.NewBatch()
  39. start = time.Now()
  40. logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
  41. hash common.Hash
  42. )
  43. for i := uint64(0); i < frozen; i++ {
  44. // Since the freezer has all data in sequential order on a file,
  45. // it would be 'neat' to read more data in one go, and let the
  46. // freezerdb return N items (e.g up to 1000 items per go)
  47. // That would require an API change in Ancients though
  48. if h, err := db.Ancient(freezerHashTable, i); err != nil {
  49. log.Crit("Failed to init database from freezer", "err", err)
  50. } else {
  51. hash = common.BytesToHash(h)
  52. }
  53. WriteHeaderNumber(batch, hash, i)
  54. // If enough data was accumulated in memory or we're at the last block, dump to disk
  55. if batch.ValueSize() > ethdb.IdealBatchSize {
  56. if err := batch.Write(); err != nil {
  57. log.Crit("Failed to write data to db", "err", err)
  58. }
  59. batch.Reset()
  60. }
  61. // If we've spent too much time already, notify the user of what we're doing
  62. if time.Since(logged) > 8*time.Second {
  63. log.Info("Initializing database from freezer", "total", frozen, "number", i, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
  64. logged = time.Now()
  65. }
  66. }
  67. if err := batch.Write(); err != nil {
  68. log.Crit("Failed to write data to db", "err", err)
  69. }
  70. batch.Reset()
  71. WriteHeadHeaderHash(db, hash)
  72. WriteHeadFastBlockHash(db, hash)
  73. log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
  74. }
  75. type blockTxHashes struct {
  76. number uint64
  77. hashes []common.Hash
  78. }
  79. // iterateTransactions iterates over all transactions in the (canon) block
  80. // number(s) given, and yields the hashes on a channel. If there is a signal
  81. // received from interrupt channel, the iteration will be aborted and result
  82. // channel will be closed.
  83. func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool, interrupt chan struct{}) chan *blockTxHashes {
  84. // One thread sequentially reads data from db
  85. type numberRlp struct {
  86. number uint64
  87. rlp rlp.RawValue
  88. }
  89. if to == from {
  90. return nil
  91. }
  92. threads := to - from
  93. if cpus := runtime.NumCPU(); threads > uint64(cpus) {
  94. threads = uint64(cpus)
  95. }
  96. var (
  97. rlpCh = make(chan *numberRlp, threads*2) // we send raw rlp over this channel
  98. hashesCh = make(chan *blockTxHashes, threads*2) // send hashes over hashesCh
  99. )
  100. // lookup runs in one instance
  101. lookup := func() {
  102. n, end := from, to
  103. if reverse {
  104. n, end = to-1, from-1
  105. }
  106. defer close(rlpCh)
  107. for n != end {
  108. data := ReadCanonicalBodyRLP(db, n)
  109. // Feed the block to the aggregator, or abort on interrupt
  110. select {
  111. case rlpCh <- &numberRlp{n, data}:
  112. case <-interrupt:
  113. return
  114. }
  115. if reverse {
  116. n--
  117. } else {
  118. n++
  119. }
  120. }
  121. }
  122. // process runs in parallel
  123. nThreadsAlive := int32(threads)
  124. process := func() {
  125. defer func() {
  126. // Last processor closes the result channel
  127. if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
  128. close(hashesCh)
  129. }
  130. }()
  131. for data := range rlpCh {
  132. var body types.Body
  133. if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
  134. log.Warn("Failed to decode block body", "block", data.number, "error", err)
  135. return
  136. }
  137. var hashes []common.Hash
  138. for _, tx := range body.Transactions {
  139. hashes = append(hashes, tx.Hash())
  140. }
  141. result := &blockTxHashes{
  142. hashes: hashes,
  143. number: data.number,
  144. }
  145. // Feed the block to the aggregator, or abort on interrupt
  146. select {
  147. case hashesCh <- result:
  148. case <-interrupt:
  149. return
  150. }
  151. }
  152. }
  153. go lookup() // start the sequential db accessor
  154. for i := 0; i < int(threads); i++ {
  155. go process()
  156. }
  157. return hashesCh
  158. }
  159. // indexTransactions creates txlookup indices of the specified block range.
  160. //
  161. // This function iterates canonical chain in reverse order, it has one main advantage:
  162. // We can write tx index tail flag periodically even without the whole indexing
  163. // procedure is finished. So that we can resume indexing procedure next time quickly.
  164. //
  165. // There is a passed channel, the whole procedure will be interrupted if any
  166. // signal received.
  167. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  168. // short circuit for invalid range
  169. if from >= to {
  170. return
  171. }
  172. var (
  173. hashesCh = iterateTransactions(db, from, to, true, interrupt)
  174. batch = db.NewBatch()
  175. start = time.Now()
  176. logged = start.Add(-7 * time.Second)
  177. // Since we iterate in reverse, we expect the first number to come
  178. // in to be [to-1]. Therefore, setting lastNum to means that the
  179. // prqueue gap-evaluation will work correctly
  180. lastNum = to
  181. queue = prque.New(nil)
  182. // for stats reporting
  183. blocks, txs = 0, 0
  184. )
  185. for chanDelivery := range hashesCh {
  186. // Push the delivery into the queue and process contiguous ranges.
  187. // Since we iterate in reverse, so lower numbers have lower prio, and
  188. // we can use the number directly as prio marker
  189. queue.Push(chanDelivery, int64(chanDelivery.number))
  190. for !queue.Empty() {
  191. // If the next available item is gapped, return
  192. if _, priority := queue.Peek(); priority != int64(lastNum-1) {
  193. break
  194. }
  195. // For testing
  196. if hook != nil && !hook(lastNum-1) {
  197. break
  198. }
  199. // Next block available, pop it off and index it
  200. delivery := queue.PopItem().(*blockTxHashes)
  201. lastNum = delivery.number
  202. WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
  203. blocks++
  204. txs += len(delivery.hashes)
  205. // If enough data was accumulated in memory or we're at the last block, dump to disk
  206. if batch.ValueSize() > ethdb.IdealBatchSize {
  207. WriteTxIndexTail(batch, lastNum) // Also write the tail here
  208. if err := batch.Write(); err != nil {
  209. log.Crit("Failed writing batch to db", "error", err)
  210. return
  211. }
  212. batch.Reset()
  213. }
  214. // If we've spent too much time already, notify the user of what we're doing
  215. if time.Since(logged) > 8*time.Second {
  216. log.Info("Indexing transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  217. logged = time.Now()
  218. }
  219. }
  220. }
  221. // Flush the new indexing tail and the last committed data. It can also happen
  222. // that the last batch is empty because nothing to index, but the tail has to
  223. // be flushed anyway.
  224. WriteTxIndexTail(batch, lastNum)
  225. if err := batch.Write(); err != nil {
  226. log.Crit("Failed writing batch to db", "error", err)
  227. return
  228. }
  229. select {
  230. case <-interrupt:
  231. log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  232. default:
  233. log.Info("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start)))
  234. }
  235. }
  236. // IndexTransactions creates txlookup indices of the specified block range.
  237. //
  238. // This function iterates canonical chain in reverse order, it has one main advantage:
  239. // We can write tx index tail flag periodically even without the whole indexing
  240. // procedure is finished. So that we can resume indexing procedure next time quickly.
  241. //
  242. // There is a passed channel, the whole procedure will be interrupted if any
  243. // signal received.
  244. func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  245. indexTransactions(db, from, to, interrupt, nil)
  246. }
  247. // indexTransactionsForTesting is the internal debug version with an additional hook.
  248. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  249. indexTransactions(db, from, to, interrupt, hook)
  250. }
  251. // unindexTransactions removes txlookup indices of the specified block range.
  252. //
  253. // There is a passed channel, the whole procedure will be interrupted if any
  254. // signal received.
  255. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  256. // short circuit for invalid range
  257. if from >= to {
  258. return
  259. }
  260. var (
  261. hashesCh = iterateTransactions(db, from, to, false, interrupt)
  262. batch = db.NewBatch()
  263. start = time.Now()
  264. logged = start.Add(-7 * time.Second)
  265. // we expect the first number to come in to be [from]. Therefore, setting
  266. // nextNum to from means that the prqueue gap-evaluation will work correctly
  267. nextNum = from
  268. queue = prque.New(nil)
  269. // for stats reporting
  270. blocks, txs = 0, 0
  271. )
  272. // Otherwise spin up the concurrent iterator and unindexer
  273. for delivery := range hashesCh {
  274. // Push the delivery into the queue and process contiguous ranges.
  275. queue.Push(delivery, -int64(delivery.number))
  276. for !queue.Empty() {
  277. // If the next available item is gapped, return
  278. if _, priority := queue.Peek(); -priority != int64(nextNum) {
  279. break
  280. }
  281. // For testing
  282. if hook != nil && !hook(nextNum) {
  283. break
  284. }
  285. delivery := queue.PopItem().(*blockTxHashes)
  286. nextNum = delivery.number + 1
  287. DeleteTxLookupEntries(batch, delivery.hashes)
  288. txs += len(delivery.hashes)
  289. blocks++
  290. // If enough data was accumulated in memory or we're at the last block, dump to disk
  291. // A batch counts the size of deletion as '1', so we need to flush more
  292. // often than that.
  293. if blocks%1000 == 0 {
  294. WriteTxIndexTail(batch, nextNum)
  295. if err := batch.Write(); err != nil {
  296. log.Crit("Failed writing batch to db", "error", err)
  297. return
  298. }
  299. batch.Reset()
  300. }
  301. // If we've spent too much time already, notify the user of what we're doing
  302. if time.Since(logged) > 8*time.Second {
  303. log.Info("Unindexing transactions", "blocks", blocks, "txs", txs, "total", to-from, "elapsed", common.PrettyDuration(time.Since(start)))
  304. logged = time.Now()
  305. }
  306. }
  307. }
  308. // Flush the new indexing tail and the last committed data. It can also happen
  309. // that the last batch is empty because nothing to unindex, but the tail has to
  310. // be flushed anyway.
  311. WriteTxIndexTail(batch, nextNum)
  312. if err := batch.Write(); err != nil {
  313. log.Crit("Failed writing batch to db", "error", err)
  314. return
  315. }
  316. select {
  317. case <-interrupt:
  318. log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  319. default:
  320. log.Info("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start)))
  321. }
  322. }
  323. // UnindexTransactions removes txlookup indices of the specified block range.
  324. //
  325. // There is a passed channel, the whole procedure will be interrupted if any
  326. // signal received.
  327. func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) {
  328. unindexTransactions(db, from, to, interrupt, nil)
  329. }
  330. // unindexTransactionsForTesting is the internal debug version with an additional hook.
  331. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
  332. unindexTransactions(db, from, to, interrupt, hook)
  333. }