tx_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "bytes"
  19. "fmt"
  20. mrand "math/rand"
  21. "sort"
  22. "time"
  23. mapset "github.com/deckarep/golang-set"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. )
  31. const (
  32. // maxTxAnnounces is the maximum number of unique transaction a peer
  33. // can announce in a short time.
  34. maxTxAnnounces = 4096
  35. // maxTxRetrievals is the maximum transaction number can be fetched in one
  36. // request. The rationale to pick 256 is:
  37. // - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
  38. // Etherscan the average transaction size is around 200B, so in theory
  39. // we can include lots of transaction in a single protocol packet.
  40. // - However the maximum size of a single transaction is raised to 128KB,
  41. // so pick a middle value here to ensure we can maximize the efficiency
  42. // of the retrieval and response size overflow won't happen in most cases.
  43. maxTxRetrievals = 256
  44. // maxTxUnderpricedSetSize is the size of the underpriced transaction set that
  45. // is used to track recent transactions that have been dropped so we don't
  46. // re-request them.
  47. maxTxUnderpricedSetSize = 32768
  48. // txArriveTimeout is the time allowance before an announced transaction is
  49. // explicitly requested.
  50. txArriveTimeout = 500 * time.Millisecond
  51. // txGatherSlack is the interval used to collate almost-expired announces
  52. // with network fetches.
  53. txGatherSlack = 100 * time.Millisecond
  54. )
  55. var (
  56. // txFetchTimeout is the maximum allotted time to return an explicitly
  57. // requested transaction.
  58. txFetchTimeout = 5 * time.Second
  59. )
  60. var (
  61. txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
  62. txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
  63. txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
  64. txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)
  65. txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)
  66. txBroadcastKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/known", nil)
  67. txBroadcastUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/underpriced", nil)
  68. txBroadcastOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/otherreject", nil)
  69. txRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/out", nil)
  70. txRequestFailMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/fail", nil)
  71. txRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/done", nil)
  72. txRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/timeout", nil)
  73. txReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/in", nil)
  74. txReplyKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/known", nil)
  75. txReplyUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/underpriced", nil)
  76. txReplyOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/otherreject", nil)
  77. txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
  78. txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
  79. txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
  80. txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
  81. txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
  82. txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
  83. )
  84. // txAnnounce is the notification of the availability of a batch
  85. // of new transactions in the network.
  86. type txAnnounce struct {
  87. origin string // Identifier of the peer originating the notification
  88. hashes []common.Hash // Batch of transaction hashes being announced
  89. }
  90. // txRequest represents an in-flight transaction retrieval request destined to
  91. // a specific peers.
  92. type txRequest struct {
  93. hashes []common.Hash // Transactions having been requested
  94. stolen map[common.Hash]struct{} // Deliveries by someone else (don't re-request)
  95. time mclock.AbsTime // Timestamp of the request
  96. }
  97. // txDelivery is the notification that a batch of transactions have been added
  98. // to the pool and should be untracked.
  99. type txDelivery struct {
  100. origin string // Identifier of the peer originating the notification
  101. hashes []common.Hash // Batch of transaction hashes having been delivered
  102. direct bool // Whether this is a direct reply or a broadcast
  103. }
  104. // txDrop is the notiication that a peer has disconnected.
  105. type txDrop struct {
  106. peer string
  107. }
  108. // TxFetcher is responsible for retrieving new transaction based on announcements.
  109. //
  110. // The fetcher operates in 3 stages:
  111. // - Transactions that are newly discovered are moved into a wait list.
  112. // - After ~500ms passes, transactions from the wait list that have not been
  113. // broadcast to us in whole are moved into a queueing area.
  114. // - When a connected peer doesn't have in-flight retrieval requests, any
  115. // transaction queued up (and announced by the peer) are allocated to the
  116. // peer and moved into a fetching status until it's fulfilled or fails.
  117. //
  118. // The invariants of the fetcher are:
  119. // - Each tracked transaction (hash) must only be present in one of the
  120. // three stages. This ensures that the fetcher operates akin to a finite
  121. // state automata and there's do data leak.
  122. // - Each peer that announced transactions may be scheduled retrievals, but
  123. // only ever one concurrently. This ensures we can immediately know what is
  124. // missing from a reply and reschedule it.
  125. type TxFetcher struct {
  126. notify chan *txAnnounce
  127. cleanup chan *txDelivery
  128. drop chan *txDrop
  129. quit chan struct{}
  130. underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch)
  131. // Stage 1: Waiting lists for newly discovered transactions that might be
  132. // broadcast without needing explicit request/reply round trips.
  133. waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
  134. waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
  135. waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
  136. // Stage 2: Queue of transactions that waiting to be allocated to some peer
  137. // to be retrieved directly.
  138. announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
  139. announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
  140. // Stage 3: Set of transactions currently being retrieved, some which may be
  141. // fulfilled and some rescheduled. Note, this step shares 'announces' from the
  142. // previous stage to avoid having to duplicate (need it for DoS checks).
  143. fetching map[common.Hash]string // Transaction set currently being retrieved
  144. requests map[string]*txRequest // In-flight transaction retrievals
  145. alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
  146. // Callbacks
  147. hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
  148. addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
  149. fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
  150. step chan struct{} // Notification channel when the fetcher loop iterates
  151. clock mclock.Clock // Time wrapper to simulate in tests
  152. rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
  153. }
  154. // NewTxFetcher creates a transaction fetcher to retrieve transaction
  155. // based on hash announcements.
  156. func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
  157. return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
  158. }
  159. // NewTxFetcherForTests is a testing method to mock out the realtime clock with
  160. // a simulated version and the internal randomness with a deterministic one.
  161. func NewTxFetcherForTests(
  162. hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
  163. clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
  164. return &TxFetcher{
  165. notify: make(chan *txAnnounce),
  166. cleanup: make(chan *txDelivery),
  167. drop: make(chan *txDrop),
  168. quit: make(chan struct{}),
  169. waitlist: make(map[common.Hash]map[string]struct{}),
  170. waittime: make(map[common.Hash]mclock.AbsTime),
  171. waitslots: make(map[string]map[common.Hash]struct{}),
  172. announces: make(map[string]map[common.Hash]struct{}),
  173. announced: make(map[common.Hash]map[string]struct{}),
  174. fetching: make(map[common.Hash]string),
  175. requests: make(map[string]*txRequest),
  176. alternates: make(map[common.Hash]map[string]struct{}),
  177. underpriced: mapset.NewSet(),
  178. hasTx: hasTx,
  179. addTxs: addTxs,
  180. fetchTxs: fetchTxs,
  181. clock: clock,
  182. rand: rand,
  183. }
  184. }
  185. // Notify announces the fetcher of the potential availability of a new batch of
  186. // transactions in the network.
  187. func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
  188. // Keep track of all the announced transactions
  189. txAnnounceInMeter.Mark(int64(len(hashes)))
  190. // Skip any transaction announcements that we already know of, or that we've
  191. // previously marked as cheap and discarded. This check is of course racey,
  192. // because multiple concurrent notifies will still manage to pass it, but it's
  193. // still valuable to check here because it runs concurrent to the internal
  194. // loop, so anything caught here is time saved internally.
  195. var (
  196. unknowns = make([]common.Hash, 0, len(hashes))
  197. duplicate, underpriced int64
  198. )
  199. for _, hash := range hashes {
  200. switch {
  201. case f.hasTx(hash):
  202. duplicate++
  203. case f.underpriced.Contains(hash):
  204. underpriced++
  205. default:
  206. unknowns = append(unknowns, hash)
  207. }
  208. }
  209. txAnnounceKnownMeter.Mark(duplicate)
  210. txAnnounceUnderpricedMeter.Mark(underpriced)
  211. // If anything's left to announce, push it into the internal loop
  212. if len(unknowns) == 0 {
  213. return nil
  214. }
  215. announce := &txAnnounce{
  216. origin: peer,
  217. hashes: unknowns,
  218. }
  219. select {
  220. case f.notify <- announce:
  221. return nil
  222. case <-f.quit:
  223. return errTerminated
  224. }
  225. }
  226. // Enqueue imports a batch of received transaction into the transaction pool
  227. // and the fetcher. This method may be called by both transaction broadcasts and
  228. // direct request replies. The differentiation is important so the fetcher can
  229. // re-shedule missing transactions as soon as possible.
  230. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
  231. // Keep track of all the propagated transactions
  232. if direct {
  233. txReplyInMeter.Mark(int64(len(txs)))
  234. } else {
  235. txBroadcastInMeter.Mark(int64(len(txs)))
  236. }
  237. // Push all the transactions into the pool, tracking underpriced ones to avoid
  238. // re-requesting them and dropping the peer in case of malicious transfers.
  239. var (
  240. added = make([]common.Hash, 0, len(txs))
  241. duplicate int64
  242. underpriced int64
  243. otherreject int64
  244. )
  245. errs := f.addTxs(txs)
  246. for i, err := range errs {
  247. if err != nil {
  248. // Track the transaction hash if the price is too low for us.
  249. // Avoid re-request this transaction when we receive another
  250. // announcement.
  251. if err == core.ErrUnderpriced || err == core.ErrReplaceUnderpriced {
  252. for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
  253. f.underpriced.Pop()
  254. }
  255. f.underpriced.Add(txs[i].Hash())
  256. }
  257. // Track a few interesting failure types
  258. switch err {
  259. case nil: // Noop, but need to handle to not count these
  260. case core.ErrAlreadyKnown:
  261. duplicate++
  262. case core.ErrUnderpriced, core.ErrReplaceUnderpriced:
  263. underpriced++
  264. default:
  265. otherreject++
  266. }
  267. }
  268. added = append(added, txs[i].Hash())
  269. }
  270. if direct {
  271. txReplyKnownMeter.Mark(duplicate)
  272. txReplyUnderpricedMeter.Mark(underpriced)
  273. txReplyOtherRejectMeter.Mark(otherreject)
  274. } else {
  275. txBroadcastKnownMeter.Mark(duplicate)
  276. txBroadcastUnderpricedMeter.Mark(underpriced)
  277. txBroadcastOtherRejectMeter.Mark(otherreject)
  278. }
  279. select {
  280. case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
  281. return nil
  282. case <-f.quit:
  283. return errTerminated
  284. }
  285. }
  286. // Drop should be called when a peer disconnects. It cleans up all the internal
  287. // data structures of the given node.
  288. func (f *TxFetcher) Drop(peer string) error {
  289. select {
  290. case f.drop <- &txDrop{peer: peer}:
  291. return nil
  292. case <-f.quit:
  293. return errTerminated
  294. }
  295. }
  296. // Start boots up the announcement based synchroniser, accepting and processing
  297. // hash notifications and block fetches until termination requested.
  298. func (f *TxFetcher) Start() {
  299. go f.loop()
  300. }
  301. // Stop terminates the announcement based synchroniser, canceling all pending
  302. // operations.
  303. func (f *TxFetcher) Stop() {
  304. close(f.quit)
  305. }
  306. func (f *TxFetcher) loop() {
  307. var (
  308. waitTimer = new(mclock.Timer)
  309. timeoutTimer = new(mclock.Timer)
  310. waitTrigger = make(chan struct{}, 1)
  311. timeoutTrigger = make(chan struct{}, 1)
  312. )
  313. for {
  314. select {
  315. case ann := <-f.notify:
  316. // Drop part of the new announcements if there are too many accumulated.
  317. // Note, we could but do not filter already known transactions here as
  318. // the probability of something arriving between this call and the pre-
  319. // filter outside is essentially zero.
  320. used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
  321. if used >= maxTxAnnounces {
  322. // This can happen if a set of transactions are requested but not
  323. // all fulfilled, so the remainder are rescheduled without the cap
  324. // check. Should be fine as the limit is in the thousands and the
  325. // request size in the hundreds.
  326. txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
  327. break
  328. }
  329. want := used + len(ann.hashes)
  330. if want > maxTxAnnounces {
  331. txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
  332. ann.hashes = ann.hashes[:want-maxTxAnnounces]
  333. }
  334. // All is well, schedule the remainder of the transactions
  335. idleWait := len(f.waittime) == 0
  336. _, oldPeer := f.announces[ann.origin]
  337. for _, hash := range ann.hashes {
  338. // If the transaction is already downloading, add it to the list
  339. // of possible alternates (in case the current retrieval fails) and
  340. // also account it for the peer.
  341. if f.alternates[hash] != nil {
  342. f.alternates[hash][ann.origin] = struct{}{}
  343. // Stage 2 and 3 share the set of origins per tx
  344. if announces := f.announces[ann.origin]; announces != nil {
  345. announces[hash] = struct{}{}
  346. } else {
  347. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  348. }
  349. continue
  350. }
  351. // If the transaction is not downloading, but is already queued
  352. // from a different peer, track it for the new peer too.
  353. if f.announced[hash] != nil {
  354. f.announced[hash][ann.origin] = struct{}{}
  355. // Stage 2 and 3 share the set of origins per tx
  356. if announces := f.announces[ann.origin]; announces != nil {
  357. announces[hash] = struct{}{}
  358. } else {
  359. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  360. }
  361. continue
  362. }
  363. // If the transaction is already known to the fetcher, but not
  364. // yet downloading, add the peer as an alternate origin in the
  365. // waiting list.
  366. if f.waitlist[hash] != nil {
  367. f.waitlist[hash][ann.origin] = struct{}{}
  368. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  369. waitslots[hash] = struct{}{}
  370. } else {
  371. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  372. }
  373. continue
  374. }
  375. // Transaction unknown to the fetcher, insert it into the waiting list
  376. f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
  377. f.waittime[hash] = f.clock.Now()
  378. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  379. waitslots[hash] = struct{}{}
  380. } else {
  381. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  382. }
  383. }
  384. // If a new item was added to the waitlist, schedule it into the fetcher
  385. if idleWait && len(f.waittime) > 0 {
  386. f.rescheduleWait(waitTimer, waitTrigger)
  387. }
  388. // If this peer is new and announced something already queued, maybe
  389. // request transactions from them
  390. if !oldPeer && len(f.announces[ann.origin]) > 0 {
  391. f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
  392. }
  393. case <-waitTrigger:
  394. // At least one transaction's waiting time ran out, push all expired
  395. // ones into the retrieval queues
  396. actives := make(map[string]struct{})
  397. for hash, instance := range f.waittime {
  398. if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
  399. // Transaction expired without propagation, schedule for retrieval
  400. if f.announced[hash] != nil {
  401. panic("announce tracker already contains waitlist item")
  402. }
  403. f.announced[hash] = f.waitlist[hash]
  404. for peer := range f.waitlist[hash] {
  405. if announces := f.announces[peer]; announces != nil {
  406. announces[hash] = struct{}{}
  407. } else {
  408. f.announces[peer] = map[common.Hash]struct{}{hash: {}}
  409. }
  410. delete(f.waitslots[peer], hash)
  411. if len(f.waitslots[peer]) == 0 {
  412. delete(f.waitslots, peer)
  413. }
  414. actives[peer] = struct{}{}
  415. }
  416. delete(f.waittime, hash)
  417. delete(f.waitlist, hash)
  418. }
  419. }
  420. // If transactions are still waiting for propagation, reschedule the wait timer
  421. if len(f.waittime) > 0 {
  422. f.rescheduleWait(waitTimer, waitTrigger)
  423. }
  424. // If any peers became active and are idle, request transactions from them
  425. if len(actives) > 0 {
  426. f.scheduleFetches(timeoutTimer, timeoutTrigger, actives)
  427. }
  428. case <-timeoutTrigger:
  429. // Clean up any expired retrievals and avoid re-requesting them from the
  430. // same peer (either overloaded or malicious, useless in both cases). We
  431. // could also penalize (Drop), but there's nothing to gain, and if could
  432. // possibly further increase the load on it.
  433. for peer, req := range f.requests {
  434. if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
  435. txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
  436. // Reschedule all the not-yet-delivered fetches to alternate peers
  437. for _, hash := range req.hashes {
  438. // Skip rescheduling hashes already delivered by someone else
  439. if req.stolen != nil {
  440. if _, ok := req.stolen[hash]; ok {
  441. continue
  442. }
  443. }
  444. // Move the delivery back from fetching to queued
  445. if _, ok := f.announced[hash]; ok {
  446. panic("announced tracker already contains alternate item")
  447. }
  448. if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
  449. f.announced[hash] = f.alternates[hash]
  450. }
  451. delete(f.announced[hash], peer)
  452. if len(f.announced[hash]) == 0 {
  453. delete(f.announced, hash)
  454. }
  455. delete(f.announces[peer], hash)
  456. delete(f.alternates, hash)
  457. delete(f.fetching, hash)
  458. }
  459. if len(f.announces[peer]) == 0 {
  460. delete(f.announces, peer)
  461. }
  462. // Keep track of the request as dangling, but never expire
  463. f.requests[peer].hashes = nil
  464. }
  465. }
  466. // Schedule a new transaction retrieval
  467. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  468. // No idea if we scheduled something or not, trigger the timer if needed
  469. // TODO(karalabe): this is kind of lame, can't we dump it into scheduleFetches somehow?
  470. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  471. case delivery := <-f.cleanup:
  472. // Independent if the delivery was direct or broadcast, remove all
  473. // traces of the hash from internal trackers
  474. for _, hash := range delivery.hashes {
  475. if _, ok := f.waitlist[hash]; ok {
  476. for peer, txset := range f.waitslots {
  477. delete(txset, hash)
  478. if len(txset) == 0 {
  479. delete(f.waitslots, peer)
  480. }
  481. }
  482. delete(f.waitlist, hash)
  483. delete(f.waittime, hash)
  484. } else {
  485. for peer, txset := range f.announces {
  486. delete(txset, hash)
  487. if len(txset) == 0 {
  488. delete(f.announces, peer)
  489. }
  490. }
  491. delete(f.announced, hash)
  492. delete(f.alternates, hash)
  493. // If a transaction currently being fetched from a different
  494. // origin was delivered (delivery stolen), mark it so the
  495. // actual delivery won't double schedule it.
  496. if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
  497. stolen := f.requests[origin].stolen
  498. if stolen == nil {
  499. f.requests[origin].stolen = make(map[common.Hash]struct{})
  500. stolen = f.requests[origin].stolen
  501. }
  502. stolen[hash] = struct{}{}
  503. }
  504. delete(f.fetching, hash)
  505. }
  506. }
  507. // In case of a direct delivery, also reschedule anything missing
  508. // from the original query
  509. if delivery.direct {
  510. // Mark the reqesting successful (independent of individual status)
  511. txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
  512. // Make sure something was pending, nuke it
  513. req := f.requests[delivery.origin]
  514. if req == nil {
  515. log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
  516. break
  517. }
  518. delete(f.requests, delivery.origin)
  519. // Anything not delivered should be re-scheduled (with or without
  520. // this peer, depending on the response cutoff)
  521. delivered := make(map[common.Hash]struct{})
  522. for _, hash := range delivery.hashes {
  523. delivered[hash] = struct{}{}
  524. }
  525. cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
  526. for i, hash := range req.hashes {
  527. if _, ok := delivered[hash]; ok {
  528. cutoff = i
  529. }
  530. }
  531. // Reschedule missing hashes from alternates, not-fulfilled from alt+self
  532. for i, hash := range req.hashes {
  533. // Skip rescheduling hashes already delivered by someone else
  534. if req.stolen != nil {
  535. if _, ok := req.stolen[hash]; ok {
  536. continue
  537. }
  538. }
  539. if _, ok := delivered[hash]; !ok {
  540. if i < cutoff {
  541. delete(f.alternates[hash], delivery.origin)
  542. delete(f.announces[delivery.origin], hash)
  543. if len(f.announces[delivery.origin]) == 0 {
  544. delete(f.announces, delivery.origin)
  545. }
  546. }
  547. if len(f.alternates[hash]) > 0 {
  548. if _, ok := f.announced[hash]; ok {
  549. panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
  550. }
  551. f.announced[hash] = f.alternates[hash]
  552. }
  553. }
  554. delete(f.alternates, hash)
  555. delete(f.fetching, hash)
  556. }
  557. // Something was delivered, try to rechedule requests
  558. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
  559. }
  560. case drop := <-f.drop:
  561. // A peer was dropped, remove all traces of it
  562. if _, ok := f.waitslots[drop.peer]; ok {
  563. for hash := range f.waitslots[drop.peer] {
  564. delete(f.waitlist[hash], drop.peer)
  565. if len(f.waitlist[hash]) == 0 {
  566. delete(f.waitlist, hash)
  567. delete(f.waittime, hash)
  568. }
  569. }
  570. delete(f.waitslots, drop.peer)
  571. if len(f.waitlist) > 0 {
  572. f.rescheduleWait(waitTimer, waitTrigger)
  573. }
  574. }
  575. // Clean up any active requests
  576. var request *txRequest
  577. if request = f.requests[drop.peer]; request != nil {
  578. for _, hash := range request.hashes {
  579. // Skip rescheduling hashes already delivered by someone else
  580. if request.stolen != nil {
  581. if _, ok := request.stolen[hash]; ok {
  582. continue
  583. }
  584. }
  585. // Undelivered hash, reschedule if there's an alternative origin available
  586. delete(f.alternates[hash], drop.peer)
  587. if len(f.alternates[hash]) == 0 {
  588. delete(f.alternates, hash)
  589. } else {
  590. f.announced[hash] = f.alternates[hash]
  591. delete(f.alternates, hash)
  592. }
  593. delete(f.fetching, hash)
  594. }
  595. delete(f.requests, drop.peer)
  596. }
  597. // Clean up general announcement tracking
  598. if _, ok := f.announces[drop.peer]; ok {
  599. for hash := range f.announces[drop.peer] {
  600. delete(f.announced[hash], drop.peer)
  601. if len(f.announced[hash]) == 0 {
  602. delete(f.announced, hash)
  603. }
  604. }
  605. delete(f.announces, drop.peer)
  606. }
  607. // If a request was cancelled, check if anything needs to be rescheduled
  608. if request != nil {
  609. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  610. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  611. }
  612. case <-f.quit:
  613. return
  614. }
  615. // No idea what happened, but bump some sanity metrics
  616. txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
  617. txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
  618. txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
  619. txFetcherQueueingHashes.Update(int64(len(f.announced)))
  620. txFetcherFetchingPeers.Update(int64(len(f.requests)))
  621. txFetcherFetchingHashes.Update(int64(len(f.fetching)))
  622. // Loop did something, ping the step notifier if needed (tests)
  623. if f.step != nil {
  624. f.step <- struct{}{}
  625. }
  626. }
  627. }
  628. // rescheduleWait iterates over all the transactions currently in the waitlist
  629. // and schedules the movement into the fetcher for the earliest.
  630. //
  631. // The method has a granularity of 'gatherSlack', since there's not much point in
  632. // spinning over all the transactions just to maybe find one that should trigger
  633. // a few ms earlier.
  634. func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
  635. if *timer != nil {
  636. (*timer).Stop()
  637. }
  638. now := f.clock.Now()
  639. earliest := now
  640. for _, instance := range f.waittime {
  641. if earliest > instance {
  642. earliest = instance
  643. if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
  644. break
  645. }
  646. }
  647. }
  648. *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
  649. trigger <- struct{}{}
  650. })
  651. }
  652. // rescheduleTimeout iterates over all the transactions currently in flight and
  653. // schedules a cleanup run when the first would trigger.
  654. //
  655. // The method has a granularity of 'gatherSlack', since there's not much point in
  656. // spinning over all the transactions just to maybe find one that should trigger
  657. // a few ms earlier.
  658. //
  659. // This method is a bit "flaky" "by design". In theory the timeout timer only ever
  660. // should be rescheduled if some request is pending. In practice, a timeout will
  661. // cause the timer to be rescheduled every 5 secs (until the peer comes through or
  662. // disconnects). This is a limitation of the fetcher code because we don't trac
  663. // pending requests and timed out requests separatey. Without double tracking, if
  664. // we simply didn't reschedule the timer on all-timeout then the timer would never
  665. // be set again since len(request) > 0 => something's running.
  666. func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
  667. if *timer != nil {
  668. (*timer).Stop()
  669. }
  670. now := f.clock.Now()
  671. earliest := now
  672. for _, req := range f.requests {
  673. // If this request already timed out, skip it altogether
  674. if req.hashes == nil {
  675. continue
  676. }
  677. if earliest > req.time {
  678. earliest = req.time
  679. if txFetchTimeout-time.Duration(now-earliest) < gatherSlack {
  680. break
  681. }
  682. }
  683. }
  684. *timer = f.clock.AfterFunc(txFetchTimeout-time.Duration(now-earliest), func() {
  685. trigger <- struct{}{}
  686. })
  687. }
  688. // scheduleFetches starts a batch of retrievals for all available idle peers.
  689. func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, authorizationList map[string]struct{}) {
  690. // Gather the set of peers we want to retrieve from (default to all)
  691. actives := authorizationList
  692. if actives == nil {
  693. actives = make(map[string]struct{})
  694. for peer := range f.announces {
  695. actives[peer] = struct{}{}
  696. }
  697. }
  698. if len(actives) == 0 {
  699. return
  700. }
  701. // For each active peer, try to schedule some transaction fetches
  702. idle := len(f.requests) == 0
  703. f.forEachPeer(actives, func(peer string) {
  704. if f.requests[peer] != nil {
  705. return // continue in the for-each
  706. }
  707. if len(f.announces[peer]) == 0 {
  708. return // continue in the for-each
  709. }
  710. hashes := make([]common.Hash, 0, maxTxRetrievals)
  711. f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
  712. if _, ok := f.fetching[hash]; !ok {
  713. // Mark the hash as fetching and stash away possible alternates
  714. f.fetching[hash] = peer
  715. if _, ok := f.alternates[hash]; ok {
  716. panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
  717. }
  718. f.alternates[hash] = f.announced[hash]
  719. delete(f.announced, hash)
  720. // Accumulate the hash and stop if the limit was reached
  721. hashes = append(hashes, hash)
  722. if len(hashes) >= maxTxRetrievals {
  723. return false // break in the for-each
  724. }
  725. }
  726. return true // continue in the for-each
  727. })
  728. // If any hashes were allocated, request them from the peer
  729. if len(hashes) > 0 {
  730. f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
  731. txRequestOutMeter.Mark(int64(len(hashes)))
  732. go func(peer string, hashes []common.Hash) {
  733. // Try to fetch the transactions, but in case of a request
  734. // failure (e.g. peer disconnected), reschedule the hashes.
  735. if err := f.fetchTxs(peer, hashes); err != nil {
  736. txRequestFailMeter.Mark(int64(len(hashes)))
  737. f.Drop(peer)
  738. }
  739. }(peer, hashes)
  740. }
  741. })
  742. // If a new request was fired, schedule a timeout timer
  743. if idle && len(f.requests) > 0 {
  744. f.rescheduleTimeout(timer, timeout)
  745. }
  746. }
  747. // forEachPeer does a range loop over a map of peers in production, but during
  748. // testing it does a deterministic sorted random to allow reproducing issues.
  749. func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) {
  750. // If we're running production, use whatever Go's map gives us
  751. if f.rand == nil {
  752. for peer := range peers {
  753. do(peer)
  754. }
  755. return
  756. }
  757. // We're running the test suite, make iteration deterministic
  758. list := make([]string, 0, len(peers))
  759. for peer := range peers {
  760. list = append(list, peer)
  761. }
  762. sort.Strings(list)
  763. rotateStrings(list, f.rand.Intn(len(list)))
  764. for _, peer := range list {
  765. do(peer)
  766. }
  767. }
  768. // forEachHash does a range loop over a map of hashes in production, but during
  769. // testing it does a deterministic sorted random to allow reproducing issues.
  770. func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
  771. // If we're running production, use whatever Go's map gives us
  772. if f.rand == nil {
  773. for hash := range hashes {
  774. if !do(hash) {
  775. return
  776. }
  777. }
  778. return
  779. }
  780. // We're running the test suite, make iteration deterministic
  781. list := make([]common.Hash, 0, len(hashes))
  782. for hash := range hashes {
  783. list = append(list, hash)
  784. }
  785. sortHashes(list)
  786. rotateHashes(list, f.rand.Intn(len(list)))
  787. for _, hash := range list {
  788. if !do(hash) {
  789. return
  790. }
  791. }
  792. }
  793. // rotateStrings rotates the contents of a slice by n steps. This method is only
  794. // used in tests to simulate random map iteration but keep it deterministic.
  795. func rotateStrings(slice []string, n int) {
  796. orig := make([]string, len(slice))
  797. copy(orig, slice)
  798. for i := 0; i < len(orig); i++ {
  799. slice[i] = orig[(i+n)%len(orig)]
  800. }
  801. }
  802. // sortHashes sorts a slice of hashes. This method is only used in tests in order
  803. // to simulate random map iteration but keep it deterministic.
  804. func sortHashes(slice []common.Hash) {
  805. for i := 0; i < len(slice); i++ {
  806. for j := i + 1; j < len(slice); j++ {
  807. if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
  808. slice[i], slice[j] = slice[j], slice[i]
  809. }
  810. }
  811. }
  812. }
  813. // rotateHashes rotates the contents of a slice by n steps. This method is only
  814. // used in tests to simulate random map iteration but keep it deterministic.
  815. func rotateHashes(slice []common.Hash, n int) {
  816. orig := make([]common.Hash, len(slice))
  817. copy(orig, slice)
  818. for i := 0; i < len(orig); i++ {
  819. slice[i] = orig[(i+n)%len(orig)]
  820. }
  821. }