fetcher.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "math/big"
  19. "math/rand"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/consensus"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth/fetcher"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/light"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. )
  33. const (
  34. blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
  35. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
  36. cachedAnnosThreshold = 64 // The maximum queued announcements
  37. )
  38. // announce represents an new block announcement from the les server.
  39. type announce struct {
  40. data *announceData
  41. trust bool
  42. peerid enode.ID
  43. }
  44. // request represents a record when the header request is sent.
  45. type request struct {
  46. reqid uint64
  47. peerid enode.ID
  48. sendAt time.Time
  49. hash common.Hash
  50. }
  51. // response represents a response packet from network as well as a channel
  52. // to return all un-requested data.
  53. type response struct {
  54. reqid uint64
  55. headers []*types.Header
  56. peerid enode.ID
  57. remain chan []*types.Header
  58. }
  59. // fetcherPeer holds the fetcher-specific information for each active peer
  60. type fetcherPeer struct {
  61. latest *announceData // The latest announcement sent from the peer
  62. // These following two fields can track the latest announces
  63. // from the peer with limited size for caching. We hold the
  64. // assumption that all enqueued announces are td-monotonic.
  65. announces map[common.Hash]*announce // Announcement map
  66. announcesList []common.Hash // FIFO announces list
  67. }
  68. // addAnno enqueues an new trusted announcement. If the queued announces overflow,
  69. // evict from the oldest.
  70. func (fp *fetcherPeer) addAnno(anno *announce) {
  71. // Short circuit if the anno already exists. In normal case it should
  72. // never happen since only monotonic anno is accepted. But the adversary
  73. // may feed us fake announces with higher td but same hash. In this case,
  74. // ignore the anno anyway.
  75. hash := anno.data.Hash
  76. if _, exist := fp.announces[hash]; exist {
  77. return
  78. }
  79. fp.announces[hash] = anno
  80. fp.announcesList = append(fp.announcesList, hash)
  81. // Evict oldest if the announces are oversized.
  82. if len(fp.announcesList)-cachedAnnosThreshold > 0 {
  83. for i := 0; i < len(fp.announcesList)-cachedAnnosThreshold; i++ {
  84. delete(fp.announces, fp.announcesList[i])
  85. }
  86. copy(fp.announcesList, fp.announcesList[len(fp.announcesList)-cachedAnnosThreshold:])
  87. fp.announcesList = fp.announcesList[:cachedAnnosThreshold]
  88. }
  89. }
  90. // forwardAnno removes all announces from the map with a number lower than
  91. // the provided threshold.
  92. func (fp *fetcherPeer) forwardAnno(td *big.Int) []*announce {
  93. var (
  94. cutset int
  95. evicted []*announce
  96. )
  97. for ; cutset < len(fp.announcesList); cutset++ {
  98. anno := fp.announces[fp.announcesList[cutset]]
  99. if anno == nil {
  100. continue // In theory it should never ever happen
  101. }
  102. if anno.data.Td.Cmp(td) > 0 {
  103. break
  104. }
  105. evicted = append(evicted, anno)
  106. delete(fp.announces, anno.data.Hash)
  107. }
  108. if cutset > 0 {
  109. copy(fp.announcesList, fp.announcesList[cutset:])
  110. fp.announcesList = fp.announcesList[:len(fp.announcesList)-cutset]
  111. }
  112. return evicted
  113. }
  114. // lightFetcher implements retrieval of newly announced headers. It reuses
  115. // the eth.BlockFetcher as the underlying fetcher but adding more additional
  116. // rules: e.g. evict "timeout" peers.
  117. type lightFetcher struct {
  118. // Various handlers
  119. ulc *ulc
  120. chaindb ethdb.Database
  121. reqDist *requestDistributor
  122. peerset *serverPeerSet // The global peerset of light client which shared by all components
  123. chain *light.LightChain // The local light chain which maintains the canonical header chain.
  124. fetcher *fetcher.BlockFetcher // The underlying fetcher which takes care block header retrieval.
  125. // Peerset maintained by fetcher
  126. plock sync.RWMutex
  127. peers map[enode.ID]*fetcherPeer
  128. // Various channels
  129. announceCh chan *announce
  130. requestCh chan *request
  131. deliverCh chan *response
  132. syncDone chan *types.Header
  133. closeCh chan struct{}
  134. wg sync.WaitGroup
  135. // Callback
  136. synchronise func(peer *serverPeer)
  137. // Test fields or hooks
  138. noAnnounce bool
  139. newHeadHook func(*types.Header)
  140. newAnnounce func(*serverPeer, *announceData)
  141. }
  142. // newLightFetcher creates a light fetcher instance.
  143. func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
  144. // Construct the fetcher by offering all necessary APIs
  145. validator := func(header *types.Header) error {
  146. // Disable seal verification explicitly if we are running in ulc mode.
  147. return engine.VerifyHeader(chain, header, ulc == nil)
  148. }
  149. heighter := func() uint64 { return chain.CurrentHeader().Number.Uint64() }
  150. dropper := func(id string) { peers.unregister(id) }
  151. inserter := func(headers []*types.Header) (int, error) {
  152. // Disable PoW checking explicitly if we are running in ulc mode.
  153. checkFreq := 1
  154. if ulc != nil {
  155. checkFreq = 0
  156. }
  157. return chain.InsertHeaderChain(headers, checkFreq)
  158. }
  159. f := &lightFetcher{
  160. ulc: ulc,
  161. peerset: peers,
  162. chaindb: chaindb,
  163. chain: chain,
  164. reqDist: reqDist,
  165. fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper),
  166. peers: make(map[enode.ID]*fetcherPeer),
  167. synchronise: syncFn,
  168. announceCh: make(chan *announce),
  169. requestCh: make(chan *request),
  170. deliverCh: make(chan *response),
  171. syncDone: make(chan *types.Header),
  172. closeCh: make(chan struct{}),
  173. }
  174. peers.subscribe(f)
  175. return f
  176. }
  177. func (f *lightFetcher) start() {
  178. f.wg.Add(1)
  179. f.fetcher.Start()
  180. go f.mainloop()
  181. }
  182. func (f *lightFetcher) stop() {
  183. close(f.closeCh)
  184. f.fetcher.Stop()
  185. f.wg.Wait()
  186. }
  187. // registerPeer adds an new peer to the fetcher's peer set
  188. func (f *lightFetcher) registerPeer(p *serverPeer) {
  189. f.plock.Lock()
  190. defer f.plock.Unlock()
  191. f.peers[p.ID()] = &fetcherPeer{announces: make(map[common.Hash]*announce)}
  192. }
  193. // unregisterPeer removes the specified peer from the fetcher's peer set
  194. func (f *lightFetcher) unregisterPeer(p *serverPeer) {
  195. f.plock.Lock()
  196. defer f.plock.Unlock()
  197. delete(f.peers, p.ID())
  198. }
  199. // peer returns the peer from the fetcher peerset.
  200. func (f *lightFetcher) peer(id enode.ID) *fetcherPeer {
  201. f.plock.RLock()
  202. defer f.plock.RUnlock()
  203. return f.peers[id]
  204. }
  205. // forEachPeer iterates the fetcher peerset, abort the iteration if the
  206. // callback returns false.
  207. func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) {
  208. f.plock.RLock()
  209. defer f.plock.RUnlock()
  210. for id, peer := range f.peers {
  211. if !check(id, peer) {
  212. return
  213. }
  214. }
  215. }
  216. // mainloop is the main event loop of the light fetcher, which is responsible for
  217. // - announcement maintenance(ulc)
  218. // If we are running in ultra light client mode, then all announcements from
  219. // the trusted servers are maintained. If the same announcements from trusted
  220. // servers reach the threshold, then the relevant header is requested for retrieval.
  221. //
  222. // - block header retrieval
  223. // Whenever we receive announce with higher td compared with local chain, the
  224. // request will be made for header retrieval.
  225. //
  226. // - re-sync trigger
  227. // If the local chain lags too much, then the fetcher will enter "synnchronise"
  228. // mode to retrieve missing headers in batch.
  229. func (f *lightFetcher) mainloop() {
  230. defer f.wg.Done()
  231. var (
  232. syncInterval = uint64(1) // Interval used to trigger a light resync.
  233. syncing bool // Indicator whether the client is syncing
  234. ulc = f.ulc != nil
  235. headCh = make(chan core.ChainHeadEvent, 100)
  236. fetching = make(map[uint64]*request)
  237. requestTimer = time.NewTimer(0)
  238. // Local status
  239. localHead = f.chain.CurrentHeader()
  240. localTd = f.chain.GetTd(localHead.Hash(), localHead.Number.Uint64())
  241. )
  242. sub := f.chain.SubscribeChainHeadEvent(headCh)
  243. defer sub.Unsubscribe()
  244. // reset updates the local status with given header.
  245. reset := func(header *types.Header) {
  246. localHead = header
  247. localTd = f.chain.GetTd(header.Hash(), header.Number.Uint64())
  248. }
  249. // trustedHeader returns an indicator whether the header is regarded as
  250. // trusted. If we are running in the ulc mode, only when we receive enough
  251. // same announcement from trusted server, the header will be trusted.
  252. trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) {
  253. var (
  254. agreed []enode.ID
  255. trusted bool
  256. )
  257. f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
  258. if anno := p.announces[hash]; anno != nil && anno.trust && anno.data.Number == number {
  259. agreed = append(agreed, id)
  260. if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
  261. trusted = true
  262. return false // abort iteration
  263. }
  264. }
  265. return true
  266. })
  267. return trusted, agreed
  268. }
  269. for {
  270. select {
  271. case anno := <-f.announceCh:
  272. peerid, data := anno.peerid, anno.data
  273. log.Debug("Received new announce", "peer", peerid, "number", data.Number, "hash", data.Hash, "reorg", data.ReorgDepth)
  274. peer := f.peer(peerid)
  275. if peer == nil {
  276. log.Debug("Receive announce from unknown peer", "peer", peerid)
  277. continue
  278. }
  279. // Announced tds should be strictly monotonic, drop the peer if
  280. // the announce is out-of-order.
  281. if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
  282. f.peerset.unregister(peerid.String())
  283. log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)
  284. continue
  285. }
  286. peer.latest = data
  287. // Filter out any stale announce, the local chain is ahead of announce
  288. if localTd != nil && data.Td.Cmp(localTd) <= 0 {
  289. continue
  290. }
  291. peer.addAnno(anno)
  292. // If we are not syncing, try to trigger a single retrieval or re-sync
  293. if !ulc && !syncing {
  294. // Two scenarios lead to re-sync:
  295. // - reorg happens
  296. // - local chain lags
  297. // We can't retrieve the parent of the announce by single retrieval
  298. // in both cases, so resync is necessary.
  299. if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
  300. syncing = true
  301. go f.startSync(peerid)
  302. log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
  303. continue
  304. }
  305. f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil)
  306. log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash)
  307. }
  308. // Keep collecting announces from trusted server even we are syncing.
  309. if ulc && anno.trust {
  310. // Notify underlying fetcher to retrieve header or trigger a resync if
  311. // we have receive enough announcements from trusted server.
  312. trusted, agreed := trustedHeader(data.Hash, data.Number)
  313. if trusted && !syncing {
  314. if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
  315. syncing = true
  316. go f.startSync(peerid)
  317. log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
  318. continue
  319. }
  320. p := agreed[rand.Intn(len(agreed))]
  321. f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil)
  322. log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash)
  323. }
  324. }
  325. case req := <-f.requestCh:
  326. fetching[req.reqid] = req // Tracking all in-flight requests for response latency statistic.
  327. if len(fetching) == 1 {
  328. f.rescheduleTimer(fetching, requestTimer)
  329. }
  330. case <-requestTimer.C:
  331. for reqid, request := range fetching {
  332. if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack {
  333. delete(fetching, reqid)
  334. f.peerset.unregister(request.peerid.String())
  335. log.Debug("Request timeout", "peer", request.peerid, "reqid", reqid)
  336. }
  337. }
  338. f.rescheduleTimer(fetching, requestTimer)
  339. case resp := <-f.deliverCh:
  340. if req := fetching[resp.reqid]; req != nil {
  341. delete(fetching, resp.reqid)
  342. f.rescheduleTimer(fetching, requestTimer)
  343. // The underlying fetcher does not check the consistency of request and response.
  344. // The adversary can send the fake announces with invalid hash and number but always
  345. // delivery some mismatched header. So it can't be punished by the underlying fetcher.
  346. // We have to add two more rules here to detect.
  347. if len(resp.headers) != 1 {
  348. f.peerset.unregister(req.peerid.String())
  349. log.Debug("Deliver more than requested", "peer", req.peerid, "reqid", req.reqid)
  350. continue
  351. }
  352. if resp.headers[0].Hash() != req.hash {
  353. f.peerset.unregister(req.peerid.String())
  354. log.Debug("Deliver invalid header", "peer", req.peerid, "reqid", req.reqid)
  355. continue
  356. }
  357. resp.remain <- f.fetcher.FilterHeaders(resp.peerid.String(), resp.headers, time.Now())
  358. } else {
  359. // Discard the entire packet no matter it's a timeout response or unexpected one.
  360. resp.remain <- resp.headers
  361. }
  362. case ev := <-headCh:
  363. // Short circuit if we are still syncing.
  364. if syncing {
  365. continue
  366. }
  367. reset(ev.Block.Header())
  368. // Clean stale announcements from les-servers.
  369. var droplist []enode.ID
  370. f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
  371. removed := p.forwardAnno(localTd)
  372. for _, anno := range removed {
  373. if header := f.chain.GetHeaderByHash(anno.data.Hash); header != nil {
  374. if header.Number.Uint64() != anno.data.Number {
  375. droplist = append(droplist, id)
  376. break
  377. }
  378. // In theory td should exists.
  379. td := f.chain.GetTd(anno.data.Hash, anno.data.Number)
  380. if td != nil && td.Cmp(anno.data.Td) != 0 {
  381. droplist = append(droplist, id)
  382. break
  383. }
  384. }
  385. }
  386. return true
  387. })
  388. for _, id := range droplist {
  389. f.peerset.unregister(id.String())
  390. log.Debug("Kicked out peer for invalid announcement")
  391. }
  392. if f.newHeadHook != nil {
  393. f.newHeadHook(localHead)
  394. }
  395. case origin := <-f.syncDone:
  396. syncing = false // Reset the status
  397. // Rewind all untrusted headers for ulc mode.
  398. if ulc {
  399. head := f.chain.CurrentHeader()
  400. ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head)
  401. var untrusted []common.Hash
  402. for head.Number.Cmp(ancestor.Number) > 0 {
  403. hash, number := head.Hash(), head.Number.Uint64()
  404. if trusted, _ := trustedHeader(hash, number); trusted {
  405. break
  406. }
  407. untrusted = append(untrusted, hash)
  408. head = f.chain.GetHeader(head.ParentHash, number-1)
  409. }
  410. if len(untrusted) > 0 {
  411. for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 {
  412. untrusted[i], untrusted[j] = untrusted[j], untrusted[i]
  413. }
  414. f.chain.Rollback(untrusted)
  415. }
  416. }
  417. // Reset local status.
  418. reset(f.chain.CurrentHeader())
  419. if f.newHeadHook != nil {
  420. f.newHeadHook(localHead)
  421. }
  422. log.Debug("light sync finished", "number", localHead.Number, "hash", localHead.Hash())
  423. case <-f.closeCh:
  424. return
  425. }
  426. }
  427. }
  428. // announce processes a new announcement message received from a peer.
  429. func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
  430. if f.newAnnounce != nil {
  431. f.newAnnounce(p, head)
  432. }
  433. if f.noAnnounce {
  434. return
  435. }
  436. select {
  437. case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}:
  438. case <-f.closeCh:
  439. return
  440. }
  441. }
  442. // trackRequest sends a reqID to main loop for in-flight request tracking.
  443. func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64, hash common.Hash) {
  444. select {
  445. case f.requestCh <- &request{reqid: reqid, peerid: peerid, sendAt: time.Now(), hash: hash}:
  446. case <-f.closeCh:
  447. }
  448. }
  449. // requestHeaderByHash constructs a header retrieval request and sends it to
  450. // local request distributor.
  451. //
  452. // Note, we rely on the underlying eth/fetcher to retrieve and validate the
  453. // response, so that we have to obey the rule of eth/fetcher which only accepts
  454. // the response from given peer.
  455. func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error {
  456. return func(hash common.Hash) error {
  457. req := &distReq{
  458. getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) },
  459. canSend: func(dp distPeer) bool { return dp.(*serverPeer).ID() == peerid },
  460. request: func(dp distPeer) func() {
  461. peer, id := dp.(*serverPeer), genReqID()
  462. cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
  463. peer.fcServer.QueuedRequest(id, cost)
  464. return func() {
  465. f.trackRequest(peer.ID(), id, hash)
  466. peer.requestHeadersByHash(id, hash, 1, 0, false)
  467. }
  468. },
  469. }
  470. f.reqDist.queue(req)
  471. return nil
  472. }
  473. }
  474. // requestResync invokes synchronisation callback to start syncing.
  475. func (f *lightFetcher) startSync(id enode.ID) {
  476. defer func(header *types.Header) {
  477. f.syncDone <- header
  478. }(f.chain.CurrentHeader())
  479. peer := f.peerset.peer(id.String())
  480. if peer == nil || peer.onlyAnnounce {
  481. return
  482. }
  483. f.synchronise(peer)
  484. }
  485. // deliverHeaders delivers header download request responses for processing
  486. func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqid uint64, headers []*types.Header) []*types.Header {
  487. remain := make(chan []*types.Header, 1)
  488. select {
  489. case f.deliverCh <- &response{reqid: reqid, headers: headers, peerid: peer.ID(), remain: remain}:
  490. case <-f.closeCh:
  491. return nil
  492. }
  493. return <-remain
  494. }
  495. // rescheduleTimer resets the specified timeout timer to the next request timeout.
  496. func (f *lightFetcher) rescheduleTimer(requests map[uint64]*request, timer *time.Timer) {
  497. // Short circuit if no inflight requests
  498. if len(requests) == 0 {
  499. timer.Stop()
  500. return
  501. }
  502. // Otherwise find the earliest expiring request
  503. earliest := time.Now()
  504. for _, req := range requests {
  505. if earliest.After(req.sendAt) {
  506. earliest = req.sendAt
  507. }
  508. }
  509. timer.Reset(blockDelayTimeout - time.Since(earliest))
  510. }