123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package les
- import (
- "math/big"
- "math/rand"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/eth/fetcher"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/light"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p/enode"
- )
- const (
- blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
- gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
- cachedAnnosThreshold = 64 // The maximum queued announcements
- )
- // announce represents an new block announcement from the les server.
- type announce struct {
- data *announceData
- trust bool
- peerid enode.ID
- }
- // request represents a record when the header request is sent.
- type request struct {
- reqid uint64
- peerid enode.ID
- sendAt time.Time
- hash common.Hash
- }
- // response represents a response packet from network as well as a channel
- // to return all un-requested data.
- type response struct {
- reqid uint64
- headers []*types.Header
- peerid enode.ID
- remain chan []*types.Header
- }
- // fetcherPeer holds the fetcher-specific information for each active peer
- type fetcherPeer struct {
- latest *announceData // The latest announcement sent from the peer
- // These following two fields can track the latest announces
- // from the peer with limited size for caching. We hold the
- // assumption that all enqueued announces are td-monotonic.
- announces map[common.Hash]*announce // Announcement map
- announcesList []common.Hash // FIFO announces list
- }
- // addAnno enqueues an new trusted announcement. If the queued announces overflow,
- // evict from the oldest.
- func (fp *fetcherPeer) addAnno(anno *announce) {
- // Short circuit if the anno already exists. In normal case it should
- // never happen since only monotonic anno is accepted. But the adversary
- // may feed us fake announces with higher td but same hash. In this case,
- // ignore the anno anyway.
- hash := anno.data.Hash
- if _, exist := fp.announces[hash]; exist {
- return
- }
- fp.announces[hash] = anno
- fp.announcesList = append(fp.announcesList, hash)
- // Evict oldest if the announces are oversized.
- if len(fp.announcesList)-cachedAnnosThreshold > 0 {
- for i := 0; i < len(fp.announcesList)-cachedAnnosThreshold; i++ {
- delete(fp.announces, fp.announcesList[i])
- }
- copy(fp.announcesList, fp.announcesList[len(fp.announcesList)-cachedAnnosThreshold:])
- fp.announcesList = fp.announcesList[:cachedAnnosThreshold]
- }
- }
- // forwardAnno removes all announces from the map with a number lower than
- // the provided threshold.
- func (fp *fetcherPeer) forwardAnno(td *big.Int) []*announce {
- var (
- cutset int
- evicted []*announce
- )
- for ; cutset < len(fp.announcesList); cutset++ {
- anno := fp.announces[fp.announcesList[cutset]]
- if anno == nil {
- continue // In theory it should never ever happen
- }
- if anno.data.Td.Cmp(td) > 0 {
- break
- }
- evicted = append(evicted, anno)
- delete(fp.announces, anno.data.Hash)
- }
- if cutset > 0 {
- copy(fp.announcesList, fp.announcesList[cutset:])
- fp.announcesList = fp.announcesList[:len(fp.announcesList)-cutset]
- }
- return evicted
- }
- // lightFetcher implements retrieval of newly announced headers. It reuses
- // the eth.BlockFetcher as the underlying fetcher but adding more additional
- // rules: e.g. evict "timeout" peers.
- type lightFetcher struct {
- // Various handlers
- ulc *ulc
- chaindb ethdb.Database
- reqDist *requestDistributor
- peerset *serverPeerSet // The global peerset of light client which shared by all components
- chain *light.LightChain // The local light chain which maintains the canonical header chain.
- fetcher *fetcher.BlockFetcher // The underlying fetcher which takes care block header retrieval.
- // Peerset maintained by fetcher
- plock sync.RWMutex
- peers map[enode.ID]*fetcherPeer
- // Various channels
- announceCh chan *announce
- requestCh chan *request
- deliverCh chan *response
- syncDone chan *types.Header
- closeCh chan struct{}
- wg sync.WaitGroup
- // Callback
- synchronise func(peer *serverPeer)
- // Test fields or hooks
- noAnnounce bool
- newHeadHook func(*types.Header)
- newAnnounce func(*serverPeer, *announceData)
- }
- // newLightFetcher creates a light fetcher instance.
- func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
- // Construct the fetcher by offering all necessary APIs
- validator := func(header *types.Header) error {
- // Disable seal verification explicitly if we are running in ulc mode.
- return engine.VerifyHeader(chain, header, ulc == nil)
- }
- heighter := func() uint64 { return chain.CurrentHeader().Number.Uint64() }
- dropper := func(id string) { peers.unregister(id) }
- inserter := func(headers []*types.Header) (int, error) {
- // Disable PoW checking explicitly if we are running in ulc mode.
- checkFreq := 1
- if ulc != nil {
- checkFreq = 0
- }
- return chain.InsertHeaderChain(headers, checkFreq)
- }
- f := &lightFetcher{
- ulc: ulc,
- peerset: peers,
- chaindb: chaindb,
- chain: chain,
- reqDist: reqDist,
- fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper),
- peers: make(map[enode.ID]*fetcherPeer),
- synchronise: syncFn,
- announceCh: make(chan *announce),
- requestCh: make(chan *request),
- deliverCh: make(chan *response),
- syncDone: make(chan *types.Header),
- closeCh: make(chan struct{}),
- }
- peers.subscribe(f)
- return f
- }
- func (f *lightFetcher) start() {
- f.wg.Add(1)
- f.fetcher.Start()
- go f.mainloop()
- }
- func (f *lightFetcher) stop() {
- close(f.closeCh)
- f.fetcher.Stop()
- f.wg.Wait()
- }
- // registerPeer adds an new peer to the fetcher's peer set
- func (f *lightFetcher) registerPeer(p *serverPeer) {
- f.plock.Lock()
- defer f.plock.Unlock()
- f.peers[p.ID()] = &fetcherPeer{announces: make(map[common.Hash]*announce)}
- }
- // unregisterPeer removes the specified peer from the fetcher's peer set
- func (f *lightFetcher) unregisterPeer(p *serverPeer) {
- f.plock.Lock()
- defer f.plock.Unlock()
- delete(f.peers, p.ID())
- }
- // peer returns the peer from the fetcher peerset.
- func (f *lightFetcher) peer(id enode.ID) *fetcherPeer {
- f.plock.RLock()
- defer f.plock.RUnlock()
- return f.peers[id]
- }
- // forEachPeer iterates the fetcher peerset, abort the iteration if the
- // callback returns false.
- func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) {
- f.plock.RLock()
- defer f.plock.RUnlock()
- for id, peer := range f.peers {
- if !check(id, peer) {
- return
- }
- }
- }
- // mainloop is the main event loop of the light fetcher, which is responsible for
- // - announcement maintenance(ulc)
- // If we are running in ultra light client mode, then all announcements from
- // the trusted servers are maintained. If the same announcements from trusted
- // servers reach the threshold, then the relevant header is requested for retrieval.
- //
- // - block header retrieval
- // Whenever we receive announce with higher td compared with local chain, the
- // request will be made for header retrieval.
- //
- // - re-sync trigger
- // If the local chain lags too much, then the fetcher will enter "synnchronise"
- // mode to retrieve missing headers in batch.
- func (f *lightFetcher) mainloop() {
- defer f.wg.Done()
- var (
- syncInterval = uint64(1) // Interval used to trigger a light resync.
- syncing bool // Indicator whether the client is syncing
- ulc = f.ulc != nil
- headCh = make(chan core.ChainHeadEvent, 100)
- fetching = make(map[uint64]*request)
- requestTimer = time.NewTimer(0)
- // Local status
- localHead = f.chain.CurrentHeader()
- localTd = f.chain.GetTd(localHead.Hash(), localHead.Number.Uint64())
- )
- sub := f.chain.SubscribeChainHeadEvent(headCh)
- defer sub.Unsubscribe()
- // reset updates the local status with given header.
- reset := func(header *types.Header) {
- localHead = header
- localTd = f.chain.GetTd(header.Hash(), header.Number.Uint64())
- }
- // trustedHeader returns an indicator whether the header is regarded as
- // trusted. If we are running in the ulc mode, only when we receive enough
- // same announcement from trusted server, the header will be trusted.
- trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) {
- var (
- agreed []enode.ID
- trusted bool
- )
- f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
- if anno := p.announces[hash]; anno != nil && anno.trust && anno.data.Number == number {
- agreed = append(agreed, id)
- if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
- trusted = true
- return false // abort iteration
- }
- }
- return true
- })
- return trusted, agreed
- }
- for {
- select {
- case anno := <-f.announceCh:
- peerid, data := anno.peerid, anno.data
- log.Debug("Received new announce", "peer", peerid, "number", data.Number, "hash", data.Hash, "reorg", data.ReorgDepth)
- peer := f.peer(peerid)
- if peer == nil {
- log.Debug("Receive announce from unknown peer", "peer", peerid)
- continue
- }
- // Announced tds should be strictly monotonic, drop the peer if
- // the announce is out-of-order.
- if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
- f.peerset.unregister(peerid.String())
- log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)
- continue
- }
- peer.latest = data
- // Filter out any stale announce, the local chain is ahead of announce
- if localTd != nil && data.Td.Cmp(localTd) <= 0 {
- continue
- }
- peer.addAnno(anno)
- // If we are not syncing, try to trigger a single retrieval or re-sync
- if !ulc && !syncing {
- // Two scenarios lead to re-sync:
- // - reorg happens
- // - local chain lags
- // We can't retrieve the parent of the announce by single retrieval
- // in both cases, so resync is necessary.
- if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
- syncing = true
- go f.startSync(peerid)
- log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
- continue
- }
- f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil)
- log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash)
- }
- // Keep collecting announces from trusted server even we are syncing.
- if ulc && anno.trust {
- // Notify underlying fetcher to retrieve header or trigger a resync if
- // we have receive enough announcements from trusted server.
- trusted, agreed := trustedHeader(data.Hash, data.Number)
- if trusted && !syncing {
- if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
- syncing = true
- go f.startSync(peerid)
- log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
- continue
- }
- p := agreed[rand.Intn(len(agreed))]
- f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil)
- log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash)
- }
- }
- case req := <-f.requestCh:
- fetching[req.reqid] = req // Tracking all in-flight requests for response latency statistic.
- if len(fetching) == 1 {
- f.rescheduleTimer(fetching, requestTimer)
- }
- case <-requestTimer.C:
- for reqid, request := range fetching {
- if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack {
- delete(fetching, reqid)
- f.peerset.unregister(request.peerid.String())
- log.Debug("Request timeout", "peer", request.peerid, "reqid", reqid)
- }
- }
- f.rescheduleTimer(fetching, requestTimer)
- case resp := <-f.deliverCh:
- if req := fetching[resp.reqid]; req != nil {
- delete(fetching, resp.reqid)
- f.rescheduleTimer(fetching, requestTimer)
- // The underlying fetcher does not check the consistency of request and response.
- // The adversary can send the fake announces with invalid hash and number but always
- // delivery some mismatched header. So it can't be punished by the underlying fetcher.
- // We have to add two more rules here to detect.
- if len(resp.headers) != 1 {
- f.peerset.unregister(req.peerid.String())
- log.Debug("Deliver more than requested", "peer", req.peerid, "reqid", req.reqid)
- continue
- }
- if resp.headers[0].Hash() != req.hash {
- f.peerset.unregister(req.peerid.String())
- log.Debug("Deliver invalid header", "peer", req.peerid, "reqid", req.reqid)
- continue
- }
- resp.remain <- f.fetcher.FilterHeaders(resp.peerid.String(), resp.headers, time.Now())
- } else {
- // Discard the entire packet no matter it's a timeout response or unexpected one.
- resp.remain <- resp.headers
- }
- case ev := <-headCh:
- // Short circuit if we are still syncing.
- if syncing {
- continue
- }
- reset(ev.Block.Header())
- // Clean stale announcements from les-servers.
- var droplist []enode.ID
- f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
- removed := p.forwardAnno(localTd)
- for _, anno := range removed {
- if header := f.chain.GetHeaderByHash(anno.data.Hash); header != nil {
- if header.Number.Uint64() != anno.data.Number {
- droplist = append(droplist, id)
- break
- }
- // In theory td should exists.
- td := f.chain.GetTd(anno.data.Hash, anno.data.Number)
- if td != nil && td.Cmp(anno.data.Td) != 0 {
- droplist = append(droplist, id)
- break
- }
- }
- }
- return true
- })
- for _, id := range droplist {
- f.peerset.unregister(id.String())
- log.Debug("Kicked out peer for invalid announcement")
- }
- if f.newHeadHook != nil {
- f.newHeadHook(localHead)
- }
- case origin := <-f.syncDone:
- syncing = false // Reset the status
- // Rewind all untrusted headers for ulc mode.
- if ulc {
- head := f.chain.CurrentHeader()
- ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head)
- var untrusted []common.Hash
- for head.Number.Cmp(ancestor.Number) > 0 {
- hash, number := head.Hash(), head.Number.Uint64()
- if trusted, _ := trustedHeader(hash, number); trusted {
- break
- }
- untrusted = append(untrusted, hash)
- head = f.chain.GetHeader(head.ParentHash, number-1)
- }
- if len(untrusted) > 0 {
- for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 {
- untrusted[i], untrusted[j] = untrusted[j], untrusted[i]
- }
- f.chain.Rollback(untrusted)
- }
- }
- // Reset local status.
- reset(f.chain.CurrentHeader())
- if f.newHeadHook != nil {
- f.newHeadHook(localHead)
- }
- log.Debug("light sync finished", "number", localHead.Number, "hash", localHead.Hash())
- case <-f.closeCh:
- return
- }
- }
- }
- // announce processes a new announcement message received from a peer.
- func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
- if f.newAnnounce != nil {
- f.newAnnounce(p, head)
- }
- if f.noAnnounce {
- return
- }
- select {
- case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}:
- case <-f.closeCh:
- return
- }
- }
- // trackRequest sends a reqID to main loop for in-flight request tracking.
- func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64, hash common.Hash) {
- select {
- case f.requestCh <- &request{reqid: reqid, peerid: peerid, sendAt: time.Now(), hash: hash}:
- case <-f.closeCh:
- }
- }
- // requestHeaderByHash constructs a header retrieval request and sends it to
- // local request distributor.
- //
- // Note, we rely on the underlying eth/fetcher to retrieve and validate the
- // response, so that we have to obey the rule of eth/fetcher which only accepts
- // the response from given peer.
- func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error {
- return func(hash common.Hash) error {
- req := &distReq{
- getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) },
- canSend: func(dp distPeer) bool { return dp.(*serverPeer).ID() == peerid },
- request: func(dp distPeer) func() {
- peer, id := dp.(*serverPeer), genReqID()
- cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
- peer.fcServer.QueuedRequest(id, cost)
- return func() {
- f.trackRequest(peer.ID(), id, hash)
- peer.requestHeadersByHash(id, hash, 1, 0, false)
- }
- },
- }
- f.reqDist.queue(req)
- return nil
- }
- }
- // requestResync invokes synchronisation callback to start syncing.
- func (f *lightFetcher) startSync(id enode.ID) {
- defer func(header *types.Header) {
- f.syncDone <- header
- }(f.chain.CurrentHeader())
- peer := f.peerset.peer(id.String())
- if peer == nil || peer.onlyAnnounce {
- return
- }
- f.synchronise(peer)
- }
- // deliverHeaders delivers header download request responses for processing
- func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqid uint64, headers []*types.Header) []*types.Header {
- remain := make(chan []*types.Header, 1)
- select {
- case f.deliverCh <- &response{reqid: reqid, headers: headers, peerid: peer.ID(), remain: remain}:
- case <-f.closeCh:
- return nil
- }
- return <-remain
- }
- // rescheduleTimer resets the specified timeout timer to the next request timeout.
- func (f *lightFetcher) rescheduleTimer(requests map[uint64]*request, timer *time.Timer) {
- // Short circuit if no inflight requests
- if len(requests) == 0 {
- timer.Stop()
- return
- }
- // Otherwise find the earliest expiring request
- earliest := time.Now()
- for _, req := range requests {
- if earliest.After(req.sendAt) {
- earliest = req.sendAt
- }
- }
- timer.Reset(blockDelayTimeout - time.Since(earliest))
- }
|