balance_tracker.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "sync"
  19. "time"
  20. "github.com/ethereum/go-ethereum/common/mclock"
  21. "github.com/ethereum/go-ethereum/ethdb"
  22. "github.com/ethereum/go-ethereum/les/utils"
  23. "github.com/ethereum/go-ethereum/p2p/enode"
  24. "github.com/ethereum/go-ethereum/p2p/enr"
  25. "github.com/ethereum/go-ethereum/p2p/nodestate"
  26. )
  27. const (
  28. posThreshold = 1000000 // minimum positive balance that is persisted in the database
  29. negThreshold = 1000000 // minimum negative balance that is persisted in the database
  30. persistExpirationRefresh = time.Minute * 5 // refresh period of the token expiration persistence
  31. )
  32. // balanceTracker tracks positive and negative balances for connected nodes.
  33. // After clientField is set externally, a nodeBalance is created and previous
  34. // balance values are loaded from the database. Both balances are exponentially expired
  35. // values. Costs are deducted from the positive balance if present, otherwise added to
  36. // the negative balance. If the capacity is non-zero then a time cost is applied
  37. // continuously while individual request costs are applied immediately.
  38. // The two balances are translated into a single priority value that also depends
  39. // on the actual capacity.
  40. type balanceTracker struct {
  41. setup *serverSetup
  42. clock mclock.Clock
  43. lock sync.Mutex
  44. ns *nodestate.NodeStateMachine
  45. ndb *nodeDB
  46. posExp, negExp utils.ValueExpirer
  47. posExpTC, negExpTC uint64
  48. defaultPosFactors, defaultNegFactors PriceFactors
  49. active, inactive utils.ExpiredValue
  50. balanceTimer *utils.UpdateTimer
  51. quit chan struct{}
  52. }
  53. // newBalanceTracker creates a new balanceTracker
  54. func newBalanceTracker(ns *nodestate.NodeStateMachine, setup *serverSetup, db ethdb.KeyValueStore, clock mclock.Clock, posExp, negExp utils.ValueExpirer) *balanceTracker {
  55. ndb := newNodeDB(db, clock)
  56. bt := &balanceTracker{
  57. ns: ns,
  58. setup: setup,
  59. ndb: ndb,
  60. clock: clock,
  61. posExp: posExp,
  62. negExp: negExp,
  63. balanceTimer: utils.NewUpdateTimer(clock, time.Second*10),
  64. quit: make(chan struct{}),
  65. }
  66. posOffset, negOffset := bt.ndb.getExpiration()
  67. posExp.SetLogOffset(clock.Now(), posOffset)
  68. negExp.SetLogOffset(clock.Now(), negOffset)
  69. // Load all persisted balance entries of priority nodes,
  70. // calculate the total number of issued service tokens.
  71. bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
  72. bt.inactive.AddExp(balance)
  73. return true
  74. })
  75. ns.SubscribeField(bt.setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  76. n, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
  77. if n == nil {
  78. return
  79. }
  80. ov, _ := oldValue.(uint64)
  81. nv, _ := newValue.(uint64)
  82. if ov == 0 && nv != 0 {
  83. n.activate()
  84. }
  85. if nv != 0 {
  86. n.setCapacity(nv)
  87. }
  88. if ov != 0 && nv == 0 {
  89. n.deactivate()
  90. }
  91. })
  92. ns.SubscribeField(bt.setup.clientField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  93. type peer interface {
  94. FreeClientId() string
  95. }
  96. if newValue != nil {
  97. n := bt.newNodeBalance(node, newValue.(peer).FreeClientId(), true)
  98. bt.lock.Lock()
  99. n.SetPriceFactors(bt.defaultPosFactors, bt.defaultNegFactors)
  100. bt.lock.Unlock()
  101. ns.SetFieldSub(node, bt.setup.balanceField, n)
  102. } else {
  103. ns.SetStateSub(node, nodestate.Flags{}, bt.setup.priorityFlag, 0)
  104. if b, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance); b != nil {
  105. b.deactivate()
  106. }
  107. ns.SetFieldSub(node, bt.setup.balanceField, nil)
  108. }
  109. })
  110. // The positive and negative balances of clients are stored in database
  111. // and both of these decay exponentially over time. Delete them if the
  112. // value is small enough.
  113. bt.ndb.evictCallBack = bt.canDropBalance
  114. go func() {
  115. for {
  116. select {
  117. case <-clock.After(persistExpirationRefresh):
  118. now := clock.Now()
  119. bt.ndb.setExpiration(posExp.LogOffset(now), negExp.LogOffset(now))
  120. case <-bt.quit:
  121. return
  122. }
  123. }
  124. }()
  125. return bt
  126. }
  127. // Stop saves expiration offset and unsaved node balances and shuts balanceTracker down
  128. func (bt *balanceTracker) stop() {
  129. now := bt.clock.Now()
  130. bt.ndb.setExpiration(bt.posExp.LogOffset(now), bt.negExp.LogOffset(now))
  131. close(bt.quit)
  132. bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  133. if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok {
  134. n.lock.Lock()
  135. n.storeBalance(true, true)
  136. n.lock.Unlock()
  137. bt.ns.SetField(node, bt.setup.balanceField, nil)
  138. }
  139. })
  140. bt.ndb.close()
  141. }
  142. // TotalTokenAmount returns the current total amount of service tokens in existence
  143. func (bt *balanceTracker) TotalTokenAmount() uint64 {
  144. bt.lock.Lock()
  145. defer bt.lock.Unlock()
  146. bt.balanceTimer.Update(func(_ time.Duration) bool {
  147. bt.active = utils.ExpiredValue{}
  148. bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  149. if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok && n.active {
  150. pos, _ := n.GetRawBalance()
  151. bt.active.AddExp(pos)
  152. }
  153. })
  154. return true
  155. })
  156. total := bt.active
  157. total.AddExp(bt.inactive)
  158. return total.Value(bt.posExp.LogOffset(bt.clock.Now()))
  159. }
  160. // GetPosBalanceIDs lists node IDs with an associated positive balance
  161. func (bt *balanceTracker) GetPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) {
  162. return bt.ndb.getPosBalanceIDs(start, stop, maxCount)
  163. }
  164. // SetDefaultFactors sets the default price factors applied to subsequently connected clients
  165. func (bt *balanceTracker) SetDefaultFactors(posFactors, negFactors PriceFactors) {
  166. bt.lock.Lock()
  167. bt.defaultPosFactors = posFactors
  168. bt.defaultNegFactors = negFactors
  169. bt.lock.Unlock()
  170. }
  171. // SetExpirationTCs sets positive and negative token expiration time constants.
  172. // Specified in seconds, 0 means infinite (no expiration).
  173. func (bt *balanceTracker) SetExpirationTCs(pos, neg uint64) {
  174. bt.lock.Lock()
  175. defer bt.lock.Unlock()
  176. bt.posExpTC, bt.negExpTC = pos, neg
  177. now := bt.clock.Now()
  178. if pos > 0 {
  179. bt.posExp.SetRate(now, 1/float64(pos*uint64(time.Second)))
  180. } else {
  181. bt.posExp.SetRate(now, 0)
  182. }
  183. if neg > 0 {
  184. bt.negExp.SetRate(now, 1/float64(neg*uint64(time.Second)))
  185. } else {
  186. bt.negExp.SetRate(now, 0)
  187. }
  188. }
  189. // GetExpirationTCs returns the current positive and negative token expiration
  190. // time constants
  191. func (bt *balanceTracker) GetExpirationTCs() (pos, neg uint64) {
  192. bt.lock.Lock()
  193. defer bt.lock.Unlock()
  194. return bt.posExpTC, bt.negExpTC
  195. }
  196. // BalanceOperation allows atomic operations on the balance of a node regardless of whether
  197. // it is currently connected or not
  198. func (bt *balanceTracker) BalanceOperation(id enode.ID, connAddress string, cb func(AtomicBalanceOperator)) {
  199. bt.ns.Operation(func() {
  200. var nb *nodeBalance
  201. if node := bt.ns.GetNode(id); node != nil {
  202. nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
  203. } else {
  204. node = enode.SignNull(&enr.Record{}, id)
  205. nb = bt.newNodeBalance(node, connAddress, false)
  206. }
  207. cb(nb)
  208. })
  209. }
  210. // newNodeBalance loads balances from the database and creates a nodeBalance instance
  211. // for the given node. It also sets the priorityFlag and adds balanceCallbackZero if
  212. // the node has a positive balance.
  213. // Note: this function should run inside a NodeStateMachine operation
  214. func (bt *balanceTracker) newNodeBalance(node *enode.Node, connAddress string, setFlags bool) *nodeBalance {
  215. pb := bt.ndb.getOrNewBalance(node.ID().Bytes(), false)
  216. nb := bt.ndb.getOrNewBalance([]byte(connAddress), true)
  217. n := &nodeBalance{
  218. bt: bt,
  219. node: node,
  220. setFlags: setFlags,
  221. connAddress: connAddress,
  222. balance: balance{pos: pb, neg: nb, posExp: bt.posExp, negExp: bt.negExp},
  223. initTime: bt.clock.Now(),
  224. lastUpdate: bt.clock.Now(),
  225. }
  226. for i := range n.callbackIndex {
  227. n.callbackIndex[i] = -1
  228. }
  229. if setFlags && n.checkPriorityStatus() {
  230. n.bt.ns.SetStateSub(n.node, n.bt.setup.priorityFlag, nodestate.Flags{}, 0)
  231. }
  232. return n
  233. }
  234. // storeBalance stores either a positive or a negative balance in the database
  235. func (bt *balanceTracker) storeBalance(id []byte, neg bool, value utils.ExpiredValue) {
  236. if bt.canDropBalance(bt.clock.Now(), neg, value) {
  237. bt.ndb.delBalance(id, neg) // balance is small enough, drop it directly.
  238. } else {
  239. bt.ndb.setBalance(id, neg, value)
  240. }
  241. }
  242. // canDropBalance tells whether a positive or negative balance is below the threshold
  243. // and therefore can be dropped from the database
  244. func (bt *balanceTracker) canDropBalance(now mclock.AbsTime, neg bool, b utils.ExpiredValue) bool {
  245. if neg {
  246. return b.Value(bt.negExp.LogOffset(now)) <= negThreshold
  247. }
  248. return b.Value(bt.posExp.LogOffset(now)) <= posThreshold
  249. }
  250. // updateTotalBalance adjusts the total balance after executing given callback.
  251. func (bt *balanceTracker) updateTotalBalance(n *nodeBalance, callback func() bool) {
  252. bt.lock.Lock()
  253. defer bt.lock.Unlock()
  254. n.lock.Lock()
  255. defer n.lock.Unlock()
  256. original, active := n.balance.pos, n.active
  257. if !callback() {
  258. return
  259. }
  260. if active {
  261. bt.active.SubExp(original)
  262. } else {
  263. bt.inactive.SubExp(original)
  264. }
  265. if n.active {
  266. bt.active.AddExp(n.balance.pos)
  267. } else {
  268. bt.inactive.AddExp(n.balance.pos)
  269. }
  270. }