sync_bloom.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/core/rawdb"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/metrics"
  28. bloomfilter "github.com/holiman/bloomfilter/v2"
  29. )
  30. var (
  31. bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil)
  32. bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil)
  33. bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil)
  34. bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil)
  35. bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil)
  36. bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil)
  37. )
  38. // SyncBloom is a bloom filter used during fast sync to quickly decide if a trie
  39. // node or contract code already exists on disk or not. It self populates from the
  40. // provided disk database on creation in a background thread and will only start
  41. // returning live results once that's finished.
  42. type SyncBloom struct {
  43. bloom *bloomfilter.Filter
  44. inited uint32
  45. closer sync.Once
  46. closed uint32
  47. pend sync.WaitGroup
  48. }
  49. // NewSyncBloom creates a new bloom filter of the given size (in megabytes) and
  50. // initializes it from the database. The bloom is hard coded to use 3 filters.
  51. func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom {
  52. // Create the bloom filter to track known trie nodes
  53. bloom, err := bloomfilter.New(memory*1024*1024*8, 4)
  54. if err != nil {
  55. panic(fmt.Sprintf("failed to create bloom: %v", err))
  56. }
  57. log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024))
  58. // Assemble the fast sync bloom and init it from previous sessions
  59. b := &SyncBloom{
  60. bloom: bloom,
  61. }
  62. b.pend.Add(2)
  63. go func() {
  64. defer b.pend.Done()
  65. b.init(database)
  66. }()
  67. go func() {
  68. defer b.pend.Done()
  69. b.meter()
  70. }()
  71. return b
  72. }
  73. // init iterates over the database, pushing every trie hash into the bloom filter.
  74. func (b *SyncBloom) init(database ethdb.Iteratee) {
  75. // Iterate over the database, but restart every now and again to avoid holding
  76. // a persistent snapshot since fast sync can push a ton of data concurrently,
  77. // bloating the disk.
  78. //
  79. // Note, this is fine, because everything inserted into leveldb by fast sync is
  80. // also pushed into the bloom directly, so we're not missing anything when the
  81. // iterator is swapped out for a new one.
  82. it := database.NewIterator(nil, nil)
  83. var (
  84. start = time.Now()
  85. swap = time.Now()
  86. )
  87. for it.Next() && atomic.LoadUint32(&b.closed) == 0 {
  88. // If the database entry is a trie node, add it to the bloom
  89. key := it.Key()
  90. if len(key) == common.HashLength {
  91. b.bloom.AddHash(binary.BigEndian.Uint64(key))
  92. bloomLoadMeter.Mark(1)
  93. } else if ok, hash := rawdb.IsCodeKey(key); ok {
  94. // If the database entry is a contract code, add it to the bloom
  95. b.bloom.AddHash(binary.BigEndian.Uint64(hash))
  96. bloomLoadMeter.Mark(1)
  97. }
  98. // If enough time elapsed since the last iterator swap, restart
  99. if time.Since(swap) > 8*time.Second {
  100. key := common.CopyBytes(it.Key())
  101. it.Release()
  102. it = database.NewIterator(nil, key)
  103. log.Info("Initializing state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start)))
  104. swap = time.Now()
  105. }
  106. }
  107. it.Release()
  108. // Mark the bloom filter inited and return
  109. log.Info("Initialized state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability(), "elapsed", common.PrettyDuration(time.Since(start)))
  110. atomic.StoreUint32(&b.inited, 1)
  111. }
  112. // meter periodically recalculates the false positive error rate of the bloom
  113. // filter and reports it in a metric.
  114. func (b *SyncBloom) meter() {
  115. for {
  116. // Report the current error ration. No floats, lame, scale it up.
  117. bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000))
  118. // Wait one second, but check termination more frequently
  119. for i := 0; i < 10; i++ {
  120. if atomic.LoadUint32(&b.closed) == 1 {
  121. return
  122. }
  123. time.Sleep(100 * time.Millisecond)
  124. }
  125. }
  126. }
  127. // Close terminates any background initializer still running and releases all the
  128. // memory allocated for the bloom.
  129. func (b *SyncBloom) Close() error {
  130. b.closer.Do(func() {
  131. // Ensure the initializer is stopped
  132. atomic.StoreUint32(&b.closed, 1)
  133. b.pend.Wait()
  134. // Wipe the bloom, but mark it "uninited" just in case someone attempts an access
  135. log.Info("Deallocated state bloom", "items", b.bloom.N(), "errorrate", b.bloom.FalsePosititveProbability())
  136. atomic.StoreUint32(&b.inited, 0)
  137. b.bloom = nil
  138. })
  139. return nil
  140. }
  141. // Add inserts a new trie node hash into the bloom filter.
  142. func (b *SyncBloom) Add(hash []byte) {
  143. if atomic.LoadUint32(&b.closed) == 1 {
  144. return
  145. }
  146. b.bloom.AddHash(binary.BigEndian.Uint64(hash))
  147. bloomAddMeter.Mark(1)
  148. }
  149. // Contains tests if the bloom filter contains the given hash:
  150. // - false: the bloom definitely does not contain hash
  151. // - true: the bloom maybe contains hash
  152. //
  153. // While the bloom is being initialized, any query will return true.
  154. func (b *SyncBloom) Contains(hash []byte) bool {
  155. bloomTestMeter.Mark(1)
  156. if atomic.LoadUint32(&b.inited) == 0 {
  157. // We didn't load all the trie nodes from the previous run of Geth yet. As
  158. // such, we can't say for sure if a hash is not present for anything. Until
  159. // the init is done, we're faking "possible presence" for everything.
  160. return true
  161. }
  162. // Bloom initialized, check the real one and report any successful misses
  163. maybe := b.bloom.ContainsHash(binary.BigEndian.Uint64(hash))
  164. if !maybe {
  165. bloomMissMeter.Mark(1)
  166. }
  167. return maybe
  168. }