sync_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "bytes"
  19. "testing"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/crypto"
  22. "github.com/ethereum/go-ethereum/ethdb/memorydb"
  23. )
  24. // makeTestTrie create a sample test trie to test node-wise reconstruction.
  25. func makeTestTrie() (*Database, *SecureTrie, map[string][]byte) {
  26. // Create an empty trie
  27. triedb := NewDatabase(memorydb.New())
  28. trie, _ := NewSecure(common.Hash{}, triedb)
  29. // Fill it with some arbitrary data
  30. content := make(map[string][]byte)
  31. for i := byte(0); i < 255; i++ {
  32. // Map the same data under multiple keys
  33. key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
  34. content[string(key)] = val
  35. trie.Update(key, val)
  36. key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
  37. content[string(key)] = val
  38. trie.Update(key, val)
  39. // Add some other data to inflate the trie
  40. for j := byte(3); j < 13; j++ {
  41. key, val = common.LeftPadBytes([]byte{j, i}, 32), []byte{j, i}
  42. content[string(key)] = val
  43. trie.Update(key, val)
  44. }
  45. }
  46. trie.Commit(nil)
  47. // Return the generated trie
  48. return triedb, trie, content
  49. }
  50. // checkTrieContents cross references a reconstructed trie with an expected data
  51. // content map.
  52. func checkTrieContents(t *testing.T, db *Database, root []byte, content map[string][]byte) {
  53. // Check root availability and trie contents
  54. trie, err := NewSecure(common.BytesToHash(root), db)
  55. if err != nil {
  56. t.Fatalf("failed to create trie at %x: %v", root, err)
  57. }
  58. if err := checkTrieConsistency(db, common.BytesToHash(root)); err != nil {
  59. t.Fatalf("inconsistent trie at %x: %v", root, err)
  60. }
  61. for key, val := range content {
  62. if have := trie.Get([]byte(key)); !bytes.Equal(have, val) {
  63. t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
  64. }
  65. }
  66. }
  67. // checkTrieConsistency checks that all nodes in a trie are indeed present.
  68. func checkTrieConsistency(db *Database, root common.Hash) error {
  69. // Create and iterate a trie rooted in a subnode
  70. trie, err := NewSecure(root, db)
  71. if err != nil {
  72. return nil // Consider a non existent state consistent
  73. }
  74. it := trie.NodeIterator(nil)
  75. for it.Next(true) {
  76. }
  77. return it.Error()
  78. }
  79. // Tests that an empty trie is not scheduled for syncing.
  80. func TestEmptySync(t *testing.T) {
  81. dbA := NewDatabase(memorydb.New())
  82. dbB := NewDatabase(memorydb.New())
  83. emptyA, _ := New(common.Hash{}, dbA)
  84. emptyB, _ := New(emptyRoot, dbB)
  85. for i, trie := range []*Trie{emptyA, emptyB} {
  86. sync := NewSync(trie.Hash(), memorydb.New(), nil, NewSyncBloom(1, memorydb.New()))
  87. if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 {
  88. t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, nodes, paths, codes)
  89. }
  90. }
  91. }
  92. // Tests that given a root hash, a trie can sync iteratively on a single thread,
  93. // requesting retrieval tasks and returning all of them in one go.
  94. func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1, false) }
  95. func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100, false) }
  96. func TestIterativeSyncIndividualByPath(t *testing.T) { testIterativeSync(t, 1, true) }
  97. func TestIterativeSyncBatchedByPath(t *testing.T) { testIterativeSync(t, 100, true) }
  98. func testIterativeSync(t *testing.T, count int, bypath bool) {
  99. // Create a random trie to copy
  100. srcDb, srcTrie, srcData := makeTestTrie()
  101. // Create a destination trie and sync with the scheduler
  102. diskdb := memorydb.New()
  103. triedb := NewDatabase(diskdb)
  104. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  105. nodes, paths, codes := sched.Missing(count)
  106. var (
  107. hashQueue []common.Hash
  108. pathQueue []SyncPath
  109. )
  110. if !bypath {
  111. hashQueue = append(append(hashQueue[:0], nodes...), codes...)
  112. } else {
  113. hashQueue = append(hashQueue[:0], codes...)
  114. pathQueue = append(pathQueue[:0], paths...)
  115. }
  116. for len(hashQueue)+len(pathQueue) > 0 {
  117. results := make([]SyncResult, len(hashQueue)+len(pathQueue))
  118. for i, hash := range hashQueue {
  119. data, err := srcDb.Node(hash)
  120. if err != nil {
  121. t.Fatalf("failed to retrieve node data for hash %x: %v", hash, err)
  122. }
  123. results[i] = SyncResult{hash, data}
  124. }
  125. for i, path := range pathQueue {
  126. data, _, err := srcTrie.TryGetNode(path[0])
  127. if err != nil {
  128. t.Fatalf("failed to retrieve node data for path %x: %v", path, err)
  129. }
  130. results[len(hashQueue)+i] = SyncResult{crypto.Keccak256Hash(data), data}
  131. }
  132. for _, result := range results {
  133. if err := sched.Process(result); err != nil {
  134. t.Fatalf("failed to process result %v", err)
  135. }
  136. }
  137. batch := diskdb.NewBatch()
  138. if err := sched.Commit(batch); err != nil {
  139. t.Fatalf("failed to commit data: %v", err)
  140. }
  141. batch.Write()
  142. nodes, paths, codes = sched.Missing(count)
  143. if !bypath {
  144. hashQueue = append(append(hashQueue[:0], nodes...), codes...)
  145. } else {
  146. hashQueue = append(hashQueue[:0], codes...)
  147. pathQueue = append(pathQueue[:0], paths...)
  148. }
  149. }
  150. // Cross check that the two tries are in sync
  151. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  152. }
  153. // Tests that the trie scheduler can correctly reconstruct the state even if only
  154. // partial results are returned, and the others sent only later.
  155. func TestIterativeDelayedSync(t *testing.T) {
  156. // Create a random trie to copy
  157. srcDb, srcTrie, srcData := makeTestTrie()
  158. // Create a destination trie and sync with the scheduler
  159. diskdb := memorydb.New()
  160. triedb := NewDatabase(diskdb)
  161. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  162. nodes, _, codes := sched.Missing(10000)
  163. queue := append(append([]common.Hash{}, nodes...), codes...)
  164. for len(queue) > 0 {
  165. // Sync only half of the scheduled nodes
  166. results := make([]SyncResult, len(queue)/2+1)
  167. for i, hash := range queue[:len(results)] {
  168. data, err := srcDb.Node(hash)
  169. if err != nil {
  170. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  171. }
  172. results[i] = SyncResult{hash, data}
  173. }
  174. for _, result := range results {
  175. if err := sched.Process(result); err != nil {
  176. t.Fatalf("failed to process result %v", err)
  177. }
  178. }
  179. batch := diskdb.NewBatch()
  180. if err := sched.Commit(batch); err != nil {
  181. t.Fatalf("failed to commit data: %v", err)
  182. }
  183. batch.Write()
  184. nodes, _, codes = sched.Missing(10000)
  185. queue = append(append(queue[len(results):], nodes...), codes...)
  186. }
  187. // Cross check that the two tries are in sync
  188. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  189. }
  190. // Tests that given a root hash, a trie can sync iteratively on a single thread,
  191. // requesting retrieval tasks and returning all of them in one go, however in a
  192. // random order.
  193. func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) }
  194. func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) }
  195. func testIterativeRandomSync(t *testing.T, count int) {
  196. // Create a random trie to copy
  197. srcDb, srcTrie, srcData := makeTestTrie()
  198. // Create a destination trie and sync with the scheduler
  199. diskdb := memorydb.New()
  200. triedb := NewDatabase(diskdb)
  201. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  202. queue := make(map[common.Hash]struct{})
  203. nodes, _, codes := sched.Missing(count)
  204. for _, hash := range append(nodes, codes...) {
  205. queue[hash] = struct{}{}
  206. }
  207. for len(queue) > 0 {
  208. // Fetch all the queued nodes in a random order
  209. results := make([]SyncResult, 0, len(queue))
  210. for hash := range queue {
  211. data, err := srcDb.Node(hash)
  212. if err != nil {
  213. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  214. }
  215. results = append(results, SyncResult{hash, data})
  216. }
  217. // Feed the retrieved results back and queue new tasks
  218. for _, result := range results {
  219. if err := sched.Process(result); err != nil {
  220. t.Fatalf("failed to process result %v", err)
  221. }
  222. }
  223. batch := diskdb.NewBatch()
  224. if err := sched.Commit(batch); err != nil {
  225. t.Fatalf("failed to commit data: %v", err)
  226. }
  227. batch.Write()
  228. queue = make(map[common.Hash]struct{})
  229. nodes, _, codes = sched.Missing(count)
  230. for _, hash := range append(nodes, codes...) {
  231. queue[hash] = struct{}{}
  232. }
  233. }
  234. // Cross check that the two tries are in sync
  235. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  236. }
  237. // Tests that the trie scheduler can correctly reconstruct the state even if only
  238. // partial results are returned (Even those randomly), others sent only later.
  239. func TestIterativeRandomDelayedSync(t *testing.T) {
  240. // Create a random trie to copy
  241. srcDb, srcTrie, srcData := makeTestTrie()
  242. // Create a destination trie and sync with the scheduler
  243. diskdb := memorydb.New()
  244. triedb := NewDatabase(diskdb)
  245. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  246. queue := make(map[common.Hash]struct{})
  247. nodes, _, codes := sched.Missing(10000)
  248. for _, hash := range append(nodes, codes...) {
  249. queue[hash] = struct{}{}
  250. }
  251. for len(queue) > 0 {
  252. // Sync only half of the scheduled nodes, even those in random order
  253. results := make([]SyncResult, 0, len(queue)/2+1)
  254. for hash := range queue {
  255. data, err := srcDb.Node(hash)
  256. if err != nil {
  257. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  258. }
  259. results = append(results, SyncResult{hash, data})
  260. if len(results) >= cap(results) {
  261. break
  262. }
  263. }
  264. // Feed the retrieved results back and queue new tasks
  265. for _, result := range results {
  266. if err := sched.Process(result); err != nil {
  267. t.Fatalf("failed to process result %v", err)
  268. }
  269. }
  270. batch := diskdb.NewBatch()
  271. if err := sched.Commit(batch); err != nil {
  272. t.Fatalf("failed to commit data: %v", err)
  273. }
  274. batch.Write()
  275. for _, result := range results {
  276. delete(queue, result.Hash)
  277. }
  278. nodes, _, codes = sched.Missing(10000)
  279. for _, hash := range append(nodes, codes...) {
  280. queue[hash] = struct{}{}
  281. }
  282. }
  283. // Cross check that the two tries are in sync
  284. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  285. }
  286. // Tests that a trie sync will not request nodes multiple times, even if they
  287. // have such references.
  288. func TestDuplicateAvoidanceSync(t *testing.T) {
  289. // Create a random trie to copy
  290. srcDb, srcTrie, srcData := makeTestTrie()
  291. // Create a destination trie and sync with the scheduler
  292. diskdb := memorydb.New()
  293. triedb := NewDatabase(diskdb)
  294. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  295. nodes, _, codes := sched.Missing(0)
  296. queue := append(append([]common.Hash{}, nodes...), codes...)
  297. requested := make(map[common.Hash]struct{})
  298. for len(queue) > 0 {
  299. results := make([]SyncResult, len(queue))
  300. for i, hash := range queue {
  301. data, err := srcDb.Node(hash)
  302. if err != nil {
  303. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  304. }
  305. if _, ok := requested[hash]; ok {
  306. t.Errorf("hash %x already requested once", hash)
  307. }
  308. requested[hash] = struct{}{}
  309. results[i] = SyncResult{hash, data}
  310. }
  311. for _, result := range results {
  312. if err := sched.Process(result); err != nil {
  313. t.Fatalf("failed to process result %v", err)
  314. }
  315. }
  316. batch := diskdb.NewBatch()
  317. if err := sched.Commit(batch); err != nil {
  318. t.Fatalf("failed to commit data: %v", err)
  319. }
  320. batch.Write()
  321. nodes, _, codes = sched.Missing(0)
  322. queue = append(append(queue[:0], nodes...), codes...)
  323. }
  324. // Cross check that the two tries are in sync
  325. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  326. }
  327. // Tests that at any point in time during a sync, only complete sub-tries are in
  328. // the database.
  329. func TestIncompleteSync(t *testing.T) {
  330. // Create a random trie to copy
  331. srcDb, srcTrie, _ := makeTestTrie()
  332. // Create a destination trie and sync with the scheduler
  333. diskdb := memorydb.New()
  334. triedb := NewDatabase(diskdb)
  335. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  336. var added []common.Hash
  337. nodes, _, codes := sched.Missing(1)
  338. queue := append(append([]common.Hash{}, nodes...), codes...)
  339. for len(queue) > 0 {
  340. // Fetch a batch of trie nodes
  341. results := make([]SyncResult, len(queue))
  342. for i, hash := range queue {
  343. data, err := srcDb.Node(hash)
  344. if err != nil {
  345. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  346. }
  347. results[i] = SyncResult{hash, data}
  348. }
  349. // Process each of the trie nodes
  350. for _, result := range results {
  351. if err := sched.Process(result); err != nil {
  352. t.Fatalf("failed to process result %v", err)
  353. }
  354. }
  355. batch := diskdb.NewBatch()
  356. if err := sched.Commit(batch); err != nil {
  357. t.Fatalf("failed to commit data: %v", err)
  358. }
  359. batch.Write()
  360. for _, result := range results {
  361. added = append(added, result.Hash)
  362. // Check that all known sub-tries in the synced trie are complete
  363. if err := checkTrieConsistency(triedb, result.Hash); err != nil {
  364. t.Fatalf("trie inconsistent: %v", err)
  365. }
  366. }
  367. // Fetch the next batch to retrieve
  368. nodes, _, codes = sched.Missing(1)
  369. queue = append(append(queue[:0], nodes...), codes...)
  370. }
  371. // Sanity check that removing any node from the database is detected
  372. for _, node := range added[1:] {
  373. key := node.Bytes()
  374. value, _ := diskdb.Get(key)
  375. diskdb.Delete(key)
  376. if err := checkTrieConsistency(triedb, added[0]); err == nil {
  377. t.Fatalf("trie inconsistency not caught, missing: %x", key)
  378. }
  379. diskdb.Put(key, value)
  380. }
  381. }
  382. // Tests that trie nodes get scheduled lexicographically when having the same
  383. // depth.
  384. func TestSyncOrdering(t *testing.T) {
  385. // Create a random trie to copy
  386. srcDb, srcTrie, srcData := makeTestTrie()
  387. // Create a destination trie and sync with the scheduler, tracking the requests
  388. diskdb := memorydb.New()
  389. triedb := NewDatabase(diskdb)
  390. sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
  391. nodes, paths, _ := sched.Missing(1)
  392. queue := append([]common.Hash{}, nodes...)
  393. reqs := append([]SyncPath{}, paths...)
  394. for len(queue) > 0 {
  395. results := make([]SyncResult, len(queue))
  396. for i, hash := range queue {
  397. data, err := srcDb.Node(hash)
  398. if err != nil {
  399. t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
  400. }
  401. results[i] = SyncResult{hash, data}
  402. }
  403. for _, result := range results {
  404. if err := sched.Process(result); err != nil {
  405. t.Fatalf("failed to process result %v", err)
  406. }
  407. }
  408. batch := diskdb.NewBatch()
  409. if err := sched.Commit(batch); err != nil {
  410. t.Fatalf("failed to commit data: %v", err)
  411. }
  412. batch.Write()
  413. nodes, paths, _ = sched.Missing(1)
  414. queue = append(queue[:0], nodes...)
  415. reqs = append(reqs, paths...)
  416. }
  417. // Cross check that the two tries are in sync
  418. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  419. // Check that the trie nodes have been requested path-ordered
  420. for i := 0; i < len(reqs)-1; i++ {
  421. if len(reqs[i]) > 1 || len(reqs[i+1]) > 1 {
  422. // In the case of the trie tests, there's no storage so the tuples
  423. // must always be single items. 2-tuples should be tested in state.
  424. t.Errorf("Invalid request tuples: len(%v) or len(%v) > 1", reqs[i], reqs[i+1])
  425. }
  426. if bytes.Compare(compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0])) > 0 {
  427. t.Errorf("Invalid request order: %v before %v", compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0]))
  428. }
  429. }
  430. }