block_fetcher_test.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/crypto"
  30. "github.com/ethereum/go-ethereum/params"
  31. "github.com/ethereum/go-ethereum/trie"
  32. )
  33. var (
  34. testdb = rawdb.NewMemoryDatabase()
  35. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  36. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  37. genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
  38. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil, trie.NewStackTrie(nil))
  39. )
  40. // makeChain creates a chain of n blocks starting at and including parent.
  41. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  42. // contains a transaction and every 5th an uncle to allow testing correct block
  43. // reassembly.
  44. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  45. blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
  46. block.SetCoinbase(common.Address{seed})
  47. // If the block number is multiple of 3, send a bonus transaction to the miner
  48. if parent == genesis && i%3 == 0 {
  49. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  50. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
  51. if err != nil {
  52. panic(err)
  53. }
  54. block.AddTx(tx)
  55. }
  56. // If the block number is a multiple of 5, add a bonus uncle to the block
  57. if i%5 == 0 {
  58. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
  59. }
  60. })
  61. hashes := make([]common.Hash, n+1)
  62. hashes[len(hashes)-1] = parent.Hash()
  63. blockm := make(map[common.Hash]*types.Block, n+1)
  64. blockm[parent.Hash()] = parent
  65. for i, b := range blocks {
  66. hashes[len(hashes)-i-2] = b.Hash()
  67. blockm[b.Hash()] = b
  68. }
  69. return hashes, blockm
  70. }
  71. // fetcherTester is a test simulator for mocking out local block chain.
  72. type fetcherTester struct {
  73. fetcher *BlockFetcher
  74. hashes []common.Hash // Hash chain belonging to the tester
  75. headers map[common.Hash]*types.Header // Headers belonging to the tester
  76. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  77. drops map[string]bool // Map of peers dropped by the fetcher
  78. lock sync.RWMutex
  79. }
  80. // newTester creates a new fetcher test mocker.
  81. func newTester(light bool) *fetcherTester {
  82. tester := &fetcherTester{
  83. hashes: []common.Hash{genesis.Hash()},
  84. headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
  85. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  86. drops: make(map[string]bool),
  87. }
  88. tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
  89. tester.fetcher.Start()
  90. return tester
  91. }
  92. // getHeader retrieves a header from the tester's block chain.
  93. func (f *fetcherTester) getHeader(hash common.Hash) *types.Header {
  94. f.lock.RLock()
  95. defer f.lock.RUnlock()
  96. return f.headers[hash]
  97. }
  98. // getBlock retrieves a block from the tester's block chain.
  99. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  100. f.lock.RLock()
  101. defer f.lock.RUnlock()
  102. return f.blocks[hash]
  103. }
  104. // verifyHeader is a nop placeholder for the block header verification.
  105. func (f *fetcherTester) verifyHeader(header *types.Header) error {
  106. return nil
  107. }
  108. // broadcastBlock is a nop placeholder for the block broadcasting.
  109. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  110. }
  111. // chainHeight retrieves the current height (block number) of the chain.
  112. func (f *fetcherTester) chainHeight() uint64 {
  113. f.lock.RLock()
  114. defer f.lock.RUnlock()
  115. if f.fetcher.light {
  116. return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64()
  117. }
  118. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  119. }
  120. // insertChain injects a new headers into the simulated chain.
  121. func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
  122. f.lock.Lock()
  123. defer f.lock.Unlock()
  124. for i, header := range headers {
  125. // Make sure the parent in known
  126. if _, ok := f.headers[header.ParentHash]; !ok {
  127. return i, errors.New("unknown parent")
  128. }
  129. // Discard any new blocks if the same height already exists
  130. if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() {
  131. return i, nil
  132. }
  133. // Otherwise build our current chain
  134. f.hashes = append(f.hashes, header.Hash())
  135. f.headers[header.Hash()] = header
  136. }
  137. return 0, nil
  138. }
  139. // insertChain injects a new blocks into the simulated chain.
  140. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  141. f.lock.Lock()
  142. defer f.lock.Unlock()
  143. for i, block := range blocks {
  144. // Make sure the parent in known
  145. if _, ok := f.blocks[block.ParentHash()]; !ok {
  146. return i, errors.New("unknown parent")
  147. }
  148. // Discard any new blocks if the same height already exists
  149. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  150. return i, nil
  151. }
  152. // Otherwise build our current chain
  153. f.hashes = append(f.hashes, block.Hash())
  154. f.blocks[block.Hash()] = block
  155. }
  156. return 0, nil
  157. }
  158. // dropPeer is an emulator for the peer removal, simply accumulating the various
  159. // peers dropped by the fetcher.
  160. func (f *fetcherTester) dropPeer(peer string) {
  161. f.lock.Lock()
  162. defer f.lock.Unlock()
  163. f.drops[peer] = true
  164. }
  165. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  166. func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  167. closure := make(map[common.Hash]*types.Block)
  168. for hash, block := range blocks {
  169. closure[hash] = block
  170. }
  171. // Create a function that return a header from the closure
  172. return func(hash common.Hash) error {
  173. // Gather the blocks to return
  174. headers := make([]*types.Header, 0, 1)
  175. if block, ok := closure[hash]; ok {
  176. headers = append(headers, block.Header())
  177. }
  178. // Return on a new thread
  179. go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
  180. return nil
  181. }
  182. }
  183. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  184. func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  185. closure := make(map[common.Hash]*types.Block)
  186. for hash, block := range blocks {
  187. closure[hash] = block
  188. }
  189. // Create a function that returns blocks from the closure
  190. return func(hashes []common.Hash) error {
  191. // Gather the block bodies to return
  192. transactions := make([][]*types.Transaction, 0, len(hashes))
  193. uncles := make([][]*types.Header, 0, len(hashes))
  194. for _, hash := range hashes {
  195. if block, ok := closure[hash]; ok {
  196. transactions = append(transactions, block.Transactions())
  197. uncles = append(uncles, block.Uncles())
  198. }
  199. }
  200. // Return on a new thread
  201. go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
  202. return nil
  203. }
  204. }
  205. // verifyFetchingEvent verifies that one single event arrive on a fetching channel.
  206. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  207. if arrive {
  208. select {
  209. case <-fetching:
  210. case <-time.After(time.Second):
  211. t.Fatalf("fetching timeout")
  212. }
  213. } else {
  214. select {
  215. case <-fetching:
  216. t.Fatalf("fetching invoked")
  217. case <-time.After(10 * time.Millisecond):
  218. }
  219. }
  220. }
  221. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  222. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  223. if arrive {
  224. select {
  225. case <-completing:
  226. case <-time.After(time.Second):
  227. t.Fatalf("completing timeout")
  228. }
  229. } else {
  230. select {
  231. case <-completing:
  232. t.Fatalf("completing invoked")
  233. case <-time.After(10 * time.Millisecond):
  234. }
  235. }
  236. }
  237. // verifyImportEvent verifies that one single event arrive on an import channel.
  238. func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) {
  239. if arrive {
  240. select {
  241. case <-imported:
  242. case <-time.After(time.Second):
  243. t.Fatalf("import timeout")
  244. }
  245. } else {
  246. select {
  247. case <-imported:
  248. t.Fatalf("import invoked")
  249. case <-time.After(20 * time.Millisecond):
  250. }
  251. }
  252. }
  253. // verifyImportCount verifies that exactly count number of events arrive on an
  254. // import hook channel.
  255. func verifyImportCount(t *testing.T, imported chan interface{}, count int) {
  256. for i := 0; i < count; i++ {
  257. select {
  258. case <-imported:
  259. case <-time.After(time.Second):
  260. t.Fatalf("block %d: import timeout", i+1)
  261. }
  262. }
  263. verifyImportDone(t, imported)
  264. }
  265. // verifyImportDone verifies that no more events are arriving on an import channel.
  266. func verifyImportDone(t *testing.T, imported chan interface{}) {
  267. select {
  268. case <-imported:
  269. t.Fatalf("extra block imported")
  270. case <-time.After(50 * time.Millisecond):
  271. }
  272. }
  273. // verifyChainHeight verifies the chain height is as expected.
  274. func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) {
  275. if fetcher.chainHeight() != height {
  276. t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height)
  277. }
  278. }
  279. // Tests that a fetcher accepts block/header announcements and initiates retrievals
  280. // for them, successfully importing into the local chain.
  281. func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) }
  282. func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) }
  283. func testSequentialAnnouncements(t *testing.T, light bool) {
  284. // Create a chain of blocks to import
  285. targetBlocks := 4 * hashLimit
  286. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  287. tester := newTester(light)
  288. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  289. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  290. // Iteratively announce blocks until all are imported
  291. imported := make(chan interface{})
  292. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  293. if light {
  294. if header == nil {
  295. t.Fatalf("Fetcher try to import empty header")
  296. }
  297. imported <- header
  298. } else {
  299. if block == nil {
  300. t.Fatalf("Fetcher try to import empty block")
  301. }
  302. imported <- block
  303. }
  304. }
  305. for i := len(hashes) - 2; i >= 0; i-- {
  306. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  307. verifyImportEvent(t, imported, true)
  308. }
  309. verifyImportDone(t, imported)
  310. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  311. }
  312. // Tests that if blocks are announced by multiple peers (or even the same buggy
  313. // peer), they will only get downloaded at most once.
  314. func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) }
  315. func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) }
  316. func testConcurrentAnnouncements(t *testing.T, light bool) {
  317. // Create a chain of blocks to import
  318. targetBlocks := 4 * hashLimit
  319. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  320. // Assemble a tester with a built in counter for the requests
  321. tester := newTester(light)
  322. firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
  323. firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
  324. secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
  325. secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
  326. counter := uint32(0)
  327. firstHeaderWrapper := func(hash common.Hash) error {
  328. atomic.AddUint32(&counter, 1)
  329. return firstHeaderFetcher(hash)
  330. }
  331. secondHeaderWrapper := func(hash common.Hash) error {
  332. atomic.AddUint32(&counter, 1)
  333. return secondHeaderFetcher(hash)
  334. }
  335. // Iteratively announce blocks until all are imported
  336. imported := make(chan interface{})
  337. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  338. if light {
  339. if header == nil {
  340. t.Fatalf("Fetcher try to import empty header")
  341. }
  342. imported <- header
  343. } else {
  344. if block == nil {
  345. t.Fatalf("Fetcher try to import empty block")
  346. }
  347. imported <- block
  348. }
  349. }
  350. for i := len(hashes) - 2; i >= 0; i-- {
  351. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
  352. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  353. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  354. verifyImportEvent(t, imported, true)
  355. }
  356. verifyImportDone(t, imported)
  357. // Make sure no blocks were retrieved twice
  358. if int(counter) != targetBlocks {
  359. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  360. }
  361. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  362. }
  363. // Tests that announcements arriving while a previous is being fetched still
  364. // results in a valid import.
  365. func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) }
  366. func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) }
  367. func testOverlappingAnnouncements(t *testing.T, light bool) {
  368. // Create a chain of blocks to import
  369. targetBlocks := 4 * hashLimit
  370. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  371. tester := newTester(light)
  372. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  373. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  374. // Iteratively announce blocks, but overlap them continuously
  375. overlap := 16
  376. imported := make(chan interface{}, len(hashes)-1)
  377. for i := 0; i < overlap; i++ {
  378. imported <- nil
  379. }
  380. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  381. if light {
  382. if header == nil {
  383. t.Fatalf("Fetcher try to import empty header")
  384. }
  385. imported <- header
  386. } else {
  387. if block == nil {
  388. t.Fatalf("Fetcher try to import empty block")
  389. }
  390. imported <- block
  391. }
  392. }
  393. for i := len(hashes) - 2; i >= 0; i-- {
  394. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  395. select {
  396. case <-imported:
  397. case <-time.After(time.Second):
  398. t.Fatalf("block %d: import timeout", len(hashes)-i)
  399. }
  400. }
  401. // Wait for all the imports to complete and check count
  402. verifyImportCount(t, imported, overlap)
  403. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  404. }
  405. // Tests that announces already being retrieved will not be duplicated.
  406. func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) }
  407. func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) }
  408. func testPendingDeduplication(t *testing.T, light bool) {
  409. // Create a hash and corresponding block
  410. hashes, blocks := makeChain(1, 0, genesis)
  411. // Assemble a tester with a built in counter and delayed fetcher
  412. tester := newTester(light)
  413. headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
  414. bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
  415. delay := 50 * time.Millisecond
  416. counter := uint32(0)
  417. headerWrapper := func(hash common.Hash) error {
  418. atomic.AddUint32(&counter, 1)
  419. // Simulate a long running fetch
  420. go func() {
  421. time.Sleep(delay)
  422. headerFetcher(hash)
  423. }()
  424. return nil
  425. }
  426. checkNonExist := func() bool {
  427. return tester.getBlock(hashes[0]) == nil
  428. }
  429. if light {
  430. checkNonExist = func() bool {
  431. return tester.getHeader(hashes[0]) == nil
  432. }
  433. }
  434. // Announce the same block many times until it's fetched (wait for any pending ops)
  435. for checkNonExist() {
  436. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  437. time.Sleep(time.Millisecond)
  438. }
  439. time.Sleep(delay)
  440. // Check that all blocks were imported and none fetched twice
  441. if int(counter) != 1 {
  442. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  443. }
  444. verifyChainHeight(t, tester, 1)
  445. }
  446. // Tests that announcements retrieved in a random order are cached and eventually
  447. // imported when all the gaps are filled in.
  448. func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) }
  449. func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) }
  450. func testRandomArrivalImport(t *testing.T, light bool) {
  451. // Create a chain of blocks to import, and choose one to delay
  452. targetBlocks := maxQueueDist
  453. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  454. skip := targetBlocks / 2
  455. tester := newTester(light)
  456. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  457. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  458. // Iteratively announce blocks, skipping one entry
  459. imported := make(chan interface{}, len(hashes)-1)
  460. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  461. if light {
  462. if header == nil {
  463. t.Fatalf("Fetcher try to import empty header")
  464. }
  465. imported <- header
  466. } else {
  467. if block == nil {
  468. t.Fatalf("Fetcher try to import empty block")
  469. }
  470. imported <- block
  471. }
  472. }
  473. for i := len(hashes) - 1; i >= 0; i-- {
  474. if i != skip {
  475. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  476. time.Sleep(time.Millisecond)
  477. }
  478. }
  479. // Finally announce the skipped entry and check full import
  480. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  481. verifyImportCount(t, imported, len(hashes)-1)
  482. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  483. }
  484. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  485. // are correctly schedule, filling and import queue gaps.
  486. func TestQueueGapFill(t *testing.T) {
  487. // Create a chain of blocks to import, and choose one to not announce at all
  488. targetBlocks := maxQueueDist
  489. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  490. skip := targetBlocks / 2
  491. tester := newTester(false)
  492. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  493. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  494. // Iteratively announce blocks, skipping one entry
  495. imported := make(chan interface{}, len(hashes)-1)
  496. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  497. for i := len(hashes) - 1; i >= 0; i-- {
  498. if i != skip {
  499. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  500. time.Sleep(time.Millisecond)
  501. }
  502. }
  503. // Fill the missing block directly as if propagated
  504. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  505. verifyImportCount(t, imported, len(hashes)-1)
  506. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  507. }
  508. // Tests that blocks arriving from various sources (multiple propagations, hash
  509. // announces, etc) do not get scheduled for import multiple times.
  510. func TestImportDeduplication(t *testing.T) {
  511. // Create two blocks to import (one for duplication, the other for stalling)
  512. hashes, blocks := makeChain(2, 0, genesis)
  513. // Create the tester and wrap the importer with a counter
  514. tester := newTester(false)
  515. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  516. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  517. counter := uint32(0)
  518. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  519. atomic.AddUint32(&counter, uint32(len(blocks)))
  520. return tester.insertChain(blocks)
  521. }
  522. // Instrument the fetching and imported events
  523. fetching := make(chan []common.Hash)
  524. imported := make(chan interface{}, len(hashes)-1)
  525. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  526. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  527. // Announce the duplicating block, wait for retrieval, and also propagate directly
  528. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  529. <-fetching
  530. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  531. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  532. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  533. // Fill the missing block directly as if propagated, and check import uniqueness
  534. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  535. verifyImportCount(t, imported, 2)
  536. if counter != 2 {
  537. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  538. }
  539. }
  540. // Tests that blocks with numbers much lower or higher than out current head get
  541. // discarded to prevent wasting resources on useless blocks from faulty peers.
  542. func TestDistantPropagationDiscarding(t *testing.T) {
  543. // Create a long chain to import and define the discard boundaries
  544. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  545. head := hashes[len(hashes)/2]
  546. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  547. // Create a tester and simulate a head block being the middle of the above chain
  548. tester := newTester(false)
  549. tester.lock.Lock()
  550. tester.hashes = []common.Hash{head}
  551. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  552. tester.lock.Unlock()
  553. // Ensure that a block with a lower number than the threshold is discarded
  554. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  555. time.Sleep(10 * time.Millisecond)
  556. if !tester.fetcher.queue.Empty() {
  557. t.Fatalf("fetcher queued stale block")
  558. }
  559. // Ensure that a block with a higher number than the threshold is discarded
  560. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  561. time.Sleep(10 * time.Millisecond)
  562. if !tester.fetcher.queue.Empty() {
  563. t.Fatalf("fetcher queued future block")
  564. }
  565. }
  566. // Tests that announcements with numbers much lower or higher than out current
  567. // head get discarded to prevent wasting resources on useless blocks from faulty
  568. // peers.
  569. func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) }
  570. func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) }
  571. func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
  572. // Create a long chain to import and define the discard boundaries
  573. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  574. head := hashes[len(hashes)/2]
  575. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  576. // Create a tester and simulate a head block being the middle of the above chain
  577. tester := newTester(light)
  578. tester.lock.Lock()
  579. tester.hashes = []common.Hash{head}
  580. tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()}
  581. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  582. tester.lock.Unlock()
  583. headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
  584. bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
  585. fetching := make(chan struct{}, 2)
  586. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  587. // Ensure that a block with a lower number than the threshold is discarded
  588. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  589. select {
  590. case <-time.After(50 * time.Millisecond):
  591. case <-fetching:
  592. t.Fatalf("fetcher requested stale header")
  593. }
  594. // Ensure that a block with a higher number than the threshold is discarded
  595. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  596. select {
  597. case <-time.After(50 * time.Millisecond):
  598. case <-fetching:
  599. t.Fatalf("fetcher requested future header")
  600. }
  601. }
  602. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  603. // the headers provided afterwards) get dropped as malicious.
  604. func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
  605. func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) }
  606. func testInvalidNumberAnnouncement(t *testing.T, light bool) {
  607. // Create a single block to import and check numbers against
  608. hashes, blocks := makeChain(1, 0, genesis)
  609. tester := newTester(light)
  610. badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
  611. badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
  612. imported := make(chan interface{})
  613. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  614. if light {
  615. if header == nil {
  616. t.Fatalf("Fetcher try to import empty header")
  617. }
  618. imported <- header
  619. } else {
  620. if block == nil {
  621. t.Fatalf("Fetcher try to import empty block")
  622. }
  623. imported <- block
  624. }
  625. }
  626. // Announce a block with a bad number, check for immediate drop
  627. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
  628. verifyImportEvent(t, imported, false)
  629. tester.lock.RLock()
  630. dropped := tester.drops["bad"]
  631. tester.lock.RUnlock()
  632. if !dropped {
  633. t.Fatalf("peer with invalid numbered announcement not dropped")
  634. }
  635. goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
  636. goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
  637. // Make sure a good announcement passes without a drop
  638. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
  639. verifyImportEvent(t, imported, true)
  640. tester.lock.RLock()
  641. dropped = tester.drops["good"]
  642. tester.lock.RUnlock()
  643. if dropped {
  644. t.Fatalf("peer with valid numbered announcement dropped")
  645. }
  646. verifyImportDone(t, imported)
  647. }
  648. // Tests that if a block is empty (i.e. header only), no body request should be
  649. // made, and instead the header should be assembled into a whole block in itself.
  650. func TestEmptyBlockShortCircuit(t *testing.T) {
  651. // Create a chain of blocks to import
  652. hashes, blocks := makeChain(32, 0, genesis)
  653. tester := newTester(false)
  654. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  655. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  656. // Add a monitoring hook for all internal events
  657. fetching := make(chan []common.Hash)
  658. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  659. completing := make(chan []common.Hash)
  660. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  661. imported := make(chan interface{})
  662. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  663. if block == nil {
  664. t.Fatalf("Fetcher try to import empty block")
  665. }
  666. imported <- block
  667. }
  668. // Iteratively announce blocks until all are imported
  669. for i := len(hashes) - 2; i >= 0; i-- {
  670. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  671. // All announces should fetch the header
  672. verifyFetchingEvent(t, fetching, true)
  673. // Only blocks with data contents should request bodies
  674. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  675. // Irrelevant of the construct, import should succeed
  676. verifyImportEvent(t, imported, true)
  677. }
  678. verifyImportDone(t, imported)
  679. }
  680. // Tests that a peer is unable to use unbounded memory with sending infinite
  681. // block announcements to a node, but that even in the face of such an attack,
  682. // the fetcher remains operational.
  683. func TestHashMemoryExhaustionAttack(t *testing.T) {
  684. // Create a tester with instrumented import hooks
  685. tester := newTester(false)
  686. imported, announces := make(chan interface{}), int32(0)
  687. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  688. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  689. if added {
  690. atomic.AddInt32(&announces, 1)
  691. } else {
  692. atomic.AddInt32(&announces, -1)
  693. }
  694. }
  695. // Create a valid chain and an infinite junk chain
  696. targetBlocks := hashLimit + 2*maxQueueDist
  697. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  698. validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  699. validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  700. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  701. attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
  702. attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
  703. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  704. for i := 0; i < len(attack); i++ {
  705. if i < maxQueueDist {
  706. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
  707. }
  708. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
  709. }
  710. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  711. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  712. }
  713. // Wait for fetches to complete
  714. verifyImportCount(t, imported, maxQueueDist)
  715. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  716. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  717. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
  718. verifyImportEvent(t, imported, true)
  719. }
  720. verifyImportDone(t, imported)
  721. }
  722. // Tests that blocks sent to the fetcher (either through propagation or via hash
  723. // announces and retrievals) don't pile up indefinitely, exhausting available
  724. // system memory.
  725. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  726. // Create a tester with instrumented import hooks
  727. tester := newTester(false)
  728. imported, enqueued := make(chan interface{}), int32(0)
  729. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  730. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  731. if added {
  732. atomic.AddInt32(&enqueued, 1)
  733. } else {
  734. atomic.AddInt32(&enqueued, -1)
  735. }
  736. }
  737. // Create a valid chain and a batch of dangling (but in range) blocks
  738. targetBlocks := hashLimit + 2*maxQueueDist
  739. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  740. attack := make(map[common.Hash]*types.Block)
  741. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  742. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  743. for _, hash := range hashes[:maxQueueDist-2] {
  744. attack[hash] = blocks[hash]
  745. }
  746. }
  747. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  748. for _, block := range attack {
  749. tester.fetcher.Enqueue("attacker", block)
  750. }
  751. time.Sleep(200 * time.Millisecond)
  752. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  753. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  754. }
  755. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  756. for i := 0; i < maxQueueDist-1; i++ {
  757. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  758. }
  759. time.Sleep(100 * time.Millisecond)
  760. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  761. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  762. }
  763. // Insert the missing piece (and sanity check the import)
  764. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  765. verifyImportCount(t, imported, maxQueueDist)
  766. // Insert the remaining blocks in chunks to ensure clean DOS protection
  767. for i := maxQueueDist; i < len(hashes)-1; i++ {
  768. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  769. verifyImportEvent(t, imported, true)
  770. }
  771. verifyImportDone(t, imported)
  772. }