block_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the announcement based header, blocks or transaction synchronisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "math/rand"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/metrics"
  28. "github.com/ethereum/go-ethereum/trie"
  29. )
  30. const (
  31. lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
  32. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
  33. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  34. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
  35. )
  36. const (
  37. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  38. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  39. hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
  40. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  41. )
  42. var (
  43. blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
  44. blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
  45. blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
  46. blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
  47. blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
  48. blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
  49. blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
  50. blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
  51. headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
  52. bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
  53. headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
  54. headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
  55. bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
  56. bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
  57. )
  58. var errTerminated = errors.New("terminated")
  59. // HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
  60. type HeaderRetrievalFn func(common.Hash) *types.Header
  61. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  62. type blockRetrievalFn func(common.Hash) *types.Block
  63. // headerRequesterFn is a callback type for sending a header retrieval request.
  64. type headerRequesterFn func(common.Hash) error
  65. // bodyRequesterFn is a callback type for sending a body retrieval request.
  66. type bodyRequesterFn func([]common.Hash) error
  67. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  68. type headerVerifierFn func(header *types.Header) error
  69. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  70. type blockBroadcasterFn func(block *types.Block, propagate bool)
  71. // chainHeightFn is a callback type to retrieve the current chain height.
  72. type chainHeightFn func() uint64
  73. // headersInsertFn is a callback type to insert a batch of headers into the local chain.
  74. type headersInsertFn func(headers []*types.Header) (int, error)
  75. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  76. type chainInsertFn func(types.Blocks) (int, error)
  77. // peerDropFn is a callback type for dropping a peer detected as malicious.
  78. type peerDropFn func(id string)
  79. // blockAnnounce is the hash notification of the availability of a new block in the
  80. // network.
  81. type blockAnnounce struct {
  82. hash common.Hash // Hash of the block being announced
  83. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  84. header *types.Header // Header of the block partially reassembled (new protocol)
  85. time time.Time // Timestamp of the announcement
  86. origin string // Identifier of the peer originating the notification
  87. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  88. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  89. }
  90. // headerFilterTask represents a batch of headers needing fetcher filtering.
  91. type headerFilterTask struct {
  92. peer string // The source peer of block headers
  93. headers []*types.Header // Collection of headers to filter
  94. time time.Time // Arrival time of the headers
  95. }
  96. // bodyFilterTask represents a batch of block bodies (transactions and uncles)
  97. // needing fetcher filtering.
  98. type bodyFilterTask struct {
  99. peer string // The source peer of block bodies
  100. transactions [][]*types.Transaction // Collection of transactions per block bodies
  101. uncles [][]*types.Header // Collection of uncles per block bodies
  102. time time.Time // Arrival time of the blocks' contents
  103. }
  104. // blockOrHeaderInject represents a schedules import operation.
  105. type blockOrHeaderInject struct {
  106. origin string
  107. header *types.Header // Used for light mode fetcher which only cares about header.
  108. block *types.Block // Used for normal mode fetcher which imports full block.
  109. }
  110. // number returns the block number of the injected object.
  111. func (inject *blockOrHeaderInject) number() uint64 {
  112. if inject.header != nil {
  113. return inject.header.Number.Uint64()
  114. }
  115. return inject.block.NumberU64()
  116. }
  117. // number returns the block hash of the injected object.
  118. func (inject *blockOrHeaderInject) hash() common.Hash {
  119. if inject.header != nil {
  120. return inject.header.Hash()
  121. }
  122. return inject.block.Hash()
  123. }
  124. // BlockFetcher is responsible for accumulating block announcements from various peers
  125. // and scheduling them for retrieval.
  126. type BlockFetcher struct {
  127. light bool // The indicator whether it's a light fetcher or normal one.
  128. // Various event channels
  129. notify chan *blockAnnounce
  130. inject chan *blockOrHeaderInject
  131. headerFilter chan chan *headerFilterTask
  132. bodyFilter chan chan *bodyFilterTask
  133. done chan common.Hash
  134. quit chan struct{}
  135. // Announce states
  136. announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
  137. announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
  138. fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
  139. fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
  140. completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
  141. // Block cache
  142. queue *prque.Prque // Queue containing the import operations (block number sorted)
  143. queues map[string]int // Per peer block counts to prevent memory exhaustion
  144. queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
  145. // Callbacks
  146. getHeader HeaderRetrievalFn // Retrieves a header from the local chain
  147. getBlock blockRetrievalFn // Retrieves a block from the local chain
  148. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  149. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  150. chainHeight chainHeightFn // Retrieves the current chain's height
  151. insertHeaders headersInsertFn // Injects a batch of headers into the chain
  152. insertChain chainInsertFn // Injects a batch of blocks into the chain
  153. dropPeer peerDropFn // Drops a peer for misbehaving
  154. // Testing hooks
  155. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
  156. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  157. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  158. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  159. importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
  160. }
  161. // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
  162. func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
  163. return &BlockFetcher{
  164. light: light,
  165. notify: make(chan *blockAnnounce),
  166. inject: make(chan *blockOrHeaderInject),
  167. headerFilter: make(chan chan *headerFilterTask),
  168. bodyFilter: make(chan chan *bodyFilterTask),
  169. done: make(chan common.Hash),
  170. quit: make(chan struct{}),
  171. announces: make(map[string]int),
  172. announced: make(map[common.Hash][]*blockAnnounce),
  173. fetching: make(map[common.Hash]*blockAnnounce),
  174. fetched: make(map[common.Hash][]*blockAnnounce),
  175. completing: make(map[common.Hash]*blockAnnounce),
  176. queue: prque.New(nil),
  177. queues: make(map[string]int),
  178. queued: make(map[common.Hash]*blockOrHeaderInject),
  179. getHeader: getHeader,
  180. getBlock: getBlock,
  181. verifyHeader: verifyHeader,
  182. broadcastBlock: broadcastBlock,
  183. chainHeight: chainHeight,
  184. insertHeaders: insertHeaders,
  185. insertChain: insertChain,
  186. dropPeer: dropPeer,
  187. }
  188. }
  189. // Start boots up the announcement based synchroniser, accepting and processing
  190. // hash notifications and block fetches until termination requested.
  191. func (f *BlockFetcher) Start() {
  192. go f.loop()
  193. }
  194. // Stop terminates the announcement based synchroniser, canceling all pending
  195. // operations.
  196. func (f *BlockFetcher) Stop() {
  197. close(f.quit)
  198. }
  199. // Notify announces the fetcher of the potential availability of a new block in
  200. // the network.
  201. func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  202. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  203. block := &blockAnnounce{
  204. hash: hash,
  205. number: number,
  206. time: time,
  207. origin: peer,
  208. fetchHeader: headerFetcher,
  209. fetchBodies: bodyFetcher,
  210. }
  211. select {
  212. case f.notify <- block:
  213. return nil
  214. case <-f.quit:
  215. return errTerminated
  216. }
  217. }
  218. // Enqueue tries to fill gaps the fetcher's future import queue.
  219. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
  220. op := &blockOrHeaderInject{
  221. origin: peer,
  222. block: block,
  223. }
  224. select {
  225. case f.inject <- op:
  226. return nil
  227. case <-f.quit:
  228. return errTerminated
  229. }
  230. }
  231. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  232. // returning those that should be handled differently.
  233. func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  234. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  235. // Send the filter channel to the fetcher
  236. filter := make(chan *headerFilterTask)
  237. select {
  238. case f.headerFilter <- filter:
  239. case <-f.quit:
  240. return nil
  241. }
  242. // Request the filtering of the header list
  243. select {
  244. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  245. case <-f.quit:
  246. return nil
  247. }
  248. // Retrieve the headers remaining after filtering
  249. select {
  250. case task := <-filter:
  251. return task.headers
  252. case <-f.quit:
  253. return nil
  254. }
  255. }
  256. // FilterBodies extracts all the block bodies that were explicitly requested by
  257. // the fetcher, returning those that should be handled differently.
  258. func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  259. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  260. // Send the filter channel to the fetcher
  261. filter := make(chan *bodyFilterTask)
  262. select {
  263. case f.bodyFilter <- filter:
  264. case <-f.quit:
  265. return nil, nil
  266. }
  267. // Request the filtering of the body list
  268. select {
  269. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  270. case <-f.quit:
  271. return nil, nil
  272. }
  273. // Retrieve the bodies remaining after filtering
  274. select {
  275. case task := <-filter:
  276. return task.transactions, task.uncles
  277. case <-f.quit:
  278. return nil, nil
  279. }
  280. }
  281. // Loop is the main fetcher loop, checking and processing various notification
  282. // events.
  283. func (f *BlockFetcher) loop() {
  284. // Iterate the block fetching until a quit is requested
  285. var (
  286. fetchTimer = time.NewTimer(0)
  287. completeTimer = time.NewTimer(0)
  288. )
  289. <-fetchTimer.C // clear out the channel
  290. <-completeTimer.C
  291. defer fetchTimer.Stop()
  292. defer completeTimer.Stop()
  293. for {
  294. // Clean up any expired block fetches
  295. for hash, announce := range f.fetching {
  296. if time.Since(announce.time) > fetchTimeout {
  297. f.forgetHash(hash)
  298. }
  299. }
  300. // Import any queued blocks that could potentially fit
  301. height := f.chainHeight()
  302. for !f.queue.Empty() {
  303. op := f.queue.PopItem().(*blockOrHeaderInject)
  304. hash := op.hash()
  305. if f.queueChangeHook != nil {
  306. f.queueChangeHook(hash, false)
  307. }
  308. // If too high up the chain or phase, continue later
  309. number := op.number()
  310. if number > height+1 {
  311. f.queue.Push(op, -int64(number))
  312. if f.queueChangeHook != nil {
  313. f.queueChangeHook(hash, true)
  314. }
  315. break
  316. }
  317. // Otherwise if fresh and still unknown, try and import
  318. if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
  319. f.forgetBlock(hash)
  320. continue
  321. }
  322. if f.light {
  323. f.importHeaders(op.origin, op.header)
  324. } else {
  325. f.importBlocks(op.origin, op.block)
  326. }
  327. }
  328. // Wait for an outside event to occur
  329. select {
  330. case <-f.quit:
  331. // BlockFetcher terminating, abort all operations
  332. return
  333. case notification := <-f.notify:
  334. // A block was announced, make sure the peer isn't DOSing us
  335. blockAnnounceInMeter.Mark(1)
  336. count := f.announces[notification.origin] + 1
  337. if count > hashLimit {
  338. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  339. blockAnnounceDOSMeter.Mark(1)
  340. break
  341. }
  342. // If we have a valid block number, check that it's potentially useful
  343. if notification.number > 0 {
  344. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  345. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  346. blockAnnounceDropMeter.Mark(1)
  347. break
  348. }
  349. }
  350. // All is well, schedule the announce if block's not yet downloading
  351. if _, ok := f.fetching[notification.hash]; ok {
  352. break
  353. }
  354. if _, ok := f.completing[notification.hash]; ok {
  355. break
  356. }
  357. f.announces[notification.origin] = count
  358. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  359. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  360. f.announceChangeHook(notification.hash, true)
  361. }
  362. if len(f.announced) == 1 {
  363. f.rescheduleFetch(fetchTimer)
  364. }
  365. case op := <-f.inject:
  366. // A direct block insertion was requested, try and fill any pending gaps
  367. blockBroadcastInMeter.Mark(1)
  368. // Now only direct block injection is allowed, drop the header injection
  369. // here silently if we receive.
  370. if f.light {
  371. continue
  372. }
  373. f.enqueue(op.origin, nil, op.block)
  374. case hash := <-f.done:
  375. // A pending import finished, remove all traces of the notification
  376. f.forgetHash(hash)
  377. f.forgetBlock(hash)
  378. case <-fetchTimer.C:
  379. // At least one block's timer ran out, check for needing retrieval
  380. request := make(map[string][]common.Hash)
  381. for hash, announces := range f.announced {
  382. // In current LES protocol(les2/les3), only header announce is
  383. // available, no need to wait too much time for header broadcast.
  384. timeout := arriveTimeout - gatherSlack
  385. if f.light {
  386. timeout = 0
  387. }
  388. if time.Since(announces[0].time) > timeout {
  389. // Pick a random peer to retrieve from, reset all others
  390. announce := announces[rand.Intn(len(announces))]
  391. f.forgetHash(hash)
  392. // If the block still didn't arrive, queue for fetching
  393. if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
  394. request[announce.origin] = append(request[announce.origin], hash)
  395. f.fetching[hash] = announce
  396. }
  397. }
  398. }
  399. // Send out all block header requests
  400. for peer, hashes := range request {
  401. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  402. // Create a closure of the fetch and schedule in on a new thread
  403. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  404. go func() {
  405. if f.fetchingHook != nil {
  406. f.fetchingHook(hashes)
  407. }
  408. for _, hash := range hashes {
  409. headerFetchMeter.Mark(1)
  410. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  411. }
  412. }()
  413. }
  414. // Schedule the next fetch if blocks are still pending
  415. f.rescheduleFetch(fetchTimer)
  416. case <-completeTimer.C:
  417. // At least one header's timer ran out, retrieve everything
  418. request := make(map[string][]common.Hash)
  419. for hash, announces := range f.fetched {
  420. // Pick a random peer to retrieve from, reset all others
  421. announce := announces[rand.Intn(len(announces))]
  422. f.forgetHash(hash)
  423. // If the block still didn't arrive, queue for completion
  424. if f.getBlock(hash) == nil {
  425. request[announce.origin] = append(request[announce.origin], hash)
  426. f.completing[hash] = announce
  427. }
  428. }
  429. // Send out all block body requests
  430. for peer, hashes := range request {
  431. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  432. // Create a closure of the fetch and schedule in on a new thread
  433. if f.completingHook != nil {
  434. f.completingHook(hashes)
  435. }
  436. bodyFetchMeter.Mark(int64(len(hashes)))
  437. go f.completing[hashes[0]].fetchBodies(hashes)
  438. }
  439. // Schedule the next fetch if blocks are still pending
  440. f.rescheduleComplete(completeTimer)
  441. case filter := <-f.headerFilter:
  442. // Headers arrived from a remote peer. Extract those that were explicitly
  443. // requested by the fetcher, and return everything else so it's delivered
  444. // to other parts of the system.
  445. var task *headerFilterTask
  446. select {
  447. case task = <-filter:
  448. case <-f.quit:
  449. return
  450. }
  451. headerFilterInMeter.Mark(int64(len(task.headers)))
  452. // Split the batch of headers into unknown ones (to return to the caller),
  453. // known incomplete ones (requiring body retrievals) and completed blocks.
  454. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
  455. for _, header := range task.headers {
  456. hash := header.Hash()
  457. // Filter fetcher-requested headers from other synchronisation algorithms
  458. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  459. // If the delivered header does not match the promised number, drop the announcer
  460. if header.Number.Uint64() != announce.number {
  461. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  462. f.dropPeer(announce.origin)
  463. f.forgetHash(hash)
  464. continue
  465. }
  466. // Collect all headers only if we are running in light
  467. // mode and the headers are not imported by other means.
  468. if f.light {
  469. if f.getHeader(hash) == nil {
  470. announce.header = header
  471. lightHeaders = append(lightHeaders, announce)
  472. }
  473. f.forgetHash(hash)
  474. continue
  475. }
  476. // Only keep if not imported by other means
  477. if f.getBlock(hash) == nil {
  478. announce.header = header
  479. announce.time = task.time
  480. // If the block is empty (header only), short circuit into the final import queue
  481. if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
  482. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  483. block := types.NewBlockWithHeader(header)
  484. block.ReceivedAt = task.time
  485. complete = append(complete, block)
  486. f.completing[hash] = announce
  487. continue
  488. }
  489. // Otherwise add to the list of blocks needing completion
  490. incomplete = append(incomplete, announce)
  491. } else {
  492. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  493. f.forgetHash(hash)
  494. }
  495. } else {
  496. // BlockFetcher doesn't know about it, add to the return list
  497. unknown = append(unknown, header)
  498. }
  499. }
  500. headerFilterOutMeter.Mark(int64(len(unknown)))
  501. select {
  502. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  503. case <-f.quit:
  504. return
  505. }
  506. // Schedule the retrieved headers for body completion
  507. for _, announce := range incomplete {
  508. hash := announce.header.Hash()
  509. if _, ok := f.completing[hash]; ok {
  510. continue
  511. }
  512. f.fetched[hash] = append(f.fetched[hash], announce)
  513. if len(f.fetched) == 1 {
  514. f.rescheduleComplete(completeTimer)
  515. }
  516. }
  517. // Schedule the header for light fetcher import
  518. for _, announce := range lightHeaders {
  519. f.enqueue(announce.origin, announce.header, nil)
  520. }
  521. // Schedule the header-only blocks for import
  522. for _, block := range complete {
  523. if announce := f.completing[block.Hash()]; announce != nil {
  524. f.enqueue(announce.origin, nil, block)
  525. }
  526. }
  527. case filter := <-f.bodyFilter:
  528. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  529. var task *bodyFilterTask
  530. select {
  531. case task = <-filter:
  532. case <-f.quit:
  533. return
  534. }
  535. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  536. blocks := []*types.Block{}
  537. // abort early if there's nothing explicitly requested
  538. if len(f.completing) > 0 {
  539. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  540. // Match up a body to any possible completion request
  541. var (
  542. matched = false
  543. uncleHash common.Hash // calculated lazily and reused
  544. txnHash common.Hash // calculated lazily and reused
  545. )
  546. for hash, announce := range f.completing {
  547. if f.queued[hash] != nil || announce.origin != task.peer {
  548. continue
  549. }
  550. if uncleHash == (common.Hash{}) {
  551. uncleHash = types.CalcUncleHash(task.uncles[i])
  552. }
  553. if uncleHash != announce.header.UncleHash {
  554. continue
  555. }
  556. if txnHash == (common.Hash{}) {
  557. txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
  558. }
  559. if txnHash != announce.header.TxHash {
  560. continue
  561. }
  562. // Mark the body matched, reassemble if still unknown
  563. matched = true
  564. if f.getBlock(hash) == nil {
  565. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  566. block.ReceivedAt = task.time
  567. blocks = append(blocks, block)
  568. } else {
  569. f.forgetHash(hash)
  570. }
  571. }
  572. if matched {
  573. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  574. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  575. i--
  576. continue
  577. }
  578. }
  579. }
  580. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  581. select {
  582. case filter <- task:
  583. case <-f.quit:
  584. return
  585. }
  586. // Schedule the retrieved blocks for ordered import
  587. for _, block := range blocks {
  588. if announce := f.completing[block.Hash()]; announce != nil {
  589. f.enqueue(announce.origin, nil, block)
  590. }
  591. }
  592. }
  593. }
  594. }
  595. // rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
  596. func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
  597. // Short circuit if no blocks are announced
  598. if len(f.announced) == 0 {
  599. return
  600. }
  601. // Schedule announcement retrieval quickly for light mode
  602. // since server won't send any headers to client.
  603. if f.light {
  604. fetch.Reset(lightTimeout)
  605. return
  606. }
  607. // Otherwise find the earliest expiring announcement
  608. earliest := time.Now()
  609. for _, announces := range f.announced {
  610. if earliest.After(announces[0].time) {
  611. earliest = announces[0].time
  612. }
  613. }
  614. fetch.Reset(arriveTimeout - time.Since(earliest))
  615. }
  616. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  617. func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
  618. // Short circuit if no headers are fetched
  619. if len(f.fetched) == 0 {
  620. return
  621. }
  622. // Otherwise find the earliest expiring announcement
  623. earliest := time.Now()
  624. for _, announces := range f.fetched {
  625. if earliest.After(announces[0].time) {
  626. earliest = announces[0].time
  627. }
  628. }
  629. complete.Reset(gatherSlack - time.Since(earliest))
  630. }
  631. // enqueue schedules a new header or block import operation, if the component
  632. // to be imported has not yet been seen.
  633. func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
  634. var (
  635. hash common.Hash
  636. number uint64
  637. )
  638. if header != nil {
  639. hash, number = header.Hash(), header.Number.Uint64()
  640. } else {
  641. hash, number = block.Hash(), block.NumberU64()
  642. }
  643. // Ensure the peer isn't DOSing us
  644. count := f.queues[peer] + 1
  645. if count > blockLimit {
  646. log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
  647. blockBroadcastDOSMeter.Mark(1)
  648. f.forgetHash(hash)
  649. return
  650. }
  651. // Discard any past or too distant blocks
  652. if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  653. log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
  654. blockBroadcastDropMeter.Mark(1)
  655. f.forgetHash(hash)
  656. return
  657. }
  658. // Schedule the block for future importing
  659. if _, ok := f.queued[hash]; !ok {
  660. op := &blockOrHeaderInject{origin: peer}
  661. if header != nil {
  662. op.header = header
  663. } else {
  664. op.block = block
  665. }
  666. f.queues[peer] = count
  667. f.queued[hash] = op
  668. f.queue.Push(op, -int64(number))
  669. if f.queueChangeHook != nil {
  670. f.queueChangeHook(hash, true)
  671. }
  672. log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
  673. }
  674. }
  675. // importHeaders spawns a new goroutine to run a header insertion into the chain.
  676. // If the header's number is at the same height as the current import phase, it
  677. // updates the phase states accordingly.
  678. func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
  679. hash := header.Hash()
  680. log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
  681. go func() {
  682. defer func() { f.done <- hash }()
  683. // If the parent's unknown, abort insertion
  684. parent := f.getHeader(header.ParentHash)
  685. if parent == nil {
  686. log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
  687. return
  688. }
  689. // Validate the header and if something went wrong, drop the peer
  690. if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
  691. log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  692. f.dropPeer(peer)
  693. return
  694. }
  695. // Run the actual import and log any issues
  696. if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
  697. log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  698. return
  699. }
  700. // Invoke the testing hook if needed
  701. if f.importedHook != nil {
  702. f.importedHook(header, nil)
  703. }
  704. }()
  705. }
  706. // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
  707. // block's number is at the same height as the current import phase, it updates
  708. // the phase states accordingly.
  709. func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
  710. hash := block.Hash()
  711. // Run the import on a new thread
  712. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  713. go func() {
  714. defer func() { f.done <- hash }()
  715. // If the parent's unknown, abort insertion
  716. parent := f.getBlock(block.ParentHash())
  717. if parent == nil {
  718. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  719. return
  720. }
  721. // Quickly validate the header and propagate the block if it passes
  722. switch err := f.verifyHeader(block.Header()); err {
  723. case nil:
  724. // All ok, quickly propagate to our peers
  725. blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  726. go f.broadcastBlock(block, true)
  727. case consensus.ErrFutureBlock:
  728. // Weird future block, don't fail, but neither propagate
  729. default:
  730. // Something went very wrong, drop the peer
  731. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  732. f.dropPeer(peer)
  733. return
  734. }
  735. // Run the actual import and log any issues
  736. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  737. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  738. return
  739. }
  740. // If import succeeded, broadcast the block
  741. blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  742. go f.broadcastBlock(block, false)
  743. // Invoke the testing hook if needed
  744. if f.importedHook != nil {
  745. f.importedHook(nil, block)
  746. }
  747. }()
  748. }
  749. // forgetHash removes all traces of a block announcement from the fetcher's
  750. // internal state.
  751. func (f *BlockFetcher) forgetHash(hash common.Hash) {
  752. // Remove all pending announces and decrement DOS counters
  753. for _, announce := range f.announced[hash] {
  754. f.announces[announce.origin]--
  755. if f.announces[announce.origin] <= 0 {
  756. delete(f.announces, announce.origin)
  757. }
  758. }
  759. delete(f.announced, hash)
  760. if f.announceChangeHook != nil {
  761. f.announceChangeHook(hash, false)
  762. }
  763. // Remove any pending fetches and decrement the DOS counters
  764. if announce := f.fetching[hash]; announce != nil {
  765. f.announces[announce.origin]--
  766. if f.announces[announce.origin] <= 0 {
  767. delete(f.announces, announce.origin)
  768. }
  769. delete(f.fetching, hash)
  770. }
  771. // Remove any pending completion requests and decrement the DOS counters
  772. for _, announce := range f.fetched[hash] {
  773. f.announces[announce.origin]--
  774. if f.announces[announce.origin] <= 0 {
  775. delete(f.announces, announce.origin)
  776. }
  777. }
  778. delete(f.fetched, hash)
  779. // Remove any pending completions and decrement the DOS counters
  780. if announce := f.completing[hash]; announce != nil {
  781. f.announces[announce.origin]--
  782. if f.announces[announce.origin] <= 0 {
  783. delete(f.announces, announce.origin)
  784. }
  785. delete(f.completing, hash)
  786. }
  787. }
  788. // forgetBlock removes all traces of a queued block from the fetcher's internal
  789. // state.
  790. func (f *BlockFetcher) forgetBlock(hash common.Hash) {
  791. if insert := f.queued[hash]; insert != nil {
  792. f.queues[insert.origin]--
  793. if f.queues[insert.origin] == 0 {
  794. delete(f.queues, insert.origin)
  795. }
  796. delete(f.queued, hash)
  797. }
  798. }