postprocess.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package light
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "math/big"
  24. "time"
  25. mapset "github.com/deckarep/golang-set"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/bitutil"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. "github.com/ethereum/go-ethereum/trie"
  36. )
  37. // IndexerConfig includes a set of configs for chain indexers.
  38. type IndexerConfig struct {
  39. // The block frequency for creating CHTs.
  40. ChtSize uint64
  41. // The number of confirmations needed to generate/accept a canonical hash help trie.
  42. ChtConfirms uint64
  43. // The block frequency for creating new bloom bits.
  44. BloomSize uint64
  45. // The number of confirmation needed before a bloom section is considered probably final and its rotated bits
  46. // are calculated.
  47. BloomConfirms uint64
  48. // The block frequency for creating BloomTrie.
  49. BloomTrieSize uint64
  50. // The number of confirmations needed to generate/accept a bloom trie.
  51. BloomTrieConfirms uint64
  52. }
  53. var (
  54. // DefaultServerIndexerConfig wraps a set of configs as a default indexer config for server side.
  55. DefaultServerIndexerConfig = &IndexerConfig{
  56. ChtSize: params.CHTFrequency,
  57. ChtConfirms: params.HelperTrieProcessConfirmations,
  58. BloomSize: params.BloomBitsBlocks,
  59. BloomConfirms: params.BloomConfirms,
  60. BloomTrieSize: params.BloomTrieFrequency,
  61. BloomTrieConfirms: params.HelperTrieProcessConfirmations,
  62. }
  63. // DefaultClientIndexerConfig wraps a set of configs as a default indexer config for client side.
  64. DefaultClientIndexerConfig = &IndexerConfig{
  65. ChtSize: params.CHTFrequency,
  66. ChtConfirms: params.HelperTrieConfirmations,
  67. BloomSize: params.BloomBitsBlocksClient,
  68. BloomConfirms: params.HelperTrieConfirmations,
  69. BloomTrieSize: params.BloomTrieFrequency,
  70. BloomTrieConfirms: params.HelperTrieConfirmations,
  71. }
  72. // TestServerIndexerConfig wraps a set of configs as a test indexer config for server side.
  73. TestServerIndexerConfig = &IndexerConfig{
  74. ChtSize: 128,
  75. ChtConfirms: 1,
  76. BloomSize: 16,
  77. BloomConfirms: 1,
  78. BloomTrieSize: 128,
  79. BloomTrieConfirms: 1,
  80. }
  81. // TestClientIndexerConfig wraps a set of configs as a test indexer config for client side.
  82. TestClientIndexerConfig = &IndexerConfig{
  83. ChtSize: 128,
  84. ChtConfirms: 8,
  85. BloomSize: 128,
  86. BloomConfirms: 8,
  87. BloomTrieSize: 128,
  88. BloomTrieConfirms: 8,
  89. }
  90. )
  91. var (
  92. errNoTrustedCht = errors.New("no trusted canonical hash trie")
  93. errNoTrustedBloomTrie = errors.New("no trusted bloom trie")
  94. errNoHeader = errors.New("header not found")
  95. chtPrefix = []byte("chtRootV2-") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
  96. ChtTablePrefix = "cht-"
  97. )
  98. // ChtNode structures are stored in the Canonical Hash Trie in an RLP encoded format
  99. type ChtNode struct {
  100. Hash common.Hash
  101. Td *big.Int
  102. }
  103. // GetChtRoot reads the CHT root associated to the given section from the database
  104. func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
  105. var encNumber [8]byte
  106. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  107. data, _ := db.Get(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...))
  108. return common.BytesToHash(data)
  109. }
  110. // StoreChtRoot writes the CHT root associated to the given section into the database
  111. func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
  112. var encNumber [8]byte
  113. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  114. db.Put(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
  115. }
  116. // ChtIndexerBackend implements core.ChainIndexerBackend.
  117. type ChtIndexerBackend struct {
  118. disablePruning bool
  119. diskdb, trieTable ethdb.Database
  120. odr OdrBackend
  121. triedb *trie.Database
  122. trieset mapset.Set
  123. section, sectionSize uint64
  124. lastHash common.Hash
  125. trie *trie.Trie
  126. }
  127. // NewChtIndexer creates a Cht chain indexer
  128. func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, disablePruning bool) *core.ChainIndexer {
  129. trieTable := rawdb.NewTable(db, ChtTablePrefix)
  130. backend := &ChtIndexerBackend{
  131. diskdb: db,
  132. odr: odr,
  133. trieTable: trieTable,
  134. triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down
  135. trieset: mapset.NewSet(),
  136. sectionSize: size,
  137. disablePruning: disablePruning,
  138. }
  139. return core.NewChainIndexer(db, rawdb.NewTable(db, "chtIndexV2-"), backend, size, confirms, time.Millisecond*100, "cht")
  140. }
  141. // fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
  142. // ODR backend in order to be able to add new entries and calculate subsequent root hashes
  143. func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
  144. batch := c.trieTable.NewBatch()
  145. r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1, Config: c.odr.IndexerConfig()}
  146. for {
  147. err := c.odr.Retrieve(ctx, r)
  148. switch err {
  149. case nil:
  150. r.Proof.Store(batch)
  151. return batch.Write()
  152. case ErrNoPeers:
  153. // if there are no peers to serve, retry later
  154. select {
  155. case <-ctx.Done():
  156. return ctx.Err()
  157. case <-time.After(time.Second * 10):
  158. // stay in the loop and try again
  159. }
  160. default:
  161. return err
  162. }
  163. }
  164. }
  165. // Reset implements core.ChainIndexerBackend
  166. func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
  167. var root common.Hash
  168. if section > 0 {
  169. root = GetChtRoot(c.diskdb, section-1, lastSectionHead)
  170. }
  171. var err error
  172. c.trie, err = trie.New(root, c.triedb)
  173. if err != nil && c.odr != nil {
  174. err = c.fetchMissingNodes(ctx, section, root)
  175. if err == nil {
  176. c.trie, err = trie.New(root, c.triedb)
  177. }
  178. }
  179. c.section = section
  180. return err
  181. }
  182. // Process implements core.ChainIndexerBackend
  183. func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error {
  184. hash, num := header.Hash(), header.Number.Uint64()
  185. c.lastHash = hash
  186. td := rawdb.ReadTd(c.diskdb, hash, num)
  187. if td == nil {
  188. panic(nil)
  189. }
  190. var encNumber [8]byte
  191. binary.BigEndian.PutUint64(encNumber[:], num)
  192. data, _ := rlp.EncodeToBytes(ChtNode{hash, td})
  193. c.trie.Update(encNumber[:], data)
  194. return nil
  195. }
  196. // Commit implements core.ChainIndexerBackend
  197. func (c *ChtIndexerBackend) Commit() error {
  198. root, err := c.trie.Commit(nil)
  199. if err != nil {
  200. return err
  201. }
  202. // Pruning historical trie nodes if necessary.
  203. if !c.disablePruning {
  204. // Flush the triedb and track the latest trie nodes.
  205. c.trieset.Clear()
  206. c.triedb.Commit(root, false, func(hash common.Hash) { c.trieset.Add(hash) })
  207. it := c.trieTable.NewIterator(nil, nil)
  208. defer it.Release()
  209. var (
  210. deleted int
  211. remaining int
  212. t = time.Now()
  213. )
  214. for it.Next() {
  215. trimmed := bytes.TrimPrefix(it.Key(), []byte(ChtTablePrefix))
  216. if !c.trieset.Contains(common.BytesToHash(trimmed)) {
  217. c.trieTable.Delete(trimmed)
  218. deleted += 1
  219. } else {
  220. remaining += 1
  221. }
  222. }
  223. log.Debug("Prune historical CHT trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
  224. } else {
  225. c.triedb.Commit(root, false, nil)
  226. }
  227. log.Info("Storing CHT", "section", c.section, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
  228. StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
  229. return nil
  230. }
  231. // PruneSections implements core.ChainIndexerBackend which deletes all
  232. // chain data(except hash<->number mappings) older than the specified
  233. // threshold.
  234. func (c *ChtIndexerBackend) Prune(threshold uint64) error {
  235. // Short circuit if the light pruning is disabled.
  236. if c.disablePruning {
  237. return nil
  238. }
  239. t := time.Now()
  240. // Always keep genesis header in database.
  241. start, end := uint64(1), (threshold+1)*c.sectionSize
  242. var batch = c.diskdb.NewBatch()
  243. for {
  244. numbers, hashes := rawdb.ReadAllCanonicalHashes(c.diskdb, start, end, 10240)
  245. if len(numbers) == 0 {
  246. break
  247. }
  248. for i := 0; i < len(numbers); i++ {
  249. // Keep hash<->number mapping in database otherwise the hash based
  250. // API(e.g. GetReceipt, GetLogs) will be broken.
  251. //
  252. // Storage size wise, the size of a mapping is ~41bytes. For one
  253. // section is about 1.3MB which is acceptable.
  254. //
  255. // In order to totally get rid of this index, we need an additional
  256. // flag to specify how many historical data light client can serve.
  257. rawdb.DeleteCanonicalHash(batch, numbers[i])
  258. rawdb.DeleteBlockWithoutNumber(batch, hashes[i], numbers[i])
  259. }
  260. if batch.ValueSize() > ethdb.IdealBatchSize {
  261. if err := batch.Write(); err != nil {
  262. return err
  263. }
  264. batch.Reset()
  265. }
  266. start = numbers[len(numbers)-1] + 1
  267. }
  268. if err := batch.Write(); err != nil {
  269. return err
  270. }
  271. log.Debug("Prune history headers", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(t)))
  272. return nil
  273. }
  274. var (
  275. bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash
  276. BloomTrieTablePrefix = "blt-"
  277. )
  278. // GetBloomTrieRoot reads the BloomTrie root assoctiated to the given section from the database
  279. func GetBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
  280. var encNumber [8]byte
  281. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  282. data, _ := db.Get(append(append(bloomTriePrefix, encNumber[:]...), sectionHead.Bytes()...))
  283. return common.BytesToHash(data)
  284. }
  285. // StoreBloomTrieRoot writes the BloomTrie root assoctiated to the given section into the database
  286. func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
  287. var encNumber [8]byte
  288. binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
  289. db.Put(append(append(bloomTriePrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
  290. }
  291. // BloomTrieIndexerBackend implements core.ChainIndexerBackend
  292. type BloomTrieIndexerBackend struct {
  293. disablePruning bool
  294. diskdb, trieTable ethdb.Database
  295. triedb *trie.Database
  296. trieset mapset.Set
  297. odr OdrBackend
  298. section uint64
  299. parentSize uint64
  300. size uint64
  301. bloomTrieRatio uint64
  302. trie *trie.Trie
  303. sectionHeads []common.Hash
  304. }
  305. // NewBloomTrieIndexer creates a BloomTrie chain indexer
  306. func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64, disablePruning bool) *core.ChainIndexer {
  307. trieTable := rawdb.NewTable(db, BloomTrieTablePrefix)
  308. backend := &BloomTrieIndexerBackend{
  309. diskdb: db,
  310. odr: odr,
  311. trieTable: trieTable,
  312. triedb: trie.NewDatabaseWithConfig(trieTable, &trie.Config{Cache: 1}), // Use a tiny cache only to keep memory down
  313. trieset: mapset.NewSet(),
  314. parentSize: parentSize,
  315. size: size,
  316. disablePruning: disablePruning,
  317. }
  318. backend.bloomTrieRatio = size / parentSize
  319. backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
  320. return core.NewChainIndexer(db, rawdb.NewTable(db, "bltIndex-"), backend, size, 0, time.Millisecond*100, "bloomtrie")
  321. }
  322. // fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
  323. // ODR backend in order to be able to add new entries and calculate subsequent root hashes
  324. func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
  325. indexCh := make(chan uint, types.BloomBitLength)
  326. type res struct {
  327. nodes *NodeSet
  328. err error
  329. }
  330. resCh := make(chan res, types.BloomBitLength)
  331. for i := 0; i < 20; i++ {
  332. go func() {
  333. for bitIndex := range indexCh {
  334. r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIndexList: []uint64{section - 1}, Config: b.odr.IndexerConfig()}
  335. for {
  336. if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
  337. // if there are no peers to serve, retry later
  338. select {
  339. case <-ctx.Done():
  340. resCh <- res{nil, ctx.Err()}
  341. return
  342. case <-time.After(time.Second * 10):
  343. // stay in the loop and try again
  344. }
  345. } else {
  346. resCh <- res{r.Proofs, err}
  347. break
  348. }
  349. }
  350. }
  351. }()
  352. }
  353. for i := uint(0); i < types.BloomBitLength; i++ {
  354. indexCh <- i
  355. }
  356. close(indexCh)
  357. batch := b.trieTable.NewBatch()
  358. for i := uint(0); i < types.BloomBitLength; i++ {
  359. res := <-resCh
  360. if res.err != nil {
  361. return res.err
  362. }
  363. res.nodes.Store(batch)
  364. }
  365. return batch.Write()
  366. }
  367. // Reset implements core.ChainIndexerBackend
  368. func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
  369. var root common.Hash
  370. if section > 0 {
  371. root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead)
  372. }
  373. var err error
  374. b.trie, err = trie.New(root, b.triedb)
  375. if err != nil && b.odr != nil {
  376. err = b.fetchMissingNodes(ctx, section, root)
  377. if err == nil {
  378. b.trie, err = trie.New(root, b.triedb)
  379. }
  380. }
  381. b.section = section
  382. return err
  383. }
  384. // Process implements core.ChainIndexerBackend
  385. func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
  386. num := header.Number.Uint64() - b.section*b.size
  387. if (num+1)%b.parentSize == 0 {
  388. b.sectionHeads[num/b.parentSize] = header.Hash()
  389. }
  390. return nil
  391. }
  392. // Commit implements core.ChainIndexerBackend
  393. func (b *BloomTrieIndexerBackend) Commit() error {
  394. var compSize, decompSize uint64
  395. for i := uint(0); i < types.BloomBitLength; i++ {
  396. var encKey [10]byte
  397. binary.BigEndian.PutUint16(encKey[0:2], uint16(i))
  398. binary.BigEndian.PutUint64(encKey[2:10], b.section)
  399. var decomp []byte
  400. for j := uint64(0); j < b.bloomTrieRatio; j++ {
  401. data, err := rawdb.ReadBloomBits(b.diskdb, i, b.section*b.bloomTrieRatio+j, b.sectionHeads[j])
  402. if err != nil {
  403. return err
  404. }
  405. decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSize/8))
  406. if err2 != nil {
  407. return err2
  408. }
  409. decomp = append(decomp, decompData...)
  410. }
  411. comp := bitutil.CompressBytes(decomp)
  412. decompSize += uint64(len(decomp))
  413. compSize += uint64(len(comp))
  414. if len(comp) > 0 {
  415. b.trie.Update(encKey[:], comp)
  416. } else {
  417. b.trie.Delete(encKey[:])
  418. }
  419. }
  420. root, err := b.trie.Commit(nil)
  421. if err != nil {
  422. return err
  423. }
  424. // Pruning historical trie nodes if necessary.
  425. if !b.disablePruning {
  426. // Flush the triedb and track the latest trie nodes.
  427. b.trieset.Clear()
  428. b.triedb.Commit(root, false, func(hash common.Hash) { b.trieset.Add(hash) })
  429. it := b.trieTable.NewIterator(nil, nil)
  430. defer it.Release()
  431. var (
  432. deleted int
  433. remaining int
  434. t = time.Now()
  435. )
  436. for it.Next() {
  437. trimmed := bytes.TrimPrefix(it.Key(), []byte(BloomTrieTablePrefix))
  438. if !b.trieset.Contains(common.BytesToHash(trimmed)) {
  439. b.trieTable.Delete(trimmed)
  440. deleted += 1
  441. } else {
  442. remaining += 1
  443. }
  444. }
  445. log.Debug("Prune historical bloom trie nodes", "deleted", deleted, "remaining", remaining, "elapsed", common.PrettyDuration(time.Since(t)))
  446. } else {
  447. b.triedb.Commit(root, false, nil)
  448. }
  449. sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
  450. StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
  451. log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
  452. return nil
  453. }
  454. // Prune implements core.ChainIndexerBackend which deletes all
  455. // bloombits which older than the specified threshold.
  456. func (b *BloomTrieIndexerBackend) Prune(threshold uint64) error {
  457. // Short circuit if the light pruning is disabled.
  458. if b.disablePruning {
  459. return nil
  460. }
  461. start := time.Now()
  462. for i := uint(0); i < types.BloomBitLength; i++ {
  463. rawdb.DeleteBloombits(b.diskdb, i, 0, threshold*b.bloomTrieRatio+b.bloomTrieRatio)
  464. }
  465. log.Debug("Prune history bloombits", "threshold", threshold, "elapsed", common.PrettyDuration(time.Since(start)))
  466. return nil
  467. }