valuetracker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "bytes"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/les/utils"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. const (
  31. vtVersion = 1 // database encoding format for ValueTracker
  32. nvtVersion = 1 // database encoding format for NodeValueTracker
  33. )
  34. var (
  35. vtKey = []byte("vt:")
  36. vtNodeKey = []byte("vtNode:")
  37. )
  38. // NodeValueTracker collects service value statistics for a specific server node
  39. type NodeValueTracker struct {
  40. lock sync.Mutex
  41. vt *ValueTracker
  42. rtStats, lastRtStats ResponseTimeStats
  43. lastTransfer mclock.AbsTime
  44. basket serverBasket
  45. reqCosts []uint64
  46. reqValues *[]float64
  47. }
  48. // UpdateCosts updates the node value tracker's request cost table
  49. func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64) {
  50. nv.vt.lock.Lock()
  51. defer nv.vt.lock.Unlock()
  52. nv.updateCosts(reqCosts, &nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
  53. }
  54. // updateCosts updates the request cost table of the server. The request value factor
  55. // is also updated based on the given cost table and the current reference basket.
  56. // Note that the contents of the referenced reqValues slice will not change; a new
  57. // reference is passed if the values are updated by ValueTracker.
  58. func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
  59. nv.lock.Lock()
  60. defer nv.lock.Unlock()
  61. nv.reqCosts = reqCosts
  62. nv.reqValues = reqValues
  63. nv.basket.updateRvFactor(rvFactor)
  64. }
  65. // transferStats returns request basket and response time statistics that should be
  66. // added to the global statistics. The contents of the server's own request basket are
  67. // gradually transferred to the main reference basket and removed from the server basket
  68. // with the specified transfer rate.
  69. // The response time statistics are retained at both places and therefore the global
  70. // distribution is always the sum of the individual server distributions.
  71. func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) {
  72. nv.lock.Lock()
  73. defer nv.lock.Unlock()
  74. dt := now - nv.lastTransfer
  75. nv.lastTransfer = now
  76. if dt < 0 {
  77. dt = 0
  78. }
  79. recentRtStats := nv.rtStats
  80. recentRtStats.SubStats(&nv.lastRtStats)
  81. nv.lastRtStats = nv.rtStats
  82. return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
  83. }
  84. type ServedRequest struct {
  85. ReqType, Amount uint32
  86. }
  87. // Served adds a served request to the node's statistics. An actual request may be composed
  88. // of one or more request types (service vector indices).
  89. func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration) {
  90. nv.vt.statsExpLock.RLock()
  91. expFactor := nv.vt.statsExpFactor
  92. nv.vt.statsExpLock.RUnlock()
  93. nv.lock.Lock()
  94. defer nv.lock.Unlock()
  95. var value float64
  96. for _, r := range reqs {
  97. nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
  98. value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
  99. }
  100. nv.rtStats.Add(respTime, value, expFactor)
  101. }
  102. // RtStats returns the node's own response time distribution statistics
  103. func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
  104. nv.lock.Lock()
  105. defer nv.lock.Unlock()
  106. return nv.rtStats
  107. }
  108. // ValueTracker coordinates service value calculation for individual servers and updates
  109. // global statistics
  110. type ValueTracker struct {
  111. clock mclock.Clock
  112. lock sync.Mutex
  113. quit chan chan struct{}
  114. db ethdb.KeyValueStore
  115. connected map[enode.ID]*NodeValueTracker
  116. reqTypeCount int
  117. refBasket referenceBasket
  118. mappings [][]string
  119. currentMapping int
  120. initRefBasket requestBasket
  121. rtStats ResponseTimeStats
  122. transferRate float64
  123. statsExpLock sync.RWMutex
  124. statsExpRate, offlineExpRate float64
  125. statsExpirer utils.Expirer
  126. statsExpFactor utils.ExpirationFactor
  127. }
  128. type valueTrackerEncV1 struct {
  129. Mappings [][]string
  130. RefBasketMapping uint
  131. RefBasket requestBasket
  132. RtStats ResponseTimeStats
  133. ExpOffset, SavedAt uint64
  134. }
  135. type nodeValueTrackerEncV1 struct {
  136. RtStats ResponseTimeStats
  137. ServerBasketMapping uint
  138. ServerBasket requestBasket
  139. }
  140. // RequestInfo is an initializer structure for the service vector.
  141. type RequestInfo struct {
  142. // Name identifies the request type and is used for re-mapping the service vector if necessary
  143. Name string
  144. // InitAmount and InitValue are used to initialize the reference basket
  145. InitAmount, InitValue float64
  146. }
  147. // NewValueTracker creates a new ValueTracker and loads its previously saved state from
  148. // the database if possible.
  149. func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker {
  150. now := clock.Now()
  151. initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))}
  152. mapping := make([]string, len(reqInfo))
  153. var sumAmount, sumValue float64
  154. for _, req := range reqInfo {
  155. sumAmount += req.InitAmount
  156. sumValue += req.InitAmount * req.InitValue
  157. }
  158. scaleValues := sumAmount * basketFactor / sumValue
  159. for i, req := range reqInfo {
  160. mapping[i] = req.Name
  161. initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor)
  162. initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues)
  163. }
  164. vt := &ValueTracker{
  165. clock: clock,
  166. connected: make(map[enode.ID]*NodeValueTracker),
  167. quit: make(chan chan struct{}),
  168. db: db,
  169. reqTypeCount: len(initRefBasket.items),
  170. initRefBasket: initRefBasket,
  171. transferRate: transferRate,
  172. statsExpRate: statsExpRate,
  173. offlineExpRate: offlineExpRate,
  174. }
  175. if vt.loadFromDb(mapping) != nil {
  176. // previous state not saved or invalid, init with default values
  177. vt.refBasket.basket = initRefBasket
  178. vt.mappings = [][]string{mapping}
  179. vt.currentMapping = 0
  180. }
  181. vt.statsExpirer.SetRate(now, statsExpRate)
  182. vt.refBasket.init(vt.reqTypeCount)
  183. vt.periodicUpdate()
  184. go func() {
  185. for {
  186. select {
  187. case <-clock.After(updatePeriod):
  188. vt.lock.Lock()
  189. vt.periodicUpdate()
  190. vt.lock.Unlock()
  191. case quit := <-vt.quit:
  192. close(quit)
  193. return
  194. }
  195. }
  196. }()
  197. return vt
  198. }
  199. // StatsExpirer returns the statistics expirer so that other values can be expired
  200. // with the same rate as the service value statistics.
  201. func (vt *ValueTracker) StatsExpirer() *utils.Expirer {
  202. return &vt.statsExpirer
  203. }
  204. // StatsExpirer returns the current expiration factor so that other values can be expired
  205. // with the same rate as the service value statistics.
  206. func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor {
  207. vt.statsExpLock.RLock()
  208. defer vt.statsExpLock.RUnlock()
  209. return vt.statsExpFactor
  210. }
  211. // loadFromDb loads the value tracker's state from the database and converts saved
  212. // request basket index mapping if it does not match the specified index to name mapping.
  213. func (vt *ValueTracker) loadFromDb(mapping []string) error {
  214. enc, err := vt.db.Get(vtKey)
  215. if err != nil {
  216. return err
  217. }
  218. r := bytes.NewReader(enc)
  219. var version uint
  220. if err := rlp.Decode(r, &version); err != nil {
  221. log.Error("Decoding value tracker state failed", "err", err)
  222. return err
  223. }
  224. if version != vtVersion {
  225. log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion)
  226. return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion)
  227. }
  228. var vte valueTrackerEncV1
  229. if err := rlp.Decode(r, &vte); err != nil {
  230. log.Error("Decoding value tracker state failed", "err", err)
  231. return err
  232. }
  233. logOffset := utils.Fixed64(vte.ExpOffset)
  234. dt := time.Now().UnixNano() - int64(vte.SavedAt)
  235. if dt > 0 {
  236. logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2))
  237. }
  238. vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset)
  239. vt.rtStats = vte.RtStats
  240. vt.mappings = vte.Mappings
  241. vt.currentMapping = -1
  242. loop:
  243. for i, m := range vt.mappings {
  244. if len(m) != len(mapping) {
  245. continue loop
  246. }
  247. for j, s := range mapping {
  248. if m[j] != s {
  249. continue loop
  250. }
  251. }
  252. vt.currentMapping = i
  253. break
  254. }
  255. if vt.currentMapping == -1 {
  256. vt.currentMapping = len(vt.mappings)
  257. vt.mappings = append(vt.mappings, mapping)
  258. }
  259. if int(vte.RefBasketMapping) == vt.currentMapping {
  260. vt.refBasket.basket = vte.RefBasket
  261. } else {
  262. if vte.RefBasketMapping >= uint(len(vt.mappings)) {
  263. log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping)
  264. return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping)
  265. }
  266. vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket)
  267. }
  268. return nil
  269. }
  270. // saveToDb saves the value tracker's state to the database
  271. func (vt *ValueTracker) saveToDb() {
  272. vte := valueTrackerEncV1{
  273. Mappings: vt.mappings,
  274. RefBasketMapping: uint(vt.currentMapping),
  275. RefBasket: vt.refBasket.basket,
  276. RtStats: vt.rtStats,
  277. ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())),
  278. SavedAt: uint64(time.Now().UnixNano()),
  279. }
  280. enc1, err := rlp.EncodeToBytes(uint(vtVersion))
  281. if err != nil {
  282. log.Error("Encoding value tracker state failed", "err", err)
  283. return
  284. }
  285. enc2, err := rlp.EncodeToBytes(&vte)
  286. if err != nil {
  287. log.Error("Encoding value tracker state failed", "err", err)
  288. return
  289. }
  290. if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil {
  291. log.Error("Saving value tracker state failed", "err", err)
  292. }
  293. }
  294. // Stop saves the value tracker's state and each loaded node's individual state and
  295. // returns after shutting the internal goroutines down.
  296. func (vt *ValueTracker) Stop() {
  297. quit := make(chan struct{})
  298. vt.quit <- quit
  299. <-quit
  300. vt.lock.Lock()
  301. vt.periodicUpdate()
  302. for id, nv := range vt.connected {
  303. vt.saveNode(id, nv)
  304. }
  305. vt.connected = nil
  306. vt.saveToDb()
  307. vt.lock.Unlock()
  308. }
  309. // Register adds a server node to the value tracker
  310. func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
  311. vt.lock.Lock()
  312. defer vt.lock.Unlock()
  313. if vt.connected == nil {
  314. // ValueTracker has already been stopped
  315. return nil
  316. }
  317. nv := vt.loadOrNewNode(id)
  318. reqTypeCount := len(vt.refBasket.reqValues)
  319. nv.reqCosts = make([]uint64, reqTypeCount)
  320. nv.lastTransfer = vt.clock.Now()
  321. nv.reqValues = &vt.refBasket.reqValues
  322. nv.basket.init(reqTypeCount)
  323. vt.connected[id] = nv
  324. return nv
  325. }
  326. // Unregister removes a server node from the value tracker
  327. func (vt *ValueTracker) Unregister(id enode.ID) {
  328. vt.lock.Lock()
  329. defer vt.lock.Unlock()
  330. if nv := vt.connected[id]; nv != nil {
  331. vt.saveNode(id, nv)
  332. delete(vt.connected, id)
  333. }
  334. }
  335. // GetNode returns an individual server node's value tracker. If it did not exist before
  336. // then a new node is created.
  337. func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker {
  338. vt.lock.Lock()
  339. defer vt.lock.Unlock()
  340. return vt.loadOrNewNode(id)
  341. }
  342. // loadOrNewNode returns an individual server node's value tracker. If it did not exist before
  343. // then a new node is created.
  344. func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
  345. if nv, ok := vt.connected[id]; ok {
  346. return nv
  347. }
  348. nv := &NodeValueTracker{vt: vt, lastTransfer: vt.clock.Now()}
  349. enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
  350. if err != nil {
  351. return nv
  352. }
  353. r := bytes.NewReader(enc)
  354. var version uint
  355. if err := rlp.Decode(r, &version); err != nil {
  356. log.Error("Failed to decode node value tracker", "id", id, "err", err)
  357. return nv
  358. }
  359. if version != nvtVersion {
  360. log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion)
  361. return nv
  362. }
  363. var nve nodeValueTrackerEncV1
  364. if err := rlp.Decode(r, &nve); err != nil {
  365. log.Error("Failed to decode node value tracker", "id", id, "err", err)
  366. return nv
  367. }
  368. nv.rtStats = nve.RtStats
  369. nv.lastRtStats = nve.RtStats
  370. if int(nve.ServerBasketMapping) == vt.currentMapping {
  371. nv.basket.basket = nve.ServerBasket
  372. } else {
  373. if nve.ServerBasketMapping >= uint(len(vt.mappings)) {
  374. log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping)
  375. return nv
  376. }
  377. nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket)
  378. }
  379. return nv
  380. }
  381. // saveNode saves a server node's value tracker to the database
  382. func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
  383. recentRtStats := nv.rtStats
  384. recentRtStats.SubStats(&nv.lastRtStats)
  385. vt.rtStats.AddStats(&recentRtStats)
  386. nv.lastRtStats = nv.rtStats
  387. nve := nodeValueTrackerEncV1{
  388. RtStats: nv.rtStats,
  389. ServerBasketMapping: uint(vt.currentMapping),
  390. ServerBasket: nv.basket.basket,
  391. }
  392. enc1, err := rlp.EncodeToBytes(uint(nvtVersion))
  393. if err != nil {
  394. log.Error("Failed to encode service value information", "id", id, "err", err)
  395. return
  396. }
  397. enc2, err := rlp.EncodeToBytes(&nve)
  398. if err != nil {
  399. log.Error("Failed to encode service value information", "id", id, "err", err)
  400. return
  401. }
  402. if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil {
  403. log.Error("Failed to save service value information", "id", id, "err", err)
  404. }
  405. }
  406. // RtStats returns the global response time distribution statistics
  407. func (vt *ValueTracker) RtStats() ResponseTimeStats {
  408. vt.lock.Lock()
  409. defer vt.lock.Unlock()
  410. vt.periodicUpdate()
  411. return vt.rtStats
  412. }
  413. // periodicUpdate transfers individual node data to the global statistics, normalizes
  414. // the reference basket and updates request values. The global state is also saved to
  415. // the database with each update.
  416. func (vt *ValueTracker) periodicUpdate() {
  417. now := vt.clock.Now()
  418. vt.statsExpLock.Lock()
  419. vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now))
  420. vt.statsExpLock.Unlock()
  421. for _, nv := range vt.connected {
  422. basket, rtStats := nv.transferStats(now, vt.transferRate)
  423. vt.refBasket.add(basket)
  424. vt.rtStats.AddStats(&rtStats)
  425. }
  426. vt.refBasket.normalize()
  427. vt.refBasket.updateReqValues()
  428. for _, nv := range vt.connected {
  429. nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
  430. }
  431. vt.saveToDb()
  432. }
  433. type RequestStatsItem struct {
  434. Name string
  435. ReqAmount, ReqValue float64
  436. }
  437. // RequestStats returns the current contents of the reference request basket, with
  438. // request values meaning average per request rather than total.
  439. func (vt *ValueTracker) RequestStats() []RequestStatsItem {
  440. vt.statsExpLock.RLock()
  441. expFactor := vt.statsExpFactor
  442. vt.statsExpLock.RUnlock()
  443. vt.lock.Lock()
  444. defer vt.lock.Unlock()
  445. vt.periodicUpdate()
  446. res := make([]RequestStatsItem, len(vt.refBasket.basket.items))
  447. for i, item := range vt.refBasket.basket.items {
  448. res[i].Name = vt.mappings[vt.currentMapping][i]
  449. res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp)
  450. res[i].ReqValue = vt.refBasket.reqValues[i]
  451. }
  452. return res
  453. }