123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- // Copyright 2020 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package client
- import (
- "errors"
- "math/rand"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/les/utils"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/enr"
- "github.com/ethereum/go-ethereum/p2p/nodestate"
- "github.com/ethereum/go-ethereum/rlp"
- )
- const (
- minTimeout = time.Millisecond * 500 // minimum request timeout suggested by the server pool
- timeoutRefresh = time.Second * 5 // recalculate timeout if older than this
- dialCost = 10000 // cost of a TCP dial (used for known node selection weight calculation)
- dialWaitStep = 1.5 // exponential multiplier of redial wait time when no value was provided by the server
- queryCost = 500 // cost of a UDP pre-negotiation query
- queryWaitStep = 1.02 // exponential multiplier of redial wait time when no value was provided by the server
- waitThreshold = time.Hour * 2000 // drop node if waiting time is over the threshold
- nodeWeightMul = 1000000 // multiplier constant for node weight calculation
- nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set
- minRedialWait = 10 // minimum redial wait time in seconds
- preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries
- warnQueryFails = 20 // number of consecutive UDP query failures before we print a warning
- maxQueryFails = 100 // number of consecutive UDP query failures when then chance of skipping a query reaches 50%
- )
- // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
- // nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
- type ServerPool struct {
- clock mclock.Clock
- unixTime func() int64
- db ethdb.KeyValueStore
- ns *nodestate.NodeStateMachine
- vt *ValueTracker
- mixer *enode.FairMix
- mixSources []enode.Iterator
- dialIterator enode.Iterator
- validSchemes enr.IdentityScheme
- trustedURLs []string
- fillSet *FillSet
- started, queryFails uint32
- timeoutLock sync.RWMutex
- timeout time.Duration
- timeWeights ResponseTimeWeights
- timeoutRefreshed mclock.AbsTime
- suggestedTimeoutGauge, totalValueGauge metrics.Gauge
- sessionValueMeter metrics.Meter
- }
- // nodeHistory keeps track of dial costs which determine node weight together with the
- // service value calculated by ValueTracker.
- type nodeHistory struct {
- dialCost utils.ExpiredValue
- redialWaitStart, redialWaitEnd int64 // unix time (seconds)
- }
- type nodeHistoryEnc struct {
- DialCost utils.ExpiredValue
- RedialWaitStart, RedialWaitEnd uint64
- }
- // queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs.
- // It returns 1 if the remote node has confirmed that connection is possible, 0 if not
- // possible and -1 if no response arrived (timeout).
- type QueryFunc func(*enode.Node) int
- var (
- clientSetup = &nodestate.Setup{Version: 2}
- sfHasValue = clientSetup.NewPersistentFlag("hasValue")
- sfQuery = clientSetup.NewFlag("query")
- sfCanDial = clientSetup.NewFlag("canDial")
- sfDialing = clientSetup.NewFlag("dialed")
- sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout")
- sfConnected = clientSetup.NewFlag("connected")
- sfRedialWait = clientSetup.NewFlag("redialWait")
- sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect")
- sfDialProcess = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait)
- sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
- func(field interface{}) ([]byte, error) {
- if n, ok := field.(nodeHistory); ok {
- ne := nodeHistoryEnc{
- DialCost: n.dialCost,
- RedialWaitStart: uint64(n.redialWaitStart),
- RedialWaitEnd: uint64(n.redialWaitEnd),
- }
- enc, err := rlp.EncodeToBytes(&ne)
- return enc, err
- }
- return nil, errors.New("invalid field type")
- },
- func(enc []byte) (interface{}, error) {
- var ne nodeHistoryEnc
- err := rlp.DecodeBytes(enc, &ne)
- n := nodeHistory{
- dialCost: ne.DialCost,
- redialWaitStart: int64(ne.RedialWaitStart),
- redialWaitEnd: int64(ne.RedialWaitEnd),
- }
- return n, err
- },
- )
- sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
- sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
- sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
- func(field interface{}) ([]byte, error) {
- if enr, ok := field.(*enr.Record); ok {
- enc, err := rlp.EncodeToBytes(enr)
- return enc, err
- }
- return nil, errors.New("invalid field type")
- },
- func(enc []byte) (interface{}, error) {
- var enr enr.Record
- if err := rlp.DecodeBytes(enc, &enr); err != nil {
- return nil, err
- }
- return &enr, nil
- },
- )
- )
- // NewServerPool creates a new server pool
- func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query QueryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
- s := &ServerPool{
- db: db,
- clock: clock,
- unixTime: func() int64 { return time.Now().Unix() },
- validSchemes: enode.ValidSchemes,
- trustedURLs: trustedURLs,
- vt: NewValueTracker(db, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
- ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, clientSetup),
- }
- s.recalTimeout()
- s.mixer = enode.NewFairMix(mixTimeout)
- knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight)
- alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil)
- s.mixSources = append(s.mixSources, knownSelector)
- s.mixSources = append(s.mixSources, alwaysConnect)
- s.dialIterator = s.mixer
- if query != nil {
- s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
- }
- s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
- if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
- // dial timeout, no connection
- s.setRedialWait(n, dialCost, dialWaitStep)
- s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
- }
- })
- return s, &serverPoolIterator{
- dialIterator: s.dialIterator,
- nextFn: func(node *enode.Node) {
- s.ns.Operation(func() {
- s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
- s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
- })
- },
- nodeFn: s.DialNode,
- }
- }
- type serverPoolIterator struct {
- dialIterator enode.Iterator
- nextFn func(*enode.Node)
- nodeFn func(*enode.Node) *enode.Node
- }
- // Next implements enode.Iterator
- func (s *serverPoolIterator) Next() bool {
- if s.dialIterator.Next() {
- s.nextFn(s.dialIterator.Node())
- return true
- }
- return false
- }
- // Node implements enode.Iterator
- func (s *serverPoolIterator) Node() *enode.Node {
- return s.nodeFn(s.dialIterator.Node())
- }
- // Close implements enode.Iterator
- func (s *serverPoolIterator) Close() {
- s.dialIterator.Close()
- }
- // AddMetrics adds metrics to the server pool. Should be called before Start().
- func (s *ServerPool) AddMetrics(
- suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge metrics.Gauge,
- sessionValueMeter, serverDialedMeter metrics.Meter) {
- s.suggestedTimeoutGauge = suggestedTimeoutGauge
- s.totalValueGauge = totalValueGauge
- s.sessionValueMeter = sessionValueMeter
- if serverSelectableGauge != nil {
- s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge)
- }
- if serverDialedMeter != nil {
- s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
- }
- if serverConnectedGauge != nil {
- s.ns.AddLogMetrics(sfConnected, nodestate.Flags{}, "connected", nil, nil, serverConnectedGauge)
- }
- }
- // AddSource adds a node discovery source to the server pool (should be called before start)
- func (s *ServerPool) AddSource(source enode.Iterator) {
- if source != nil {
- s.mixSources = append(s.mixSources, source)
- }
- }
- // addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
- // Nodes that are filtered out and does not appear on the output iterator are put back
- // into redialWait state.
- func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator {
- s.fillSet = NewFillSet(s.ns, input, sfQuery)
- s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) {
- if !newState.Equals(sfQuery) {
- if newState.HasAll(sfQuery) {
- // remove query flag if the node is already somewhere in the dial process
- s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
- }
- return
- }
- fails := atomic.LoadUint32(&s.queryFails)
- failMax := fails
- if failMax > maxQueryFails {
- failMax = maxQueryFails
- }
- if rand.Intn(maxQueryFails*2) < int(failMax) {
- // skip pre-negotiation with increasing chance, max 50%
- // this ensures that the client can operate even if UDP is not working at all
- s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
- // set canDial before resetting queried so that FillSet will not read more
- // candidates unnecessarily
- s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
- return
- }
- go func() {
- q := query(n)
- if q == -1 {
- atomic.AddUint32(&s.queryFails, 1)
- fails++
- if fails%warnQueryFails == 0 {
- // warn if a large number of consecutive queries have failed
- log.Warn("UDP connection queries failed", "count", fails)
- }
- } else {
- atomic.StoreUint32(&s.queryFails, 0)
- }
- s.ns.Operation(func() {
- // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
- if q == 1 {
- s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
- } else {
- s.setRedialWait(n, queryCost, queryWaitStep)
- }
- s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
- })
- }()
- })
- return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
- if waiting {
- s.fillSet.SetTarget(preNegLimit)
- } else {
- s.fillSet.SetTarget(0)
- }
- })
- }
- // start starts the server pool. Note that NodeStateMachine should be started first.
- func (s *ServerPool) Start() {
- s.ns.Start()
- for _, iter := range s.mixSources {
- // add sources to mixer at startup because the mixer instantly tries to read them
- // which should only happen after NodeStateMachine has been started
- s.mixer.AddSource(iter)
- }
- for _, url := range s.trustedURLs {
- if node, err := enode.Parse(s.validSchemes, url); err == nil {
- s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
- } else {
- log.Error("Invalid trusted server URL", "url", url, "error", err)
- }
- }
- unixTime := s.unixTime()
- s.ns.Operation(func() {
- s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
- s.calculateWeight(node)
- if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
- wait := n.redialWaitEnd - unixTime
- lastWait := n.redialWaitEnd - n.redialWaitStart
- if wait > lastWait {
- // if the time until expiration is larger than the last suggested
- // waiting time then the system clock was probably adjusted
- wait = lastWait
- }
- s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
- }
- })
- })
- atomic.StoreUint32(&s.started, 1)
- }
- // stop stops the server pool
- func (s *ServerPool) Stop() {
- if s.fillSet != nil {
- s.fillSet.Close()
- }
- s.ns.Operation(func() {
- s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
- // recalculate weight of connected nodes in order to update hasValue flag if necessary
- s.calculateWeight(n)
- })
- })
- s.ns.Stop()
- s.vt.Stop()
- }
- // RegisterNode implements serverPeerSubscriber
- func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
- if atomic.LoadUint32(&s.started) == 0 {
- return nil, errors.New("server pool not started yet")
- }
- nvt := s.vt.Register(node.ID())
- s.ns.Operation(func() {
- s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
- s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
- if node.IP().IsLoopback() {
- s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
- }
- })
- return nvt, nil
- }
- // UnregisterNode implements serverPeerSubscriber
- func (s *ServerPool) UnregisterNode(node *enode.Node) {
- s.ns.Operation(func() {
- s.setRedialWait(node, dialCost, dialWaitStep)
- s.ns.SetStateSub(node, nodestate.Flags{}, sfConnected, 0)
- s.ns.SetFieldSub(node, sfiConnectedStats, nil)
- })
- s.vt.Unregister(node.ID())
- }
- // recalTimeout calculates the current recommended timeout. This value is used by
- // the client as a "soft timeout" value. It also affects the service value calculation
- // of individual nodes.
- func (s *ServerPool) recalTimeout() {
- // Use cached result if possible, avoid recalculating too frequently.
- s.timeoutLock.RLock()
- refreshed := s.timeoutRefreshed
- s.timeoutLock.RUnlock()
- now := s.clock.Now()
- if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
- return
- }
- // Cached result is stale, recalculate a new one.
- rts := s.vt.RtStats()
- // Add a fake statistic here. It is an easy way to initialize with some
- // conservative values when the database is new. As soon as we have a
- // considerable amount of real stats this small value won't matter.
- rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
- // Use either 10% failure rate timeout or twice the median response time
- // as the recommended timeout.
- timeout := minTimeout
- if t := rts.Timeout(0.1); t > timeout {
- timeout = t
- }
- if t := rts.Timeout(0.5) * 2; t > timeout {
- timeout = t
- }
- s.timeoutLock.Lock()
- if s.timeout != timeout {
- s.timeout = timeout
- s.timeWeights = TimeoutWeights(s.timeout)
- if s.suggestedTimeoutGauge != nil {
- s.suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
- }
- if s.totalValueGauge != nil {
- s.totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
- }
- }
- s.timeoutRefreshed = now
- s.timeoutLock.Unlock()
- }
- // GetTimeout returns the recommended request timeout.
- func (s *ServerPool) GetTimeout() time.Duration {
- s.recalTimeout()
- s.timeoutLock.RLock()
- defer s.timeoutLock.RUnlock()
- return s.timeout
- }
- // getTimeoutAndWeight returns the recommended request timeout as well as the
- // response time weight which is necessary to calculate service value.
- func (s *ServerPool) getTimeoutAndWeight() (time.Duration, ResponseTimeWeights) {
- s.recalTimeout()
- s.timeoutLock.RLock()
- defer s.timeoutLock.RUnlock()
- return s.timeout, s.timeWeights
- }
- // addDialCost adds the given amount of dial cost to the node history and returns the current
- // amount of total dial cost
- func (s *ServerPool) addDialCost(n *nodeHistory, amount int64) uint64 {
- logOffset := s.vt.StatsExpirer().LogOffset(s.clock.Now())
- if amount > 0 {
- n.dialCost.Add(amount, logOffset)
- }
- totalDialCost := n.dialCost.Value(logOffset)
- if totalDialCost < dialCost {
- totalDialCost = dialCost
- }
- return totalDialCost
- }
- // serviceValue returns the service value accumulated in this session and in total
- func (s *ServerPool) serviceValue(node *enode.Node) (sessionValue, totalValue float64) {
- nvt := s.vt.GetNode(node.ID())
- if nvt == nil {
- return 0, 0
- }
- currentStats := nvt.RtStats()
- _, timeWeights := s.getTimeoutAndWeight()
- expFactor := s.vt.StatsExpFactor()
- totalValue = currentStats.Value(timeWeights, expFactor)
- if connStats, ok := s.ns.GetField(node, sfiConnectedStats).(ResponseTimeStats); ok {
- diff := currentStats
- diff.SubStats(&connStats)
- sessionValue = diff.Value(timeWeights, expFactor)
- if s.sessionValueMeter != nil {
- s.sessionValueMeter.Mark(int64(sessionValue))
- }
- }
- return
- }
- // updateWeight calculates the node weight and updates the nodeWeight field and the
- // hasValue flag. It also saves the node state if necessary.
- // Note: this function should run inside a NodeStateMachine operation
- func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
- weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
- if weight >= nodeWeightThreshold {
- s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
- s.ns.SetFieldSub(node, sfiNodeWeight, weight)
- } else {
- s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
- s.ns.SetFieldSub(node, sfiNodeWeight, nil)
- s.ns.SetFieldSub(node, sfiNodeHistory, nil)
- s.ns.SetFieldSub(node, sfiLocalAddress, nil)
- }
- s.ns.Persist(node) // saved if node history or hasValue changed
- }
- // setRedialWait calculates and sets the redialWait timeout based on the service value
- // and dial cost accumulated during the last session/attempt and in total.
- // The waiting time is raised exponentially if no service value has been received in order
- // to prevent dialing an unresponsive node frequently for a very long time just because it
- // was useful in the past. It can still be occasionally dialed though and once it provides
- // a significant amount of service value again its waiting time is quickly reduced or reset
- // to the minimum.
- // Note: node weight is also recalculated and updated by this function.
- // Note 2: this function should run inside a NodeStateMachine operation
- func (s *ServerPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
- n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
- sessionValue, totalValue := s.serviceValue(node)
- totalDialCost := s.addDialCost(&n, addDialCost)
- // if the current dial session has yielded at least the average value/dial cost ratio
- // then the waiting time should be reset to the minimum. If the session value
- // is below average but still positive then timeout is limited to the ratio of
- // average / current service value multiplied by the minimum timeout. If the attempt
- // was unsuccessful then timeout is raised exponentially without limitation.
- // Note: dialCost is used in the formula below even if dial was not attempted at all
- // because the pre-negotiation query did not return a positive result. In this case
- // the ratio has no meaning anyway and waitFactor is always raised, though in smaller
- // steps because queries are cheaper and therefore we can allow more failed attempts.
- unixTime := s.unixTime()
- plannedTimeout := float64(n.redialWaitEnd - n.redialWaitStart) // last planned redialWait timeout
- var actualWait float64 // actual waiting time elapsed
- if unixTime > n.redialWaitEnd {
- // the planned timeout has elapsed
- actualWait = plannedTimeout
- } else {
- // if the node was redialed earlier then we do not raise the planned timeout
- // exponentially because that could lead to the timeout rising very high in
- // a short amount of time
- // Note that in case of an early redial actualWait also includes the dial
- // timeout or connection time of the last attempt but it still serves its
- // purpose of preventing the timeout rising quicker than linearly as a function
- // of total time elapsed without a successful connection.
- actualWait = float64(unixTime - n.redialWaitStart)
- }
- // raise timeout exponentially if the last planned timeout has elapsed
- // (use at least the last planned timeout otherwise)
- nextTimeout := actualWait * waitStep
- if plannedTimeout > nextTimeout {
- nextTimeout = plannedTimeout
- }
- // we reduce the waiting time if the server has provided service value during the
- // connection (but never under the minimum)
- a := totalValue * dialCost * float64(minRedialWait)
- b := float64(totalDialCost) * sessionValue
- if a < b*nextTimeout {
- nextTimeout = a / b
- }
- if nextTimeout < minRedialWait {
- nextTimeout = minRedialWait
- }
- wait := time.Duration(float64(time.Second) * nextTimeout)
- if wait < waitThreshold {
- n.redialWaitStart = unixTime
- n.redialWaitEnd = unixTime + int64(nextTimeout)
- s.ns.SetFieldSub(node, sfiNodeHistory, n)
- s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
- s.updateWeight(node, totalValue, totalDialCost)
- } else {
- // discard known node statistics if waiting time is very long because the node
- // hasn't been responsive for a very long time
- s.ns.SetFieldSub(node, sfiNodeHistory, nil)
- s.ns.SetFieldSub(node, sfiNodeWeight, nil)
- s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
- }
- }
- // calculateWeight calculates and sets the node weight without altering the node history.
- // This function should be called during startup and shutdown only, otherwise setRedialWait
- // will keep the weights updated as the underlying statistics are adjusted.
- // Note: this function should run inside a NodeStateMachine operation
- func (s *ServerPool) calculateWeight(node *enode.Node) {
- n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
- _, totalValue := s.serviceValue(node)
- totalDialCost := s.addDialCost(&n, 0)
- s.updateWeight(node, totalValue, totalDialCost)
- }
- // API returns the vflux client API
- func (s *ServerPool) API() *PrivateClientAPI {
- return NewPrivateClientAPI(s.vt)
- }
- type dummyIdentity enode.ID
- func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
- func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
- // DialNode replaces the given enode with a locally generated one containing the ENR
- // stored in the sfiLocalAddress field if present. This workaround ensures that nodes
- // on the local network can be dialed at the local address if a connection has been
- // successfully established previously.
- // Note that NodeStateMachine always remembers the enode with the latest version of
- // the remote signed ENR. ENR filtering should be performed on that version while
- // dialNode should be used for dialing the node over TCP or UDP.
- func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
- if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
- n, _ := enode.New(dummyIdentity(n.ID()), enr)
- return n
- }
- return n
- }
- // Persist immediately stores the state of a node in the node database
- func (s *ServerPool) Persist(n *enode.Node) {
- s.ns.Persist(n)
- }
|