serverpool.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "errors"
  19. "math/rand"
  20. "reflect"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/les/utils"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/enr"
  31. "github.com/ethereum/go-ethereum/p2p/nodestate"
  32. "github.com/ethereum/go-ethereum/rlp"
  33. )
  34. const (
  35. minTimeout = time.Millisecond * 500 // minimum request timeout suggested by the server pool
  36. timeoutRefresh = time.Second * 5 // recalculate timeout if older than this
  37. dialCost = 10000 // cost of a TCP dial (used for known node selection weight calculation)
  38. dialWaitStep = 1.5 // exponential multiplier of redial wait time when no value was provided by the server
  39. queryCost = 500 // cost of a UDP pre-negotiation query
  40. queryWaitStep = 1.02 // exponential multiplier of redial wait time when no value was provided by the server
  41. waitThreshold = time.Hour * 2000 // drop node if waiting time is over the threshold
  42. nodeWeightMul = 1000000 // multiplier constant for node weight calculation
  43. nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set
  44. minRedialWait = 10 // minimum redial wait time in seconds
  45. preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries
  46. warnQueryFails = 20 // number of consecutive UDP query failures before we print a warning
  47. maxQueryFails = 100 // number of consecutive UDP query failures when then chance of skipping a query reaches 50%
  48. )
  49. // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
  50. // nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
  51. type ServerPool struct {
  52. clock mclock.Clock
  53. unixTime func() int64
  54. db ethdb.KeyValueStore
  55. ns *nodestate.NodeStateMachine
  56. vt *ValueTracker
  57. mixer *enode.FairMix
  58. mixSources []enode.Iterator
  59. dialIterator enode.Iterator
  60. validSchemes enr.IdentityScheme
  61. trustedURLs []string
  62. fillSet *FillSet
  63. started, queryFails uint32
  64. timeoutLock sync.RWMutex
  65. timeout time.Duration
  66. timeWeights ResponseTimeWeights
  67. timeoutRefreshed mclock.AbsTime
  68. suggestedTimeoutGauge, totalValueGauge metrics.Gauge
  69. sessionValueMeter metrics.Meter
  70. }
  71. // nodeHistory keeps track of dial costs which determine node weight together with the
  72. // service value calculated by ValueTracker.
  73. type nodeHistory struct {
  74. dialCost utils.ExpiredValue
  75. redialWaitStart, redialWaitEnd int64 // unix time (seconds)
  76. }
  77. type nodeHistoryEnc struct {
  78. DialCost utils.ExpiredValue
  79. RedialWaitStart, RedialWaitEnd uint64
  80. }
  81. // queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs.
  82. // It returns 1 if the remote node has confirmed that connection is possible, 0 if not
  83. // possible and -1 if no response arrived (timeout).
  84. type QueryFunc func(*enode.Node) int
  85. var (
  86. clientSetup = &nodestate.Setup{Version: 2}
  87. sfHasValue = clientSetup.NewPersistentFlag("hasValue")
  88. sfQuery = clientSetup.NewFlag("query")
  89. sfCanDial = clientSetup.NewFlag("canDial")
  90. sfDialing = clientSetup.NewFlag("dialed")
  91. sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout")
  92. sfConnected = clientSetup.NewFlag("connected")
  93. sfRedialWait = clientSetup.NewFlag("redialWait")
  94. sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect")
  95. sfDialProcess = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait)
  96. sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
  97. func(field interface{}) ([]byte, error) {
  98. if n, ok := field.(nodeHistory); ok {
  99. ne := nodeHistoryEnc{
  100. DialCost: n.dialCost,
  101. RedialWaitStart: uint64(n.redialWaitStart),
  102. RedialWaitEnd: uint64(n.redialWaitEnd),
  103. }
  104. enc, err := rlp.EncodeToBytes(&ne)
  105. return enc, err
  106. }
  107. return nil, errors.New("invalid field type")
  108. },
  109. func(enc []byte) (interface{}, error) {
  110. var ne nodeHistoryEnc
  111. err := rlp.DecodeBytes(enc, &ne)
  112. n := nodeHistory{
  113. dialCost: ne.DialCost,
  114. redialWaitStart: int64(ne.RedialWaitStart),
  115. redialWaitEnd: int64(ne.RedialWaitEnd),
  116. }
  117. return n, err
  118. },
  119. )
  120. sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
  121. sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
  122. sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
  123. func(field interface{}) ([]byte, error) {
  124. if enr, ok := field.(*enr.Record); ok {
  125. enc, err := rlp.EncodeToBytes(enr)
  126. return enc, err
  127. }
  128. return nil, errors.New("invalid field type")
  129. },
  130. func(enc []byte) (interface{}, error) {
  131. var enr enr.Record
  132. if err := rlp.DecodeBytes(enc, &enr); err != nil {
  133. return nil, err
  134. }
  135. return &enr, nil
  136. },
  137. )
  138. )
  139. // NewServerPool creates a new server pool
  140. func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query QueryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
  141. s := &ServerPool{
  142. db: db,
  143. clock: clock,
  144. unixTime: func() int64 { return time.Now().Unix() },
  145. validSchemes: enode.ValidSchemes,
  146. trustedURLs: trustedURLs,
  147. vt: NewValueTracker(db, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
  148. ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, clientSetup),
  149. }
  150. s.recalTimeout()
  151. s.mixer = enode.NewFairMix(mixTimeout)
  152. knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight)
  153. alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil)
  154. s.mixSources = append(s.mixSources, knownSelector)
  155. s.mixSources = append(s.mixSources, alwaysConnect)
  156. s.dialIterator = s.mixer
  157. if query != nil {
  158. s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
  159. }
  160. s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
  161. if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
  162. // dial timeout, no connection
  163. s.setRedialWait(n, dialCost, dialWaitStep)
  164. s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
  165. }
  166. })
  167. return s, &serverPoolIterator{
  168. dialIterator: s.dialIterator,
  169. nextFn: func(node *enode.Node) {
  170. s.ns.Operation(func() {
  171. s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
  172. s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
  173. })
  174. },
  175. nodeFn: s.DialNode,
  176. }
  177. }
  178. type serverPoolIterator struct {
  179. dialIterator enode.Iterator
  180. nextFn func(*enode.Node)
  181. nodeFn func(*enode.Node) *enode.Node
  182. }
  183. // Next implements enode.Iterator
  184. func (s *serverPoolIterator) Next() bool {
  185. if s.dialIterator.Next() {
  186. s.nextFn(s.dialIterator.Node())
  187. return true
  188. }
  189. return false
  190. }
  191. // Node implements enode.Iterator
  192. func (s *serverPoolIterator) Node() *enode.Node {
  193. return s.nodeFn(s.dialIterator.Node())
  194. }
  195. // Close implements enode.Iterator
  196. func (s *serverPoolIterator) Close() {
  197. s.dialIterator.Close()
  198. }
  199. // AddMetrics adds metrics to the server pool. Should be called before Start().
  200. func (s *ServerPool) AddMetrics(
  201. suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge metrics.Gauge,
  202. sessionValueMeter, serverDialedMeter metrics.Meter) {
  203. s.suggestedTimeoutGauge = suggestedTimeoutGauge
  204. s.totalValueGauge = totalValueGauge
  205. s.sessionValueMeter = sessionValueMeter
  206. if serverSelectableGauge != nil {
  207. s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge)
  208. }
  209. if serverDialedMeter != nil {
  210. s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
  211. }
  212. if serverConnectedGauge != nil {
  213. s.ns.AddLogMetrics(sfConnected, nodestate.Flags{}, "connected", nil, nil, serverConnectedGauge)
  214. }
  215. }
  216. // AddSource adds a node discovery source to the server pool (should be called before start)
  217. func (s *ServerPool) AddSource(source enode.Iterator) {
  218. if source != nil {
  219. s.mixSources = append(s.mixSources, source)
  220. }
  221. }
  222. // addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
  223. // Nodes that are filtered out and does not appear on the output iterator are put back
  224. // into redialWait state.
  225. func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator {
  226. s.fillSet = NewFillSet(s.ns, input, sfQuery)
  227. s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) {
  228. if !newState.Equals(sfQuery) {
  229. if newState.HasAll(sfQuery) {
  230. // remove query flag if the node is already somewhere in the dial process
  231. s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
  232. }
  233. return
  234. }
  235. fails := atomic.LoadUint32(&s.queryFails)
  236. failMax := fails
  237. if failMax > maxQueryFails {
  238. failMax = maxQueryFails
  239. }
  240. if rand.Intn(maxQueryFails*2) < int(failMax) {
  241. // skip pre-negotiation with increasing chance, max 50%
  242. // this ensures that the client can operate even if UDP is not working at all
  243. s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
  244. // set canDial before resetting queried so that FillSet will not read more
  245. // candidates unnecessarily
  246. s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
  247. return
  248. }
  249. go func() {
  250. q := query(n)
  251. if q == -1 {
  252. atomic.AddUint32(&s.queryFails, 1)
  253. fails++
  254. if fails%warnQueryFails == 0 {
  255. // warn if a large number of consecutive queries have failed
  256. log.Warn("UDP connection queries failed", "count", fails)
  257. }
  258. } else {
  259. atomic.StoreUint32(&s.queryFails, 0)
  260. }
  261. s.ns.Operation(func() {
  262. // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
  263. if q == 1 {
  264. s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
  265. } else {
  266. s.setRedialWait(n, queryCost, queryWaitStep)
  267. }
  268. s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
  269. })
  270. }()
  271. })
  272. return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
  273. if waiting {
  274. s.fillSet.SetTarget(preNegLimit)
  275. } else {
  276. s.fillSet.SetTarget(0)
  277. }
  278. })
  279. }
  280. // start starts the server pool. Note that NodeStateMachine should be started first.
  281. func (s *ServerPool) Start() {
  282. s.ns.Start()
  283. for _, iter := range s.mixSources {
  284. // add sources to mixer at startup because the mixer instantly tries to read them
  285. // which should only happen after NodeStateMachine has been started
  286. s.mixer.AddSource(iter)
  287. }
  288. for _, url := range s.trustedURLs {
  289. if node, err := enode.Parse(s.validSchemes, url); err == nil {
  290. s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
  291. } else {
  292. log.Error("Invalid trusted server URL", "url", url, "error", err)
  293. }
  294. }
  295. unixTime := s.unixTime()
  296. s.ns.Operation(func() {
  297. s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  298. s.calculateWeight(node)
  299. if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
  300. wait := n.redialWaitEnd - unixTime
  301. lastWait := n.redialWaitEnd - n.redialWaitStart
  302. if wait > lastWait {
  303. // if the time until expiration is larger than the last suggested
  304. // waiting time then the system clock was probably adjusted
  305. wait = lastWait
  306. }
  307. s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
  308. }
  309. })
  310. })
  311. atomic.StoreUint32(&s.started, 1)
  312. }
  313. // stop stops the server pool
  314. func (s *ServerPool) Stop() {
  315. if s.fillSet != nil {
  316. s.fillSet.Close()
  317. }
  318. s.ns.Operation(func() {
  319. s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
  320. // recalculate weight of connected nodes in order to update hasValue flag if necessary
  321. s.calculateWeight(n)
  322. })
  323. })
  324. s.ns.Stop()
  325. s.vt.Stop()
  326. }
  327. // RegisterNode implements serverPeerSubscriber
  328. func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
  329. if atomic.LoadUint32(&s.started) == 0 {
  330. return nil, errors.New("server pool not started yet")
  331. }
  332. nvt := s.vt.Register(node.ID())
  333. s.ns.Operation(func() {
  334. s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
  335. s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
  336. if node.IP().IsLoopback() {
  337. s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
  338. }
  339. })
  340. return nvt, nil
  341. }
  342. // UnregisterNode implements serverPeerSubscriber
  343. func (s *ServerPool) UnregisterNode(node *enode.Node) {
  344. s.ns.Operation(func() {
  345. s.setRedialWait(node, dialCost, dialWaitStep)
  346. s.ns.SetStateSub(node, nodestate.Flags{}, sfConnected, 0)
  347. s.ns.SetFieldSub(node, sfiConnectedStats, nil)
  348. })
  349. s.vt.Unregister(node.ID())
  350. }
  351. // recalTimeout calculates the current recommended timeout. This value is used by
  352. // the client as a "soft timeout" value. It also affects the service value calculation
  353. // of individual nodes.
  354. func (s *ServerPool) recalTimeout() {
  355. // Use cached result if possible, avoid recalculating too frequently.
  356. s.timeoutLock.RLock()
  357. refreshed := s.timeoutRefreshed
  358. s.timeoutLock.RUnlock()
  359. now := s.clock.Now()
  360. if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
  361. return
  362. }
  363. // Cached result is stale, recalculate a new one.
  364. rts := s.vt.RtStats()
  365. // Add a fake statistic here. It is an easy way to initialize with some
  366. // conservative values when the database is new. As soon as we have a
  367. // considerable amount of real stats this small value won't matter.
  368. rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
  369. // Use either 10% failure rate timeout or twice the median response time
  370. // as the recommended timeout.
  371. timeout := minTimeout
  372. if t := rts.Timeout(0.1); t > timeout {
  373. timeout = t
  374. }
  375. if t := rts.Timeout(0.5) * 2; t > timeout {
  376. timeout = t
  377. }
  378. s.timeoutLock.Lock()
  379. if s.timeout != timeout {
  380. s.timeout = timeout
  381. s.timeWeights = TimeoutWeights(s.timeout)
  382. if s.suggestedTimeoutGauge != nil {
  383. s.suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
  384. }
  385. if s.totalValueGauge != nil {
  386. s.totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
  387. }
  388. }
  389. s.timeoutRefreshed = now
  390. s.timeoutLock.Unlock()
  391. }
  392. // GetTimeout returns the recommended request timeout.
  393. func (s *ServerPool) GetTimeout() time.Duration {
  394. s.recalTimeout()
  395. s.timeoutLock.RLock()
  396. defer s.timeoutLock.RUnlock()
  397. return s.timeout
  398. }
  399. // getTimeoutAndWeight returns the recommended request timeout as well as the
  400. // response time weight which is necessary to calculate service value.
  401. func (s *ServerPool) getTimeoutAndWeight() (time.Duration, ResponseTimeWeights) {
  402. s.recalTimeout()
  403. s.timeoutLock.RLock()
  404. defer s.timeoutLock.RUnlock()
  405. return s.timeout, s.timeWeights
  406. }
  407. // addDialCost adds the given amount of dial cost to the node history and returns the current
  408. // amount of total dial cost
  409. func (s *ServerPool) addDialCost(n *nodeHistory, amount int64) uint64 {
  410. logOffset := s.vt.StatsExpirer().LogOffset(s.clock.Now())
  411. if amount > 0 {
  412. n.dialCost.Add(amount, logOffset)
  413. }
  414. totalDialCost := n.dialCost.Value(logOffset)
  415. if totalDialCost < dialCost {
  416. totalDialCost = dialCost
  417. }
  418. return totalDialCost
  419. }
  420. // serviceValue returns the service value accumulated in this session and in total
  421. func (s *ServerPool) serviceValue(node *enode.Node) (sessionValue, totalValue float64) {
  422. nvt := s.vt.GetNode(node.ID())
  423. if nvt == nil {
  424. return 0, 0
  425. }
  426. currentStats := nvt.RtStats()
  427. _, timeWeights := s.getTimeoutAndWeight()
  428. expFactor := s.vt.StatsExpFactor()
  429. totalValue = currentStats.Value(timeWeights, expFactor)
  430. if connStats, ok := s.ns.GetField(node, sfiConnectedStats).(ResponseTimeStats); ok {
  431. diff := currentStats
  432. diff.SubStats(&connStats)
  433. sessionValue = diff.Value(timeWeights, expFactor)
  434. if s.sessionValueMeter != nil {
  435. s.sessionValueMeter.Mark(int64(sessionValue))
  436. }
  437. }
  438. return
  439. }
  440. // updateWeight calculates the node weight and updates the nodeWeight field and the
  441. // hasValue flag. It also saves the node state if necessary.
  442. // Note: this function should run inside a NodeStateMachine operation
  443. func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
  444. weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
  445. if weight >= nodeWeightThreshold {
  446. s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
  447. s.ns.SetFieldSub(node, sfiNodeWeight, weight)
  448. } else {
  449. s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
  450. s.ns.SetFieldSub(node, sfiNodeWeight, nil)
  451. s.ns.SetFieldSub(node, sfiNodeHistory, nil)
  452. s.ns.SetFieldSub(node, sfiLocalAddress, nil)
  453. }
  454. s.ns.Persist(node) // saved if node history or hasValue changed
  455. }
  456. // setRedialWait calculates and sets the redialWait timeout based on the service value
  457. // and dial cost accumulated during the last session/attempt and in total.
  458. // The waiting time is raised exponentially if no service value has been received in order
  459. // to prevent dialing an unresponsive node frequently for a very long time just because it
  460. // was useful in the past. It can still be occasionally dialed though and once it provides
  461. // a significant amount of service value again its waiting time is quickly reduced or reset
  462. // to the minimum.
  463. // Note: node weight is also recalculated and updated by this function.
  464. // Note 2: this function should run inside a NodeStateMachine operation
  465. func (s *ServerPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
  466. n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
  467. sessionValue, totalValue := s.serviceValue(node)
  468. totalDialCost := s.addDialCost(&n, addDialCost)
  469. // if the current dial session has yielded at least the average value/dial cost ratio
  470. // then the waiting time should be reset to the minimum. If the session value
  471. // is below average but still positive then timeout is limited to the ratio of
  472. // average / current service value multiplied by the minimum timeout. If the attempt
  473. // was unsuccessful then timeout is raised exponentially without limitation.
  474. // Note: dialCost is used in the formula below even if dial was not attempted at all
  475. // because the pre-negotiation query did not return a positive result. In this case
  476. // the ratio has no meaning anyway and waitFactor is always raised, though in smaller
  477. // steps because queries are cheaper and therefore we can allow more failed attempts.
  478. unixTime := s.unixTime()
  479. plannedTimeout := float64(n.redialWaitEnd - n.redialWaitStart) // last planned redialWait timeout
  480. var actualWait float64 // actual waiting time elapsed
  481. if unixTime > n.redialWaitEnd {
  482. // the planned timeout has elapsed
  483. actualWait = plannedTimeout
  484. } else {
  485. // if the node was redialed earlier then we do not raise the planned timeout
  486. // exponentially because that could lead to the timeout rising very high in
  487. // a short amount of time
  488. // Note that in case of an early redial actualWait also includes the dial
  489. // timeout or connection time of the last attempt but it still serves its
  490. // purpose of preventing the timeout rising quicker than linearly as a function
  491. // of total time elapsed without a successful connection.
  492. actualWait = float64(unixTime - n.redialWaitStart)
  493. }
  494. // raise timeout exponentially if the last planned timeout has elapsed
  495. // (use at least the last planned timeout otherwise)
  496. nextTimeout := actualWait * waitStep
  497. if plannedTimeout > nextTimeout {
  498. nextTimeout = plannedTimeout
  499. }
  500. // we reduce the waiting time if the server has provided service value during the
  501. // connection (but never under the minimum)
  502. a := totalValue * dialCost * float64(minRedialWait)
  503. b := float64(totalDialCost) * sessionValue
  504. if a < b*nextTimeout {
  505. nextTimeout = a / b
  506. }
  507. if nextTimeout < minRedialWait {
  508. nextTimeout = minRedialWait
  509. }
  510. wait := time.Duration(float64(time.Second) * nextTimeout)
  511. if wait < waitThreshold {
  512. n.redialWaitStart = unixTime
  513. n.redialWaitEnd = unixTime + int64(nextTimeout)
  514. s.ns.SetFieldSub(node, sfiNodeHistory, n)
  515. s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
  516. s.updateWeight(node, totalValue, totalDialCost)
  517. } else {
  518. // discard known node statistics if waiting time is very long because the node
  519. // hasn't been responsive for a very long time
  520. s.ns.SetFieldSub(node, sfiNodeHistory, nil)
  521. s.ns.SetFieldSub(node, sfiNodeWeight, nil)
  522. s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
  523. }
  524. }
  525. // calculateWeight calculates and sets the node weight without altering the node history.
  526. // This function should be called during startup and shutdown only, otherwise setRedialWait
  527. // will keep the weights updated as the underlying statistics are adjusted.
  528. // Note: this function should run inside a NodeStateMachine operation
  529. func (s *ServerPool) calculateWeight(node *enode.Node) {
  530. n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
  531. _, totalValue := s.serviceValue(node)
  532. totalDialCost := s.addDialCost(&n, 0)
  533. s.updateWeight(node, totalValue, totalDialCost)
  534. }
  535. // API returns the vflux client API
  536. func (s *ServerPool) API() *PrivateClientAPI {
  537. return NewPrivateClientAPI(s.vt)
  538. }
  539. type dummyIdentity enode.ID
  540. func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
  541. func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
  542. // DialNode replaces the given enode with a locally generated one containing the ENR
  543. // stored in the sfiLocalAddress field if present. This workaround ensures that nodes
  544. // on the local network can be dialed at the local address if a connection has been
  545. // successfully established previously.
  546. // Note that NodeStateMachine always remembers the enode with the latest version of
  547. // the remote signed ENR. ENR filtering should be performed on that version while
  548. // dialNode should be used for dialing the node over TCP or UDP.
  549. func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
  550. if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
  551. n, _ := enode.New(dummyIdentity(n.ID()), enr)
  552. return n
  553. }
  554. return n
  555. }
  556. // Persist immediately stores the state of a node in the node database
  557. func (s *ServerPool) Persist(n *enode.Node) {
  558. s.ns.Persist(n)
  559. }