trie_prefetcher.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/log"
  21. "github.com/ethereum/go-ethereum/metrics"
  22. )
  23. var (
  24. // triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
  25. triePrefetchMetricsPrefix = "trie/prefetch/"
  26. )
  27. // triePrefetcher is an active prefetcher, which receives accounts or storage
  28. // items and does trie-loading of them. The goal is to get as much useful content
  29. // into the caches as possible.
  30. //
  31. // Note, the prefetcher's API is not thread safe.
  32. type triePrefetcher struct {
  33. db Database // Database to fetch trie nodes through
  34. root common.Hash // Root hash of theaccount trie for metrics
  35. fetches map[common.Hash]Trie // Partially or fully fetcher tries
  36. fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
  37. deliveryMissMeter metrics.Meter
  38. accountLoadMeter metrics.Meter
  39. accountDupMeter metrics.Meter
  40. accountSkipMeter metrics.Meter
  41. accountWasteMeter metrics.Meter
  42. storageLoadMeter metrics.Meter
  43. storageDupMeter metrics.Meter
  44. storageSkipMeter metrics.Meter
  45. storageWasteMeter metrics.Meter
  46. }
  47. // newTriePrefetcher
  48. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  49. prefix := triePrefetchMetricsPrefix + namespace
  50. p := &triePrefetcher{
  51. db: db,
  52. root: root,
  53. fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
  54. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  55. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  56. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  57. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  58. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  59. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  60. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  61. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  62. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  63. }
  64. return p
  65. }
  66. // close iterates over all the subfetchers, aborts any that were left spinning
  67. // and reports the stats to the metrics subsystem.
  68. func (p *triePrefetcher) close() {
  69. for _, fetcher := range p.fetchers {
  70. fetcher.abort() // safe to do multiple times
  71. if metrics.Enabled {
  72. if fetcher.root == p.root {
  73. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  74. p.accountDupMeter.Mark(int64(fetcher.dups))
  75. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  76. for _, key := range fetcher.used {
  77. delete(fetcher.seen, string(key))
  78. }
  79. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  80. } else {
  81. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  82. p.storageDupMeter.Mark(int64(fetcher.dups))
  83. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  84. for _, key := range fetcher.used {
  85. delete(fetcher.seen, string(key))
  86. }
  87. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  88. }
  89. }
  90. }
  91. // Clear out all fetchers (will crash on a second call, deliberate)
  92. p.fetchers = nil
  93. }
  94. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  95. // already loaded will be copied over, but no goroutines will be started. This
  96. // is mostly used in the miner which creates a copy of it's actively mutated
  97. // state to be sealed while it may further mutate the state.
  98. func (p *triePrefetcher) copy() *triePrefetcher {
  99. copy := &triePrefetcher{
  100. db: p.db,
  101. root: p.root,
  102. fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
  103. deliveryMissMeter: p.deliveryMissMeter,
  104. accountLoadMeter: p.accountLoadMeter,
  105. accountDupMeter: p.accountDupMeter,
  106. accountSkipMeter: p.accountSkipMeter,
  107. accountWasteMeter: p.accountWasteMeter,
  108. storageLoadMeter: p.storageLoadMeter,
  109. storageDupMeter: p.storageDupMeter,
  110. storageSkipMeter: p.storageSkipMeter,
  111. storageWasteMeter: p.storageWasteMeter,
  112. }
  113. // If the prefetcher is already a copy, duplicate the data
  114. if p.fetches != nil {
  115. for root, fetch := range p.fetches {
  116. if fetch != nil {
  117. copy.fetches[root] = p.db.CopyTrie(fetch)
  118. }
  119. }
  120. return copy
  121. }
  122. // Otherwise we're copying an active fetcher, retrieve the current states
  123. for root, fetcher := range p.fetchers {
  124. copy.fetches[root] = fetcher.peek()
  125. }
  126. return copy
  127. }
  128. // prefetch schedules a batch of trie items to prefetch.
  129. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) {
  130. // If the prefetcher is an inactive one, bail out
  131. if p.fetches != nil {
  132. return
  133. }
  134. // Active fetcher, schedule the retrievals
  135. fetcher := p.fetchers[root]
  136. if fetcher == nil {
  137. fetcher = newSubfetcher(p.db, root)
  138. p.fetchers[root] = fetcher
  139. }
  140. fetcher.schedule(keys)
  141. }
  142. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  143. // have it.
  144. func (p *triePrefetcher) trie(root common.Hash) Trie {
  145. // If the prefetcher is inactive, return from existing deep copies
  146. if p.fetches != nil {
  147. trie := p.fetches[root]
  148. if trie == nil {
  149. p.deliveryMissMeter.Mark(1)
  150. return nil
  151. }
  152. return p.db.CopyTrie(trie)
  153. }
  154. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  155. fetcher := p.fetchers[root]
  156. if fetcher == nil {
  157. p.deliveryMissMeter.Mark(1)
  158. return nil
  159. }
  160. // Interrupt the prefetcher if it's by any chance still running and return
  161. // a copy of any pre-loaded trie.
  162. fetcher.abort() // safe to do multiple times
  163. trie := fetcher.peek()
  164. if trie == nil {
  165. p.deliveryMissMeter.Mark(1)
  166. return nil
  167. }
  168. return trie
  169. }
  170. // used marks a batch of state items used to allow creating statistics as to
  171. // how useful or wasteful the prefetcher is.
  172. func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
  173. if fetcher := p.fetchers[root]; fetcher != nil {
  174. fetcher.used = used
  175. }
  176. }
  177. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  178. // single trie. It is spawned when a new root is encountered and lives until the
  179. // main prefetcher is paused and either all requested items are processed or if
  180. // the trie being worked on is retrieved from the prefetcher.
  181. type subfetcher struct {
  182. db Database // Database to load trie nodes through
  183. root common.Hash // Root hash of the trie to prefetch
  184. trie Trie // Trie being populated with nodes
  185. tasks [][]byte // Items queued up for retrieval
  186. lock sync.Mutex // Lock protecting the task queue
  187. wake chan struct{} // Wake channel if a new task is scheduled
  188. stop chan struct{} // Channel to interrupt processing
  189. term chan struct{} // Channel to signal iterruption
  190. copy chan chan Trie // Channel to request a copy of the current trie
  191. seen map[string]struct{} // Tracks the entries already loaded
  192. dups int // Number of duplicate preload tasks
  193. used [][]byte // Tracks the entries used in the end
  194. }
  195. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  196. // particular root hash.
  197. func newSubfetcher(db Database, root common.Hash) *subfetcher {
  198. sf := &subfetcher{
  199. db: db,
  200. root: root,
  201. wake: make(chan struct{}, 1),
  202. stop: make(chan struct{}),
  203. term: make(chan struct{}),
  204. copy: make(chan chan Trie),
  205. seen: make(map[string]struct{}),
  206. }
  207. go sf.loop()
  208. return sf
  209. }
  210. // schedule adds a batch of trie keys to the queue to prefetch.
  211. func (sf *subfetcher) schedule(keys [][]byte) {
  212. // Append the tasks to the current queue
  213. sf.lock.Lock()
  214. sf.tasks = append(sf.tasks, keys...)
  215. sf.lock.Unlock()
  216. // Notify the prefetcher, it's fine if it's already terminated
  217. select {
  218. case sf.wake <- struct{}{}:
  219. default:
  220. }
  221. }
  222. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  223. // is currently.
  224. func (sf *subfetcher) peek() Trie {
  225. ch := make(chan Trie)
  226. select {
  227. case sf.copy <- ch:
  228. // Subfetcher still alive, return copy from it
  229. return <-ch
  230. case <-sf.term:
  231. // Subfetcher already terminated, return a copy directly
  232. if sf.trie == nil {
  233. return nil
  234. }
  235. return sf.db.CopyTrie(sf.trie)
  236. }
  237. }
  238. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  239. // times but it is not thread safe.
  240. func (sf *subfetcher) abort() {
  241. select {
  242. case <-sf.stop:
  243. default:
  244. close(sf.stop)
  245. }
  246. <-sf.term
  247. }
  248. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  249. // out of tasks or its underlying trie is retrieved for committing.
  250. func (sf *subfetcher) loop() {
  251. // No matter how the loop stops, signal anyone waiting that it's terminated
  252. defer close(sf.term)
  253. // Start by opening the trie and stop processing if it fails
  254. trie, err := sf.db.OpenTrie(sf.root)
  255. if err != nil {
  256. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  257. return
  258. }
  259. sf.trie = trie
  260. // Trie opened successfully, keep prefetching items
  261. for {
  262. select {
  263. case <-sf.wake:
  264. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  265. sf.lock.Lock()
  266. tasks := sf.tasks
  267. sf.tasks = nil
  268. sf.lock.Unlock()
  269. // Prefetch any tasks until the loop is interrupted
  270. for i, task := range tasks {
  271. select {
  272. case <-sf.stop:
  273. // If termination is requested, add any leftover back and return
  274. sf.lock.Lock()
  275. sf.tasks = append(sf.tasks, tasks[i:]...)
  276. sf.lock.Unlock()
  277. return
  278. case ch := <-sf.copy:
  279. // Somebody wants a copy of the current trie, grant them
  280. ch <- sf.db.CopyTrie(sf.trie)
  281. default:
  282. // No termination request yet, prefetch the next entry
  283. taskid := string(task)
  284. if _, ok := sf.seen[taskid]; ok {
  285. sf.dups++
  286. } else {
  287. sf.trie.TryGet(task)
  288. sf.seen[taskid] = struct{}{}
  289. }
  290. }
  291. }
  292. case ch := <-sf.copy:
  293. // Somebody wants a copy of the current trie, grant them
  294. ch <- sf.db.CopyTrie(sf.trie)
  295. case <-sf.stop:
  296. // Termination is requested, abort and leave remaining tasks
  297. return
  298. }
  299. }
  300. }