prioritypool.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "math"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/p2p/nodestate"
  26. )
  27. const (
  28. lazyQueueRefresh = time.Second * 10 // refresh period of the active queue
  29. )
  30. // priorityPool handles a set of nodes where each node has a capacity (a scalar value)
  31. // and a priority (which can change over time and can also depend on the capacity).
  32. // A node is active if it has at least the necessary minimal amount of capacity while
  33. // inactive nodes have 0 capacity (values between 0 and the minimum are not allowed).
  34. // The pool ensures that the number and total capacity of all active nodes are limited
  35. // and the highest priority nodes are active at all times (limits can be changed
  36. // during operation with immediate effect).
  37. //
  38. // When activating clients a priority bias is applied in favor of the already active
  39. // nodes in order to avoid nodes quickly alternating between active and inactive states
  40. // when their priorities are close to each other. The bias is specified in terms of
  41. // duration (time) because priorities are expected to usually get lower over time and
  42. // therefore a future minimum prediction (see EstMinPriority) should monotonously
  43. // decrease with the specified time parameter.
  44. // This time bias can be interpreted as minimum expected active time at the given
  45. // capacity (if the threshold priority stays the same).
  46. //
  47. // Nodes in the pool always have either inactiveFlag or activeFlag set. A new node is
  48. // added to the pool by externally setting inactiveFlag. priorityPool can switch a node
  49. // between inactiveFlag and activeFlag at any time. Nodes can be removed from the pool
  50. // by externally resetting both flags. activeFlag should not be set externally.
  51. //
  52. // The highest priority nodes in "inactive" state are moved to "active" state as soon as
  53. // the minimum capacity can be granted for them. The capacity of lower priority active
  54. // nodes is reduced or they are demoted to "inactive" state if their priority is
  55. // insufficient even at minimal capacity.
  56. type priorityPool struct {
  57. setup *serverSetup
  58. ns *nodestate.NodeStateMachine
  59. clock mclock.Clock
  60. lock sync.Mutex
  61. maxCount, maxCap uint64
  62. minCap uint64
  63. activeBias time.Duration
  64. capacityStepDiv, fineStepDiv uint64
  65. // The snapshot of priority pool for query.
  66. cachedCurve *capacityCurve
  67. ccUpdatedAt mclock.AbsTime
  68. ccUpdateForced bool
  69. // Runtime status of prioritypool, represents the
  70. // temporary state if tempState is not empty
  71. tempState []*ppNodeInfo
  72. activeCount, activeCap uint64
  73. activeQueue *prque.LazyQueue
  74. inactiveQueue *prque.Prque
  75. }
  76. // ppNodeInfo is the internal node descriptor of priorityPool
  77. type ppNodeInfo struct {
  78. nodePriority nodePriority
  79. node *enode.Node
  80. connected bool
  81. capacity uint64 // only changed when temporary state is committed
  82. activeIndex, inactiveIndex int
  83. tempState bool // should only be true while the priorityPool lock is held
  84. tempCapacity uint64 // equals capacity when tempState is false
  85. // the following fields only affect the temporary state and they are set to their
  86. // default value when leaving the temp state
  87. minTarget, stepDiv uint64
  88. bias time.Duration
  89. }
  90. // newPriorityPool creates a new priorityPool
  91. func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock mclock.Clock, minCap uint64, activeBias time.Duration, capacityStepDiv, fineStepDiv uint64) *priorityPool {
  92. pp := &priorityPool{
  93. setup: setup,
  94. ns: ns,
  95. clock: clock,
  96. inactiveQueue: prque.New(inactiveSetIndex),
  97. minCap: minCap,
  98. activeBias: activeBias,
  99. capacityStepDiv: capacityStepDiv,
  100. fineStepDiv: fineStepDiv,
  101. }
  102. if pp.activeBias < time.Duration(1) {
  103. pp.activeBias = time.Duration(1)
  104. }
  105. pp.activeQueue = prque.NewLazyQueue(activeSetIndex, activePriority, pp.activeMaxPriority, clock, lazyQueueRefresh)
  106. ns.SubscribeField(pp.setup.balanceField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  107. if newValue != nil {
  108. c := &ppNodeInfo{
  109. node: node,
  110. nodePriority: newValue.(nodePriority),
  111. activeIndex: -1,
  112. inactiveIndex: -1,
  113. }
  114. ns.SetFieldSub(node, pp.setup.queueField, c)
  115. ns.SetStateSub(node, setup.inactiveFlag, nodestate.Flags{}, 0)
  116. } else {
  117. ns.SetStateSub(node, nodestate.Flags{}, pp.setup.activeFlag.Or(pp.setup.inactiveFlag), 0)
  118. if n, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo); n != nil {
  119. pp.disconnectedNode(n)
  120. }
  121. ns.SetFieldSub(node, pp.setup.capacityField, nil)
  122. ns.SetFieldSub(node, pp.setup.queueField, nil)
  123. }
  124. })
  125. ns.SubscribeState(pp.setup.activeFlag.Or(pp.setup.inactiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
  126. if c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo); c != nil {
  127. if oldState.IsEmpty() {
  128. pp.connectedNode(c)
  129. }
  130. if newState.IsEmpty() {
  131. pp.disconnectedNode(c)
  132. }
  133. }
  134. })
  135. ns.SubscribeState(pp.setup.updateFlag, func(node *enode.Node, oldState, newState nodestate.Flags) {
  136. if !newState.IsEmpty() {
  137. pp.updatePriority(node)
  138. }
  139. })
  140. return pp
  141. }
  142. // requestCapacity tries to set the capacity of a connected node to the highest possible
  143. // value inside the given target range. If maxTarget is not reachable then the capacity is
  144. // iteratively reduced in fine steps based on the fineStepDiv parameter until minTarget is reached.
  145. // The function returns the new capacity if successful and the original capacity otherwise.
  146. // Note: this function should run inside a NodeStateMachine operation
  147. func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 {
  148. pp.lock.Lock()
  149. pp.activeQueue.Refresh()
  150. if minTarget < pp.minCap {
  151. minTarget = pp.minCap
  152. }
  153. if maxTarget < minTarget {
  154. maxTarget = minTarget
  155. }
  156. if bias < pp.activeBias {
  157. bias = pp.activeBias
  158. }
  159. c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
  160. if c == nil {
  161. log.Error("requestCapacity called for unknown node", "id", node.ID())
  162. pp.lock.Unlock()
  163. return 0
  164. }
  165. pp.setTempState(c)
  166. if maxTarget > c.capacity {
  167. pp.setTempStepDiv(c, pp.fineStepDiv)
  168. pp.setTempBias(c, bias)
  169. }
  170. pp.setTempCapacity(c, maxTarget)
  171. c.minTarget = minTarget
  172. pp.activeQueue.Remove(c.activeIndex)
  173. pp.inactiveQueue.Remove(c.inactiveIndex)
  174. pp.activeQueue.Push(c)
  175. pp.enforceLimits()
  176. updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
  177. pp.lock.Unlock()
  178. pp.updateFlags(updates)
  179. return c.capacity
  180. }
  181. // SetLimits sets the maximum number and total capacity of simultaneously active nodes
  182. func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
  183. pp.lock.Lock()
  184. pp.activeQueue.Refresh()
  185. inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap)
  186. dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap)
  187. pp.maxCount, pp.maxCap = maxCount, maxCap
  188. var updates []capUpdate
  189. if dec {
  190. pp.enforceLimits()
  191. updates = pp.finalizeChanges(true)
  192. }
  193. if inc {
  194. updates = append(updates, pp.tryActivate(false)...)
  195. }
  196. pp.lock.Unlock()
  197. pp.ns.Operation(func() { pp.updateFlags(updates) })
  198. }
  199. // setActiveBias sets the bias applied when trying to activate inactive nodes
  200. func (pp *priorityPool) setActiveBias(bias time.Duration) {
  201. pp.lock.Lock()
  202. pp.activeBias = bias
  203. if pp.activeBias < time.Duration(1) {
  204. pp.activeBias = time.Duration(1)
  205. }
  206. updates := pp.tryActivate(false)
  207. pp.lock.Unlock()
  208. pp.ns.Operation(func() { pp.updateFlags(updates) })
  209. }
  210. // Active returns the number and total capacity of currently active nodes
  211. func (pp *priorityPool) Active() (uint64, uint64) {
  212. pp.lock.Lock()
  213. defer pp.lock.Unlock()
  214. return pp.activeCount, pp.activeCap
  215. }
  216. // Limits returns the maximum allowed number and total capacity of active nodes
  217. func (pp *priorityPool) Limits() (uint64, uint64) {
  218. pp.lock.Lock()
  219. defer pp.lock.Unlock()
  220. return pp.maxCount, pp.maxCap
  221. }
  222. // inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue
  223. func inactiveSetIndex(a interface{}, index int) {
  224. a.(*ppNodeInfo).inactiveIndex = index
  225. }
  226. // activeSetIndex callback updates ppNodeInfo item index in activeQueue
  227. func activeSetIndex(a interface{}, index int) {
  228. a.(*ppNodeInfo).activeIndex = index
  229. }
  230. // invertPriority inverts a priority value. The active queue uses inverted priorities
  231. // because the node on the top is the first to be deactivated.
  232. func invertPriority(p int64) int64 {
  233. if p == math.MinInt64 {
  234. return math.MaxInt64
  235. }
  236. return -p
  237. }
  238. // activePriority callback returns actual priority of ppNodeInfo item in activeQueue
  239. func activePriority(a interface{}) int64 {
  240. c := a.(*ppNodeInfo)
  241. if c.bias == 0 {
  242. return invertPriority(c.nodePriority.priority(c.tempCapacity))
  243. } else {
  244. return invertPriority(c.nodePriority.estimatePriority(c.tempCapacity, 0, 0, c.bias, true))
  245. }
  246. }
  247. // activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
  248. func (pp *priorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
  249. c := a.(*ppNodeInfo)
  250. future := time.Duration(until - pp.clock.Now())
  251. if future < 0 {
  252. future = 0
  253. }
  254. return invertPriority(c.nodePriority.estimatePriority(c.tempCapacity, 0, future, c.bias, false))
  255. }
  256. // inactivePriority callback returns actual priority of ppNodeInfo item in inactiveQueue
  257. func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
  258. return p.nodePriority.priority(pp.minCap)
  259. }
  260. // connectedNode is called when a new node has been added to the pool (inactiveFlag set)
  261. // Note: this function should run inside a NodeStateMachine operation
  262. func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
  263. pp.lock.Lock()
  264. pp.activeQueue.Refresh()
  265. if c.connected {
  266. pp.lock.Unlock()
  267. return
  268. }
  269. c.connected = true
  270. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  271. updates := pp.tryActivate(false)
  272. pp.lock.Unlock()
  273. pp.updateFlags(updates)
  274. }
  275. // disconnectedNode is called when a node has been removed from the pool (both inactiveFlag
  276. // and activeFlag reset)
  277. // Note: this function should run inside a NodeStateMachine operation
  278. func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) {
  279. pp.lock.Lock()
  280. pp.activeQueue.Refresh()
  281. if !c.connected {
  282. pp.lock.Unlock()
  283. return
  284. }
  285. c.connected = false
  286. pp.activeQueue.Remove(c.activeIndex)
  287. pp.inactiveQueue.Remove(c.inactiveIndex)
  288. var updates []capUpdate
  289. if c.capacity != 0 {
  290. pp.setTempState(c)
  291. pp.setTempCapacity(c, 0)
  292. updates = pp.tryActivate(true)
  293. }
  294. pp.lock.Unlock()
  295. pp.updateFlags(updates)
  296. }
  297. // setTempState internally puts a node in a temporary state that can either be reverted
  298. // or confirmed later. This temporary state allows changing the capacity of a node and
  299. // moving it between the active and inactive queue. activeFlag/inactiveFlag and
  300. // capacityField are not changed while the changes are still temporary.
  301. func (pp *priorityPool) setTempState(c *ppNodeInfo) {
  302. if c.tempState {
  303. return
  304. }
  305. c.tempState = true
  306. if c.tempCapacity != c.capacity { // should never happen
  307. log.Error("tempCapacity != capacity when entering tempState")
  308. }
  309. // Assign all the defaults to the temp state.
  310. c.minTarget = pp.minCap
  311. c.stepDiv = pp.capacityStepDiv
  312. c.bias = 0
  313. pp.tempState = append(pp.tempState, c)
  314. }
  315. // unsetTempState revokes the temp status of the node and reset all internal
  316. // fields to the default value.
  317. func (pp *priorityPool) unsetTempState(c *ppNodeInfo) {
  318. if !c.tempState {
  319. return
  320. }
  321. c.tempState = false
  322. if c.tempCapacity != c.capacity { // should never happen
  323. log.Error("tempCapacity != capacity when leaving tempState")
  324. }
  325. c.minTarget = pp.minCap
  326. c.stepDiv = pp.capacityStepDiv
  327. c.bias = 0
  328. }
  329. // setTempCapacity changes the capacity of a node in the temporary state and adjusts
  330. // activeCap and activeCount accordingly. Since this change is performed in the temporary
  331. // state it should be called after setTempState and before finalizeChanges.
  332. func (pp *priorityPool) setTempCapacity(c *ppNodeInfo, cap uint64) {
  333. if !c.tempState { // should never happen
  334. log.Error("Node is not in temporary state")
  335. return
  336. }
  337. pp.activeCap += cap - c.tempCapacity
  338. if c.tempCapacity == 0 {
  339. pp.activeCount++
  340. }
  341. if cap == 0 {
  342. pp.activeCount--
  343. }
  344. c.tempCapacity = cap
  345. }
  346. // setTempBias changes the connection bias of a node in the temporary state.
  347. func (pp *priorityPool) setTempBias(c *ppNodeInfo, bias time.Duration) {
  348. if !c.tempState { // should never happen
  349. log.Error("Node is not in temporary state")
  350. return
  351. }
  352. c.bias = bias
  353. }
  354. // setTempStepDiv changes the capacity divisor of a node in the temporary state.
  355. func (pp *priorityPool) setTempStepDiv(c *ppNodeInfo, stepDiv uint64) {
  356. if !c.tempState { // should never happen
  357. log.Error("Node is not in temporary state")
  358. return
  359. }
  360. c.stepDiv = stepDiv
  361. }
  362. // enforceLimits enforces active node count and total capacity limits. It returns the
  363. // lowest active node priority. Note that this function is performed on the temporary
  364. // internal state.
  365. func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
  366. if pp.activeCap <= pp.maxCap && pp.activeCount <= pp.maxCount {
  367. return nil, math.MinInt64
  368. }
  369. var (
  370. c *ppNodeInfo
  371. maxActivePriority int64
  372. )
  373. pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
  374. c = data.(*ppNodeInfo)
  375. pp.setTempState(c)
  376. maxActivePriority = priority
  377. if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount {
  378. pp.setTempCapacity(c, 0)
  379. } else {
  380. sub := c.tempCapacity / c.stepDiv
  381. if sub == 0 {
  382. sub = 1
  383. }
  384. if c.tempCapacity-sub < c.minTarget {
  385. sub = c.tempCapacity - c.minTarget
  386. }
  387. pp.setTempCapacity(c, c.tempCapacity-sub)
  388. pp.activeQueue.Push(c)
  389. }
  390. return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
  391. })
  392. return c, invertPriority(maxActivePriority)
  393. }
  394. // finalizeChanges either commits or reverts temporary changes. The necessary capacity
  395. // field and according flag updates are not performed here but returned in a list because
  396. // they should be performed while the mutex is not held.
  397. func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
  398. for _, c := range pp.tempState {
  399. // always remove and push back in order to update biased priority
  400. pp.activeQueue.Remove(c.activeIndex)
  401. pp.inactiveQueue.Remove(c.inactiveIndex)
  402. oldCapacity := c.capacity
  403. if commit {
  404. c.capacity = c.tempCapacity
  405. } else {
  406. pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap
  407. }
  408. pp.unsetTempState(c)
  409. if c.connected {
  410. if c.capacity != 0 {
  411. pp.activeQueue.Push(c)
  412. } else {
  413. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  414. }
  415. if c.capacity != oldCapacity {
  416. updates = append(updates, capUpdate{c.node, oldCapacity, c.capacity})
  417. }
  418. }
  419. }
  420. pp.tempState = nil
  421. if commit {
  422. pp.ccUpdateForced = true
  423. }
  424. return
  425. }
  426. // capUpdate describes a capacityField and activeFlag/inactiveFlag update
  427. type capUpdate struct {
  428. node *enode.Node
  429. oldCap, newCap uint64
  430. }
  431. // updateFlags performs capacityField and activeFlag/inactiveFlag updates while the
  432. // pool mutex is not held
  433. // Note: this function should run inside a NodeStateMachine operation
  434. func (pp *priorityPool) updateFlags(updates []capUpdate) {
  435. for _, f := range updates {
  436. if f.oldCap == 0 {
  437. pp.ns.SetStateSub(f.node, pp.setup.activeFlag, pp.setup.inactiveFlag, 0)
  438. }
  439. if f.newCap == 0 {
  440. pp.ns.SetStateSub(f.node, pp.setup.inactiveFlag, pp.setup.activeFlag, 0)
  441. pp.ns.SetFieldSub(f.node, pp.setup.capacityField, nil)
  442. } else {
  443. pp.ns.SetFieldSub(f.node, pp.setup.capacityField, f.newCap)
  444. }
  445. }
  446. }
  447. // tryActivate tries to activate inactive nodes if possible
  448. func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
  449. for pp.inactiveQueue.Size() > 0 {
  450. c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
  451. pp.setTempState(c)
  452. pp.setTempBias(c, pp.activeBias)
  453. pp.setTempCapacity(c, pp.minCap)
  454. pp.activeQueue.Push(c)
  455. pp.enforceLimits()
  456. if c.tempCapacity > 0 {
  457. commit = true
  458. pp.setTempBias(c, 0)
  459. } else {
  460. break
  461. }
  462. }
  463. pp.ccUpdateForced = true
  464. return pp.finalizeChanges(commit)
  465. }
  466. // updatePriority gets the current priority value of the given node from the nodePriority
  467. // interface and performs the necessary changes. It is triggered by updateFlag.
  468. // Note: this function should run inside a NodeStateMachine operation
  469. func (pp *priorityPool) updatePriority(node *enode.Node) {
  470. pp.lock.Lock()
  471. pp.activeQueue.Refresh()
  472. c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
  473. if c == nil || !c.connected {
  474. pp.lock.Unlock()
  475. return
  476. }
  477. pp.activeQueue.Remove(c.activeIndex)
  478. pp.inactiveQueue.Remove(c.inactiveIndex)
  479. if c.capacity != 0 {
  480. pp.activeQueue.Push(c)
  481. } else {
  482. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  483. }
  484. updates := pp.tryActivate(false)
  485. pp.lock.Unlock()
  486. pp.updateFlags(updates)
  487. }
  488. // capacityCurve is a snapshot of the priority pool contents in a format that can efficiently
  489. // estimate how much capacity could be granted to a given node at a given priority level.
  490. type capacityCurve struct {
  491. points []curvePoint // curve points sorted in descending order of priority
  492. index map[enode.ID][]int // curve point indexes belonging to each node
  493. excludeList []int // curve point indexes of excluded node
  494. excludeFirst bool // true if activeCount == maxCount
  495. }
  496. type curvePoint struct {
  497. freeCap uint64 // available capacity and node count at the current priority level
  498. nextPri int64 // next priority level where more capacity will be available
  499. }
  500. // getCapacityCurve returns a new or recently cached capacityCurve based on the contents of the pool
  501. func (pp *priorityPool) getCapacityCurve() *capacityCurve {
  502. pp.lock.Lock()
  503. defer pp.lock.Unlock()
  504. now := pp.clock.Now()
  505. dt := time.Duration(now - pp.ccUpdatedAt)
  506. if !pp.ccUpdateForced && pp.cachedCurve != nil && dt < time.Second*10 {
  507. return pp.cachedCurve
  508. }
  509. pp.ccUpdateForced = false
  510. pp.ccUpdatedAt = now
  511. curve := &capacityCurve{
  512. index: make(map[enode.ID][]int),
  513. }
  514. pp.cachedCurve = curve
  515. var excludeID enode.ID
  516. excludeFirst := pp.maxCount == pp.activeCount
  517. // reduce node capacities or remove nodes until nothing is left in the queue;
  518. // record the available capacity and the necessary priority after each step
  519. lastPri := int64(math.MinInt64)
  520. for pp.activeCap > 0 {
  521. cp := curvePoint{}
  522. if pp.activeCap > pp.maxCap {
  523. log.Error("Active capacity is greater than allowed maximum", "active", pp.activeCap, "maximum", pp.maxCap)
  524. } else {
  525. cp.freeCap = pp.maxCap - pp.activeCap
  526. }
  527. // temporarily increase activeCap to enforce reducing or removing a node capacity
  528. tempCap := cp.freeCap + 1
  529. pp.activeCap += tempCap
  530. var next *ppNodeInfo
  531. // enforceLimits removes the lowest priority node if it has minimal capacity,
  532. // otherwise reduces its capacity
  533. next, cp.nextPri = pp.enforceLimits()
  534. if cp.nextPri < lastPri {
  535. // enforce monotonicity which may be broken by continuously changing priorities
  536. cp.nextPri = lastPri
  537. } else {
  538. lastPri = cp.nextPri
  539. }
  540. pp.activeCap -= tempCap
  541. if next == nil {
  542. log.Error("getCapacityCurve: cannot remove next element from the priority queue")
  543. break
  544. }
  545. id := next.node.ID()
  546. if excludeFirst {
  547. // if the node count limit is already reached then mark the node with the
  548. // lowest priority for exclusion
  549. curve.excludeFirst = true
  550. excludeID = id
  551. excludeFirst = false
  552. }
  553. // multiple curve points and therefore multiple indexes may belong to a node
  554. // if it was removed in multiple steps (if its capacity was more than the minimum)
  555. curve.index[id] = append(curve.index[id], len(curve.points))
  556. curve.points = append(curve.points, cp)
  557. }
  558. // restore original state of the queue
  559. pp.finalizeChanges(false)
  560. curve.points = append(curve.points, curvePoint{
  561. freeCap: pp.maxCap,
  562. nextPri: math.MaxInt64,
  563. })
  564. if curve.excludeFirst {
  565. curve.excludeList = curve.index[excludeID]
  566. }
  567. return curve
  568. }
  569. // exclude returns a capacityCurve with the given node excluded from the original curve
  570. func (cc *capacityCurve) exclude(id enode.ID) *capacityCurve {
  571. if excludeList, ok := cc.index[id]; ok {
  572. // return a new version of the curve (only one excluded node can be selected)
  573. // Note: if the first node was excluded by default (excludeFirst == true) then
  574. // we can forget about that and exclude the node with the given id instead.
  575. return &capacityCurve{
  576. points: cc.points,
  577. index: cc.index,
  578. excludeList: excludeList,
  579. }
  580. }
  581. return cc
  582. }
  583. func (cc *capacityCurve) getPoint(i int) curvePoint {
  584. cp := cc.points[i]
  585. if i == 0 && cc.excludeFirst {
  586. cp.freeCap = 0
  587. return cp
  588. }
  589. for ii := len(cc.excludeList) - 1; ii >= 0; ii-- {
  590. ei := cc.excludeList[ii]
  591. if ei < i {
  592. break
  593. }
  594. e1, e2 := cc.points[ei], cc.points[ei+1]
  595. cp.freeCap += e2.freeCap - e1.freeCap
  596. }
  597. return cp
  598. }
  599. // maxCapacity calculates the maximum capacity available for a node with a given
  600. // (monotonically decreasing) priority vs. capacity function. Note that if the requesting
  601. // node is already in the pool then it should be excluded from the curve in order to get
  602. // the correct result.
  603. func (cc *capacityCurve) maxCapacity(priority func(cap uint64) int64) uint64 {
  604. min, max := 0, len(cc.points)-1 // the curve always has at least one point
  605. for min < max {
  606. mid := (min + max) / 2
  607. cp := cc.getPoint(mid)
  608. if cp.freeCap == 0 || priority(cp.freeCap) > cp.nextPri {
  609. min = mid + 1
  610. } else {
  611. max = mid
  612. }
  613. }
  614. cp2 := cc.getPoint(min)
  615. if cp2.freeCap == 0 || min == 0 {
  616. return cp2.freeCap
  617. }
  618. cp1 := cc.getPoint(min - 1)
  619. if priority(cp2.freeCap) > cp1.nextPri {
  620. return cp2.freeCap
  621. }
  622. minc, maxc := cp1.freeCap, cp2.freeCap-1
  623. for minc < maxc {
  624. midc := (minc + maxc + 1) / 2
  625. if midc == 0 || priority(midc) > cp1.nextPri {
  626. minc = midc
  627. } else {
  628. maxc = midc - 1
  629. }
  630. }
  631. return maxc
  632. }