123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894 |
- // Copyright 2020 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package fetcher
- import (
- "bytes"
- "fmt"
- mrand "math/rand"
- "sort"
- "time"
- mapset "github.com/deckarep/golang-set"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- )
- const (
- // maxTxAnnounces is the maximum number of unique transaction a peer
- // can announce in a short time.
- maxTxAnnounces = 4096
- // maxTxRetrievals is the maximum transaction number can be fetched in one
- // request. The rationale to pick 256 is:
- // - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
- // Etherscan the average transaction size is around 200B, so in theory
- // we can include lots of transaction in a single protocol packet.
- // - However the maximum size of a single transaction is raised to 128KB,
- // so pick a middle value here to ensure we can maximize the efficiency
- // of the retrieval and response size overflow won't happen in most cases.
- maxTxRetrievals = 256
- // maxTxUnderpricedSetSize is the size of the underpriced transaction set that
- // is used to track recent transactions that have been dropped so we don't
- // re-request them.
- maxTxUnderpricedSetSize = 32768
- // txArriveTimeout is the time allowance before an announced transaction is
- // explicitly requested.
- txArriveTimeout = 500 * time.Millisecond
- // txGatherSlack is the interval used to collate almost-expired announces
- // with network fetches.
- txGatherSlack = 100 * time.Millisecond
- )
- var (
- // txFetchTimeout is the maximum allotted time to return an explicitly
- // requested transaction.
- txFetchTimeout = 5 * time.Second
- )
- var (
- txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
- txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
- txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
- txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)
- txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)
- txBroadcastKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/known", nil)
- txBroadcastUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/underpriced", nil)
- txBroadcastOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/otherreject", nil)
- txRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/out", nil)
- txRequestFailMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/fail", nil)
- txRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/done", nil)
- txRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/timeout", nil)
- txReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/in", nil)
- txReplyKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/known", nil)
- txReplyUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/underpriced", nil)
- txReplyOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/otherreject", nil)
- txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
- txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
- txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
- txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
- txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
- txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
- )
- // txAnnounce is the notification of the availability of a batch
- // of new transactions in the network.
- type txAnnounce struct {
- origin string // Identifier of the peer originating the notification
- hashes []common.Hash // Batch of transaction hashes being announced
- }
- // txRequest represents an in-flight transaction retrieval request destined to
- // a specific peers.
- type txRequest struct {
- hashes []common.Hash // Transactions having been requested
- stolen map[common.Hash]struct{} // Deliveries by someone else (don't re-request)
- time mclock.AbsTime // Timestamp of the request
- }
- // txDelivery is the notification that a batch of transactions have been added
- // to the pool and should be untracked.
- type txDelivery struct {
- origin string // Identifier of the peer originating the notification
- hashes []common.Hash // Batch of transaction hashes having been delivered
- direct bool // Whether this is a direct reply or a broadcast
- }
- // txDrop is the notiication that a peer has disconnected.
- type txDrop struct {
- peer string
- }
- // TxFetcher is responsible for retrieving new transaction based on announcements.
- //
- // The fetcher operates in 3 stages:
- // - Transactions that are newly discovered are moved into a wait list.
- // - After ~500ms passes, transactions from the wait list that have not been
- // broadcast to us in whole are moved into a queueing area.
- // - When a connected peer doesn't have in-flight retrieval requests, any
- // transaction queued up (and announced by the peer) are allocated to the
- // peer and moved into a fetching status until it's fulfilled or fails.
- //
- // The invariants of the fetcher are:
- // - Each tracked transaction (hash) must only be present in one of the
- // three stages. This ensures that the fetcher operates akin to a finite
- // state automata and there's do data leak.
- // - Each peer that announced transactions may be scheduled retrievals, but
- // only ever one concurrently. This ensures we can immediately know what is
- // missing from a reply and reschedule it.
- type TxFetcher struct {
- notify chan *txAnnounce
- cleanup chan *txDelivery
- drop chan *txDrop
- quit chan struct{}
- underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch)
- // Stage 1: Waiting lists for newly discovered transactions that might be
- // broadcast without needing explicit request/reply round trips.
- waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
- waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
- waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
- // Stage 2: Queue of transactions that waiting to be allocated to some peer
- // to be retrieved directly.
- announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
- announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
- // Stage 3: Set of transactions currently being retrieved, some which may be
- // fulfilled and some rescheduled. Note, this step shares 'announces' from the
- // previous stage to avoid having to duplicate (need it for DoS checks).
- fetching map[common.Hash]string // Transaction set currently being retrieved
- requests map[string]*txRequest // In-flight transaction retrievals
- alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
- // Callbacks
- hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
- addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
- fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
- step chan struct{} // Notification channel when the fetcher loop iterates
- clock mclock.Clock // Time wrapper to simulate in tests
- rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
- }
- // NewTxFetcher creates a transaction fetcher to retrieve transaction
- // based on hash announcements.
- func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
- return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
- }
- // NewTxFetcherForTests is a testing method to mock out the realtime clock with
- // a simulated version and the internal randomness with a deterministic one.
- func NewTxFetcherForTests(
- hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
- clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
- return &TxFetcher{
- notify: make(chan *txAnnounce),
- cleanup: make(chan *txDelivery),
- drop: make(chan *txDrop),
- quit: make(chan struct{}),
- waitlist: make(map[common.Hash]map[string]struct{}),
- waittime: make(map[common.Hash]mclock.AbsTime),
- waitslots: make(map[string]map[common.Hash]struct{}),
- announces: make(map[string]map[common.Hash]struct{}),
- announced: make(map[common.Hash]map[string]struct{}),
- fetching: make(map[common.Hash]string),
- requests: make(map[string]*txRequest),
- alternates: make(map[common.Hash]map[string]struct{}),
- underpriced: mapset.NewSet(),
- hasTx: hasTx,
- addTxs: addTxs,
- fetchTxs: fetchTxs,
- clock: clock,
- rand: rand,
- }
- }
- // Notify announces the fetcher of the potential availability of a new batch of
- // transactions in the network.
- func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
- // Keep track of all the announced transactions
- txAnnounceInMeter.Mark(int64(len(hashes)))
- // Skip any transaction announcements that we already know of, or that we've
- // previously marked as cheap and discarded. This check is of course racey,
- // because multiple concurrent notifies will still manage to pass it, but it's
- // still valuable to check here because it runs concurrent to the internal
- // loop, so anything caught here is time saved internally.
- var (
- unknowns = make([]common.Hash, 0, len(hashes))
- duplicate, underpriced int64
- )
- for _, hash := range hashes {
- switch {
- case f.hasTx(hash):
- duplicate++
- case f.underpriced.Contains(hash):
- underpriced++
- default:
- unknowns = append(unknowns, hash)
- }
- }
- txAnnounceKnownMeter.Mark(duplicate)
- txAnnounceUnderpricedMeter.Mark(underpriced)
- // If anything's left to announce, push it into the internal loop
- if len(unknowns) == 0 {
- return nil
- }
- announce := &txAnnounce{
- origin: peer,
- hashes: unknowns,
- }
- select {
- case f.notify <- announce:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // Enqueue imports a batch of received transaction into the transaction pool
- // and the fetcher. This method may be called by both transaction broadcasts and
- // direct request replies. The differentiation is important so the fetcher can
- // re-shedule missing transactions as soon as possible.
- func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
- // Keep track of all the propagated transactions
- if direct {
- txReplyInMeter.Mark(int64(len(txs)))
- } else {
- txBroadcastInMeter.Mark(int64(len(txs)))
- }
- // Push all the transactions into the pool, tracking underpriced ones to avoid
- // re-requesting them and dropping the peer in case of malicious transfers.
- var (
- added = make([]common.Hash, 0, len(txs))
- duplicate int64
- underpriced int64
- otherreject int64
- )
- errs := f.addTxs(txs)
- for i, err := range errs {
- if err != nil {
- // Track the transaction hash if the price is too low for us.
- // Avoid re-request this transaction when we receive another
- // announcement.
- if err == core.ErrUnderpriced || err == core.ErrReplaceUnderpriced {
- for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
- f.underpriced.Pop()
- }
- f.underpriced.Add(txs[i].Hash())
- }
- // Track a few interesting failure types
- switch err {
- case nil: // Noop, but need to handle to not count these
- case core.ErrAlreadyKnown:
- duplicate++
- case core.ErrUnderpriced, core.ErrReplaceUnderpriced:
- underpriced++
- default:
- otherreject++
- }
- }
- added = append(added, txs[i].Hash())
- }
- if direct {
- txReplyKnownMeter.Mark(duplicate)
- txReplyUnderpricedMeter.Mark(underpriced)
- txReplyOtherRejectMeter.Mark(otherreject)
- } else {
- txBroadcastKnownMeter.Mark(duplicate)
- txBroadcastUnderpricedMeter.Mark(underpriced)
- txBroadcastOtherRejectMeter.Mark(otherreject)
- }
- select {
- case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // Drop should be called when a peer disconnects. It cleans up all the internal
- // data structures of the given node.
- func (f *TxFetcher) Drop(peer string) error {
- select {
- case f.drop <- &txDrop{peer: peer}:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // Start boots up the announcement based synchroniser, accepting and processing
- // hash notifications and block fetches until termination requested.
- func (f *TxFetcher) Start() {
- go f.loop()
- }
- // Stop terminates the announcement based synchroniser, canceling all pending
- // operations.
- func (f *TxFetcher) Stop() {
- close(f.quit)
- }
- func (f *TxFetcher) loop() {
- var (
- waitTimer = new(mclock.Timer)
- timeoutTimer = new(mclock.Timer)
- waitTrigger = make(chan struct{}, 1)
- timeoutTrigger = make(chan struct{}, 1)
- )
- for {
- select {
- case ann := <-f.notify:
- // Drop part of the new announcements if there are too many accumulated.
- // Note, we could but do not filter already known transactions here as
- // the probability of something arriving between this call and the pre-
- // filter outside is essentially zero.
- used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
- if used >= maxTxAnnounces {
- // This can happen if a set of transactions are requested but not
- // all fulfilled, so the remainder are rescheduled without the cap
- // check. Should be fine as the limit is in the thousands and the
- // request size in the hundreds.
- txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
- break
- }
- want := used + len(ann.hashes)
- if want > maxTxAnnounces {
- txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
- ann.hashes = ann.hashes[:want-maxTxAnnounces]
- }
- // All is well, schedule the remainder of the transactions
- idleWait := len(f.waittime) == 0
- _, oldPeer := f.announces[ann.origin]
- for _, hash := range ann.hashes {
- // If the transaction is already downloading, add it to the list
- // of possible alternates (in case the current retrieval fails) and
- // also account it for the peer.
- if f.alternates[hash] != nil {
- f.alternates[hash][ann.origin] = struct{}{}
- // Stage 2 and 3 share the set of origins per tx
- if announces := f.announces[ann.origin]; announces != nil {
- announces[hash] = struct{}{}
- } else {
- f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
- }
- continue
- }
- // If the transaction is not downloading, but is already queued
- // from a different peer, track it for the new peer too.
- if f.announced[hash] != nil {
- f.announced[hash][ann.origin] = struct{}{}
- // Stage 2 and 3 share the set of origins per tx
- if announces := f.announces[ann.origin]; announces != nil {
- announces[hash] = struct{}{}
- } else {
- f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
- }
- continue
- }
- // If the transaction is already known to the fetcher, but not
- // yet downloading, add the peer as an alternate origin in the
- // waiting list.
- if f.waitlist[hash] != nil {
- f.waitlist[hash][ann.origin] = struct{}{}
- if waitslots := f.waitslots[ann.origin]; waitslots != nil {
- waitslots[hash] = struct{}{}
- } else {
- f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
- }
- continue
- }
- // Transaction unknown to the fetcher, insert it into the waiting list
- f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
- f.waittime[hash] = f.clock.Now()
- if waitslots := f.waitslots[ann.origin]; waitslots != nil {
- waitslots[hash] = struct{}{}
- } else {
- f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
- }
- }
- // If a new item was added to the waitlist, schedule it into the fetcher
- if idleWait && len(f.waittime) > 0 {
- f.rescheduleWait(waitTimer, waitTrigger)
- }
- // If this peer is new and announced something already queued, maybe
- // request transactions from them
- if !oldPeer && len(f.announces[ann.origin]) > 0 {
- f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
- }
- case <-waitTrigger:
- // At least one transaction's waiting time ran out, push all expired
- // ones into the retrieval queues
- actives := make(map[string]struct{})
- for hash, instance := range f.waittime {
- if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
- // Transaction expired without propagation, schedule for retrieval
- if f.announced[hash] != nil {
- panic("announce tracker already contains waitlist item")
- }
- f.announced[hash] = f.waitlist[hash]
- for peer := range f.waitlist[hash] {
- if announces := f.announces[peer]; announces != nil {
- announces[hash] = struct{}{}
- } else {
- f.announces[peer] = map[common.Hash]struct{}{hash: {}}
- }
- delete(f.waitslots[peer], hash)
- if len(f.waitslots[peer]) == 0 {
- delete(f.waitslots, peer)
- }
- actives[peer] = struct{}{}
- }
- delete(f.waittime, hash)
- delete(f.waitlist, hash)
- }
- }
- // If transactions are still waiting for propagation, reschedule the wait timer
- if len(f.waittime) > 0 {
- f.rescheduleWait(waitTimer, waitTrigger)
- }
- // If any peers became active and are idle, request transactions from them
- if len(actives) > 0 {
- f.scheduleFetches(timeoutTimer, timeoutTrigger, actives)
- }
- case <-timeoutTrigger:
- // Clean up any expired retrievals and avoid re-requesting them from the
- // same peer (either overloaded or malicious, useless in both cases). We
- // could also penalize (Drop), but there's nothing to gain, and if could
- // possibly further increase the load on it.
- for peer, req := range f.requests {
- if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
- txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
- // Reschedule all the not-yet-delivered fetches to alternate peers
- for _, hash := range req.hashes {
- // Skip rescheduling hashes already delivered by someone else
- if req.stolen != nil {
- if _, ok := req.stolen[hash]; ok {
- continue
- }
- }
- // Move the delivery back from fetching to queued
- if _, ok := f.announced[hash]; ok {
- panic("announced tracker already contains alternate item")
- }
- if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
- f.announced[hash] = f.alternates[hash]
- }
- delete(f.announced[hash], peer)
- if len(f.announced[hash]) == 0 {
- delete(f.announced, hash)
- }
- delete(f.announces[peer], hash)
- delete(f.alternates, hash)
- delete(f.fetching, hash)
- }
- if len(f.announces[peer]) == 0 {
- delete(f.announces, peer)
- }
- // Keep track of the request as dangling, but never expire
- f.requests[peer].hashes = nil
- }
- }
- // Schedule a new transaction retrieval
- f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
- // No idea if we scheduled something or not, trigger the timer if needed
- // TODO(karalabe): this is kind of lame, can't we dump it into scheduleFetches somehow?
- f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
- case delivery := <-f.cleanup:
- // Independent if the delivery was direct or broadcast, remove all
- // traces of the hash from internal trackers
- for _, hash := range delivery.hashes {
- if _, ok := f.waitlist[hash]; ok {
- for peer, txset := range f.waitslots {
- delete(txset, hash)
- if len(txset) == 0 {
- delete(f.waitslots, peer)
- }
- }
- delete(f.waitlist, hash)
- delete(f.waittime, hash)
- } else {
- for peer, txset := range f.announces {
- delete(txset, hash)
- if len(txset) == 0 {
- delete(f.announces, peer)
- }
- }
- delete(f.announced, hash)
- delete(f.alternates, hash)
- // If a transaction currently being fetched from a different
- // origin was delivered (delivery stolen), mark it so the
- // actual delivery won't double schedule it.
- if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
- stolen := f.requests[origin].stolen
- if stolen == nil {
- f.requests[origin].stolen = make(map[common.Hash]struct{})
- stolen = f.requests[origin].stolen
- }
- stolen[hash] = struct{}{}
- }
- delete(f.fetching, hash)
- }
- }
- // In case of a direct delivery, also reschedule anything missing
- // from the original query
- if delivery.direct {
- // Mark the reqesting successful (independent of individual status)
- txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
- // Make sure something was pending, nuke it
- req := f.requests[delivery.origin]
- if req == nil {
- log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
- break
- }
- delete(f.requests, delivery.origin)
- // Anything not delivered should be re-scheduled (with or without
- // this peer, depending on the response cutoff)
- delivered := make(map[common.Hash]struct{})
- for _, hash := range delivery.hashes {
- delivered[hash] = struct{}{}
- }
- cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
- for i, hash := range req.hashes {
- if _, ok := delivered[hash]; ok {
- cutoff = i
- }
- }
- // Reschedule missing hashes from alternates, not-fulfilled from alt+self
- for i, hash := range req.hashes {
- // Skip rescheduling hashes already delivered by someone else
- if req.stolen != nil {
- if _, ok := req.stolen[hash]; ok {
- continue
- }
- }
- if _, ok := delivered[hash]; !ok {
- if i < cutoff {
- delete(f.alternates[hash], delivery.origin)
- delete(f.announces[delivery.origin], hash)
- if len(f.announces[delivery.origin]) == 0 {
- delete(f.announces, delivery.origin)
- }
- }
- if len(f.alternates[hash]) > 0 {
- if _, ok := f.announced[hash]; ok {
- panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
- }
- f.announced[hash] = f.alternates[hash]
- }
- }
- delete(f.alternates, hash)
- delete(f.fetching, hash)
- }
- // Something was delivered, try to rechedule requests
- f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
- }
- case drop := <-f.drop:
- // A peer was dropped, remove all traces of it
- if _, ok := f.waitslots[drop.peer]; ok {
- for hash := range f.waitslots[drop.peer] {
- delete(f.waitlist[hash], drop.peer)
- if len(f.waitlist[hash]) == 0 {
- delete(f.waitlist, hash)
- delete(f.waittime, hash)
- }
- }
- delete(f.waitslots, drop.peer)
- if len(f.waitlist) > 0 {
- f.rescheduleWait(waitTimer, waitTrigger)
- }
- }
- // Clean up any active requests
- var request *txRequest
- if request = f.requests[drop.peer]; request != nil {
- for _, hash := range request.hashes {
- // Skip rescheduling hashes already delivered by someone else
- if request.stolen != nil {
- if _, ok := request.stolen[hash]; ok {
- continue
- }
- }
- // Undelivered hash, reschedule if there's an alternative origin available
- delete(f.alternates[hash], drop.peer)
- if len(f.alternates[hash]) == 0 {
- delete(f.alternates, hash)
- } else {
- f.announced[hash] = f.alternates[hash]
- delete(f.alternates, hash)
- }
- delete(f.fetching, hash)
- }
- delete(f.requests, drop.peer)
- }
- // Clean up general announcement tracking
- if _, ok := f.announces[drop.peer]; ok {
- for hash := range f.announces[drop.peer] {
- delete(f.announced[hash], drop.peer)
- if len(f.announced[hash]) == 0 {
- delete(f.announced, hash)
- }
- }
- delete(f.announces, drop.peer)
- }
- // If a request was cancelled, check if anything needs to be rescheduled
- if request != nil {
- f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
- f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
- }
- case <-f.quit:
- return
- }
- // No idea what happened, but bump some sanity metrics
- txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
- txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
- txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
- txFetcherQueueingHashes.Update(int64(len(f.announced)))
- txFetcherFetchingPeers.Update(int64(len(f.requests)))
- txFetcherFetchingHashes.Update(int64(len(f.fetching)))
- // Loop did something, ping the step notifier if needed (tests)
- if f.step != nil {
- f.step <- struct{}{}
- }
- }
- }
- // rescheduleWait iterates over all the transactions currently in the waitlist
- // and schedules the movement into the fetcher for the earliest.
- //
- // The method has a granularity of 'gatherSlack', since there's not much point in
- // spinning over all the transactions just to maybe find one that should trigger
- // a few ms earlier.
- func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
- if *timer != nil {
- (*timer).Stop()
- }
- now := f.clock.Now()
- earliest := now
- for _, instance := range f.waittime {
- if earliest > instance {
- earliest = instance
- if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
- break
- }
- }
- }
- *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
- trigger <- struct{}{}
- })
- }
- // rescheduleTimeout iterates over all the transactions currently in flight and
- // schedules a cleanup run when the first would trigger.
- //
- // The method has a granularity of 'gatherSlack', since there's not much point in
- // spinning over all the transactions just to maybe find one that should trigger
- // a few ms earlier.
- //
- // This method is a bit "flaky" "by design". In theory the timeout timer only ever
- // should be rescheduled if some request is pending. In practice, a timeout will
- // cause the timer to be rescheduled every 5 secs (until the peer comes through or
- // disconnects). This is a limitation of the fetcher code because we don't trac
- // pending requests and timed out requests separatey. Without double tracking, if
- // we simply didn't reschedule the timer on all-timeout then the timer would never
- // be set again since len(request) > 0 => something's running.
- func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
- if *timer != nil {
- (*timer).Stop()
- }
- now := f.clock.Now()
- earliest := now
- for _, req := range f.requests {
- // If this request already timed out, skip it altogether
- if req.hashes == nil {
- continue
- }
- if earliest > req.time {
- earliest = req.time
- if txFetchTimeout-time.Duration(now-earliest) < gatherSlack {
- break
- }
- }
- }
- *timer = f.clock.AfterFunc(txFetchTimeout-time.Duration(now-earliest), func() {
- trigger <- struct{}{}
- })
- }
- // scheduleFetches starts a batch of retrievals for all available idle peers.
- func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, authorizationList map[string]struct{}) {
- // Gather the set of peers we want to retrieve from (default to all)
- actives := authorizationList
- if actives == nil {
- actives = make(map[string]struct{})
- for peer := range f.announces {
- actives[peer] = struct{}{}
- }
- }
- if len(actives) == 0 {
- return
- }
- // For each active peer, try to schedule some transaction fetches
- idle := len(f.requests) == 0
- f.forEachPeer(actives, func(peer string) {
- if f.requests[peer] != nil {
- return // continue in the for-each
- }
- if len(f.announces[peer]) == 0 {
- return // continue in the for-each
- }
- hashes := make([]common.Hash, 0, maxTxRetrievals)
- f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
- if _, ok := f.fetching[hash]; !ok {
- // Mark the hash as fetching and stash away possible alternates
- f.fetching[hash] = peer
- if _, ok := f.alternates[hash]; ok {
- panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
- }
- f.alternates[hash] = f.announced[hash]
- delete(f.announced, hash)
- // Accumulate the hash and stop if the limit was reached
- hashes = append(hashes, hash)
- if len(hashes) >= maxTxRetrievals {
- return false // break in the for-each
- }
- }
- return true // continue in the for-each
- })
- // If any hashes were allocated, request them from the peer
- if len(hashes) > 0 {
- f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
- txRequestOutMeter.Mark(int64(len(hashes)))
- go func(peer string, hashes []common.Hash) {
- // Try to fetch the transactions, but in case of a request
- // failure (e.g. peer disconnected), reschedule the hashes.
- if err := f.fetchTxs(peer, hashes); err != nil {
- txRequestFailMeter.Mark(int64(len(hashes)))
- f.Drop(peer)
- }
- }(peer, hashes)
- }
- })
- // If a new request was fired, schedule a timeout timer
- if idle && len(f.requests) > 0 {
- f.rescheduleTimeout(timer, timeout)
- }
- }
- // forEachPeer does a range loop over a map of peers in production, but during
- // testing it does a deterministic sorted random to allow reproducing issues.
- func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) {
- // If we're running production, use whatever Go's map gives us
- if f.rand == nil {
- for peer := range peers {
- do(peer)
- }
- return
- }
- // We're running the test suite, make iteration deterministic
- list := make([]string, 0, len(peers))
- for peer := range peers {
- list = append(list, peer)
- }
- sort.Strings(list)
- rotateStrings(list, f.rand.Intn(len(list)))
- for _, peer := range list {
- do(peer)
- }
- }
- // forEachHash does a range loop over a map of hashes in production, but during
- // testing it does a deterministic sorted random to allow reproducing issues.
- func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
- // If we're running production, use whatever Go's map gives us
- if f.rand == nil {
- for hash := range hashes {
- if !do(hash) {
- return
- }
- }
- return
- }
- // We're running the test suite, make iteration deterministic
- list := make([]common.Hash, 0, len(hashes))
- for hash := range hashes {
- list = append(list, hash)
- }
- sortHashes(list)
- rotateHashes(list, f.rand.Intn(len(list)))
- for _, hash := range list {
- if !do(hash) {
- return
- }
- }
- }
- // rotateStrings rotates the contents of a slice by n steps. This method is only
- // used in tests to simulate random map iteration but keep it deterministic.
- func rotateStrings(slice []string, n int) {
- orig := make([]string, len(slice))
- copy(orig, slice)
- for i := 0; i < len(orig); i++ {
- slice[i] = orig[(i+n)%len(orig)]
- }
- }
- // sortHashes sorts a slice of hashes. This method is only used in tests in order
- // to simulate random map iteration but keep it deterministic.
- func sortHashes(slice []common.Hash) {
- for i := 0; i < len(slice); i++ {
- for j := i + 1; j < len(slice); j++ {
- if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
- slice[i], slice[j] = slice[j], slice[i]
- }
- }
- }
- }
- // rotateHashes rotates the contents of a slice by n steps. This method is only
- // used in tests to simulate random map iteration but keep it deterministic.
- func rotateHashes(slice []common.Hash, n int) {
- orig := make([]common.Hash, len(slice))
- copy(orig, slice)
- for i := 0; i < len(orig); i++ {
- slice[i] = orig[(i+n)%len(orig)]
- }
- }
|