queue_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "fmt"
  19. "math/big"
  20. "math/rand"
  21. "sync"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/params"
  31. )
  32. var (
  33. testdb = rawdb.NewMemoryDatabase()
  34. genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
  35. )
  36. // makeChain creates a chain of n blocks starting at and including parent.
  37. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  38. // contains a transaction and every 5th an uncle to allow testing correct block
  39. // reassembly.
  40. func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
  41. blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
  42. block.SetCoinbase(common.Address{seed})
  43. // Add one tx to every secondblock
  44. if !empty && i%2 == 0 {
  45. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  46. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
  47. if err != nil {
  48. panic(err)
  49. }
  50. block.AddTx(tx)
  51. }
  52. })
  53. return blocks, receipts
  54. }
  55. type chainData struct {
  56. blocks []*types.Block
  57. offset int
  58. }
  59. var chain *chainData
  60. var emptyChain *chainData
  61. func init() {
  62. // Create a chain of blocks to import
  63. targetBlocks := 128
  64. blocks, _ := makeChain(targetBlocks, 0, genesis, false)
  65. chain = &chainData{blocks, 0}
  66. blocks, _ = makeChain(targetBlocks, 0, genesis, true)
  67. emptyChain = &chainData{blocks, 0}
  68. }
  69. func (chain *chainData) headers() []*types.Header {
  70. hdrs := make([]*types.Header, len(chain.blocks))
  71. for i, b := range chain.blocks {
  72. hdrs[i] = b.Header()
  73. }
  74. return hdrs
  75. }
  76. func (chain *chainData) Len() int {
  77. return len(chain.blocks)
  78. }
  79. func dummyPeer(id string) *peerConnection {
  80. p := &peerConnection{
  81. id: id,
  82. lacking: make(map[common.Hash]struct{}),
  83. }
  84. return p
  85. }
  86. func TestBasics(t *testing.T) {
  87. numOfBlocks := len(emptyChain.blocks)
  88. numOfReceipts := len(emptyChain.blocks) / 2
  89. q := newQueue(10, 10)
  90. if !q.Idle() {
  91. t.Errorf("new queue should be idle")
  92. }
  93. q.Prepare(1, FastSync)
  94. if res := q.Results(false); len(res) != 0 {
  95. t.Fatal("new queue should have 0 results")
  96. }
  97. // Schedule a batch of headers
  98. q.Schedule(chain.headers(), 1)
  99. if q.Idle() {
  100. t.Errorf("queue should not be idle")
  101. }
  102. if got, exp := q.PendingBlocks(), chain.Len(); got != exp {
  103. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  104. }
  105. // Only non-empty receipts get added to task-queue
  106. if got, exp := q.PendingReceipts(), 64; got != exp {
  107. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  108. }
  109. // Items are now queued for downloading, next step is that we tell the
  110. // queue that a certain peer will deliver them for us
  111. {
  112. peer := dummyPeer("peer-1")
  113. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  114. if !throttle {
  115. // queue size is only 10, so throttling should occur
  116. t.Fatal("should throttle")
  117. }
  118. // But we should still get the first things to fetch
  119. if got, exp := len(fetchReq.Headers), 5; got != exp {
  120. t.Fatalf("expected %d requests, got %d", exp, got)
  121. }
  122. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  123. t.Fatalf("expected header %d, got %d", exp, got)
  124. }
  125. }
  126. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  127. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  128. }
  129. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  130. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  131. }
  132. {
  133. peer := dummyPeer("peer-2")
  134. fetchReq, _, throttle := q.ReserveBodies(peer, 50)
  135. // The second peer should hit throttling
  136. if !throttle {
  137. t.Fatalf("should not throttle")
  138. }
  139. // And not get any fetches at all, since it was throttled to begin with
  140. if fetchReq != nil {
  141. t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers))
  142. }
  143. }
  144. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  145. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  146. }
  147. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts; exp != got {
  148. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  149. }
  150. {
  151. // The receipt delivering peer should not be affected
  152. // by the throttling of body deliveries
  153. peer := dummyPeer("peer-3")
  154. fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
  155. if !throttle {
  156. // queue size is only 10, so throttling should occur
  157. t.Fatal("should throttle")
  158. }
  159. // But we should still get the first things to fetch
  160. if got, exp := len(fetchReq.Headers), 5; got != exp {
  161. t.Fatalf("expected %d requests, got %d", exp, got)
  162. }
  163. if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
  164. t.Fatalf("expected header %d, got %d", exp, got)
  165. }
  166. }
  167. if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got {
  168. t.Errorf("expected block task queue to be %d, got %d", exp, got)
  169. }
  170. if exp, got := q.receiptTaskQueue.Size(), numOfReceipts-5; exp != got {
  171. t.Errorf("expected receipt task queue to be %d, got %d", exp, got)
  172. }
  173. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  174. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  175. }
  176. }
  177. func TestEmptyBlocks(t *testing.T) {
  178. numOfBlocks := len(emptyChain.blocks)
  179. q := newQueue(10, 10)
  180. q.Prepare(1, FastSync)
  181. // Schedule a batch of headers
  182. q.Schedule(emptyChain.headers(), 1)
  183. if q.Idle() {
  184. t.Errorf("queue should not be idle")
  185. }
  186. if got, exp := q.PendingBlocks(), len(emptyChain.blocks); got != exp {
  187. t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
  188. }
  189. if got, exp := q.PendingReceipts(), 0; got != exp {
  190. t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
  191. }
  192. // They won't be processable, because the fetchresults haven't been
  193. // created yet
  194. if got, exp := q.resultCache.countCompleted(), 0; got != exp {
  195. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  196. }
  197. // Items are now queued for downloading, next step is that we tell the
  198. // queue that a certain peer will deliver them for us
  199. // That should trigger all of them to suddenly become 'done'
  200. {
  201. // Reserve blocks
  202. peer := dummyPeer("peer-1")
  203. fetchReq, _, _ := q.ReserveBodies(peer, 50)
  204. // there should be nothing to fetch, blocks are empty
  205. if fetchReq != nil {
  206. t.Fatal("there should be no body fetch tasks remaining")
  207. }
  208. }
  209. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  210. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  211. }
  212. if q.receiptTaskQueue.Size() != 0 {
  213. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  214. }
  215. {
  216. peer := dummyPeer("peer-3")
  217. fetchReq, _, _ := q.ReserveReceipts(peer, 50)
  218. // there should be nothing to fetch, blocks are empty
  219. if fetchReq != nil {
  220. t.Fatal("there should be no body fetch tasks remaining")
  221. }
  222. }
  223. if q.blockTaskQueue.Size() != numOfBlocks-10 {
  224. t.Errorf("expected block task queue to be %d, got %d", numOfBlocks-10, q.blockTaskQueue.Size())
  225. }
  226. if q.receiptTaskQueue.Size() != 0 {
  227. t.Errorf("expected receipt task queue to be %d, got %d", 0, q.receiptTaskQueue.Size())
  228. }
  229. if got, exp := q.resultCache.countCompleted(), 10; got != exp {
  230. t.Errorf("wrong processable count, got %d, exp %d", got, exp)
  231. }
  232. }
  233. // XTestDelivery does some more extensive testing of events that happen,
  234. // blocks that become known and peers that make reservations and deliveries.
  235. // disabled since it's not really a unit-test, but can be executed to test
  236. // some more advanced scenarios
  237. func XTestDelivery(t *testing.T) {
  238. // the outside network, holding blocks
  239. blo, rec := makeChain(128, 0, genesis, false)
  240. world := newNetwork()
  241. world.receipts = rec
  242. world.chain = blo
  243. world.progress(10)
  244. if false {
  245. log.Root().SetHandler(log.StdoutHandler)
  246. }
  247. q := newQueue(10, 10)
  248. var wg sync.WaitGroup
  249. q.Prepare(1, FastSync)
  250. wg.Add(1)
  251. go func() {
  252. // deliver headers
  253. defer wg.Done()
  254. c := 1
  255. for {
  256. //fmt.Printf("getting headers from %d\n", c)
  257. hdrs := world.headers(c)
  258. l := len(hdrs)
  259. //fmt.Printf("scheduling %d headers, first %d last %d\n",
  260. // l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64())
  261. q.Schedule(hdrs, uint64(c))
  262. c += l
  263. }
  264. }()
  265. wg.Add(1)
  266. go func() {
  267. // collect results
  268. defer wg.Done()
  269. tot := 0
  270. for {
  271. res := q.Results(true)
  272. tot += len(res)
  273. fmt.Printf("got %d results, %d tot\n", len(res), tot)
  274. // Now we can forget about these
  275. world.forget(res[len(res)-1].Header.Number.Uint64())
  276. }
  277. }()
  278. wg.Add(1)
  279. go func() {
  280. defer wg.Done()
  281. // reserve body fetch
  282. i := 4
  283. for {
  284. peer := dummyPeer(fmt.Sprintf("peer-%d", i))
  285. f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
  286. if f != nil {
  287. var emptyList []*types.Header
  288. var txs [][]*types.Transaction
  289. var uncles [][]*types.Header
  290. numToSkip := rand.Intn(len(f.Headers))
  291. for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
  292. txs = append(txs, world.getTransactions(hdr.Number.Uint64()))
  293. uncles = append(uncles, emptyList)
  294. }
  295. time.Sleep(100 * time.Millisecond)
  296. _, err := q.DeliverBodies(peer.id, txs, uncles)
  297. if err != nil {
  298. fmt.Printf("delivered %d bodies %v\n", len(txs), err)
  299. }
  300. } else {
  301. i++
  302. time.Sleep(200 * time.Millisecond)
  303. }
  304. }
  305. }()
  306. go func() {
  307. defer wg.Done()
  308. // reserve receiptfetch
  309. peer := dummyPeer("peer-3")
  310. for {
  311. f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
  312. if f != nil {
  313. var rcs [][]*types.Receipt
  314. for _, hdr := range f.Headers {
  315. rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
  316. }
  317. _, err := q.DeliverReceipts(peer.id, rcs)
  318. if err != nil {
  319. fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
  320. }
  321. time.Sleep(100 * time.Millisecond)
  322. } else {
  323. time.Sleep(200 * time.Millisecond)
  324. }
  325. }
  326. }()
  327. wg.Add(1)
  328. go func() {
  329. defer wg.Done()
  330. for i := 0; i < 50; i++ {
  331. time.Sleep(300 * time.Millisecond)
  332. //world.tick()
  333. //fmt.Printf("trying to progress\n")
  334. world.progress(rand.Intn(100))
  335. }
  336. for i := 0; i < 50; i++ {
  337. time.Sleep(2990 * time.Millisecond)
  338. }
  339. }()
  340. wg.Add(1)
  341. go func() {
  342. defer wg.Done()
  343. for {
  344. time.Sleep(990 * time.Millisecond)
  345. fmt.Printf("world block tip is %d\n",
  346. world.chain[len(world.chain)-1].Header().Number.Uint64())
  347. fmt.Println(q.Stats())
  348. }
  349. }()
  350. wg.Wait()
  351. }
  352. func newNetwork() *network {
  353. var l sync.RWMutex
  354. return &network{
  355. cond: sync.NewCond(&l),
  356. offset: 1, // block 1 is at blocks[0]
  357. }
  358. }
  359. // represents the network
  360. type network struct {
  361. offset int
  362. chain []*types.Block
  363. receipts []types.Receipts
  364. lock sync.RWMutex
  365. cond *sync.Cond
  366. }
  367. func (n *network) getTransactions(blocknum uint64) types.Transactions {
  368. index := blocknum - uint64(n.offset)
  369. return n.chain[index].Transactions()
  370. }
  371. func (n *network) getReceipts(blocknum uint64) types.Receipts {
  372. index := blocknum - uint64(n.offset)
  373. if got := n.chain[index].Header().Number.Uint64(); got != blocknum {
  374. fmt.Printf("Err, got %d exp %d\n", got, blocknum)
  375. panic("sd")
  376. }
  377. return n.receipts[index]
  378. }
  379. func (n *network) forget(blocknum uint64) {
  380. index := blocknum - uint64(n.offset)
  381. n.chain = n.chain[index:]
  382. n.receipts = n.receipts[index:]
  383. n.offset = int(blocknum)
  384. }
  385. func (n *network) progress(numBlocks int) {
  386. n.lock.Lock()
  387. defer n.lock.Unlock()
  388. //fmt.Printf("progressing...\n")
  389. newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
  390. n.chain = append(n.chain, newBlocks...)
  391. n.receipts = append(n.receipts, newR...)
  392. n.cond.Broadcast()
  393. }
  394. func (n *network) headers(from int) []*types.Header {
  395. numHeaders := 128
  396. var hdrs []*types.Header
  397. index := from - n.offset
  398. for index >= len(n.chain) {
  399. // wait for progress
  400. n.cond.L.Lock()
  401. //fmt.Printf("header going into wait\n")
  402. n.cond.Wait()
  403. index = from - n.offset
  404. n.cond.L.Unlock()
  405. }
  406. n.lock.RLock()
  407. defer n.lock.RUnlock()
  408. for i, b := range n.chain[index:] {
  409. hdrs = append(hdrs, b.Header())
  410. if i >= numHeaders {
  411. break
  412. }
  413. }
  414. return hdrs
  415. }