queue.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the block download scheduler to collect download tasks and schedule
  17. // them in an ordered, and throttled way.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/prque"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/ethereum/go-ethereum/trie"
  31. )
  32. const (
  33. bodyType = uint(0)
  34. receiptType = uint(1)
  35. )
  36. var (
  37. blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
  38. blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
  39. blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
  40. blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
  41. )
  42. var (
  43. errNoFetchesPending = errors.New("no fetches pending")
  44. errStaleDelivery = errors.New("stale delivery")
  45. )
  46. // fetchRequest is a currently running data retrieval operation.
  47. type fetchRequest struct {
  48. Peer *peerConnection // Peer to which the request was sent
  49. From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
  50. Headers []*types.Header // [eth/62] Requested headers, sorted by request order
  51. Time time.Time // Time when the request was made
  52. }
  53. // fetchResult is a struct collecting partial results from data fetchers until
  54. // all outstanding pieces complete and the result as a whole can be processed.
  55. type fetchResult struct {
  56. pending int32 // Flag telling what deliveries are outstanding
  57. Header *types.Header
  58. Uncles []*types.Header
  59. Transactions types.Transactions
  60. Receipts types.Receipts
  61. }
  62. func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
  63. item := &fetchResult{
  64. Header: header,
  65. }
  66. if !header.EmptyBody() {
  67. item.pending |= (1 << bodyType)
  68. }
  69. if fastSync && !header.EmptyReceipts() {
  70. item.pending |= (1 << receiptType)
  71. }
  72. return item
  73. }
  74. // SetBodyDone flags the body as finished.
  75. func (f *fetchResult) SetBodyDone() {
  76. if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
  77. atomic.AddInt32(&f.pending, -1)
  78. }
  79. }
  80. // AllDone checks if item is done.
  81. func (f *fetchResult) AllDone() bool {
  82. return atomic.LoadInt32(&f.pending) == 0
  83. }
  84. // SetReceiptsDone flags the receipts as finished.
  85. func (f *fetchResult) SetReceiptsDone() {
  86. if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
  87. atomic.AddInt32(&f.pending, -2)
  88. }
  89. }
  90. // Done checks if the given type is done already
  91. func (f *fetchResult) Done(kind uint) bool {
  92. v := atomic.LoadInt32(&f.pending)
  93. return v&(1<<kind) == 0
  94. }
  95. // queue represents hashes that are either need fetching or are being fetched
  96. type queue struct {
  97. mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
  98. // Headers are "special", they download in batches, supported by a skeleton chain
  99. headerHead common.Hash // Hash of the last queued header to verify order
  100. headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
  101. headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
  102. headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
  103. headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
  104. headerResults []*types.Header // Result cache accumulating the completed headers
  105. headerProced int // Number of headers already processed from the results
  106. headerOffset uint64 // Number of the first header in the result cache
  107. headerContCh chan bool // Channel to notify when header download finishes
  108. // All data retrievals below are based on an already assembles header chain
  109. blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
  110. blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
  111. blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
  112. receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
  113. receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
  114. receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
  115. resultCache *resultStore // Downloaded but not yet delivered fetch results
  116. resultSize common.StorageSize // Approximate size of a block (exponential moving average)
  117. lock *sync.RWMutex
  118. active *sync.Cond
  119. closed bool
  120. lastStatLog time.Time
  121. }
  122. // newQueue creates a new download queue for scheduling block retrieval.
  123. func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
  124. lock := new(sync.RWMutex)
  125. q := &queue{
  126. headerContCh: make(chan bool),
  127. blockTaskQueue: prque.New(nil),
  128. receiptTaskQueue: prque.New(nil),
  129. active: sync.NewCond(lock),
  130. lock: lock,
  131. }
  132. q.Reset(blockCacheLimit, thresholdInitialSize)
  133. return q
  134. }
  135. // Reset clears out the queue contents.
  136. func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
  137. q.lock.Lock()
  138. defer q.lock.Unlock()
  139. q.closed = false
  140. q.mode = FullSync
  141. q.headerHead = common.Hash{}
  142. q.headerPendPool = make(map[string]*fetchRequest)
  143. q.blockTaskPool = make(map[common.Hash]*types.Header)
  144. q.blockTaskQueue.Reset()
  145. q.blockPendPool = make(map[string]*fetchRequest)
  146. q.receiptTaskPool = make(map[common.Hash]*types.Header)
  147. q.receiptTaskQueue.Reset()
  148. q.receiptPendPool = make(map[string]*fetchRequest)
  149. q.resultCache = newResultStore(blockCacheLimit)
  150. q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
  151. }
  152. // Close marks the end of the sync, unblocking Results.
  153. // It may be called even if the queue is already closed.
  154. func (q *queue) Close() {
  155. q.lock.Lock()
  156. q.closed = true
  157. q.active.Signal()
  158. q.lock.Unlock()
  159. }
  160. // PendingHeaders retrieves the number of header requests pending for retrieval.
  161. func (q *queue) PendingHeaders() int {
  162. q.lock.Lock()
  163. defer q.lock.Unlock()
  164. return q.headerTaskQueue.Size()
  165. }
  166. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
  167. func (q *queue) PendingBlocks() int {
  168. q.lock.Lock()
  169. defer q.lock.Unlock()
  170. return q.blockTaskQueue.Size()
  171. }
  172. // PendingReceipts retrieves the number of block receipts pending for retrieval.
  173. func (q *queue) PendingReceipts() int {
  174. q.lock.Lock()
  175. defer q.lock.Unlock()
  176. return q.receiptTaskQueue.Size()
  177. }
  178. // InFlightHeaders retrieves whether there are header fetch requests currently
  179. // in flight.
  180. func (q *queue) InFlightHeaders() bool {
  181. q.lock.Lock()
  182. defer q.lock.Unlock()
  183. return len(q.headerPendPool) > 0
  184. }
  185. // InFlightBlocks retrieves whether there are block fetch requests currently in
  186. // flight.
  187. func (q *queue) InFlightBlocks() bool {
  188. q.lock.Lock()
  189. defer q.lock.Unlock()
  190. return len(q.blockPendPool) > 0
  191. }
  192. // InFlightReceipts retrieves whether there are receipt fetch requests currently
  193. // in flight.
  194. func (q *queue) InFlightReceipts() bool {
  195. q.lock.Lock()
  196. defer q.lock.Unlock()
  197. return len(q.receiptPendPool) > 0
  198. }
  199. // Idle returns if the queue is fully idle or has some data still inside.
  200. func (q *queue) Idle() bool {
  201. q.lock.Lock()
  202. defer q.lock.Unlock()
  203. queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
  204. pending := len(q.blockPendPool) + len(q.receiptPendPool)
  205. return (queued + pending) == 0
  206. }
  207. // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
  208. // up an already retrieved header skeleton.
  209. func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
  210. q.lock.Lock()
  211. defer q.lock.Unlock()
  212. // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
  213. if q.headerResults != nil {
  214. panic("skeleton assembly already in progress")
  215. }
  216. // Schedule all the header retrieval tasks for the skeleton assembly
  217. q.headerTaskPool = make(map[uint64]*types.Header)
  218. q.headerTaskQueue = prque.New(nil)
  219. q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
  220. q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
  221. q.headerProced = 0
  222. q.headerOffset = from
  223. q.headerContCh = make(chan bool, 1)
  224. for i, header := range skeleton {
  225. index := from + uint64(i*MaxHeaderFetch)
  226. q.headerTaskPool[index] = header
  227. q.headerTaskQueue.Push(index, -int64(index))
  228. }
  229. }
  230. // RetrieveHeaders retrieves the header chain assemble based on the scheduled
  231. // skeleton.
  232. func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
  233. q.lock.Lock()
  234. defer q.lock.Unlock()
  235. headers, proced := q.headerResults, q.headerProced
  236. q.headerResults, q.headerProced = nil, 0
  237. return headers, proced
  238. }
  239. // Schedule adds a set of headers for the download queue for scheduling, returning
  240. // the new headers encountered.
  241. func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
  242. q.lock.Lock()
  243. defer q.lock.Unlock()
  244. // Insert all the headers prioritised by the contained block number
  245. inserts := make([]*types.Header, 0, len(headers))
  246. for _, header := range headers {
  247. // Make sure chain order is honoured and preserved throughout
  248. hash := header.Hash()
  249. if header.Number == nil || header.Number.Uint64() != from {
  250. log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
  251. break
  252. }
  253. if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
  254. log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  255. break
  256. }
  257. // Make sure no duplicate requests are executed
  258. // We cannot skip this, even if the block is empty, since this is
  259. // what triggers the fetchResult creation.
  260. if _, ok := q.blockTaskPool[hash]; ok {
  261. log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
  262. } else {
  263. q.blockTaskPool[hash] = header
  264. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  265. }
  266. // Queue for receipt retrieval
  267. if q.mode == FastSync && !header.EmptyReceipts() {
  268. if _, ok := q.receiptTaskPool[hash]; ok {
  269. log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
  270. } else {
  271. q.receiptTaskPool[hash] = header
  272. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  273. }
  274. }
  275. inserts = append(inserts, header)
  276. q.headerHead = hash
  277. from++
  278. }
  279. return inserts
  280. }
  281. // Results retrieves and permanently removes a batch of fetch results from
  282. // the cache. the result slice will be empty if the queue has been closed.
  283. // Results can be called concurrently with Deliver and Schedule,
  284. // but assumes that there are not two simultaneous callers to Results
  285. func (q *queue) Results(block bool) []*fetchResult {
  286. // Abort early if there are no items and non-blocking requested
  287. if !block && !q.resultCache.HasCompletedItems() {
  288. return nil
  289. }
  290. closed := false
  291. for !closed && !q.resultCache.HasCompletedItems() {
  292. // In order to wait on 'active', we need to obtain the lock.
  293. // That may take a while, if someone is delivering at the same
  294. // time, so after obtaining the lock, we check again if there
  295. // are any results to fetch.
  296. // Also, in-between we ask for the lock and the lock is obtained,
  297. // someone can have closed the queue. In that case, we should
  298. // return the available results and stop blocking
  299. q.lock.Lock()
  300. if q.resultCache.HasCompletedItems() || q.closed {
  301. q.lock.Unlock()
  302. break
  303. }
  304. // No items available, and not closed
  305. q.active.Wait()
  306. closed = q.closed
  307. q.lock.Unlock()
  308. }
  309. // Regardless if closed or not, we can still deliver whatever we have
  310. results := q.resultCache.GetCompleted(maxResultsProcess)
  311. for _, result := range results {
  312. // Recalculate the result item weights to prevent memory exhaustion
  313. size := result.Header.Size()
  314. for _, uncle := range result.Uncles {
  315. size += uncle.Size()
  316. }
  317. for _, receipt := range result.Receipts {
  318. size += receipt.Size()
  319. }
  320. for _, tx := range result.Transactions {
  321. size += tx.Size()
  322. }
  323. q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
  324. (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
  325. }
  326. // Using the newly calibrated resultsize, figure out the new throttle limit
  327. // on the result cache
  328. throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
  329. throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
  330. // Log some info at certain times
  331. if time.Since(q.lastStatLog) > 60*time.Second {
  332. q.lastStatLog = time.Now()
  333. info := q.Stats()
  334. info = append(info, "throttle", throttleThreshold)
  335. log.Info("Downloader queue stats", info...)
  336. }
  337. return results
  338. }
  339. func (q *queue) Stats() []interface{} {
  340. q.lock.RLock()
  341. defer q.lock.RUnlock()
  342. return q.stats()
  343. }
  344. func (q *queue) stats() []interface{} {
  345. return []interface{}{
  346. "receiptTasks", q.receiptTaskQueue.Size(),
  347. "blockTasks", q.blockTaskQueue.Size(),
  348. "itemSize", q.resultSize,
  349. }
  350. }
  351. // ReserveHeaders reserves a set of headers for the given peer, skipping any
  352. // previously failed batches.
  353. func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
  354. q.lock.Lock()
  355. defer q.lock.Unlock()
  356. // Short circuit if the peer's already downloading something (sanity check to
  357. // not corrupt state)
  358. if _, ok := q.headerPendPool[p.id]; ok {
  359. return nil
  360. }
  361. // Retrieve a batch of hashes, skipping previously failed ones
  362. send, skip := uint64(0), []uint64{}
  363. for send == 0 && !q.headerTaskQueue.Empty() {
  364. from, _ := q.headerTaskQueue.Pop()
  365. if q.headerPeerMiss[p.id] != nil {
  366. if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
  367. skip = append(skip, from.(uint64))
  368. continue
  369. }
  370. }
  371. send = from.(uint64)
  372. }
  373. // Merge all the skipped batches back
  374. for _, from := range skip {
  375. q.headerTaskQueue.Push(from, -int64(from))
  376. }
  377. // Assemble and return the block download request
  378. if send == 0 {
  379. return nil
  380. }
  381. request := &fetchRequest{
  382. Peer: p,
  383. From: send,
  384. Time: time.Now(),
  385. }
  386. q.headerPendPool[p.id] = request
  387. return request
  388. }
  389. // ReserveBodies reserves a set of body fetches for the given peer, skipping any
  390. // previously failed downloads. Beside the next batch of needed fetches, it also
  391. // returns a flag whether empty blocks were queued requiring processing.
  392. func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, bool) {
  393. q.lock.Lock()
  394. defer q.lock.Unlock()
  395. return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, bodyType)
  396. }
  397. // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
  398. // any previously failed downloads. Beside the next batch of needed fetches, it
  399. // also returns a flag whether empty receipts were queued requiring importing.
  400. func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, bool) {
  401. q.lock.Lock()
  402. defer q.lock.Unlock()
  403. return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
  404. }
  405. // reserveHeaders reserves a set of data download operations for a given peer,
  406. // skipping any previously failed ones. This method is a generic version used
  407. // by the individual special reservation functions.
  408. //
  409. // Note, this method expects the queue lock to be already held for writing. The
  410. // reason the lock is not obtained in here is because the parameters already need
  411. // to access the queue, so they already need a lock anyway.
  412. //
  413. // Returns:
  414. // item - the fetchRequest
  415. // progress - whether any progress was made
  416. // throttle - if the caller should throttle for a while
  417. func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  418. pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
  419. // Short circuit if the pool has been depleted, or if the peer's already
  420. // downloading something (sanity check not to corrupt state)
  421. if taskQueue.Empty() {
  422. return nil, false, true
  423. }
  424. if _, ok := pendPool[p.id]; ok {
  425. return nil, false, false
  426. }
  427. // Retrieve a batch of tasks, skipping previously failed ones
  428. send := make([]*types.Header, 0, count)
  429. skip := make([]*types.Header, 0)
  430. progress := false
  431. throttled := false
  432. for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
  433. // the task queue will pop items in order, so the highest prio block
  434. // is also the lowest block number.
  435. h, _ := taskQueue.Peek()
  436. header := h.(*types.Header)
  437. // we can ask the resultcache if this header is within the
  438. // "prioritized" segment of blocks. If it is not, we need to throttle
  439. stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
  440. if stale {
  441. // Don't put back in the task queue, this item has already been
  442. // delivered upstream
  443. taskQueue.PopItem()
  444. progress = true
  445. delete(taskPool, header.Hash())
  446. proc = proc - 1
  447. log.Error("Fetch reservation already delivered", "number", header.Number.Uint64())
  448. continue
  449. }
  450. if throttle {
  451. // There are no resultslots available. Leave it in the task queue
  452. // However, if there are any left as 'skipped', we should not tell
  453. // the caller to throttle, since we still want some other
  454. // peer to fetch those for us
  455. throttled = len(skip) == 0
  456. break
  457. }
  458. if err != nil {
  459. // this most definitely should _not_ happen
  460. log.Warn("Failed to reserve headers", "err", err)
  461. // There are no resultslots available. Leave it in the task queue
  462. break
  463. }
  464. if item.Done(kind) {
  465. // If it's a noop, we can skip this task
  466. delete(taskPool, header.Hash())
  467. taskQueue.PopItem()
  468. proc = proc - 1
  469. progress = true
  470. continue
  471. }
  472. // Remove it from the task queue
  473. taskQueue.PopItem()
  474. // Otherwise unless the peer is known not to have the data, add to the retrieve list
  475. if p.Lacks(header.Hash()) {
  476. skip = append(skip, header)
  477. } else {
  478. send = append(send, header)
  479. }
  480. }
  481. // Merge all the skipped headers back
  482. for _, header := range skip {
  483. taskQueue.Push(header, -int64(header.Number.Uint64()))
  484. }
  485. if q.resultCache.HasCompletedItems() {
  486. // Wake Results, resultCache was modified
  487. q.active.Signal()
  488. }
  489. // Assemble and return the block download request
  490. if len(send) == 0 {
  491. return nil, progress, throttled
  492. }
  493. request := &fetchRequest{
  494. Peer: p,
  495. Headers: send,
  496. Time: time.Now(),
  497. }
  498. pendPool[p.id] = request
  499. return request, progress, throttled
  500. }
  501. // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
  502. func (q *queue) CancelHeaders(request *fetchRequest) {
  503. q.lock.Lock()
  504. defer q.lock.Unlock()
  505. q.cancel(request, q.headerTaskQueue, q.headerPendPool)
  506. }
  507. // CancelBodies aborts a body fetch request, returning all pending headers to the
  508. // task queue.
  509. func (q *queue) CancelBodies(request *fetchRequest) {
  510. q.lock.Lock()
  511. defer q.lock.Unlock()
  512. q.cancel(request, q.blockTaskQueue, q.blockPendPool)
  513. }
  514. // CancelReceipts aborts a body fetch request, returning all pending headers to
  515. // the task queue.
  516. func (q *queue) CancelReceipts(request *fetchRequest) {
  517. q.lock.Lock()
  518. defer q.lock.Unlock()
  519. q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
  520. }
  521. // Cancel aborts a fetch request, returning all pending hashes to the task queue.
  522. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
  523. if request.From > 0 {
  524. taskQueue.Push(request.From, -int64(request.From))
  525. }
  526. for _, header := range request.Headers {
  527. taskQueue.Push(header, -int64(header.Number.Uint64()))
  528. }
  529. delete(pendPool, request.Peer.id)
  530. }
  531. // Revoke cancels all pending requests belonging to a given peer. This method is
  532. // meant to be called during a peer drop to quickly reassign owned data fetches
  533. // to remaining nodes.
  534. func (q *queue) Revoke(peerID string) {
  535. q.lock.Lock()
  536. defer q.lock.Unlock()
  537. if request, ok := q.blockPendPool[peerID]; ok {
  538. for _, header := range request.Headers {
  539. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  540. }
  541. delete(q.blockPendPool, peerID)
  542. }
  543. if request, ok := q.receiptPendPool[peerID]; ok {
  544. for _, header := range request.Headers {
  545. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  546. }
  547. delete(q.receiptPendPool, peerID)
  548. }
  549. }
  550. // ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
  551. // canceling them and returning the responsible peers for penalisation.
  552. func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
  553. q.lock.Lock()
  554. defer q.lock.Unlock()
  555. return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
  556. }
  557. // ExpireBodies checks for in flight block body requests that exceeded a timeout
  558. // allowance, canceling them and returning the responsible peers for penalisation.
  559. func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
  560. q.lock.Lock()
  561. defer q.lock.Unlock()
  562. return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
  563. }
  564. // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
  565. // allowance, canceling them and returning the responsible peers for penalisation.
  566. func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
  567. q.lock.Lock()
  568. defer q.lock.Unlock()
  569. return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
  570. }
  571. // expire is the generic check that move expired tasks from a pending pool back
  572. // into a task pool, returning all entities caught with expired tasks.
  573. //
  574. // Note, this method expects the queue lock to be already held. The
  575. // reason the lock is not obtained in here is because the parameters already need
  576. // to access the queue, so they already need a lock anyway.
  577. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
  578. // Iterate over the expired requests and return each to the queue
  579. expiries := make(map[string]int)
  580. for id, request := range pendPool {
  581. if time.Since(request.Time) > timeout {
  582. // Update the metrics with the timeout
  583. timeoutMeter.Mark(1)
  584. // Return any non satisfied requests to the pool
  585. if request.From > 0 {
  586. taskQueue.Push(request.From, -int64(request.From))
  587. }
  588. for _, header := range request.Headers {
  589. taskQueue.Push(header, -int64(header.Number.Uint64()))
  590. }
  591. // Add the peer to the expiry report along the number of failed requests
  592. expiries[id] = len(request.Headers)
  593. // Remove the expired requests from the pending pool directly
  594. delete(pendPool, id)
  595. }
  596. }
  597. return expiries
  598. }
  599. // DeliverHeaders injects a header retrieval response into the header results
  600. // cache. This method either accepts all headers it received, or none of them
  601. // if they do not map correctly to the skeleton.
  602. //
  603. // If the headers are accepted, the method makes an attempt to deliver the set
  604. // of ready headers to the processor to keep the pipeline full. However it will
  605. // not block to prevent stalling other pending deliveries.
  606. func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
  607. q.lock.Lock()
  608. defer q.lock.Unlock()
  609. var logger log.Logger
  610. if len(id) < 16 {
  611. // Tests use short IDs, don't choke on them
  612. logger = log.New("peer", id)
  613. } else {
  614. logger = log.New("peer", id[:16])
  615. }
  616. // Short circuit if the data was never requested
  617. request := q.headerPendPool[id]
  618. if request == nil {
  619. return 0, errNoFetchesPending
  620. }
  621. headerReqTimer.UpdateSince(request.Time)
  622. delete(q.headerPendPool, id)
  623. // Ensure headers can be mapped onto the skeleton chain
  624. target := q.headerTaskPool[request.From].Hash()
  625. accepted := len(headers) == MaxHeaderFetch
  626. if accepted {
  627. if headers[0].Number.Uint64() != request.From {
  628. logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", headers[0].Hash(), "expected", request.From)
  629. accepted = false
  630. } else if headers[len(headers)-1].Hash() != target {
  631. logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
  632. accepted = false
  633. }
  634. }
  635. if accepted {
  636. parentHash := headers[0].Hash()
  637. for i, header := range headers[1:] {
  638. hash := header.Hash()
  639. if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
  640. logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
  641. accepted = false
  642. break
  643. }
  644. if parentHash != header.ParentHash {
  645. logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  646. accepted = false
  647. break
  648. }
  649. // Set-up parent hash for next round
  650. parentHash = hash
  651. }
  652. }
  653. // If the batch of headers wasn't accepted, mark as unavailable
  654. if !accepted {
  655. logger.Trace("Skeleton filling not accepted", "from", request.From)
  656. miss := q.headerPeerMiss[id]
  657. if miss == nil {
  658. q.headerPeerMiss[id] = make(map[uint64]struct{})
  659. miss = q.headerPeerMiss[id]
  660. }
  661. miss[request.From] = struct{}{}
  662. q.headerTaskQueue.Push(request.From, -int64(request.From))
  663. return 0, errors.New("delivery not accepted")
  664. }
  665. // Clean up a successful fetch and try to deliver any sub-results
  666. copy(q.headerResults[request.From-q.headerOffset:], headers)
  667. delete(q.headerTaskPool, request.From)
  668. ready := 0
  669. for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
  670. ready += MaxHeaderFetch
  671. }
  672. if ready > 0 {
  673. // Headers are ready for delivery, gather them and push forward (non blocking)
  674. process := make([]*types.Header, ready)
  675. copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
  676. select {
  677. case headerProcCh <- process:
  678. logger.Trace("Pre-scheduled new headers", "count", len(process), "from", process[0].Number)
  679. q.headerProced += len(process)
  680. default:
  681. }
  682. }
  683. // Check for termination and return
  684. if len(q.headerTaskPool) == 0 {
  685. q.headerContCh <- false
  686. }
  687. return len(headers), nil
  688. }
  689. // DeliverBodies injects a block body retrieval response into the results queue.
  690. // The method returns the number of blocks bodies accepted from the delivery and
  691. // also wakes any threads waiting for data delivery.
  692. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
  693. q.lock.Lock()
  694. defer q.lock.Unlock()
  695. validate := func(index int, header *types.Header) error {
  696. if types.DeriveSha(types.Transactions(txLists[index]), trie.NewStackTrie(nil)) != header.TxHash {
  697. return errInvalidBody
  698. }
  699. if types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
  700. return errInvalidBody
  701. }
  702. return nil
  703. }
  704. reconstruct := func(index int, result *fetchResult) {
  705. result.Transactions = txLists[index]
  706. result.Uncles = uncleLists[index]
  707. result.SetBodyDone()
  708. }
  709. return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
  710. bodyReqTimer, len(txLists), validate, reconstruct)
  711. }
  712. // DeliverReceipts injects a receipt retrieval response into the results queue.
  713. // The method returns the number of transaction receipts accepted from the delivery
  714. // and also wakes any threads waiting for data delivery.
  715. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
  716. q.lock.Lock()
  717. defer q.lock.Unlock()
  718. validate := func(index int, header *types.Header) error {
  719. if types.DeriveSha(types.Receipts(receiptList[index]), trie.NewStackTrie(nil)) != header.ReceiptHash {
  720. return errInvalidReceipt
  721. }
  722. return nil
  723. }
  724. reconstruct := func(index int, result *fetchResult) {
  725. result.Receipts = receiptList[index]
  726. result.SetReceiptsDone()
  727. }
  728. return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
  729. receiptReqTimer, len(receiptList), validate, reconstruct)
  730. }
  731. // deliver injects a data retrieval response into the results queue.
  732. //
  733. // Note, this method expects the queue lock to be already held for writing. The
  734. // reason this lock is not obtained in here is because the parameters already need
  735. // to access the queue, so they already need a lock anyway.
  736. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
  737. taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
  738. results int, validate func(index int, header *types.Header) error,
  739. reconstruct func(index int, result *fetchResult)) (int, error) {
  740. // Short circuit if the data was never requested
  741. request := pendPool[id]
  742. if request == nil {
  743. return 0, errNoFetchesPending
  744. }
  745. reqTimer.UpdateSince(request.Time)
  746. delete(pendPool, id)
  747. // If no data items were retrieved, mark them as unavailable for the origin peer
  748. if results == 0 {
  749. for _, header := range request.Headers {
  750. request.Peer.MarkLacking(header.Hash())
  751. }
  752. }
  753. // Assemble each of the results with their headers and retrieved data parts
  754. var (
  755. accepted int
  756. failure error
  757. i int
  758. hashes []common.Hash
  759. )
  760. for _, header := range request.Headers {
  761. // Short circuit assembly if no more fetch results are found
  762. if i >= results {
  763. break
  764. }
  765. // Validate the fields
  766. if err := validate(i, header); err != nil {
  767. failure = err
  768. break
  769. }
  770. hashes = append(hashes, header.Hash())
  771. i++
  772. }
  773. for _, header := range request.Headers[:i] {
  774. if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
  775. reconstruct(accepted, res)
  776. } else {
  777. // else: betweeen here and above, some other peer filled this result,
  778. // or it was indeed a no-op. This should not happen, but if it does it's
  779. // not something to panic about
  780. log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
  781. failure = errStaleDelivery
  782. }
  783. // Clean up a successful fetch
  784. delete(taskPool, hashes[accepted])
  785. accepted++
  786. }
  787. // Return all failed or missing fetches to the queue
  788. for _, header := range request.Headers[accepted:] {
  789. taskQueue.Push(header, -int64(header.Number.Uint64()))
  790. }
  791. // Wake up Results
  792. if accepted > 0 {
  793. q.active.Signal()
  794. }
  795. if failure == nil {
  796. return accepted, nil
  797. }
  798. // If none of the data was good, it's a stale delivery
  799. if accepted > 0 {
  800. return accepted, fmt.Errorf("partial failure: %v", failure)
  801. }
  802. return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
  803. }
  804. // Prepare configures the result cache to allow accepting and caching inbound
  805. // fetch results.
  806. func (q *queue) Prepare(offset uint64, mode SyncMode) {
  807. q.lock.Lock()
  808. defer q.lock.Unlock()
  809. // Prepare the queue for sync results
  810. q.resultCache.Prepare(offset)
  811. q.mode = mode
  812. }