tx_fetcher_test.go 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "math/rand"
  21. "testing"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/core"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. )
  28. var (
  29. // testTxs is a set of transactions to use during testing that have meaningful hashes.
  30. testTxs = []*types.Transaction{
  31. types.NewTransaction(5577006791947779410, common.Address{0x0f}, new(big.Int), 0, new(big.Int), nil),
  32. types.NewTransaction(15352856648520921629, common.Address{0xbb}, new(big.Int), 0, new(big.Int), nil),
  33. types.NewTransaction(3916589616287113937, common.Address{0x86}, new(big.Int), 0, new(big.Int), nil),
  34. types.NewTransaction(9828766684487745566, common.Address{0xac}, new(big.Int), 0, new(big.Int), nil),
  35. }
  36. // testTxsHashes is the hashes of the test transactions above
  37. testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()}
  38. )
  39. type doTxNotify struct {
  40. peer string
  41. hashes []common.Hash
  42. }
  43. type doTxEnqueue struct {
  44. peer string
  45. txs []*types.Transaction
  46. direct bool
  47. }
  48. type doWait struct {
  49. time time.Duration
  50. step bool
  51. }
  52. type doDrop string
  53. type doFunc func()
  54. type isWaiting map[string][]common.Hash
  55. type isScheduled struct {
  56. tracking map[string][]common.Hash
  57. fetching map[string][]common.Hash
  58. dangling map[string][]common.Hash
  59. }
  60. type isUnderpriced int
  61. // txFetcherTest represents a test scenario that can be executed by the test
  62. // runner.
  63. type txFetcherTest struct {
  64. init func() *TxFetcher
  65. steps []interface{}
  66. }
  67. // Tests that transaction announcements are added to a waitlist, and none
  68. // of them are scheduled for retrieval until the wait expires.
  69. func TestTransactionFetcherWaiting(t *testing.T) {
  70. testTransactionFetcherParallel(t, txFetcherTest{
  71. init: func() *TxFetcher {
  72. return NewTxFetcher(
  73. func(common.Hash) bool { return false },
  74. nil,
  75. func(string, []common.Hash) error { return nil },
  76. )
  77. },
  78. steps: []interface{}{
  79. // Initial announcement to get something into the waitlist
  80. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}},
  81. isWaiting(map[string][]common.Hash{
  82. "A": {{0x01}, {0x02}},
  83. }),
  84. // Announce from a new peer to check that no overwrite happens
  85. doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}},
  86. isWaiting(map[string][]common.Hash{
  87. "A": {{0x01}, {0x02}},
  88. "B": {{0x03}, {0x04}},
  89. }),
  90. // Announce clashing hashes but unique new peer
  91. doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}},
  92. isWaiting(map[string][]common.Hash{
  93. "A": {{0x01}, {0x02}},
  94. "B": {{0x03}, {0x04}},
  95. "C": {{0x01}, {0x04}},
  96. }),
  97. // Announce existing and clashing hashes from existing peer
  98. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}},
  99. isWaiting(map[string][]common.Hash{
  100. "A": {{0x01}, {0x02}, {0x03}, {0x05}},
  101. "B": {{0x03}, {0x04}},
  102. "C": {{0x01}, {0x04}},
  103. }),
  104. isScheduled{tracking: nil, fetching: nil},
  105. // Wait for the arrival timeout which should move all expired items
  106. // from the wait list to the scheduler
  107. doWait{time: txArriveTimeout, step: true},
  108. isWaiting(nil),
  109. isScheduled{
  110. tracking: map[string][]common.Hash{
  111. "A": {{0x01}, {0x02}, {0x03}, {0x05}},
  112. "B": {{0x03}, {0x04}},
  113. "C": {{0x01}, {0x04}},
  114. },
  115. fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
  116. "A": {{0x02}, {0x03}, {0x05}},
  117. "C": {{0x01}, {0x04}},
  118. },
  119. },
  120. // Queue up a non-fetchable transaction and then trigger it with a new
  121. // peer (weird case to test 1 line in the fetcher)
  122. doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}},
  123. isWaiting(map[string][]common.Hash{
  124. "C": {{0x06}, {0x07}},
  125. }),
  126. doWait{time: txArriveTimeout, step: true},
  127. isScheduled{
  128. tracking: map[string][]common.Hash{
  129. "A": {{0x01}, {0x02}, {0x03}, {0x05}},
  130. "B": {{0x03}, {0x04}},
  131. "C": {{0x01}, {0x04}, {0x06}, {0x07}},
  132. },
  133. fetching: map[string][]common.Hash{
  134. "A": {{0x02}, {0x03}, {0x05}},
  135. "C": {{0x01}, {0x04}},
  136. },
  137. },
  138. doTxNotify{peer: "D", hashes: []common.Hash{{0x06}, {0x07}}},
  139. isScheduled{
  140. tracking: map[string][]common.Hash{
  141. "A": {{0x01}, {0x02}, {0x03}, {0x05}},
  142. "B": {{0x03}, {0x04}},
  143. "C": {{0x01}, {0x04}, {0x06}, {0x07}},
  144. "D": {{0x06}, {0x07}},
  145. },
  146. fetching: map[string][]common.Hash{
  147. "A": {{0x02}, {0x03}, {0x05}},
  148. "C": {{0x01}, {0x04}},
  149. "D": {{0x06}, {0x07}},
  150. },
  151. },
  152. },
  153. })
  154. }
  155. // Tests that transaction announcements skip the waiting list if they are
  156. // already scheduled.
  157. func TestTransactionFetcherSkipWaiting(t *testing.T) {
  158. testTransactionFetcherParallel(t, txFetcherTest{
  159. init: func() *TxFetcher {
  160. return NewTxFetcher(
  161. func(common.Hash) bool { return false },
  162. nil,
  163. func(string, []common.Hash) error { return nil },
  164. )
  165. },
  166. steps: []interface{}{
  167. // Push an initial announcement through to the scheduled stage
  168. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}},
  169. isWaiting(map[string][]common.Hash{
  170. "A": {{0x01}, {0x02}},
  171. }),
  172. isScheduled{tracking: nil, fetching: nil},
  173. doWait{time: txArriveTimeout, step: true},
  174. isWaiting(nil),
  175. isScheduled{
  176. tracking: map[string][]common.Hash{
  177. "A": {{0x01}, {0x02}},
  178. },
  179. fetching: map[string][]common.Hash{
  180. "A": {{0x01}, {0x02}},
  181. },
  182. },
  183. // Announce overlaps from the same peer, ensure the new ones end up
  184. // in stage one, and clashing ones don't get double tracked
  185. doTxNotify{peer: "A", hashes: []common.Hash{{0x02}, {0x03}}},
  186. isWaiting(map[string][]common.Hash{
  187. "A": {{0x03}},
  188. }),
  189. isScheduled{
  190. tracking: map[string][]common.Hash{
  191. "A": {{0x01}, {0x02}},
  192. },
  193. fetching: map[string][]common.Hash{
  194. "A": {{0x01}, {0x02}},
  195. },
  196. },
  197. // Announce overlaps from a new peer, ensure new transactions end up
  198. // in stage one and clashing ones get tracked for the new peer
  199. doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x04}}},
  200. isWaiting(map[string][]common.Hash{
  201. "A": {{0x03}},
  202. "B": {{0x03}, {0x04}},
  203. }),
  204. isScheduled{
  205. tracking: map[string][]common.Hash{
  206. "A": {{0x01}, {0x02}},
  207. "B": {{0x02}},
  208. },
  209. fetching: map[string][]common.Hash{
  210. "A": {{0x01}, {0x02}},
  211. },
  212. },
  213. },
  214. })
  215. }
  216. // Tests that only a single transaction request gets scheduled to a peer
  217. // and subsequent announces block or get allotted to someone else.
  218. func TestTransactionFetcherSingletonRequesting(t *testing.T) {
  219. testTransactionFetcherParallel(t, txFetcherTest{
  220. init: func() *TxFetcher {
  221. return NewTxFetcher(
  222. func(common.Hash) bool { return false },
  223. nil,
  224. func(string, []common.Hash) error { return nil },
  225. )
  226. },
  227. steps: []interface{}{
  228. // Push an initial announcement through to the scheduled stage
  229. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}},
  230. isWaiting(map[string][]common.Hash{
  231. "A": {{0x01}, {0x02}},
  232. }),
  233. isScheduled{tracking: nil, fetching: nil},
  234. doWait{time: txArriveTimeout, step: true},
  235. isWaiting(nil),
  236. isScheduled{
  237. tracking: map[string][]common.Hash{
  238. "A": {{0x01}, {0x02}},
  239. },
  240. fetching: map[string][]common.Hash{
  241. "A": {{0x01}, {0x02}},
  242. },
  243. },
  244. // Announce a new set of transactions from the same peer and ensure
  245. // they do not start fetching since the peer is already busy
  246. doTxNotify{peer: "A", hashes: []common.Hash{{0x03}, {0x04}}},
  247. isWaiting(map[string][]common.Hash{
  248. "A": {{0x03}, {0x04}},
  249. }),
  250. isScheduled{
  251. tracking: map[string][]common.Hash{
  252. "A": {{0x01}, {0x02}},
  253. },
  254. fetching: map[string][]common.Hash{
  255. "A": {{0x01}, {0x02}},
  256. },
  257. },
  258. doWait{time: txArriveTimeout, step: true},
  259. isWaiting(nil),
  260. isScheduled{
  261. tracking: map[string][]common.Hash{
  262. "A": {{0x01}, {0x02}, {0x03}, {0x04}},
  263. },
  264. fetching: map[string][]common.Hash{
  265. "A": {{0x01}, {0x02}},
  266. },
  267. },
  268. // Announce a duplicate set of transactions from a new peer and ensure
  269. // uniquely new ones start downloading, even if clashing.
  270. doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}},
  271. isWaiting(map[string][]common.Hash{
  272. "B": {{0x05}, {0x06}},
  273. }),
  274. isScheduled{
  275. tracking: map[string][]common.Hash{
  276. "A": {{0x01}, {0x02}, {0x03}, {0x04}},
  277. "B": {{0x02}, {0x03}},
  278. },
  279. fetching: map[string][]common.Hash{
  280. "A": {{0x01}, {0x02}},
  281. "B": {{0x03}},
  282. },
  283. },
  284. },
  285. })
  286. }
  287. // Tests that if a transaction retrieval fails, all the transactions get
  288. // instantly schedule back to someone else or the announcements dropped
  289. // if no alternate source is available.
  290. func TestTransactionFetcherFailedRescheduling(t *testing.T) {
  291. // Create a channel to control when tx requests can fail
  292. proceed := make(chan struct{})
  293. testTransactionFetcherParallel(t, txFetcherTest{
  294. init: func() *TxFetcher {
  295. return NewTxFetcher(
  296. func(common.Hash) bool { return false },
  297. nil,
  298. func(origin string, hashes []common.Hash) error {
  299. <-proceed
  300. return errors.New("peer disconnected")
  301. },
  302. )
  303. },
  304. steps: []interface{}{
  305. // Push an initial announcement through to the scheduled stage
  306. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}},
  307. isWaiting(map[string][]common.Hash{
  308. "A": {{0x01}, {0x02}},
  309. }),
  310. isScheduled{tracking: nil, fetching: nil},
  311. doWait{time: txArriveTimeout, step: true},
  312. isWaiting(nil),
  313. isScheduled{
  314. tracking: map[string][]common.Hash{
  315. "A": {{0x01}, {0x02}},
  316. },
  317. fetching: map[string][]common.Hash{
  318. "A": {{0x01}, {0x02}},
  319. },
  320. },
  321. // While the original peer is stuck in the request, push in an second
  322. // data source.
  323. doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}},
  324. isWaiting(nil),
  325. isScheduled{
  326. tracking: map[string][]common.Hash{
  327. "A": {{0x01}, {0x02}},
  328. "B": {{0x02}},
  329. },
  330. fetching: map[string][]common.Hash{
  331. "A": {{0x01}, {0x02}},
  332. },
  333. },
  334. // Wait until the original request fails and check that transactions
  335. // are either rescheduled or dropped
  336. doFunc(func() {
  337. proceed <- struct{}{} // Allow peer A to return the failure
  338. }),
  339. doWait{time: 0, step: true},
  340. isWaiting(nil),
  341. isScheduled{
  342. tracking: map[string][]common.Hash{
  343. "B": {{0x02}},
  344. },
  345. fetching: map[string][]common.Hash{
  346. "B": {{0x02}},
  347. },
  348. },
  349. doFunc(func() {
  350. proceed <- struct{}{} // Allow peer B to return the failure
  351. }),
  352. doWait{time: 0, step: true},
  353. isWaiting(nil),
  354. isScheduled{nil, nil, nil},
  355. },
  356. })
  357. }
  358. // Tests that if a transaction retrieval succeeds, all alternate origins
  359. // are cleaned up.
  360. func TestTransactionFetcherCleanup(t *testing.T) {
  361. testTransactionFetcherParallel(t, txFetcherTest{
  362. init: func() *TxFetcher {
  363. return NewTxFetcher(
  364. func(common.Hash) bool { return false },
  365. func(txs []*types.Transaction) []error {
  366. return make([]error, len(txs))
  367. },
  368. func(string, []common.Hash) error { return nil },
  369. )
  370. },
  371. steps: []interface{}{
  372. // Push an initial announcement through to the scheduled stage
  373. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  374. isWaiting(map[string][]common.Hash{
  375. "A": {testTxsHashes[0]},
  376. }),
  377. isScheduled{tracking: nil, fetching: nil},
  378. doWait{time: txArriveTimeout, step: true},
  379. isWaiting(nil),
  380. isScheduled{
  381. tracking: map[string][]common.Hash{
  382. "A": {testTxsHashes[0]},
  383. },
  384. fetching: map[string][]common.Hash{
  385. "A": {testTxsHashes[0]},
  386. },
  387. },
  388. // Request should be delivered
  389. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
  390. isScheduled{nil, nil, nil},
  391. },
  392. })
  393. }
  394. // Tests that if a transaction retrieval succeeds, but the response is empty (no
  395. // transactions available, then all are nuked instead of being rescheduled (yes,
  396. // this was a bug)).
  397. func TestTransactionFetcherCleanupEmpty(t *testing.T) {
  398. testTransactionFetcherParallel(t, txFetcherTest{
  399. init: func() *TxFetcher {
  400. return NewTxFetcher(
  401. func(common.Hash) bool { return false },
  402. func(txs []*types.Transaction) []error {
  403. return make([]error, len(txs))
  404. },
  405. func(string, []common.Hash) error { return nil },
  406. )
  407. },
  408. steps: []interface{}{
  409. // Push an initial announcement through to the scheduled stage
  410. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  411. isWaiting(map[string][]common.Hash{
  412. "A": {testTxsHashes[0]},
  413. }),
  414. isScheduled{tracking: nil, fetching: nil},
  415. doWait{time: txArriveTimeout, step: true},
  416. isWaiting(nil),
  417. isScheduled{
  418. tracking: map[string][]common.Hash{
  419. "A": {testTxsHashes[0]},
  420. },
  421. fetching: map[string][]common.Hash{
  422. "A": {testTxsHashes[0]},
  423. },
  424. },
  425. // Deliver an empty response and ensure the transaction is cleared, not rescheduled
  426. doTxEnqueue{peer: "A", txs: []*types.Transaction{}, direct: true},
  427. isScheduled{nil, nil, nil},
  428. },
  429. })
  430. }
  431. // Tests that non-returned transactions are either re-scheduled from a
  432. // different peer, or self if they are after the cutoff point.
  433. func TestTransactionFetcherMissingRescheduling(t *testing.T) {
  434. testTransactionFetcherParallel(t, txFetcherTest{
  435. init: func() *TxFetcher {
  436. return NewTxFetcher(
  437. func(common.Hash) bool { return false },
  438. func(txs []*types.Transaction) []error {
  439. return make([]error, len(txs))
  440. },
  441. func(string, []common.Hash) error { return nil },
  442. )
  443. },
  444. steps: []interface{}{
  445. // Push an initial announcement through to the scheduled stage
  446. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}},
  447. isWaiting(map[string][]common.Hash{
  448. "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
  449. }),
  450. isScheduled{tracking: nil, fetching: nil},
  451. doWait{time: txArriveTimeout, step: true},
  452. isWaiting(nil),
  453. isScheduled{
  454. tracking: map[string][]common.Hash{
  455. "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
  456. },
  457. fetching: map[string][]common.Hash{
  458. "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
  459. },
  460. },
  461. // Deliver the middle transaction requested, the one before which
  462. // should be dropped and the one after re-requested.
  463. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random
  464. isScheduled{
  465. tracking: map[string][]common.Hash{
  466. "A": {testTxsHashes[2]},
  467. },
  468. fetching: map[string][]common.Hash{
  469. "A": {testTxsHashes[2]},
  470. },
  471. },
  472. },
  473. })
  474. }
  475. // Tests that out of two transactions, if one is missing and the last is
  476. // delivered, the peer gets properly cleaned out from the internal state.
  477. func TestTransactionFetcherMissingCleanup(t *testing.T) {
  478. testTransactionFetcherParallel(t, txFetcherTest{
  479. init: func() *TxFetcher {
  480. return NewTxFetcher(
  481. func(common.Hash) bool { return false },
  482. func(txs []*types.Transaction) []error {
  483. return make([]error, len(txs))
  484. },
  485. func(string, []common.Hash) error { return nil },
  486. )
  487. },
  488. steps: []interface{}{
  489. // Push an initial announcement through to the scheduled stage
  490. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}},
  491. isWaiting(map[string][]common.Hash{
  492. "A": {testTxsHashes[0], testTxsHashes[1]},
  493. }),
  494. isScheduled{tracking: nil, fetching: nil},
  495. doWait{time: txArriveTimeout, step: true},
  496. isWaiting(nil),
  497. isScheduled{
  498. tracking: map[string][]common.Hash{
  499. "A": {testTxsHashes[0], testTxsHashes[1]},
  500. },
  501. fetching: map[string][]common.Hash{
  502. "A": {testTxsHashes[0], testTxsHashes[1]},
  503. },
  504. },
  505. // Deliver the middle transaction requested, the one before which
  506. // should be dropped and the one after re-requested.
  507. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, // This depends on the deterministic random
  508. isScheduled{nil, nil, nil},
  509. },
  510. })
  511. }
  512. // Tests that transaction broadcasts properly clean up announcements.
  513. func TestTransactionFetcherBroadcasts(t *testing.T) {
  514. testTransactionFetcherParallel(t, txFetcherTest{
  515. init: func() *TxFetcher {
  516. return NewTxFetcher(
  517. func(common.Hash) bool { return false },
  518. func(txs []*types.Transaction) []error {
  519. return make([]error, len(txs))
  520. },
  521. func(string, []common.Hash) error { return nil },
  522. )
  523. },
  524. steps: []interface{}{
  525. // Set up three transactions to be in different stats, waiting, queued and fetching
  526. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  527. doWait{time: txArriveTimeout, step: true},
  528. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}},
  529. doWait{time: txArriveTimeout, step: true},
  530. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}},
  531. isWaiting(map[string][]common.Hash{
  532. "A": {testTxsHashes[2]},
  533. }),
  534. isScheduled{
  535. tracking: map[string][]common.Hash{
  536. "A": {testTxsHashes[0], testTxsHashes[1]},
  537. },
  538. fetching: map[string][]common.Hash{
  539. "A": {testTxsHashes[0]},
  540. },
  541. },
  542. // Broadcast all the transactions and ensure everything gets cleaned
  543. // up, but the dangling request is left alone to avoid doing multiple
  544. // concurrent requests.
  545. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: false},
  546. isWaiting(nil),
  547. isScheduled{
  548. tracking: nil,
  549. fetching: nil,
  550. dangling: map[string][]common.Hash{
  551. "A": {testTxsHashes[0]},
  552. },
  553. },
  554. // Deliver the requested hashes
  555. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: true},
  556. isScheduled{nil, nil, nil},
  557. },
  558. })
  559. }
  560. // Tests that the waiting list timers properly reset and reschedule.
  561. func TestTransactionFetcherWaitTimerResets(t *testing.T) {
  562. testTransactionFetcherParallel(t, txFetcherTest{
  563. init: func() *TxFetcher {
  564. return NewTxFetcher(
  565. func(common.Hash) bool { return false },
  566. nil,
  567. func(string, []common.Hash) error { return nil },
  568. )
  569. },
  570. steps: []interface{}{
  571. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}},
  572. isWaiting(map[string][]common.Hash{
  573. "A": {{0x01}},
  574. }),
  575. isScheduled{nil, nil, nil},
  576. doWait{time: txArriveTimeout / 2, step: false},
  577. isWaiting(map[string][]common.Hash{
  578. "A": {{0x01}},
  579. }),
  580. isScheduled{nil, nil, nil},
  581. doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}},
  582. isWaiting(map[string][]common.Hash{
  583. "A": {{0x01}, {0x02}},
  584. }),
  585. isScheduled{nil, nil, nil},
  586. doWait{time: txArriveTimeout / 2, step: true},
  587. isWaiting(map[string][]common.Hash{
  588. "A": {{0x02}},
  589. }),
  590. isScheduled{
  591. tracking: map[string][]common.Hash{
  592. "A": {{0x01}},
  593. },
  594. fetching: map[string][]common.Hash{
  595. "A": {{0x01}},
  596. },
  597. },
  598. doWait{time: txArriveTimeout / 2, step: true},
  599. isWaiting(nil),
  600. isScheduled{
  601. tracking: map[string][]common.Hash{
  602. "A": {{0x01}, {0x02}},
  603. },
  604. fetching: map[string][]common.Hash{
  605. "A": {{0x01}},
  606. },
  607. },
  608. },
  609. })
  610. }
  611. // Tests that if a transaction request is not replied to, it will time
  612. // out and be re-scheduled for someone else.
  613. func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
  614. testTransactionFetcherParallel(t, txFetcherTest{
  615. init: func() *TxFetcher {
  616. return NewTxFetcher(
  617. func(common.Hash) bool { return false },
  618. func(txs []*types.Transaction) []error {
  619. return make([]error, len(txs))
  620. },
  621. func(string, []common.Hash) error { return nil },
  622. )
  623. },
  624. steps: []interface{}{
  625. // Push an initial announcement through to the scheduled stage
  626. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  627. isWaiting(map[string][]common.Hash{
  628. "A": {testTxsHashes[0]},
  629. }),
  630. isScheduled{tracking: nil, fetching: nil},
  631. doWait{time: txArriveTimeout, step: true},
  632. isWaiting(nil),
  633. isScheduled{
  634. tracking: map[string][]common.Hash{
  635. "A": {testTxsHashes[0]},
  636. },
  637. fetching: map[string][]common.Hash{
  638. "A": {testTxsHashes[0]},
  639. },
  640. },
  641. // Wait until the delivery times out, everything should be cleaned up
  642. doWait{time: txFetchTimeout, step: true},
  643. isWaiting(nil),
  644. isScheduled{
  645. tracking: nil,
  646. fetching: nil,
  647. dangling: map[string][]common.Hash{
  648. "A": {},
  649. },
  650. },
  651. // Ensure that followup announcements don't get scheduled
  652. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}},
  653. doWait{time: txArriveTimeout, step: true},
  654. isScheduled{
  655. tracking: map[string][]common.Hash{
  656. "A": {testTxsHashes[1]},
  657. },
  658. fetching: nil,
  659. dangling: map[string][]common.Hash{
  660. "A": {},
  661. },
  662. },
  663. // If the dangling request arrives a bit later, do not choke
  664. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
  665. isWaiting(nil),
  666. isScheduled{
  667. tracking: map[string][]common.Hash{
  668. "A": {testTxsHashes[1]},
  669. },
  670. fetching: map[string][]common.Hash{
  671. "A": {testTxsHashes[1]},
  672. },
  673. },
  674. },
  675. })
  676. }
  677. // Tests that the fetching timeout timers properly reset and reschedule.
  678. func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
  679. testTransactionFetcherParallel(t, txFetcherTest{
  680. init: func() *TxFetcher {
  681. return NewTxFetcher(
  682. func(common.Hash) bool { return false },
  683. nil,
  684. func(string, []common.Hash) error { return nil },
  685. )
  686. },
  687. steps: []interface{}{
  688. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}},
  689. doWait{time: txArriveTimeout, step: true},
  690. doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}},
  691. doWait{time: txArriveTimeout, step: true},
  692. isWaiting(nil),
  693. isScheduled{
  694. tracking: map[string][]common.Hash{
  695. "A": {{0x01}},
  696. "B": {{0x02}},
  697. },
  698. fetching: map[string][]common.Hash{
  699. "A": {{0x01}},
  700. "B": {{0x02}},
  701. },
  702. },
  703. doWait{time: txFetchTimeout - txArriveTimeout, step: true},
  704. isScheduled{
  705. tracking: map[string][]common.Hash{
  706. "B": {{0x02}},
  707. },
  708. fetching: map[string][]common.Hash{
  709. "B": {{0x02}},
  710. },
  711. dangling: map[string][]common.Hash{
  712. "A": {},
  713. },
  714. },
  715. doWait{time: txArriveTimeout, step: true},
  716. isScheduled{
  717. tracking: nil,
  718. fetching: nil,
  719. dangling: map[string][]common.Hash{
  720. "A": {},
  721. "B": {},
  722. },
  723. },
  724. },
  725. })
  726. }
  727. // Tests that if thousands of transactions are announces, only a small
  728. // number of them will be requested at a time.
  729. func TestTransactionFetcherRateLimiting(t *testing.T) {
  730. // Create a slew of transactions and to announce them
  731. var hashes []common.Hash
  732. for i := 0; i < maxTxAnnounces; i++ {
  733. hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
  734. }
  735. testTransactionFetcherParallel(t, txFetcherTest{
  736. init: func() *TxFetcher {
  737. return NewTxFetcher(
  738. func(common.Hash) bool { return false },
  739. nil,
  740. func(string, []common.Hash) error { return nil },
  741. )
  742. },
  743. steps: []interface{}{
  744. // Announce all the transactions, wait a bit and ensure only a small
  745. // percentage gets requested
  746. doTxNotify{peer: "A", hashes: hashes},
  747. doWait{time: txArriveTimeout, step: true},
  748. isWaiting(nil),
  749. isScheduled{
  750. tracking: map[string][]common.Hash{
  751. "A": hashes,
  752. },
  753. fetching: map[string][]common.Hash{
  754. "A": hashes[1643 : 1643+maxTxRetrievals],
  755. },
  756. },
  757. },
  758. })
  759. }
  760. // Tests that then number of transactions a peer is allowed to announce and/or
  761. // request at the same time is hard capped.
  762. func TestTransactionFetcherDoSProtection(t *testing.T) {
  763. // Create a slew of transactions and to announce them
  764. var hashesA []common.Hash
  765. for i := 0; i < maxTxAnnounces+1; i++ {
  766. hashesA = append(hashesA, common.Hash{0x01, byte(i / 256), byte(i % 256)})
  767. }
  768. var hashesB []common.Hash
  769. for i := 0; i < maxTxAnnounces+1; i++ {
  770. hashesB = append(hashesB, common.Hash{0x02, byte(i / 256), byte(i % 256)})
  771. }
  772. testTransactionFetcherParallel(t, txFetcherTest{
  773. init: func() *TxFetcher {
  774. return NewTxFetcher(
  775. func(common.Hash) bool { return false },
  776. nil,
  777. func(string, []common.Hash) error { return nil },
  778. )
  779. },
  780. steps: []interface{}{
  781. // Announce half of the transaction and wait for them to be scheduled
  782. doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2]},
  783. doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1]},
  784. doWait{time: txArriveTimeout, step: true},
  785. // Announce the second half and keep them in the wait list
  786. doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]},
  787. doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]},
  788. // Ensure the hashes are split half and half
  789. isWaiting(map[string][]common.Hash{
  790. "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces],
  791. "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1],
  792. }),
  793. isScheduled{
  794. tracking: map[string][]common.Hash{
  795. "A": hashesA[:maxTxAnnounces/2],
  796. "B": hashesB[:maxTxAnnounces/2-1],
  797. },
  798. fetching: map[string][]common.Hash{
  799. "A": hashesA[1643 : 1643+maxTxRetrievals],
  800. "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...),
  801. },
  802. },
  803. // Ensure that adding even one more hash results in dropping the hash
  804. doTxNotify{peer: "A", hashes: []common.Hash{hashesA[maxTxAnnounces]}},
  805. doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1]},
  806. isWaiting(map[string][]common.Hash{
  807. "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces],
  808. "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces],
  809. }),
  810. isScheduled{
  811. tracking: map[string][]common.Hash{
  812. "A": hashesA[:maxTxAnnounces/2],
  813. "B": hashesB[:maxTxAnnounces/2-1],
  814. },
  815. fetching: map[string][]common.Hash{
  816. "A": hashesA[1643 : 1643+maxTxRetrievals],
  817. "B": append(append([]common.Hash{}, hashesB[maxTxAnnounces/2-3:maxTxAnnounces/2-1]...), hashesB[:maxTxRetrievals-2]...),
  818. },
  819. },
  820. },
  821. })
  822. }
  823. // Tests that underpriced transactions don't get rescheduled after being rejected.
  824. func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
  825. testTransactionFetcherParallel(t, txFetcherTest{
  826. init: func() *TxFetcher {
  827. return NewTxFetcher(
  828. func(common.Hash) bool { return false },
  829. func(txs []*types.Transaction) []error {
  830. errs := make([]error, len(txs))
  831. for i := 0; i < len(errs); i++ {
  832. if i%2 == 0 {
  833. errs[i] = core.ErrUnderpriced
  834. } else {
  835. errs[i] = core.ErrReplaceUnderpriced
  836. }
  837. }
  838. return errs
  839. },
  840. func(string, []common.Hash) error { return nil },
  841. )
  842. },
  843. steps: []interface{}{
  844. // Deliver a transaction through the fetcher, but reject as underpriced
  845. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}},
  846. doWait{time: txArriveTimeout, step: true},
  847. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}, direct: true},
  848. isScheduled{nil, nil, nil},
  849. // Try to announce the transaction again, ensure it's not scheduled back
  850. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, // [2] is needed to force a step in the fetcher
  851. isWaiting(map[string][]common.Hash{
  852. "A": {testTxsHashes[2]},
  853. }),
  854. isScheduled{nil, nil, nil},
  855. },
  856. })
  857. }
  858. // Tests that underpriced transactions don't get rescheduled after being rejected,
  859. // but at the same time there's a hard cap on the number of transactions that are
  860. // tracked.
  861. func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
  862. // Temporarily disable fetch timeouts as they massively mess up the simulated clock
  863. defer func(timeout time.Duration) { txFetchTimeout = timeout }(txFetchTimeout)
  864. txFetchTimeout = 24 * time.Hour
  865. // Create a slew of transactions to max out the underpriced set
  866. var txs []*types.Transaction
  867. for i := 0; i < maxTxUnderpricedSetSize+1; i++ {
  868. txs = append(txs, types.NewTransaction(rand.Uint64(), common.Address{byte(rand.Intn(256))}, new(big.Int), 0, new(big.Int), nil))
  869. }
  870. hashes := make([]common.Hash, len(txs))
  871. for i, tx := range txs {
  872. hashes[i] = tx.Hash()
  873. }
  874. // Generate a set of steps to announce and deliver the entire set of transactions
  875. var steps []interface{}
  876. for i := 0; i < maxTxUnderpricedSetSize/maxTxRetrievals; i++ {
  877. steps = append(steps, doTxNotify{peer: "A", hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals]})
  878. steps = append(steps, isWaiting(map[string][]common.Hash{
  879. "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
  880. }))
  881. steps = append(steps, doWait{time: txArriveTimeout, step: true})
  882. steps = append(steps, isScheduled{
  883. tracking: map[string][]common.Hash{
  884. "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
  885. },
  886. fetching: map[string][]common.Hash{
  887. "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
  888. },
  889. })
  890. steps = append(steps, doTxEnqueue{peer: "A", txs: txs[i*maxTxRetrievals : (i+1)*maxTxRetrievals], direct: true})
  891. steps = append(steps, isWaiting(nil))
  892. steps = append(steps, isScheduled{nil, nil, nil})
  893. steps = append(steps, isUnderpriced((i+1)*maxTxRetrievals))
  894. }
  895. testTransactionFetcher(t, txFetcherTest{
  896. init: func() *TxFetcher {
  897. return NewTxFetcher(
  898. func(common.Hash) bool { return false },
  899. func(txs []*types.Transaction) []error {
  900. errs := make([]error, len(txs))
  901. for i := 0; i < len(errs); i++ {
  902. errs[i] = core.ErrUnderpriced
  903. }
  904. return errs
  905. },
  906. func(string, []common.Hash) error { return nil },
  907. )
  908. },
  909. steps: append(steps, []interface{}{
  910. // The preparation of the test has already been done in `steps`, add the last check
  911. doTxNotify{peer: "A", hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}},
  912. doWait{time: txArriveTimeout, step: true},
  913. doTxEnqueue{peer: "A", txs: []*types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true},
  914. isUnderpriced(maxTxUnderpricedSetSize),
  915. }...),
  916. })
  917. }
  918. // Tests that unexpected deliveries don't corrupt the internal state.
  919. func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
  920. testTransactionFetcherParallel(t, txFetcherTest{
  921. init: func() *TxFetcher {
  922. return NewTxFetcher(
  923. func(common.Hash) bool { return false },
  924. func(txs []*types.Transaction) []error {
  925. return make([]error, len(txs))
  926. },
  927. func(string, []common.Hash) error { return nil },
  928. )
  929. },
  930. steps: []interface{}{
  931. // Deliver something out of the blue
  932. isWaiting(nil),
  933. isScheduled{nil, nil, nil},
  934. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: false},
  935. isWaiting(nil),
  936. isScheduled{nil, nil, nil},
  937. // Set up a few hashes into various stages
  938. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  939. doWait{time: txArriveTimeout, step: true},
  940. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}},
  941. doWait{time: txArriveTimeout, step: true},
  942. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}},
  943. isWaiting(map[string][]common.Hash{
  944. "A": {testTxsHashes[2]},
  945. }),
  946. isScheduled{
  947. tracking: map[string][]common.Hash{
  948. "A": {testTxsHashes[0], testTxsHashes[1]},
  949. },
  950. fetching: map[string][]common.Hash{
  951. "A": {testTxsHashes[0]},
  952. },
  953. },
  954. // Deliver everything and more out of the blue
  955. doTxEnqueue{peer: "B", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2], testTxs[3]}, direct: true},
  956. isWaiting(nil),
  957. isScheduled{
  958. tracking: nil,
  959. fetching: nil,
  960. dangling: map[string][]common.Hash{
  961. "A": {testTxsHashes[0]},
  962. },
  963. },
  964. },
  965. })
  966. }
  967. // Tests that dropping a peer cleans out all internal data structures in all the
  968. // live or danglng stages.
  969. func TestTransactionFetcherDrop(t *testing.T) {
  970. testTransactionFetcherParallel(t, txFetcherTest{
  971. init: func() *TxFetcher {
  972. return NewTxFetcher(
  973. func(common.Hash) bool { return false },
  974. func(txs []*types.Transaction) []error {
  975. return make([]error, len(txs))
  976. },
  977. func(string, []common.Hash) error { return nil },
  978. )
  979. },
  980. steps: []interface{}{
  981. // Set up a few hashes into various stages
  982. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}},
  983. doWait{time: txArriveTimeout, step: true},
  984. doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}},
  985. doWait{time: txArriveTimeout, step: true},
  986. doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}},
  987. isWaiting(map[string][]common.Hash{
  988. "A": {{0x03}},
  989. }),
  990. isScheduled{
  991. tracking: map[string][]common.Hash{
  992. "A": {{0x01}, {0x02}},
  993. },
  994. fetching: map[string][]common.Hash{
  995. "A": {{0x01}},
  996. },
  997. },
  998. // Drop the peer and ensure everything's cleaned out
  999. doDrop("A"),
  1000. isWaiting(nil),
  1001. isScheduled{nil, nil, nil},
  1002. // Push the node into a dangling (timeout) state
  1003. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  1004. doWait{time: txArriveTimeout, step: true},
  1005. isWaiting(nil),
  1006. isScheduled{
  1007. tracking: map[string][]common.Hash{
  1008. "A": {testTxsHashes[0]},
  1009. },
  1010. fetching: map[string][]common.Hash{
  1011. "A": {testTxsHashes[0]},
  1012. },
  1013. },
  1014. doWait{time: txFetchTimeout, step: true},
  1015. isWaiting(nil),
  1016. isScheduled{
  1017. tracking: nil,
  1018. fetching: nil,
  1019. dangling: map[string][]common.Hash{
  1020. "A": {},
  1021. },
  1022. },
  1023. // Drop the peer and ensure everything's cleaned out
  1024. doDrop("A"),
  1025. isWaiting(nil),
  1026. isScheduled{nil, nil, nil},
  1027. },
  1028. })
  1029. }
  1030. // Tests that dropping a peer instantly reschedules failed announcements to any
  1031. // available peer.
  1032. func TestTransactionFetcherDropRescheduling(t *testing.T) {
  1033. testTransactionFetcherParallel(t, txFetcherTest{
  1034. init: func() *TxFetcher {
  1035. return NewTxFetcher(
  1036. func(common.Hash) bool { return false },
  1037. func(txs []*types.Transaction) []error {
  1038. return make([]error, len(txs))
  1039. },
  1040. func(string, []common.Hash) error { return nil },
  1041. )
  1042. },
  1043. steps: []interface{}{
  1044. // Set up a few hashes into various stages
  1045. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}},
  1046. doWait{time: txArriveTimeout, step: true},
  1047. doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}},
  1048. isWaiting(nil),
  1049. isScheduled{
  1050. tracking: map[string][]common.Hash{
  1051. "A": {{0x01}},
  1052. "B": {{0x01}},
  1053. },
  1054. fetching: map[string][]common.Hash{
  1055. "A": {{0x01}},
  1056. },
  1057. },
  1058. // Drop the peer and ensure everything's cleaned out
  1059. doDrop("A"),
  1060. isWaiting(nil),
  1061. isScheduled{
  1062. tracking: map[string][]common.Hash{
  1063. "B": {{0x01}},
  1064. },
  1065. fetching: map[string][]common.Hash{
  1066. "B": {{0x01}},
  1067. },
  1068. },
  1069. },
  1070. })
  1071. }
  1072. // This test reproduces a crash caught by the fuzzer. The root cause was a
  1073. // dangling transaction timing out and clashing on readd with a concurrently
  1074. // announced one.
  1075. func TestTransactionFetcherFuzzCrash01(t *testing.T) {
  1076. testTransactionFetcherParallel(t, txFetcherTest{
  1077. init: func() *TxFetcher {
  1078. return NewTxFetcher(
  1079. func(common.Hash) bool { return false },
  1080. func(txs []*types.Transaction) []error {
  1081. return make([]error, len(txs))
  1082. },
  1083. func(string, []common.Hash) error { return nil },
  1084. )
  1085. },
  1086. steps: []interface{}{
  1087. // Get a transaction into fetching mode and make it dangling with a broadcast
  1088. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  1089. doWait{time: txArriveTimeout, step: true},
  1090. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
  1091. // Notify the dangling transaction once more and crash via a timeout
  1092. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  1093. doWait{time: txFetchTimeout, step: true},
  1094. },
  1095. })
  1096. }
  1097. // This test reproduces a crash caught by the fuzzer. The root cause was a
  1098. // dangling transaction getting peer-dropped and clashing on readd with a
  1099. // concurrently announced one.
  1100. func TestTransactionFetcherFuzzCrash02(t *testing.T) {
  1101. testTransactionFetcherParallel(t, txFetcherTest{
  1102. init: func() *TxFetcher {
  1103. return NewTxFetcher(
  1104. func(common.Hash) bool { return false },
  1105. func(txs []*types.Transaction) []error {
  1106. return make([]error, len(txs))
  1107. },
  1108. func(string, []common.Hash) error { return nil },
  1109. )
  1110. },
  1111. steps: []interface{}{
  1112. // Get a transaction into fetching mode and make it dangling with a broadcast
  1113. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  1114. doWait{time: txArriveTimeout, step: true},
  1115. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
  1116. // Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout
  1117. doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}},
  1118. doWait{time: txArriveTimeout, step: true},
  1119. doDrop("A"),
  1120. doWait{time: txFetchTimeout, step: true},
  1121. },
  1122. })
  1123. }
  1124. // This test reproduces a crash caught by the fuzzer. The root cause was a
  1125. // dangling transaction getting rescheduled via a partial delivery, clashing
  1126. // with a concurrent notify.
  1127. func TestTransactionFetcherFuzzCrash03(t *testing.T) {
  1128. testTransactionFetcherParallel(t, txFetcherTest{
  1129. init: func() *TxFetcher {
  1130. return NewTxFetcher(
  1131. func(common.Hash) bool { return false },
  1132. func(txs []*types.Transaction) []error {
  1133. return make([]error, len(txs))
  1134. },
  1135. func(string, []common.Hash) error { return nil },
  1136. )
  1137. },
  1138. steps: []interface{}{
  1139. // Get a transaction into fetching mode and make it dangling with a broadcast
  1140. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}},
  1141. doWait{time: txFetchTimeout, step: true},
  1142. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}},
  1143. // Notify the dangling transaction once more, partially deliver, clash&crash with a timeout
  1144. doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}},
  1145. doWait{time: txArriveTimeout, step: true},
  1146. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true},
  1147. doWait{time: txFetchTimeout, step: true},
  1148. },
  1149. })
  1150. }
  1151. // This test reproduces a crash caught by the fuzzer. The root cause was a
  1152. // dangling transaction getting rescheduled via a disconnect, clashing with
  1153. // a concurrent notify.
  1154. func TestTransactionFetcherFuzzCrash04(t *testing.T) {
  1155. // Create a channel to control when tx requests can fail
  1156. proceed := make(chan struct{})
  1157. testTransactionFetcherParallel(t, txFetcherTest{
  1158. init: func() *TxFetcher {
  1159. return NewTxFetcher(
  1160. func(common.Hash) bool { return false },
  1161. func(txs []*types.Transaction) []error {
  1162. return make([]error, len(txs))
  1163. },
  1164. func(string, []common.Hash) error {
  1165. <-proceed
  1166. return errors.New("peer disconnected")
  1167. },
  1168. )
  1169. },
  1170. steps: []interface{}{
  1171. // Get a transaction into fetching mode and make it dangling with a broadcast
  1172. doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}},
  1173. doWait{time: txArriveTimeout, step: true},
  1174. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
  1175. // Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect
  1176. doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}},
  1177. doWait{time: txArriveTimeout, step: true},
  1178. doFunc(func() {
  1179. proceed <- struct{}{} // Allow peer A to return the failure
  1180. }),
  1181. doWait{time: 0, step: true},
  1182. doWait{time: txFetchTimeout, step: true},
  1183. },
  1184. })
  1185. }
  1186. func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
  1187. t.Parallel()
  1188. testTransactionFetcher(t, tt)
  1189. }
  1190. func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
  1191. // Create a fetcher and hook into it's simulated fields
  1192. clock := new(mclock.Simulated)
  1193. wait := make(chan struct{})
  1194. fetcher := tt.init()
  1195. fetcher.clock = clock
  1196. fetcher.step = wait
  1197. fetcher.rand = rand.New(rand.NewSource(0x3a29))
  1198. fetcher.Start()
  1199. defer fetcher.Stop()
  1200. // Crunch through all the test steps and execute them
  1201. for i, step := range tt.steps {
  1202. switch step := step.(type) {
  1203. case doTxNotify:
  1204. if err := fetcher.Notify(step.peer, step.hashes); err != nil {
  1205. t.Errorf("step %d: %v", i, err)
  1206. }
  1207. <-wait // Fetcher needs to process this, wait until it's done
  1208. select {
  1209. case <-wait:
  1210. panic("wtf")
  1211. case <-time.After(time.Millisecond):
  1212. }
  1213. case doTxEnqueue:
  1214. if err := fetcher.Enqueue(step.peer, step.txs, step.direct); err != nil {
  1215. t.Errorf("step %d: %v", i, err)
  1216. }
  1217. <-wait // Fetcher needs to process this, wait until it's done
  1218. case doWait:
  1219. clock.Run(step.time)
  1220. if step.step {
  1221. <-wait // Fetcher supposed to do something, wait until it's done
  1222. }
  1223. case doDrop:
  1224. if err := fetcher.Drop(string(step)); err != nil {
  1225. t.Errorf("step %d: %v", i, err)
  1226. }
  1227. <-wait // Fetcher needs to process this, wait until it's done
  1228. case doFunc:
  1229. step()
  1230. case isWaiting:
  1231. // We need to check that the waiting list (stage 1) internals
  1232. // match with the expected set. Check the peer->hash mappings
  1233. // first.
  1234. for peer, hashes := range step {
  1235. waiting := fetcher.waitslots[peer]
  1236. if waiting == nil {
  1237. t.Errorf("step %d: peer %s missing from waitslots", i, peer)
  1238. continue
  1239. }
  1240. for _, hash := range hashes {
  1241. if _, ok := waiting[hash]; !ok {
  1242. t.Errorf("step %d, peer %s: hash %x missing from waitslots", i, peer, hash)
  1243. }
  1244. }
  1245. for hash := range waiting {
  1246. if !containsHash(hashes, hash) {
  1247. t.Errorf("step %d, peer %s: hash %x extra in waitslots", i, peer, hash)
  1248. }
  1249. }
  1250. }
  1251. for peer := range fetcher.waitslots {
  1252. if _, ok := step[peer]; !ok {
  1253. t.Errorf("step %d: peer %s extra in waitslots", i, peer)
  1254. }
  1255. }
  1256. // Peer->hash sets correct, check the hash->peer and timeout sets
  1257. for peer, hashes := range step {
  1258. for _, hash := range hashes {
  1259. if _, ok := fetcher.waitlist[hash][peer]; !ok {
  1260. t.Errorf("step %d, hash %x: peer %s missing from waitlist", i, hash, peer)
  1261. }
  1262. if _, ok := fetcher.waittime[hash]; !ok {
  1263. t.Errorf("step %d: hash %x missing from waittime", i, hash)
  1264. }
  1265. }
  1266. }
  1267. for hash, peers := range fetcher.waitlist {
  1268. if len(peers) == 0 {
  1269. t.Errorf("step %d, hash %x: empty peerset in waitlist", i, hash)
  1270. }
  1271. for peer := range peers {
  1272. if !containsHash(step[peer], hash) {
  1273. t.Errorf("step %d, hash %x: peer %s extra in waitlist", i, hash, peer)
  1274. }
  1275. }
  1276. }
  1277. for hash := range fetcher.waittime {
  1278. var found bool
  1279. for _, hashes := range step {
  1280. if containsHash(hashes, hash) {
  1281. found = true
  1282. break
  1283. }
  1284. }
  1285. if !found {
  1286. t.Errorf("step %d,: hash %x extra in waittime", i, hash)
  1287. }
  1288. }
  1289. case isScheduled:
  1290. // Check that all scheduled announces are accounted for and no
  1291. // extra ones are present.
  1292. for peer, hashes := range step.tracking {
  1293. scheduled := fetcher.announces[peer]
  1294. if scheduled == nil {
  1295. t.Errorf("step %d: peer %s missing from announces", i, peer)
  1296. continue
  1297. }
  1298. for _, hash := range hashes {
  1299. if _, ok := scheduled[hash]; !ok {
  1300. t.Errorf("step %d, peer %s: hash %x missing from announces", i, peer, hash)
  1301. }
  1302. }
  1303. for hash := range scheduled {
  1304. if !containsHash(hashes, hash) {
  1305. t.Errorf("step %d, peer %s: hash %x extra in announces", i, peer, hash)
  1306. }
  1307. }
  1308. }
  1309. for peer := range fetcher.announces {
  1310. if _, ok := step.tracking[peer]; !ok {
  1311. t.Errorf("step %d: peer %s extra in announces", i, peer)
  1312. }
  1313. }
  1314. // Check that all announces required to be fetching are in the
  1315. // appropriate sets
  1316. for peer, hashes := range step.fetching {
  1317. request := fetcher.requests[peer]
  1318. if request == nil {
  1319. t.Errorf("step %d: peer %s missing from requests", i, peer)
  1320. continue
  1321. }
  1322. for _, hash := range hashes {
  1323. if !containsHash(request.hashes, hash) {
  1324. t.Errorf("step %d, peer %s: hash %x missing from requests", i, peer, hash)
  1325. }
  1326. }
  1327. for _, hash := range request.hashes {
  1328. if !containsHash(hashes, hash) {
  1329. t.Errorf("step %d, peer %s: hash %x extra in requests", i, peer, hash)
  1330. }
  1331. }
  1332. }
  1333. for peer := range fetcher.requests {
  1334. if _, ok := step.fetching[peer]; !ok {
  1335. if _, ok := step.dangling[peer]; !ok {
  1336. t.Errorf("step %d: peer %s extra in requests", i, peer)
  1337. }
  1338. }
  1339. }
  1340. for peer, hashes := range step.fetching {
  1341. for _, hash := range hashes {
  1342. if _, ok := fetcher.fetching[hash]; !ok {
  1343. t.Errorf("step %d, peer %s: hash %x missing from fetching", i, peer, hash)
  1344. }
  1345. }
  1346. }
  1347. for hash := range fetcher.fetching {
  1348. var found bool
  1349. for _, req := range fetcher.requests {
  1350. if containsHash(req.hashes, hash) {
  1351. found = true
  1352. break
  1353. }
  1354. }
  1355. if !found {
  1356. t.Errorf("step %d: hash %x extra in fetching", i, hash)
  1357. }
  1358. }
  1359. for _, hashes := range step.fetching {
  1360. for _, hash := range hashes {
  1361. alternates := fetcher.alternates[hash]
  1362. if alternates == nil {
  1363. t.Errorf("step %d: hash %x missing from alternates", i, hash)
  1364. continue
  1365. }
  1366. for peer := range alternates {
  1367. if _, ok := fetcher.announces[peer]; !ok {
  1368. t.Errorf("step %d: peer %s extra in alternates", i, peer)
  1369. continue
  1370. }
  1371. if _, ok := fetcher.announces[peer][hash]; !ok {
  1372. t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer)
  1373. continue
  1374. }
  1375. }
  1376. for p := range fetcher.announced[hash] {
  1377. if _, ok := alternates[p]; !ok {
  1378. t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p)
  1379. continue
  1380. }
  1381. }
  1382. }
  1383. }
  1384. for peer, hashes := range step.dangling {
  1385. request := fetcher.requests[peer]
  1386. if request == nil {
  1387. t.Errorf("step %d: peer %s missing from requests", i, peer)
  1388. continue
  1389. }
  1390. for _, hash := range hashes {
  1391. if !containsHash(request.hashes, hash) {
  1392. t.Errorf("step %d, peer %s: hash %x missing from requests", i, peer, hash)
  1393. }
  1394. }
  1395. for _, hash := range request.hashes {
  1396. if !containsHash(hashes, hash) {
  1397. t.Errorf("step %d, peer %s: hash %x extra in requests", i, peer, hash)
  1398. }
  1399. }
  1400. }
  1401. // Check that all transaction announces that are scheduled for
  1402. // retrieval but not actively being downloaded are tracked only
  1403. // in the stage 2 `announced` map.
  1404. var queued []common.Hash
  1405. for _, hashes := range step.tracking {
  1406. for _, hash := range hashes {
  1407. var found bool
  1408. for _, hs := range step.fetching {
  1409. if containsHash(hs, hash) {
  1410. found = true
  1411. break
  1412. }
  1413. }
  1414. if !found {
  1415. queued = append(queued, hash)
  1416. }
  1417. }
  1418. }
  1419. for _, hash := range queued {
  1420. if _, ok := fetcher.announced[hash]; !ok {
  1421. t.Errorf("step %d: hash %x missing from announced", i, hash)
  1422. }
  1423. }
  1424. for hash := range fetcher.announced {
  1425. if !containsHash(queued, hash) {
  1426. t.Errorf("step %d: hash %x extra in announced", i, hash)
  1427. }
  1428. }
  1429. case isUnderpriced:
  1430. if fetcher.underpriced.Cardinality() != int(step) {
  1431. t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Cardinality(), step)
  1432. }
  1433. default:
  1434. t.Fatalf("step %d: unknown step type %T", i, step)
  1435. }
  1436. // After every step, cross validate the internal uniqueness invariants
  1437. // between stage one and stage two.
  1438. for hash := range fetcher.waittime {
  1439. if _, ok := fetcher.announced[hash]; ok {
  1440. t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash)
  1441. }
  1442. }
  1443. }
  1444. }
  1445. // containsHash returns whether a hash is contained within a hash slice.
  1446. func containsHash(slice []common.Hash, hash common.Hash) bool {
  1447. for _, have := range slice {
  1448. if have == hash {
  1449. return true
  1450. }
  1451. }
  1452. return false
  1453. }