manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package flowcontrol
  17. import (
  18. "fmt"
  19. "math"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. )
  25. // cmNodeFields are ClientNode fields used by the client manager
  26. // Note: these fields are locked by the client manager's mutex
  27. type cmNodeFields struct {
  28. corrBufValue int64 // buffer value adjusted with the extra recharge amount
  29. rcLastIntValue int64 // past recharge integrator value when corrBufValue was last updated
  30. rcFullIntValue int64 // future recharge integrator value when corrBufValue will reach maximum
  31. queueIndex int // position in the recharge queue (-1 if not queued)
  32. }
  33. // FixedPointMultiplier is applied to the recharge integrator and the recharge curve.
  34. //
  35. // Note: fixed point arithmetic is required for the integrator because it is a
  36. // constantly increasing value that can wrap around int64 limits (which behavior is
  37. // also supported by the priority queue). A floating point value would gradually lose
  38. // precision in this application.
  39. // The recharge curve and all recharge values are encoded as fixed point because
  40. // sumRecharge is frequently updated by adding or subtracting individual recharge
  41. // values and perfect precision is required.
  42. const FixedPointMultiplier = 1000000
  43. var (
  44. capacityDropFactor = 0.1
  45. capacityRaiseTC = 1 / (3 * float64(time.Hour)) // time constant for raising the capacity factor
  46. capacityRaiseThresholdRatio = 1.125 // total/connected capacity ratio threshold for raising the capacity factor
  47. )
  48. // ClientManager controls the capacity assigned to the clients of a server.
  49. // Since ServerParams guarantee a safe lower estimate for processable requests
  50. // even in case of all clients being active, ClientManager calculates a
  51. // corrigated buffer value and usually allows a higher remaining buffer value
  52. // to be returned with each reply.
  53. type ClientManager struct {
  54. clock mclock.Clock
  55. lock sync.Mutex
  56. enabledCh chan struct{}
  57. stop chan chan struct{}
  58. curve PieceWiseLinear
  59. sumRecharge, totalRecharge, totalConnected uint64
  60. logTotalCap, totalCapacity float64
  61. logTotalCapRaiseLimit float64
  62. minLogTotalCap, maxLogTotalCap float64
  63. capacityRaiseThreshold uint64
  64. capLastUpdate mclock.AbsTime
  65. totalCapacityCh chan uint64
  66. // recharge integrator is increasing in each moment with a rate of
  67. // (totalRecharge / sumRecharge)*FixedPointMultiplier or 0 if sumRecharge==0
  68. rcLastUpdate mclock.AbsTime // last time the recharge integrator was updated
  69. rcLastIntValue int64 // last updated value of the recharge integrator
  70. // recharge queue is a priority queue with currently recharging client nodes
  71. // as elements. The priority value is rcFullIntValue which allows to quickly
  72. // determine which client will first finish recharge.
  73. rcQueue *prque.Prque
  74. }
  75. // NewClientManager returns a new client manager.
  76. // Client manager enhances flow control performance by allowing client buffers
  77. // to recharge quicker than the minimum guaranteed recharge rate if possible.
  78. // The sum of all minimum recharge rates (sumRecharge) is updated each time
  79. // a clients starts or finishes buffer recharging. Then an adjusted total
  80. // recharge rate is calculated using a piecewise linear recharge curve:
  81. //
  82. // totalRecharge = curve(sumRecharge)
  83. // (totalRecharge >= sumRecharge is enforced)
  84. //
  85. // Then the "bonus" buffer recharge is distributed between currently recharging
  86. // clients proportionally to their minimum recharge rates.
  87. //
  88. // Note: total recharge is proportional to the average number of parallel running
  89. // serving threads. A recharge value of 1000000 corresponds to one thread in average.
  90. // The maximum number of allowed serving threads should always be considerably
  91. // higher than the targeted average number.
  92. //
  93. // Note 2: although it is possible to specify a curve allowing the total target
  94. // recharge starting from zero sumRecharge, it makes sense to add a linear ramp
  95. // starting from zero in order to not let a single low-priority client use up
  96. // the entire server capacity and thus ensure quick availability for others at
  97. // any moment.
  98. func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager {
  99. cm := &ClientManager{
  100. clock: clock,
  101. rcQueue: prque.NewWrapAround(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }),
  102. capLastUpdate: clock.Now(),
  103. stop: make(chan chan struct{}),
  104. }
  105. if curve != nil {
  106. cm.SetRechargeCurve(curve)
  107. }
  108. go func() {
  109. // regularly recalculate and update total capacity
  110. for {
  111. select {
  112. case <-time.After(time.Minute):
  113. cm.lock.Lock()
  114. cm.updateTotalCapacity(cm.clock.Now(), true)
  115. cm.lock.Unlock()
  116. case stop := <-cm.stop:
  117. close(stop)
  118. return
  119. }
  120. }
  121. }()
  122. return cm
  123. }
  124. // Stop stops the client manager
  125. func (cm *ClientManager) Stop() {
  126. stop := make(chan struct{})
  127. cm.stop <- stop
  128. <-stop
  129. }
  130. // SetRechargeCurve updates the recharge curve
  131. func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear) {
  132. cm.lock.Lock()
  133. defer cm.lock.Unlock()
  134. now := cm.clock.Now()
  135. cm.updateRecharge(now)
  136. cm.curve = curve
  137. if len(curve) > 0 {
  138. cm.totalRecharge = curve[len(curve)-1].Y
  139. } else {
  140. cm.totalRecharge = 0
  141. }
  142. }
  143. // SetCapacityRaiseThreshold sets a threshold value used for raising capFactor.
  144. // Either if the difference between total allowed and connected capacity is less
  145. // than this threshold or if their ratio is less than capacityRaiseThresholdRatio
  146. // then capFactor is allowed to slowly raise.
  147. func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64) {
  148. if min < 1 {
  149. min = 1
  150. }
  151. cm.minLogTotalCap = math.Log(float64(min))
  152. if max < 1 {
  153. max = 1
  154. }
  155. cm.maxLogTotalCap = math.Log(float64(max))
  156. cm.logTotalCap = cm.maxLogTotalCap
  157. cm.capacityRaiseThreshold = raiseThreshold
  158. cm.refreshCapacity()
  159. }
  160. // connect should be called when a client is connected, before passing it to any
  161. // other ClientManager function
  162. func (cm *ClientManager) connect(node *ClientNode) {
  163. cm.lock.Lock()
  164. defer cm.lock.Unlock()
  165. now := cm.clock.Now()
  166. cm.updateRecharge(now)
  167. node.corrBufValue = int64(node.params.BufLimit)
  168. node.rcLastIntValue = cm.rcLastIntValue
  169. node.queueIndex = -1
  170. cm.updateTotalCapacity(now, true)
  171. cm.totalConnected += node.params.MinRecharge
  172. cm.updateRaiseLimit()
  173. }
  174. // disconnect should be called when a client is disconnected
  175. func (cm *ClientManager) disconnect(node *ClientNode) {
  176. cm.lock.Lock()
  177. defer cm.lock.Unlock()
  178. now := cm.clock.Now()
  179. cm.updateRecharge(cm.clock.Now())
  180. cm.updateTotalCapacity(now, true)
  181. cm.totalConnected -= node.params.MinRecharge
  182. cm.updateRaiseLimit()
  183. }
  184. // accepted is called when a request with given maximum cost is accepted.
  185. // It returns a priority indicator for the request which is used to determine placement
  186. // in the serving queue. Older requests have higher priority by default. If the client
  187. // is almost out of buffer, request priority is reduced.
  188. func (cm *ClientManager) accepted(node *ClientNode, maxCost uint64, now mclock.AbsTime) (priority int64) {
  189. cm.lock.Lock()
  190. defer cm.lock.Unlock()
  191. cm.updateNodeRc(node, -int64(maxCost), &node.params, now)
  192. rcTime := (node.params.BufLimit - uint64(node.corrBufValue)) * FixedPointMultiplier / node.params.MinRecharge
  193. return -int64(now) - int64(rcTime)
  194. }
  195. // processed updates the client buffer according to actual request cost after
  196. // serving has been finished.
  197. //
  198. // Note: processed should always be called for all accepted requests
  199. func (cm *ClientManager) processed(node *ClientNode, maxCost, realCost uint64, now mclock.AbsTime) {
  200. if realCost > maxCost {
  201. realCost = maxCost
  202. }
  203. cm.updateBuffer(node, int64(maxCost-realCost), now)
  204. }
  205. // updateBuffer recalulates the corrected buffer value, adds the given value to it
  206. // and updates the node's actual buffer value if possible
  207. func (cm *ClientManager) updateBuffer(node *ClientNode, add int64, now mclock.AbsTime) {
  208. cm.lock.Lock()
  209. defer cm.lock.Unlock()
  210. cm.updateNodeRc(node, add, &node.params, now)
  211. if node.corrBufValue > node.bufValue {
  212. if node.log != nil {
  213. node.log.add(now, fmt.Sprintf("corrected bv=%d oldBv=%d", node.corrBufValue, node.bufValue))
  214. }
  215. node.bufValue = node.corrBufValue
  216. }
  217. }
  218. // updateParams updates the flow control parameters of a client node
  219. func (cm *ClientManager) updateParams(node *ClientNode, params ServerParams, now mclock.AbsTime) {
  220. cm.lock.Lock()
  221. defer cm.lock.Unlock()
  222. cm.updateRecharge(now)
  223. cm.updateTotalCapacity(now, true)
  224. cm.totalConnected += params.MinRecharge - node.params.MinRecharge
  225. cm.updateRaiseLimit()
  226. cm.updateNodeRc(node, 0, &params, now)
  227. }
  228. // updateRaiseLimit recalculates the limiting value until which logTotalCap
  229. // can be raised when no client freeze events occur
  230. func (cm *ClientManager) updateRaiseLimit() {
  231. if cm.capacityRaiseThreshold == 0 {
  232. cm.logTotalCapRaiseLimit = 0
  233. return
  234. }
  235. limit := float64(cm.totalConnected + cm.capacityRaiseThreshold)
  236. limit2 := float64(cm.totalConnected) * capacityRaiseThresholdRatio
  237. if limit2 > limit {
  238. limit = limit2
  239. }
  240. if limit < 1 {
  241. limit = 1
  242. }
  243. cm.logTotalCapRaiseLimit = math.Log(limit)
  244. }
  245. // updateRecharge updates the recharge integrator and checks the recharge queue
  246. // for nodes with recently filled buffers
  247. func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
  248. lastUpdate := cm.rcLastUpdate
  249. cm.rcLastUpdate = now
  250. // updating is done in multiple steps if node buffers are filled and sumRecharge
  251. // is decreased before the given target time
  252. for cm.sumRecharge > 0 {
  253. sumRecharge := cm.sumRecharge
  254. if sumRecharge > cm.totalRecharge {
  255. sumRecharge = cm.totalRecharge
  256. }
  257. bonusRatio := float64(1)
  258. v := cm.curve.ValueAt(sumRecharge)
  259. s := float64(sumRecharge)
  260. if v > s && s > 0 {
  261. bonusRatio = v / s
  262. }
  263. dt := now - lastUpdate
  264. // fetch the client that finishes first
  265. rcqNode := cm.rcQueue.PopItem().(*ClientNode) // if sumRecharge > 0 then the queue cannot be empty
  266. // check whether it has already finished
  267. dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio)
  268. if dt < dtNext {
  269. // not finished yet, put it back, update integrator according
  270. // to current bonusRatio and return
  271. cm.rcQueue.Push(rcqNode, -rcqNode.rcFullIntValue)
  272. cm.rcLastIntValue += int64(bonusRatio * float64(dt))
  273. return
  274. }
  275. lastUpdate += dtNext
  276. // finished recharging, update corrBufValue and sumRecharge if necessary and do next step
  277. if rcqNode.corrBufValue < int64(rcqNode.params.BufLimit) {
  278. rcqNode.corrBufValue = int64(rcqNode.params.BufLimit)
  279. cm.sumRecharge -= rcqNode.params.MinRecharge
  280. }
  281. cm.rcLastIntValue = rcqNode.rcFullIntValue
  282. }
  283. }
  284. // updateNodeRc updates a node's corrBufValue and adds an external correction value.
  285. // It also adds or removes the rcQueue entry and updates ServerParams and sumRecharge if necessary.
  286. func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *ServerParams, now mclock.AbsTime) {
  287. cm.updateRecharge(now)
  288. wasFull := true
  289. if node.corrBufValue != int64(node.params.BufLimit) {
  290. wasFull = false
  291. node.corrBufValue += (cm.rcLastIntValue - node.rcLastIntValue) * int64(node.params.MinRecharge) / FixedPointMultiplier
  292. if node.corrBufValue > int64(node.params.BufLimit) {
  293. node.corrBufValue = int64(node.params.BufLimit)
  294. }
  295. node.rcLastIntValue = cm.rcLastIntValue
  296. }
  297. node.corrBufValue += bvc
  298. diff := int64(params.BufLimit - node.params.BufLimit)
  299. if diff > 0 {
  300. node.corrBufValue += diff
  301. }
  302. isFull := false
  303. if node.corrBufValue >= int64(params.BufLimit) {
  304. node.corrBufValue = int64(params.BufLimit)
  305. isFull = true
  306. }
  307. if !wasFull {
  308. cm.sumRecharge -= node.params.MinRecharge
  309. }
  310. if params != &node.params {
  311. node.params = *params
  312. }
  313. if !isFull {
  314. cm.sumRecharge += node.params.MinRecharge
  315. if node.queueIndex != -1 {
  316. cm.rcQueue.Remove(node.queueIndex)
  317. }
  318. node.rcLastIntValue = cm.rcLastIntValue
  319. node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge)
  320. cm.rcQueue.Push(node, -node.rcFullIntValue)
  321. }
  322. }
  323. // reduceTotalCapacity reduces the total capacity allowance in case of a client freeze event
  324. func (cm *ClientManager) reduceTotalCapacity(frozenCap uint64) {
  325. cm.lock.Lock()
  326. defer cm.lock.Unlock()
  327. ratio := float64(1)
  328. if frozenCap < cm.totalConnected {
  329. ratio = float64(frozenCap) / float64(cm.totalConnected)
  330. }
  331. now := cm.clock.Now()
  332. cm.updateTotalCapacity(now, false)
  333. cm.logTotalCap -= capacityDropFactor * ratio
  334. if cm.logTotalCap < cm.minLogTotalCap {
  335. cm.logTotalCap = cm.minLogTotalCap
  336. }
  337. cm.updateTotalCapacity(now, true)
  338. }
  339. // updateTotalCapacity updates the total capacity factor. The capacity factor allows
  340. // the total capacity of the system to go over the allowed total recharge value
  341. // if clients go to frozen state sufficiently rarely.
  342. // The capacity factor is dropped instantly by a small amount if a clients is frozen.
  343. // It is raised slowly (with a large time constant) if the total connected capacity
  344. // is close to the total allowed amount and no clients are frozen.
  345. func (cm *ClientManager) updateTotalCapacity(now mclock.AbsTime, refresh bool) {
  346. dt := now - cm.capLastUpdate
  347. cm.capLastUpdate = now
  348. if cm.logTotalCap < cm.logTotalCapRaiseLimit {
  349. cm.logTotalCap += capacityRaiseTC * float64(dt)
  350. if cm.logTotalCap > cm.logTotalCapRaiseLimit {
  351. cm.logTotalCap = cm.logTotalCapRaiseLimit
  352. }
  353. }
  354. if cm.logTotalCap > cm.maxLogTotalCap {
  355. cm.logTotalCap = cm.maxLogTotalCap
  356. }
  357. if refresh {
  358. cm.refreshCapacity()
  359. }
  360. }
  361. // refreshCapacity recalculates the total capacity value and sends an update to the subscription
  362. // channel if the relative change of the value since the last update is more than 0.1 percent
  363. func (cm *ClientManager) refreshCapacity() {
  364. totalCapacity := math.Exp(cm.logTotalCap)
  365. if totalCapacity >= cm.totalCapacity*0.999 && totalCapacity <= cm.totalCapacity*1.001 {
  366. return
  367. }
  368. cm.totalCapacity = totalCapacity
  369. if cm.totalCapacityCh != nil {
  370. select {
  371. case cm.totalCapacityCh <- uint64(cm.totalCapacity):
  372. default:
  373. }
  374. }
  375. }
  376. // SubscribeTotalCapacity returns all future updates to the total capacity value
  377. // through a channel and also returns the current value
  378. func (cm *ClientManager) SubscribeTotalCapacity(ch chan uint64) uint64 {
  379. cm.lock.Lock()
  380. defer cm.lock.Unlock()
  381. cm.totalCapacityCh = ch
  382. return uint64(cm.totalCapacity)
  383. }
  384. // PieceWiseLinear is used to describe recharge curves
  385. type PieceWiseLinear []struct{ X, Y uint64 }
  386. // ValueAt returns the curve's value at a given point
  387. func (pwl PieceWiseLinear) ValueAt(x uint64) float64 {
  388. l := 0
  389. h := len(pwl)
  390. if h == 0 {
  391. return 0
  392. }
  393. for h != l {
  394. m := (l + h) / 2
  395. if x > pwl[m].X {
  396. l = m + 1
  397. } else {
  398. h = m
  399. }
  400. }
  401. if l == 0 {
  402. return float64(pwl[0].Y)
  403. }
  404. l--
  405. if h == len(pwl) {
  406. return float64(pwl[l].Y)
  407. }
  408. dx := pwl[h].X - pwl[l].X
  409. if dx < 1 {
  410. return float64(pwl[l].Y)
  411. }
  412. return float64(pwl[l].Y) + float64(pwl[h].Y-pwl[l].Y)*float64(x-pwl[l].X)/float64(dx)
  413. }
  414. // Valid returns true if the X coordinates of the curve points are non-strictly monotonic
  415. func (pwl PieceWiseLinear) Valid() bool {
  416. var lastX uint64
  417. for _, i := range pwl {
  418. if i.X < lastX {
  419. return false
  420. }
  421. lastX = i.X
  422. }
  423. return true
  424. }