balance.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "errors"
  19. "math"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/les/utils"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/p2p/nodestate"
  26. )
  27. var errBalanceOverflow = errors.New("balance overflow")
  28. const maxBalance = math.MaxInt64 // maximum allowed balance value
  29. const (
  30. balanceCallbackUpdate = iota // called when priority drops below the last minimum estimate
  31. balanceCallbackZero // called when priority drops to zero (positive balance exhausted)
  32. balanceCallbackCount // total number of balance callbacks
  33. )
  34. // PriceFactors determine the pricing policy (may apply either to positive or
  35. // negative balances which may have different factors).
  36. // - TimeFactor is cost unit per nanosecond of connection time
  37. // - CapacityFactor is cost unit per nanosecond of connection time per 1000000 capacity
  38. // - RequestFactor is cost unit per request "realCost" unit
  39. type PriceFactors struct {
  40. TimeFactor, CapacityFactor, RequestFactor float64
  41. }
  42. // connectionPrice returns the price of connection per nanosecond at the given capacity
  43. // and the estimated average request cost.
  44. func (p PriceFactors) connectionPrice(cap uint64, avgReqCost float64) float64 {
  45. return p.TimeFactor + float64(cap)*p.CapacityFactor/1000000 + p.RequestFactor*avgReqCost
  46. }
  47. type (
  48. // nodePriority interface provides current and estimated future priorities on demand
  49. nodePriority interface {
  50. // priority should return the current priority of the node (higher is better)
  51. priority(cap uint64) int64
  52. // estimatePriority should return a lower estimate for the minimum of the node priority
  53. // value starting from the current moment until the given time. If the priority goes
  54. // under the returned estimate before the specified moment then it is the caller's
  55. // responsibility to signal with updateFlag.
  56. estimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64
  57. }
  58. // ReadOnlyBalance provides read-only operations on the node balance
  59. ReadOnlyBalance interface {
  60. nodePriority
  61. GetBalance() (uint64, uint64)
  62. GetRawBalance() (utils.ExpiredValue, utils.ExpiredValue)
  63. GetPriceFactors() (posFactor, negFactor PriceFactors)
  64. }
  65. // ConnectedBalance provides operations permitted on connected nodes (non-read-only
  66. // operations are not permitted inside a BalanceOperation)
  67. ConnectedBalance interface {
  68. ReadOnlyBalance
  69. SetPriceFactors(posFactor, negFactor PriceFactors)
  70. RequestServed(cost uint64) uint64
  71. }
  72. // AtomicBalanceOperator provides operations permitted in an atomic BalanceOperation
  73. AtomicBalanceOperator interface {
  74. ReadOnlyBalance
  75. AddBalance(amount int64) (uint64, uint64, error)
  76. SetBalance(pos, neg uint64) error
  77. }
  78. )
  79. // nodeBalance keeps track of the positive and negative balances of a connected
  80. // client and calculates actual and projected future priority values.
  81. // Implements nodePriority interface.
  82. type nodeBalance struct {
  83. bt *balanceTracker
  84. lock sync.RWMutex
  85. node *enode.Node
  86. connAddress string
  87. active, hasPriority, setFlags bool
  88. capacity uint64
  89. balance balance
  90. posFactor, negFactor PriceFactors
  91. sumReqCost uint64
  92. lastUpdate, nextUpdate, initTime mclock.AbsTime
  93. updateEvent mclock.Timer
  94. // since only a limited and fixed number of callbacks are needed, they are
  95. // stored in a fixed size array ordered by priority threshold.
  96. callbacks [balanceCallbackCount]balanceCallback
  97. // callbackIndex maps balanceCallback constants to callbacks array indexes (-1 if not active)
  98. callbackIndex [balanceCallbackCount]int
  99. callbackCount int // number of active callbacks
  100. }
  101. // balance represents a pair of positive and negative balances
  102. type balance struct {
  103. pos, neg utils.ExpiredValue
  104. posExp, negExp utils.ValueExpirer
  105. }
  106. // posValue returns the value of positive balance at a given timestamp.
  107. func (b balance) posValue(now mclock.AbsTime) uint64 {
  108. return b.pos.Value(b.posExp.LogOffset(now))
  109. }
  110. // negValue returns the value of negative balance at a given timestamp.
  111. func (b balance) negValue(now mclock.AbsTime) uint64 {
  112. return b.neg.Value(b.negExp.LogOffset(now))
  113. }
  114. // addValue adds the value of a given amount to the balance. The original value and
  115. // updated value will also be returned if the addition is successful.
  116. // Returns the error if the given value is too large and the value overflows.
  117. func (b *balance) addValue(now mclock.AbsTime, amount int64, pos bool, force bool) (uint64, uint64, int64, error) {
  118. var (
  119. val utils.ExpiredValue
  120. offset utils.Fixed64
  121. )
  122. if pos {
  123. offset, val = b.posExp.LogOffset(now), b.pos
  124. } else {
  125. offset, val = b.negExp.LogOffset(now), b.neg
  126. }
  127. old := val.Value(offset)
  128. if amount > 0 && (amount > maxBalance || old > maxBalance-uint64(amount)) {
  129. if !force {
  130. return old, 0, 0, errBalanceOverflow
  131. }
  132. val = utils.ExpiredValue{}
  133. amount = maxBalance
  134. }
  135. net := val.Add(amount, offset)
  136. if pos {
  137. b.pos = val
  138. } else {
  139. b.neg = val
  140. }
  141. return old, val.Value(offset), net, nil
  142. }
  143. // setValue sets the internal balance amount to the given values. Returns the
  144. // error if the given value is too large.
  145. func (b *balance) setValue(now mclock.AbsTime, pos uint64, neg uint64) error {
  146. if pos > maxBalance || neg > maxBalance {
  147. return errBalanceOverflow
  148. }
  149. var pb, nb utils.ExpiredValue
  150. pb.Add(int64(pos), b.posExp.LogOffset(now))
  151. nb.Add(int64(neg), b.negExp.LogOffset(now))
  152. b.pos = pb
  153. b.neg = nb
  154. return nil
  155. }
  156. // balanceCallback represents a single callback that is activated when client priority
  157. // reaches the given threshold
  158. type balanceCallback struct {
  159. id int
  160. threshold int64
  161. callback func()
  162. }
  163. // GetBalance returns the current positive and negative balance.
  164. func (n *nodeBalance) GetBalance() (uint64, uint64) {
  165. n.lock.Lock()
  166. defer n.lock.Unlock()
  167. now := n.bt.clock.Now()
  168. n.updateBalance(now)
  169. return n.balance.posValue(now), n.balance.negValue(now)
  170. }
  171. // GetRawBalance returns the current positive and negative balance
  172. // but in the raw(expired value) format.
  173. func (n *nodeBalance) GetRawBalance() (utils.ExpiredValue, utils.ExpiredValue) {
  174. n.lock.Lock()
  175. defer n.lock.Unlock()
  176. now := n.bt.clock.Now()
  177. n.updateBalance(now)
  178. return n.balance.pos, n.balance.neg
  179. }
  180. // AddBalance adds the given amount to the positive balance and returns the balance
  181. // before and after the operation. Exceeding maxBalance results in an error (balance is
  182. // unchanged) while adding a negative amount higher than the current balance results in
  183. // zero balance.
  184. // Note: this function should run inside a NodeStateMachine operation
  185. func (n *nodeBalance) AddBalance(amount int64) (uint64, uint64, error) {
  186. var (
  187. err error
  188. old, new uint64
  189. now = n.bt.clock.Now()
  190. callbacks []func()
  191. setPriority bool
  192. )
  193. // Operation with holding the lock
  194. n.bt.updateTotalBalance(n, func() bool {
  195. n.updateBalance(now)
  196. if old, new, _, err = n.balance.addValue(now, amount, true, false); err != nil {
  197. return false
  198. }
  199. callbacks, setPriority = n.checkCallbacks(now), n.checkPriorityStatus()
  200. n.storeBalance(true, false)
  201. return true
  202. })
  203. if err != nil {
  204. return old, old, err
  205. }
  206. // Operation without holding the lock
  207. for _, cb := range callbacks {
  208. cb()
  209. }
  210. if n.setFlags {
  211. if setPriority {
  212. n.bt.ns.SetStateSub(n.node, n.bt.setup.priorityFlag, nodestate.Flags{}, 0)
  213. }
  214. // Note: priority flag is automatically removed by the zero priority callback if necessary
  215. n.signalPriorityUpdate()
  216. }
  217. return old, new, nil
  218. }
  219. // SetBalance sets the positive and negative balance to the given values
  220. // Note: this function should run inside a NodeStateMachine operation
  221. func (n *nodeBalance) SetBalance(pos, neg uint64) error {
  222. var (
  223. now = n.bt.clock.Now()
  224. callbacks []func()
  225. setPriority bool
  226. )
  227. // Operation with holding the lock
  228. n.bt.updateTotalBalance(n, func() bool {
  229. n.updateBalance(now)
  230. if err := n.balance.setValue(now, pos, neg); err != nil {
  231. return false
  232. }
  233. callbacks, setPriority = n.checkCallbacks(now), n.checkPriorityStatus()
  234. n.storeBalance(true, true)
  235. return true
  236. })
  237. // Operation without holding the lock
  238. for _, cb := range callbacks {
  239. cb()
  240. }
  241. if n.setFlags {
  242. if setPriority {
  243. n.bt.ns.SetStateSub(n.node, n.bt.setup.priorityFlag, nodestate.Flags{}, 0)
  244. }
  245. // Note: priority flag is automatically removed by the zero priority callback if necessary
  246. n.signalPriorityUpdate()
  247. }
  248. return nil
  249. }
  250. // RequestServed should be called after serving a request for the given peer
  251. func (n *nodeBalance) RequestServed(cost uint64) (newBalance uint64) {
  252. n.lock.Lock()
  253. var (
  254. check bool
  255. fcost = float64(cost)
  256. now = n.bt.clock.Now()
  257. )
  258. n.updateBalance(now)
  259. if !n.balance.pos.IsZero() {
  260. posCost := -int64(fcost * n.posFactor.RequestFactor)
  261. if posCost == 0 {
  262. fcost = 0
  263. newBalance = n.balance.posValue(now)
  264. } else {
  265. var net int64
  266. _, newBalance, net, _ = n.balance.addValue(now, posCost, true, false)
  267. if posCost == net {
  268. fcost = 0
  269. } else {
  270. fcost *= 1 - float64(net)/float64(posCost)
  271. }
  272. check = true
  273. }
  274. }
  275. if fcost > 0 && n.negFactor.RequestFactor != 0 {
  276. n.balance.addValue(now, int64(fcost*n.negFactor.RequestFactor), false, false)
  277. check = true
  278. }
  279. n.sumReqCost += cost
  280. var callbacks []func()
  281. if check {
  282. callbacks = n.checkCallbacks(now)
  283. }
  284. n.lock.Unlock()
  285. if callbacks != nil {
  286. n.bt.ns.Operation(func() {
  287. for _, cb := range callbacks {
  288. cb()
  289. }
  290. })
  291. }
  292. return
  293. }
  294. // priority returns the actual priority based on the current balance
  295. func (n *nodeBalance) priority(capacity uint64) int64 {
  296. n.lock.Lock()
  297. defer n.lock.Unlock()
  298. now := n.bt.clock.Now()
  299. n.updateBalance(now)
  300. return n.balanceToPriority(now, n.balance, capacity)
  301. }
  302. // EstMinPriority gives a lower estimate for the priority at a given time in the future.
  303. // An average request cost per time is assumed that is twice the average cost per time
  304. // in the current session.
  305. // If update is true then a priority callback is added that turns updateFlag on and off
  306. // in case the priority goes below the estimated minimum.
  307. func (n *nodeBalance) estimatePriority(capacity uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
  308. n.lock.Lock()
  309. defer n.lock.Unlock()
  310. now := n.bt.clock.Now()
  311. n.updateBalance(now)
  312. b := n.balance // copy the balance
  313. if addBalance != 0 {
  314. b.addValue(now, addBalance, true, true)
  315. }
  316. if future > 0 {
  317. var avgReqCost float64
  318. dt := time.Duration(n.lastUpdate - n.initTime)
  319. if dt > time.Second {
  320. avgReqCost = float64(n.sumReqCost) * 2 / float64(dt)
  321. }
  322. b = n.reducedBalance(b, now, future, capacity, avgReqCost)
  323. }
  324. if bias > 0 {
  325. b = n.reducedBalance(b, now+mclock.AbsTime(future), bias, capacity, 0)
  326. }
  327. pri := n.balanceToPriority(now, b, capacity)
  328. // Ensure that biased estimates are always lower than actual priorities, even if
  329. // the bias is very small.
  330. // This ensures that two nodes will not ping-pong update signals forever if both of
  331. // them have zero estimated priority drop in the projected future.
  332. current := n.balanceToPriority(now, n.balance, capacity)
  333. if pri >= current {
  334. pri = current - 1
  335. }
  336. if update {
  337. n.addCallback(balanceCallbackUpdate, pri, n.signalPriorityUpdate)
  338. }
  339. return pri
  340. }
  341. // SetPriceFactors sets the price factors. TimeFactor is the price of a nanosecond of
  342. // connection while RequestFactor is the price of a request cost unit.
  343. func (n *nodeBalance) SetPriceFactors(posFactor, negFactor PriceFactors) {
  344. n.lock.Lock()
  345. now := n.bt.clock.Now()
  346. n.updateBalance(now)
  347. n.posFactor, n.negFactor = posFactor, negFactor
  348. callbacks := n.checkCallbacks(now)
  349. n.lock.Unlock()
  350. if callbacks != nil {
  351. n.bt.ns.Operation(func() {
  352. for _, cb := range callbacks {
  353. cb()
  354. }
  355. })
  356. }
  357. }
  358. // GetPriceFactors returns the price factors
  359. func (n *nodeBalance) GetPriceFactors() (posFactor, negFactor PriceFactors) {
  360. n.lock.Lock()
  361. defer n.lock.Unlock()
  362. return n.posFactor, n.negFactor
  363. }
  364. // activate starts time/capacity cost deduction.
  365. func (n *nodeBalance) activate() {
  366. n.bt.updateTotalBalance(n, func() bool {
  367. if n.active {
  368. return false
  369. }
  370. n.active = true
  371. n.lastUpdate = n.bt.clock.Now()
  372. return true
  373. })
  374. }
  375. // deactivate stops time/capacity cost deduction and saves the balances in the database
  376. func (n *nodeBalance) deactivate() {
  377. n.bt.updateTotalBalance(n, func() bool {
  378. if !n.active {
  379. return false
  380. }
  381. n.updateBalance(n.bt.clock.Now())
  382. if n.updateEvent != nil {
  383. n.updateEvent.Stop()
  384. n.updateEvent = nil
  385. }
  386. n.storeBalance(true, true)
  387. n.active = false
  388. return true
  389. })
  390. }
  391. // updateBalance updates balance based on the time factor
  392. func (n *nodeBalance) updateBalance(now mclock.AbsTime) {
  393. if n.active && now > n.lastUpdate {
  394. n.balance = n.reducedBalance(n.balance, n.lastUpdate, time.Duration(now-n.lastUpdate), n.capacity, 0)
  395. n.lastUpdate = now
  396. }
  397. }
  398. // storeBalance stores the positive and/or negative balance of the node in the database
  399. func (n *nodeBalance) storeBalance(pos, neg bool) {
  400. if pos {
  401. n.bt.storeBalance(n.node.ID().Bytes(), false, n.balance.pos)
  402. }
  403. if neg {
  404. n.bt.storeBalance([]byte(n.connAddress), true, n.balance.neg)
  405. }
  406. }
  407. // addCallback sets up a one-time callback to be called when priority reaches
  408. // the threshold. If it has already reached the threshold the callback is called
  409. // immediately.
  410. // Note: should be called while n.lock is held
  411. // Note 2: the callback function runs inside a NodeStateMachine operation
  412. func (n *nodeBalance) addCallback(id int, threshold int64, callback func()) {
  413. n.removeCallback(id)
  414. idx := 0
  415. for idx < n.callbackCount && threshold > n.callbacks[idx].threshold {
  416. idx++
  417. }
  418. for i := n.callbackCount - 1; i >= idx; i-- {
  419. n.callbackIndex[n.callbacks[i].id]++
  420. n.callbacks[i+1] = n.callbacks[i]
  421. }
  422. n.callbackCount++
  423. n.callbackIndex[id] = idx
  424. n.callbacks[idx] = balanceCallback{id, threshold, callback}
  425. now := n.bt.clock.Now()
  426. n.updateBalance(now)
  427. n.scheduleCheck(now)
  428. }
  429. // removeCallback removes the given callback and returns true if it was active
  430. // Note: should be called while n.lock is held
  431. func (n *nodeBalance) removeCallback(id int) bool {
  432. idx := n.callbackIndex[id]
  433. if idx == -1 {
  434. return false
  435. }
  436. n.callbackIndex[id] = -1
  437. for i := idx; i < n.callbackCount-1; i++ {
  438. n.callbackIndex[n.callbacks[i+1].id]--
  439. n.callbacks[i] = n.callbacks[i+1]
  440. }
  441. n.callbackCount--
  442. return true
  443. }
  444. // checkCallbacks checks whether the threshold of any of the active callbacks
  445. // have been reached and returns triggered callbacks.
  446. // Note: checkCallbacks assumes that the balance has been recently updated.
  447. func (n *nodeBalance) checkCallbacks(now mclock.AbsTime) (callbacks []func()) {
  448. if n.callbackCount == 0 || n.capacity == 0 {
  449. return
  450. }
  451. pri := n.balanceToPriority(now, n.balance, n.capacity)
  452. for n.callbackCount != 0 && n.callbacks[n.callbackCount-1].threshold >= pri {
  453. n.callbackCount--
  454. n.callbackIndex[n.callbacks[n.callbackCount].id] = -1
  455. callbacks = append(callbacks, n.callbacks[n.callbackCount].callback)
  456. }
  457. n.scheduleCheck(now)
  458. return
  459. }
  460. // scheduleCheck sets up or updates a scheduled event to ensure that it will be called
  461. // again just after the next threshold has been reached.
  462. func (n *nodeBalance) scheduleCheck(now mclock.AbsTime) {
  463. if n.callbackCount != 0 {
  464. d, ok := n.timeUntil(n.callbacks[n.callbackCount-1].threshold)
  465. if !ok {
  466. n.nextUpdate = 0
  467. n.updateAfter(0)
  468. return
  469. }
  470. if n.nextUpdate == 0 || n.nextUpdate > now+mclock.AbsTime(d) {
  471. if d > time.Second {
  472. // Note: if the scheduled update is not in the very near future then we
  473. // schedule the update a bit earlier. This way we do need to update a few
  474. // extra times but don't need to reschedule every time a processed request
  475. // brings the expected firing time a little bit closer.
  476. d = ((d - time.Second) * 7 / 8) + time.Second
  477. }
  478. n.nextUpdate = now + mclock.AbsTime(d)
  479. n.updateAfter(d)
  480. }
  481. } else {
  482. n.nextUpdate = 0
  483. n.updateAfter(0)
  484. }
  485. }
  486. // updateAfter schedules a balance update and callback check in the future
  487. func (n *nodeBalance) updateAfter(dt time.Duration) {
  488. if n.updateEvent == nil || n.updateEvent.Stop() {
  489. if dt == 0 {
  490. n.updateEvent = nil
  491. } else {
  492. n.updateEvent = n.bt.clock.AfterFunc(dt, func() {
  493. var callbacks []func()
  494. n.lock.Lock()
  495. if n.callbackCount != 0 {
  496. now := n.bt.clock.Now()
  497. n.updateBalance(now)
  498. callbacks = n.checkCallbacks(now)
  499. }
  500. n.lock.Unlock()
  501. if callbacks != nil {
  502. n.bt.ns.Operation(func() {
  503. for _, cb := range callbacks {
  504. cb()
  505. }
  506. })
  507. }
  508. })
  509. }
  510. }
  511. }
  512. // balanceExhausted should be called when the positive balance is exhausted (priority goes to zero/negative)
  513. // Note: this function should run inside a NodeStateMachine operation
  514. func (n *nodeBalance) balanceExhausted() {
  515. n.lock.Lock()
  516. n.storeBalance(true, false)
  517. n.hasPriority = false
  518. n.lock.Unlock()
  519. if n.setFlags {
  520. n.bt.ns.SetStateSub(n.node, nodestate.Flags{}, n.bt.setup.priorityFlag, 0)
  521. }
  522. }
  523. // checkPriorityStatus checks whether the node has gained priority status and sets the priority
  524. // callback and flag if necessary. It assumes that the balance has been recently updated.
  525. // Note that the priority flag has to be set by the caller after the mutex has been released.
  526. func (n *nodeBalance) checkPriorityStatus() bool {
  527. if !n.hasPriority && !n.balance.pos.IsZero() {
  528. n.hasPriority = true
  529. n.addCallback(balanceCallbackZero, 0, func() { n.balanceExhausted() })
  530. return true
  531. }
  532. return false
  533. }
  534. // signalPriorityUpdate signals that the priority fell below the previous minimum estimate
  535. // Note: this function should run inside a NodeStateMachine operation
  536. func (n *nodeBalance) signalPriorityUpdate() {
  537. n.bt.ns.SetStateSub(n.node, n.bt.setup.updateFlag, nodestate.Flags{}, 0)
  538. n.bt.ns.SetStateSub(n.node, nodestate.Flags{}, n.bt.setup.updateFlag, 0)
  539. }
  540. // setCapacity updates the capacity value used for priority calculation
  541. // Note: capacity should never be zero
  542. // Note 2: this function should run inside a NodeStateMachine operation
  543. func (n *nodeBalance) setCapacity(capacity uint64) {
  544. n.lock.Lock()
  545. now := n.bt.clock.Now()
  546. n.updateBalance(now)
  547. n.capacity = capacity
  548. callbacks := n.checkCallbacks(now)
  549. n.lock.Unlock()
  550. for _, cb := range callbacks {
  551. cb()
  552. }
  553. }
  554. // balanceToPriority converts a balance to a priority value. Lower priority means
  555. // first to disconnect. Positive balance translates to positive priority. If positive
  556. // balance is zero then negative balance translates to a negative priority.
  557. func (n *nodeBalance) balanceToPriority(now mclock.AbsTime, b balance, capacity uint64) int64 {
  558. pos := b.posValue(now)
  559. if pos > 0 {
  560. return int64(pos / capacity)
  561. }
  562. return -int64(b.negValue(now))
  563. }
  564. // priorityToBalance converts a target priority to a requested balance value.
  565. // If the priority is negative, then minimal negative balance is returned;
  566. // otherwise the minimal positive balance is returned.
  567. func (n *nodeBalance) priorityToBalance(priority int64, capacity uint64) (uint64, uint64) {
  568. if priority > 0 {
  569. return uint64(priority) * n.capacity, 0
  570. }
  571. return 0, uint64(-priority)
  572. }
  573. // reducedBalance estimates the reduced balance at a given time in the fututre based
  574. // on the given balance, the time factor and an estimated average request cost per time ratio
  575. func (n *nodeBalance) reducedBalance(b balance, start mclock.AbsTime, dt time.Duration, capacity uint64, avgReqCost float64) balance {
  576. // since the costs are applied continuously during the dt time period we calculate
  577. // the expiration offset at the middle of the period
  578. var (
  579. at = start + mclock.AbsTime(dt/2)
  580. dtf = float64(dt)
  581. )
  582. if !b.pos.IsZero() {
  583. factor := n.posFactor.connectionPrice(capacity, avgReqCost)
  584. diff := -int64(dtf * factor)
  585. _, _, net, _ := b.addValue(at, diff, true, false)
  586. if net == diff {
  587. dtf = 0
  588. } else {
  589. dtf += float64(net) / factor
  590. }
  591. }
  592. if dtf > 0 {
  593. factor := n.negFactor.connectionPrice(capacity, avgReqCost)
  594. b.addValue(at, int64(dtf*factor), false, false)
  595. }
  596. return b
  597. }
  598. // timeUntil calculates the remaining time needed to reach a given priority level
  599. // assuming that no requests are processed until then. If the given level is never
  600. // reached then (0, false) is returned. If it has already been reached then (0, true)
  601. // is returned.
  602. // Note: the function assumes that the balance has been recently updated and
  603. // calculates the time starting from the last update.
  604. func (n *nodeBalance) timeUntil(priority int64) (time.Duration, bool) {
  605. var (
  606. now = n.bt.clock.Now()
  607. pos = n.balance.posValue(now)
  608. targetPos, targetNeg = n.priorityToBalance(priority, n.capacity)
  609. diffTime float64
  610. )
  611. if pos > 0 {
  612. timePrice := n.posFactor.connectionPrice(n.capacity, 0)
  613. if timePrice < 1e-100 {
  614. return 0, false
  615. }
  616. if targetPos > 0 {
  617. if targetPos > pos {
  618. return 0, true
  619. }
  620. diffTime = float64(pos-targetPos) / timePrice
  621. return time.Duration(diffTime), true
  622. } else {
  623. diffTime = float64(pos) / timePrice
  624. }
  625. } else {
  626. if targetPos > 0 {
  627. return 0, true
  628. }
  629. }
  630. neg := n.balance.negValue(now)
  631. if targetNeg > neg {
  632. timePrice := n.negFactor.connectionPrice(n.capacity, 0)
  633. if timePrice < 1e-100 {
  634. return 0, false
  635. }
  636. diffTime += float64(targetNeg-neg) / timePrice
  637. }
  638. return time.Duration(diffTime), true
  639. }