costtracker.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "math"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/eth/ethconfig"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/les/flowcontrol"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. )
  30. const makeCostStats = false // make request cost statistics during operation
  31. var (
  32. // average request cost estimates based on serving time
  33. reqAvgTimeCost = requestCostTable{
  34. GetBlockHeadersMsg: {150000, 30000},
  35. GetBlockBodiesMsg: {0, 700000},
  36. GetReceiptsMsg: {0, 1000000},
  37. GetCodeMsg: {0, 450000},
  38. GetProofsV2Msg: {0, 600000},
  39. GetHelperTrieProofsMsg: {0, 1000000},
  40. SendTxV2Msg: {0, 450000},
  41. GetTxStatusMsg: {0, 250000},
  42. }
  43. // maximum incoming message size estimates
  44. reqMaxInSize = requestCostTable{
  45. GetBlockHeadersMsg: {40, 0},
  46. GetBlockBodiesMsg: {0, 40},
  47. GetReceiptsMsg: {0, 40},
  48. GetCodeMsg: {0, 80},
  49. GetProofsV2Msg: {0, 80},
  50. GetHelperTrieProofsMsg: {0, 20},
  51. SendTxV2Msg: {0, 16500},
  52. GetTxStatusMsg: {0, 50},
  53. }
  54. // maximum outgoing message size estimates
  55. reqMaxOutSize = requestCostTable{
  56. GetBlockHeadersMsg: {0, 556},
  57. GetBlockBodiesMsg: {0, 100000},
  58. GetReceiptsMsg: {0, 200000},
  59. GetCodeMsg: {0, 50000},
  60. GetProofsV2Msg: {0, 4000},
  61. GetHelperTrieProofsMsg: {0, 4000},
  62. SendTxV2Msg: {0, 100},
  63. GetTxStatusMsg: {0, 100},
  64. }
  65. // request amounts that have to fit into the minimum buffer size minBufferMultiplier times
  66. minBufferReqAmount = map[uint64]uint64{
  67. GetBlockHeadersMsg: 192,
  68. GetBlockBodiesMsg: 1,
  69. GetReceiptsMsg: 1,
  70. GetCodeMsg: 1,
  71. GetProofsV2Msg: 1,
  72. GetHelperTrieProofsMsg: 16,
  73. SendTxV2Msg: 8,
  74. GetTxStatusMsg: 64,
  75. }
  76. minBufferMultiplier = 3
  77. )
  78. const (
  79. maxCostFactor = 2 // ratio of maximum and average cost estimates
  80. bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
  81. gfUsageThreshold = 0.5
  82. gfUsageTC = time.Second
  83. gfRaiseTC = time.Second * 200
  84. gfDropTC = time.Second * 50
  85. gfDbKey = "_globalCostFactorV6"
  86. )
  87. // costTracker is responsible for calculating costs and cost estimates on the
  88. // server side. It continuously updates the global cost factor which is defined
  89. // as the number of cost units per nanosecond of serving time in a single thread.
  90. // It is based on statistics collected during serving requests in high-load periods
  91. // and practically acts as a one-dimension request price scaling factor over the
  92. // pre-defined cost estimate table.
  93. //
  94. // The reason for dynamically maintaining the global factor on the server side is:
  95. // the estimated time cost of the request is fixed(hardcoded) but the configuration
  96. // of the machine running the server is really different. Therefore, the request serving
  97. // time in different machine will vary greatly. And also, the request serving time
  98. // in same machine may vary greatly with different request pressure.
  99. //
  100. // In order to more effectively limit resources, we apply the global factor to serving
  101. // time to make the result as close as possible to the estimated time cost no matter
  102. // the server is slow or fast. And also we scale the totalRecharge with global factor
  103. // so that fast server can serve more requests than estimation and slow server can
  104. // reduce request pressure.
  105. //
  106. // Instead of scaling the cost values, the real value of cost units is changed by
  107. // applying the factor to the serving times. This is more convenient because the
  108. // changes in the cost factor can be applied immediately without always notifying
  109. // the clients about the changed cost tables.
  110. type costTracker struct {
  111. db ethdb.Database
  112. stopCh chan chan struct{}
  113. inSizeFactor float64
  114. outSizeFactor float64
  115. factor float64
  116. utilTarget float64
  117. minBufLimit uint64
  118. gfLock sync.RWMutex
  119. reqInfoCh chan reqInfo
  120. totalRechargeCh chan uint64
  121. stats map[uint64][]uint64 // Used for testing purpose.
  122. // TestHooks
  123. testing bool // Disable real cost evaluation for testing purpose.
  124. testCostList RequestCostList // Customized cost table for testing purpose.
  125. }
  126. // newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
  127. // It also returns the minimum capacity that can be assigned to any peer.
  128. func newCostTracker(db ethdb.Database, config *ethconfig.Config) (*costTracker, uint64) {
  129. utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
  130. ct := &costTracker{
  131. db: db,
  132. stopCh: make(chan chan struct{}),
  133. reqInfoCh: make(chan reqInfo, 100),
  134. utilTarget: utilTarget,
  135. }
  136. if config.LightIngress > 0 {
  137. ct.inSizeFactor = utilTarget / float64(config.LightIngress)
  138. }
  139. if config.LightEgress > 0 {
  140. ct.outSizeFactor = utilTarget / float64(config.LightEgress)
  141. }
  142. if makeCostStats {
  143. ct.stats = make(map[uint64][]uint64)
  144. for code := range reqAvgTimeCost {
  145. ct.stats[code] = make([]uint64, 10)
  146. }
  147. }
  148. ct.gfLoop()
  149. costList := ct.makeCostList(ct.globalFactor() * 1.25)
  150. for _, c := range costList {
  151. amount := minBufferReqAmount[c.MsgCode]
  152. cost := c.BaseCost + amount*c.ReqCost
  153. if cost > ct.minBufLimit {
  154. ct.minBufLimit = cost
  155. }
  156. }
  157. ct.minBufLimit *= uint64(minBufferMultiplier)
  158. return ct, (ct.minBufLimit-1)/bufLimitRatio + 1
  159. }
  160. // stop stops the cost tracker and saves the cost factor statistics to the database
  161. func (ct *costTracker) stop() {
  162. stopCh := make(chan struct{})
  163. ct.stopCh <- stopCh
  164. <-stopCh
  165. if makeCostStats {
  166. ct.printStats()
  167. }
  168. }
  169. // makeCostList returns upper cost estimates based on the hardcoded cost estimate
  170. // tables and the optionally specified incoming/outgoing bandwidth limits
  171. func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList {
  172. maxCost := func(avgTimeCost, inSize, outSize uint64) uint64 {
  173. cost := avgTimeCost * maxCostFactor
  174. inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor)
  175. if inSizeCost > cost {
  176. cost = inSizeCost
  177. }
  178. outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor)
  179. if outSizeCost > cost {
  180. cost = outSizeCost
  181. }
  182. return cost
  183. }
  184. var list RequestCostList
  185. for code, data := range reqAvgTimeCost {
  186. baseCost := maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost)
  187. reqCost := maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost)
  188. if ct.minBufLimit != 0 {
  189. // if minBufLimit is set then always enforce maximum request cost <= minBufLimit
  190. maxCost := baseCost + reqCost*minBufferReqAmount[code]
  191. if maxCost > ct.minBufLimit {
  192. mul := 0.999 * float64(ct.minBufLimit) / float64(maxCost)
  193. baseCost = uint64(float64(baseCost) * mul)
  194. reqCost = uint64(float64(reqCost) * mul)
  195. }
  196. }
  197. list = append(list, requestCostListItem{
  198. MsgCode: code,
  199. BaseCost: baseCost,
  200. ReqCost: reqCost,
  201. })
  202. }
  203. return list
  204. }
  205. // reqInfo contains the estimated time cost and the actual request serving time
  206. // which acts as a feed source to update factor maintained by costTracker.
  207. type reqInfo struct {
  208. // avgTimeCost is the estimated time cost corresponding to maxCostTable.
  209. avgTimeCost float64
  210. // servingTime is the CPU time corresponding to the actual processing of
  211. // the request.
  212. servingTime float64
  213. // msgCode indicates the type of request.
  214. msgCode uint64
  215. }
  216. // gfLoop starts an event loop which updates the global cost factor which is
  217. // calculated as a weighted average of the average estimate / serving time ratio.
  218. // The applied weight equals the serving time if gfUsage is over a threshold,
  219. // zero otherwise. gfUsage is the recent average serving time per time unit in
  220. // an exponential moving window. This ensures that statistics are collected only
  221. // under high-load circumstances where the measured serving times are relevant.
  222. // The total recharge parameter of the flow control system which controls the
  223. // total allowed serving time per second but nominated in cost units, should
  224. // also be scaled with the cost factor and is also updated by this loop.
  225. func (ct *costTracker) gfLoop() {
  226. var (
  227. factor, totalRecharge float64
  228. gfLog, recentTime, recentAvg float64
  229. lastUpdate, expUpdate = mclock.Now(), mclock.Now()
  230. )
  231. // Load historical cost factor statistics from the database.
  232. data, _ := ct.db.Get([]byte(gfDbKey))
  233. if len(data) == 8 {
  234. gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:]))
  235. }
  236. ct.factor = math.Exp(gfLog)
  237. factor, totalRecharge = ct.factor, ct.utilTarget*ct.factor
  238. // In order to perform factor data statistics under the high request pressure,
  239. // we only adjust factor when recent factor usage beyond the threshold.
  240. threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / flowcontrol.FixedPointMultiplier
  241. go func() {
  242. saveCostFactor := func() {
  243. var data [8]byte
  244. binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog))
  245. ct.db.Put([]byte(gfDbKey), data[:])
  246. log.Debug("global cost factor saved", "value", factor)
  247. }
  248. saveTicker := time.NewTicker(time.Minute * 10)
  249. defer saveTicker.Stop()
  250. for {
  251. select {
  252. case r := <-ct.reqInfoCh:
  253. relCost := int64(factor * r.servingTime * 100 / r.avgTimeCost) // Convert the value to a percentage form
  254. // Record more metrics if we are debugging
  255. if metrics.EnabledExpensive {
  256. switch r.msgCode {
  257. case GetBlockHeadersMsg:
  258. relativeCostHeaderHistogram.Update(relCost)
  259. case GetBlockBodiesMsg:
  260. relativeCostBodyHistogram.Update(relCost)
  261. case GetReceiptsMsg:
  262. relativeCostReceiptHistogram.Update(relCost)
  263. case GetCodeMsg:
  264. relativeCostCodeHistogram.Update(relCost)
  265. case GetProofsV2Msg:
  266. relativeCostProofHistogram.Update(relCost)
  267. case GetHelperTrieProofsMsg:
  268. relativeCostHelperProofHistogram.Update(relCost)
  269. case SendTxV2Msg:
  270. relativeCostSendTxHistogram.Update(relCost)
  271. case GetTxStatusMsg:
  272. relativeCostTxStatusHistogram.Update(relCost)
  273. }
  274. }
  275. // SendTxV2 and GetTxStatus requests are two special cases.
  276. // All other requests will only put pressure on the database, and
  277. // the corresponding delay is relatively stable. While these two
  278. // requests involve txpool query, which is usually unstable.
  279. //
  280. // TODO(rjl493456442) fixes this.
  281. if r.msgCode == SendTxV2Msg || r.msgCode == GetTxStatusMsg {
  282. continue
  283. }
  284. requestServedMeter.Mark(int64(r.servingTime))
  285. requestServedTimer.Update(time.Duration(r.servingTime))
  286. requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
  287. requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
  288. relativeCostHistogram.Update(relCost)
  289. now := mclock.Now()
  290. dt := float64(now - expUpdate)
  291. expUpdate = now
  292. exp := math.Exp(-dt / float64(gfUsageTC))
  293. // calculate factor correction until now, based on previous values
  294. var gfCorr float64
  295. max := recentTime
  296. if recentAvg > max {
  297. max = recentAvg
  298. }
  299. // we apply continuous correction when MAX(recentTime, recentAvg) > threshold
  300. if max > threshold {
  301. // calculate correction time between last expUpdate and now
  302. if max*exp >= threshold {
  303. gfCorr = dt
  304. } else {
  305. gfCorr = math.Log(max/threshold) * float64(gfUsageTC)
  306. }
  307. // calculate log(factor) correction with the right direction and time constant
  308. if recentTime > recentAvg {
  309. // drop factor if actual serving times are larger than average estimates
  310. gfCorr /= -float64(gfDropTC)
  311. } else {
  312. // raise factor if actual serving times are smaller than average estimates
  313. gfCorr /= float64(gfRaiseTC)
  314. }
  315. }
  316. // update recent cost values with current request
  317. recentTime = recentTime*exp + r.servingTime
  318. recentAvg = recentAvg*exp + r.avgTimeCost/factor
  319. if gfCorr != 0 {
  320. // Apply the correction to factor
  321. gfLog += gfCorr
  322. factor = math.Exp(gfLog)
  323. // Notify outside modules the new factor and totalRecharge.
  324. if time.Duration(now-lastUpdate) > time.Second {
  325. totalRecharge, lastUpdate = ct.utilTarget*factor, now
  326. ct.gfLock.Lock()
  327. ct.factor = factor
  328. ch := ct.totalRechargeCh
  329. ct.gfLock.Unlock()
  330. if ch != nil {
  331. select {
  332. case ct.totalRechargeCh <- uint64(totalRecharge):
  333. default:
  334. }
  335. }
  336. globalFactorGauge.Update(int64(1000 * factor))
  337. log.Debug("global cost factor updated", "factor", factor)
  338. }
  339. }
  340. recentServedGauge.Update(int64(recentTime))
  341. recentEstimatedGauge.Update(int64(recentAvg))
  342. case <-saveTicker.C:
  343. saveCostFactor()
  344. case stopCh := <-ct.stopCh:
  345. saveCostFactor()
  346. close(stopCh)
  347. return
  348. }
  349. }
  350. }()
  351. }
  352. // globalFactor returns the current value of the global cost factor
  353. func (ct *costTracker) globalFactor() float64 {
  354. ct.gfLock.RLock()
  355. defer ct.gfLock.RUnlock()
  356. return ct.factor
  357. }
  358. // totalRecharge returns the current total recharge parameter which is used by
  359. // flowcontrol.ClientManager and is scaled by the global cost factor
  360. func (ct *costTracker) totalRecharge() uint64 {
  361. ct.gfLock.RLock()
  362. defer ct.gfLock.RUnlock()
  363. return uint64(ct.factor * ct.utilTarget)
  364. }
  365. // subscribeTotalRecharge returns all future updates to the total recharge value
  366. // through a channel and also returns the current value
  367. func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
  368. ct.gfLock.Lock()
  369. defer ct.gfLock.Unlock()
  370. ct.totalRechargeCh = ch
  371. return uint64(ct.factor * ct.utilTarget)
  372. }
  373. // updateStats updates the global cost factor and (if enabled) the real cost vs.
  374. // average estimate statistics
  375. func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
  376. avg := reqAvgTimeCost[code]
  377. avgTimeCost := avg.baseCost + amount*avg.reqCost
  378. select {
  379. case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime), code}:
  380. default:
  381. }
  382. if makeCostStats {
  383. realCost <<= 4
  384. l := 0
  385. for l < 9 && realCost > avgTimeCost {
  386. l++
  387. realCost >>= 1
  388. }
  389. atomic.AddUint64(&ct.stats[code][l], 1)
  390. }
  391. }
  392. // realCost calculates the final cost of a request based on actual serving time,
  393. // incoming and outgoing message size
  394. //
  395. // Note: message size is only taken into account if bandwidth limitation is applied
  396. // and the cost based on either message size is greater than the cost based on
  397. // serving time. A maximum of the three costs is applied instead of their sum
  398. // because the three limited resources (serving thread time and i/o bandwidth) can
  399. // also be maxed out simultaneously.
  400. func (ct *costTracker) realCost(servingTime uint64, inSize, outSize uint32) uint64 {
  401. cost := float64(servingTime)
  402. inSizeCost := float64(inSize) * ct.inSizeFactor
  403. if inSizeCost > cost {
  404. cost = inSizeCost
  405. }
  406. outSizeCost := float64(outSize) * ct.outSizeFactor
  407. if outSizeCost > cost {
  408. cost = outSizeCost
  409. }
  410. return uint64(cost * ct.globalFactor())
  411. }
  412. // printStats prints the distribution of real request cost relative to the average estimates
  413. func (ct *costTracker) printStats() {
  414. if ct.stats == nil {
  415. return
  416. }
  417. for code, arr := range ct.stats {
  418. log.Info("Request cost statistics", "code", code, "1/16", arr[0], "1/8", arr[1], "1/4", arr[2], "1/2", arr[3], "1", arr[4], "2", arr[5], "4", arr[6], "8", arr[7], "16", arr[8], ">16", arr[9])
  419. }
  420. }
  421. type (
  422. // requestCostTable assigns a cost estimate function to each request type
  423. // which is a linear function of the requested amount
  424. // (cost = baseCost + reqCost * amount)
  425. requestCostTable map[uint64]*requestCosts
  426. requestCosts struct {
  427. baseCost, reqCost uint64
  428. }
  429. // RequestCostList is a list representation of request costs which is used for
  430. // database storage and communication through the network
  431. RequestCostList []requestCostListItem
  432. requestCostListItem struct {
  433. MsgCode, BaseCost, ReqCost uint64
  434. }
  435. )
  436. // getMaxCost calculates the estimated cost for a given request type and amount
  437. func (table requestCostTable) getMaxCost(code, amount uint64) uint64 {
  438. costs := table[code]
  439. return costs.baseCost + amount*costs.reqCost
  440. }
  441. // decode converts a cost list to a cost table
  442. func (list RequestCostList) decode(protocolLength uint64) requestCostTable {
  443. table := make(requestCostTable)
  444. for _, e := range list {
  445. if e.MsgCode < protocolLength {
  446. table[e.MsgCode] = &requestCosts{
  447. baseCost: e.BaseCost,
  448. reqCost: e.ReqCost,
  449. }
  450. }
  451. }
  452. return table
  453. }
  454. // testCostList returns a dummy request cost list used by tests
  455. func testCostList(testCost uint64) RequestCostList {
  456. cl := make(RequestCostList, len(reqAvgTimeCost))
  457. var max uint64
  458. for code := range reqAvgTimeCost {
  459. if code > max {
  460. max = code
  461. }
  462. }
  463. i := 0
  464. for code := uint64(0); code <= max; code++ {
  465. if _, ok := reqAvgTimeCost[code]; ok {
  466. cl[i].MsgCode = code
  467. cl[i].BaseCost = testCost
  468. cl[i].ReqCost = 0
  469. i++
  470. }
  471. }
  472. return cl
  473. }