123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- // Copyright 2020 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package client
- import (
- "bytes"
- "fmt"
- "math"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/les/utils"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/rlp"
- )
- const (
- vtVersion = 1 // database encoding format for ValueTracker
- nvtVersion = 1 // database encoding format for NodeValueTracker
- )
- var (
- vtKey = []byte("vt:")
- vtNodeKey = []byte("vtNode:")
- )
- // NodeValueTracker collects service value statistics for a specific server node
- type NodeValueTracker struct {
- lock sync.Mutex
- vt *ValueTracker
- rtStats, lastRtStats ResponseTimeStats
- lastTransfer mclock.AbsTime
- basket serverBasket
- reqCosts []uint64
- reqValues *[]float64
- }
- // UpdateCosts updates the node value tracker's request cost table
- func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64) {
- nv.vt.lock.Lock()
- defer nv.vt.lock.Unlock()
- nv.updateCosts(reqCosts, &nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
- }
- // updateCosts updates the request cost table of the server. The request value factor
- // is also updated based on the given cost table and the current reference basket.
- // Note that the contents of the referenced reqValues slice will not change; a new
- // reference is passed if the values are updated by ValueTracker.
- func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
- nv.lock.Lock()
- defer nv.lock.Unlock()
- nv.reqCosts = reqCosts
- nv.reqValues = reqValues
- nv.basket.updateRvFactor(rvFactor)
- }
- // transferStats returns request basket and response time statistics that should be
- // added to the global statistics. The contents of the server's own request basket are
- // gradually transferred to the main reference basket and removed from the server basket
- // with the specified transfer rate.
- // The response time statistics are retained at both places and therefore the global
- // distribution is always the sum of the individual server distributions.
- func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) {
- nv.lock.Lock()
- defer nv.lock.Unlock()
- dt := now - nv.lastTransfer
- nv.lastTransfer = now
- if dt < 0 {
- dt = 0
- }
- recentRtStats := nv.rtStats
- recentRtStats.SubStats(&nv.lastRtStats)
- nv.lastRtStats = nv.rtStats
- return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
- }
- type ServedRequest struct {
- ReqType, Amount uint32
- }
- // Served adds a served request to the node's statistics. An actual request may be composed
- // of one or more request types (service vector indices).
- func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration) {
- nv.vt.statsExpLock.RLock()
- expFactor := nv.vt.statsExpFactor
- nv.vt.statsExpLock.RUnlock()
- nv.lock.Lock()
- defer nv.lock.Unlock()
- var value float64
- for _, r := range reqs {
- nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
- value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
- }
- nv.rtStats.Add(respTime, value, expFactor)
- }
- // RtStats returns the node's own response time distribution statistics
- func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
- nv.lock.Lock()
- defer nv.lock.Unlock()
- return nv.rtStats
- }
- // ValueTracker coordinates service value calculation for individual servers and updates
- // global statistics
- type ValueTracker struct {
- clock mclock.Clock
- lock sync.Mutex
- quit chan chan struct{}
- db ethdb.KeyValueStore
- connected map[enode.ID]*NodeValueTracker
- reqTypeCount int
- refBasket referenceBasket
- mappings [][]string
- currentMapping int
- initRefBasket requestBasket
- rtStats ResponseTimeStats
- transferRate float64
- statsExpLock sync.RWMutex
- statsExpRate, offlineExpRate float64
- statsExpirer utils.Expirer
- statsExpFactor utils.ExpirationFactor
- }
- type valueTrackerEncV1 struct {
- Mappings [][]string
- RefBasketMapping uint
- RefBasket requestBasket
- RtStats ResponseTimeStats
- ExpOffset, SavedAt uint64
- }
- type nodeValueTrackerEncV1 struct {
- RtStats ResponseTimeStats
- ServerBasketMapping uint
- ServerBasket requestBasket
- }
- // RequestInfo is an initializer structure for the service vector.
- type RequestInfo struct {
- // Name identifies the request type and is used for re-mapping the service vector if necessary
- Name string
- // InitAmount and InitValue are used to initialize the reference basket
- InitAmount, InitValue float64
- }
- // NewValueTracker creates a new ValueTracker and loads its previously saved state from
- // the database if possible.
- func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker {
- now := clock.Now()
- initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))}
- mapping := make([]string, len(reqInfo))
- var sumAmount, sumValue float64
- for _, req := range reqInfo {
- sumAmount += req.InitAmount
- sumValue += req.InitAmount * req.InitValue
- }
- scaleValues := sumAmount * basketFactor / sumValue
- for i, req := range reqInfo {
- mapping[i] = req.Name
- initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor)
- initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues)
- }
- vt := &ValueTracker{
- clock: clock,
- connected: make(map[enode.ID]*NodeValueTracker),
- quit: make(chan chan struct{}),
- db: db,
- reqTypeCount: len(initRefBasket.items),
- initRefBasket: initRefBasket,
- transferRate: transferRate,
- statsExpRate: statsExpRate,
- offlineExpRate: offlineExpRate,
- }
- if vt.loadFromDb(mapping) != nil {
- // previous state not saved or invalid, init with default values
- vt.refBasket.basket = initRefBasket
- vt.mappings = [][]string{mapping}
- vt.currentMapping = 0
- }
- vt.statsExpirer.SetRate(now, statsExpRate)
- vt.refBasket.init(vt.reqTypeCount)
- vt.periodicUpdate()
- go func() {
- for {
- select {
- case <-clock.After(updatePeriod):
- vt.lock.Lock()
- vt.periodicUpdate()
- vt.lock.Unlock()
- case quit := <-vt.quit:
- close(quit)
- return
- }
- }
- }()
- return vt
- }
- // StatsExpirer returns the statistics expirer so that other values can be expired
- // with the same rate as the service value statistics.
- func (vt *ValueTracker) StatsExpirer() *utils.Expirer {
- return &vt.statsExpirer
- }
- // StatsExpirer returns the current expiration factor so that other values can be expired
- // with the same rate as the service value statistics.
- func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor {
- vt.statsExpLock.RLock()
- defer vt.statsExpLock.RUnlock()
- return vt.statsExpFactor
- }
- // loadFromDb loads the value tracker's state from the database and converts saved
- // request basket index mapping if it does not match the specified index to name mapping.
- func (vt *ValueTracker) loadFromDb(mapping []string) error {
- enc, err := vt.db.Get(vtKey)
- if err != nil {
- return err
- }
- r := bytes.NewReader(enc)
- var version uint
- if err := rlp.Decode(r, &version); err != nil {
- log.Error("Decoding value tracker state failed", "err", err)
- return err
- }
- if version != vtVersion {
- log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion)
- return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion)
- }
- var vte valueTrackerEncV1
- if err := rlp.Decode(r, &vte); err != nil {
- log.Error("Decoding value tracker state failed", "err", err)
- return err
- }
- logOffset := utils.Fixed64(vte.ExpOffset)
- dt := time.Now().UnixNano() - int64(vte.SavedAt)
- if dt > 0 {
- logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2))
- }
- vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset)
- vt.rtStats = vte.RtStats
- vt.mappings = vte.Mappings
- vt.currentMapping = -1
- loop:
- for i, m := range vt.mappings {
- if len(m) != len(mapping) {
- continue loop
- }
- for j, s := range mapping {
- if m[j] != s {
- continue loop
- }
- }
- vt.currentMapping = i
- break
- }
- if vt.currentMapping == -1 {
- vt.currentMapping = len(vt.mappings)
- vt.mappings = append(vt.mappings, mapping)
- }
- if int(vte.RefBasketMapping) == vt.currentMapping {
- vt.refBasket.basket = vte.RefBasket
- } else {
- if vte.RefBasketMapping >= uint(len(vt.mappings)) {
- log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping)
- return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping)
- }
- vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket)
- }
- return nil
- }
- // saveToDb saves the value tracker's state to the database
- func (vt *ValueTracker) saveToDb() {
- vte := valueTrackerEncV1{
- Mappings: vt.mappings,
- RefBasketMapping: uint(vt.currentMapping),
- RefBasket: vt.refBasket.basket,
- RtStats: vt.rtStats,
- ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())),
- SavedAt: uint64(time.Now().UnixNano()),
- }
- enc1, err := rlp.EncodeToBytes(uint(vtVersion))
- if err != nil {
- log.Error("Encoding value tracker state failed", "err", err)
- return
- }
- enc2, err := rlp.EncodeToBytes(&vte)
- if err != nil {
- log.Error("Encoding value tracker state failed", "err", err)
- return
- }
- if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil {
- log.Error("Saving value tracker state failed", "err", err)
- }
- }
- // Stop saves the value tracker's state and each loaded node's individual state and
- // returns after shutting the internal goroutines down.
- func (vt *ValueTracker) Stop() {
- quit := make(chan struct{})
- vt.quit <- quit
- <-quit
- vt.lock.Lock()
- vt.periodicUpdate()
- for id, nv := range vt.connected {
- vt.saveNode(id, nv)
- }
- vt.connected = nil
- vt.saveToDb()
- vt.lock.Unlock()
- }
- // Register adds a server node to the value tracker
- func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
- vt.lock.Lock()
- defer vt.lock.Unlock()
- if vt.connected == nil {
- // ValueTracker has already been stopped
- return nil
- }
- nv := vt.loadOrNewNode(id)
- reqTypeCount := len(vt.refBasket.reqValues)
- nv.reqCosts = make([]uint64, reqTypeCount)
- nv.lastTransfer = vt.clock.Now()
- nv.reqValues = &vt.refBasket.reqValues
- nv.basket.init(reqTypeCount)
- vt.connected[id] = nv
- return nv
- }
- // Unregister removes a server node from the value tracker
- func (vt *ValueTracker) Unregister(id enode.ID) {
- vt.lock.Lock()
- defer vt.lock.Unlock()
- if nv := vt.connected[id]; nv != nil {
- vt.saveNode(id, nv)
- delete(vt.connected, id)
- }
- }
- // GetNode returns an individual server node's value tracker. If it did not exist before
- // then a new node is created.
- func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker {
- vt.lock.Lock()
- defer vt.lock.Unlock()
- return vt.loadOrNewNode(id)
- }
- // loadOrNewNode returns an individual server node's value tracker. If it did not exist before
- // then a new node is created.
- func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
- if nv, ok := vt.connected[id]; ok {
- return nv
- }
- nv := &NodeValueTracker{vt: vt, lastTransfer: vt.clock.Now()}
- enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
- if err != nil {
- return nv
- }
- r := bytes.NewReader(enc)
- var version uint
- if err := rlp.Decode(r, &version); err != nil {
- log.Error("Failed to decode node value tracker", "id", id, "err", err)
- return nv
- }
- if version != nvtVersion {
- log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion)
- return nv
- }
- var nve nodeValueTrackerEncV1
- if err := rlp.Decode(r, &nve); err != nil {
- log.Error("Failed to decode node value tracker", "id", id, "err", err)
- return nv
- }
- nv.rtStats = nve.RtStats
- nv.lastRtStats = nve.RtStats
- if int(nve.ServerBasketMapping) == vt.currentMapping {
- nv.basket.basket = nve.ServerBasket
- } else {
- if nve.ServerBasketMapping >= uint(len(vt.mappings)) {
- log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping)
- return nv
- }
- nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket)
- }
- return nv
- }
- // saveNode saves a server node's value tracker to the database
- func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
- recentRtStats := nv.rtStats
- recentRtStats.SubStats(&nv.lastRtStats)
- vt.rtStats.AddStats(&recentRtStats)
- nv.lastRtStats = nv.rtStats
- nve := nodeValueTrackerEncV1{
- RtStats: nv.rtStats,
- ServerBasketMapping: uint(vt.currentMapping),
- ServerBasket: nv.basket.basket,
- }
- enc1, err := rlp.EncodeToBytes(uint(nvtVersion))
- if err != nil {
- log.Error("Failed to encode service value information", "id", id, "err", err)
- return
- }
- enc2, err := rlp.EncodeToBytes(&nve)
- if err != nil {
- log.Error("Failed to encode service value information", "id", id, "err", err)
- return
- }
- if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil {
- log.Error("Failed to save service value information", "id", id, "err", err)
- }
- }
- // RtStats returns the global response time distribution statistics
- func (vt *ValueTracker) RtStats() ResponseTimeStats {
- vt.lock.Lock()
- defer vt.lock.Unlock()
- vt.periodicUpdate()
- return vt.rtStats
- }
- // periodicUpdate transfers individual node data to the global statistics, normalizes
- // the reference basket and updates request values. The global state is also saved to
- // the database with each update.
- func (vt *ValueTracker) periodicUpdate() {
- now := vt.clock.Now()
- vt.statsExpLock.Lock()
- vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now))
- vt.statsExpLock.Unlock()
- for _, nv := range vt.connected {
- basket, rtStats := nv.transferStats(now, vt.transferRate)
- vt.refBasket.add(basket)
- vt.rtStats.AddStats(&rtStats)
- }
- vt.refBasket.normalize()
- vt.refBasket.updateReqValues()
- for _, nv := range vt.connected {
- nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
- }
- vt.saveToDb()
- }
- type RequestStatsItem struct {
- Name string
- ReqAmount, ReqValue float64
- }
- // RequestStats returns the current contents of the reference request basket, with
- // request values meaning average per request rather than total.
- func (vt *ValueTracker) RequestStats() []RequestStatsItem {
- vt.statsExpLock.RLock()
- expFactor := vt.statsExpFactor
- vt.statsExpLock.RUnlock()
- vt.lock.Lock()
- defer vt.lock.Unlock()
- vt.periodicUpdate()
- res := make([]RequestStatsItem, len(vt.refBasket.basket.items))
- for i, item := range vt.refBasket.basket.items {
- res[i].Name = vt.mappings[vt.currentMapping][i]
- res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp)
- res[i].ReqValue = vt.refBasket.reqValues[i]
- }
- return res
- }
|