clientpool.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "errors"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/ethdb"
  23. "github.com/ethereum/go-ethereum/les/utils"
  24. "github.com/ethereum/go-ethereum/les/vflux"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/p2p/nodestate"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. var (
  31. ErrNotConnected = errors.New("client not connected")
  32. ErrNoPriority = errors.New("priority too low to raise capacity")
  33. ErrCantFindMaximum = errors.New("Unable to find maximum allowed capacity")
  34. )
  35. // ClientPool implements a client database that assigns a priority to each client
  36. // based on a positive and negative balance. Positive balance is externally assigned
  37. // to prioritized clients and is decreased with connection time and processed
  38. // requests (unless the price factors are zero). If the positive balance is zero
  39. // then negative balance is accumulated.
  40. //
  41. // Balance tracking and priority calculation for connected clients is done by
  42. // balanceTracker. PriorityQueue ensures that clients with the lowest positive or
  43. // highest negative balance get evicted when the total capacity allowance is full
  44. // and new clients with a better balance want to connect.
  45. //
  46. // Already connected nodes receive a small bias in their favor in order to avoid
  47. // accepting and instantly kicking out clients. In theory, we try to ensure that
  48. // each client can have several minutes of connection time.
  49. //
  50. // Balances of disconnected clients are stored in nodeDB including positive balance
  51. // and negative banalce. Boeth positive balance and negative balance will decrease
  52. // exponentially. If the balance is low enough, then the record will be dropped.
  53. type ClientPool struct {
  54. *priorityPool
  55. *balanceTracker
  56. setup *serverSetup
  57. clock mclock.Clock
  58. closed bool
  59. ns *nodestate.NodeStateMachine
  60. synced func() bool
  61. lock sync.RWMutex
  62. connectedBias time.Duration
  63. minCap uint64 // the minimal capacity value allowed for any client
  64. capReqNode *enode.Node // node that is requesting capacity change; only used inside NSM operation
  65. }
  66. // clientPeer represents a peer in the client pool. None of the callbacks should block.
  67. type clientPeer interface {
  68. Node() *enode.Node
  69. FreeClientId() string // unique id for non-priority clients (typically a prefix of the network address)
  70. InactiveAllowance() time.Duration // disconnection timeout for inactive non-priority peers
  71. UpdateCapacity(newCap uint64, requested bool) // signals a capacity update (requested is true if it is a result of a SetCapacity call on the given peer
  72. Disconnect() // initiates disconnection (Unregister should always be called)
  73. }
  74. // NewClientPool creates a new client pool
  75. func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias time.Duration, clock mclock.Clock, synced func() bool) *ClientPool {
  76. setup := newServerSetup()
  77. ns := nodestate.NewNodeStateMachine(nil, nil, clock, setup.setup)
  78. cp := &ClientPool{
  79. priorityPool: newPriorityPool(ns, setup, clock, minCap, connectedBias, 4, 100),
  80. balanceTracker: newBalanceTracker(ns, setup, balanceDb, clock, &utils.Expirer{}, &utils.Expirer{}),
  81. setup: setup,
  82. ns: ns,
  83. clock: clock,
  84. minCap: minCap,
  85. connectedBias: connectedBias,
  86. synced: synced,
  87. }
  88. ns.SubscribeState(nodestate.MergeFlags(setup.activeFlag, setup.inactiveFlag, setup.priorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
  89. if newState.Equals(setup.inactiveFlag) {
  90. // set timeout for non-priority inactive client
  91. var timeout time.Duration
  92. if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
  93. timeout = c.InactiveAllowance()
  94. }
  95. ns.AddTimeout(node, setup.inactiveFlag, timeout)
  96. }
  97. if oldState.Equals(setup.inactiveFlag) && newState.Equals(setup.inactiveFlag.Or(setup.priorityFlag)) {
  98. ns.SetStateSub(node, setup.inactiveFlag, nodestate.Flags{}, 0) // priority gained; remove timeout
  99. }
  100. if newState.Equals(setup.activeFlag) {
  101. // active with no priority; limit capacity to minCap
  102. cap, _ := ns.GetField(node, setup.capacityField).(uint64)
  103. if cap > minCap {
  104. cp.requestCapacity(node, minCap, minCap, 0)
  105. }
  106. }
  107. if newState.Equals(nodestate.Flags{}) {
  108. if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
  109. c.Disconnect()
  110. }
  111. }
  112. })
  113. ns.SubscribeField(setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  114. if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
  115. newCap, _ := newValue.(uint64)
  116. c.UpdateCapacity(newCap, node == cp.capReqNode)
  117. }
  118. })
  119. // add metrics
  120. cp.ns.SubscribeState(nodestate.MergeFlags(cp.setup.activeFlag, cp.setup.inactiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
  121. if oldState.IsEmpty() && !newState.IsEmpty() {
  122. clientConnectedMeter.Mark(1)
  123. }
  124. if !oldState.IsEmpty() && newState.IsEmpty() {
  125. clientDisconnectedMeter.Mark(1)
  126. }
  127. if oldState.HasNone(cp.setup.activeFlag) && oldState.HasAll(cp.setup.activeFlag) {
  128. clientActivatedMeter.Mark(1)
  129. }
  130. if oldState.HasAll(cp.setup.activeFlag) && oldState.HasNone(cp.setup.activeFlag) {
  131. clientDeactivatedMeter.Mark(1)
  132. }
  133. _, connected := cp.Active()
  134. totalConnectedGauge.Update(int64(connected))
  135. })
  136. return cp
  137. }
  138. // Start starts the client pool. Should be called before Register/Unregister.
  139. func (cp *ClientPool) Start() {
  140. cp.ns.Start()
  141. }
  142. // Stop shuts the client pool down. The clientPeer interface callbacks will not be called
  143. // after Stop. Register calls will return nil.
  144. func (cp *ClientPool) Stop() {
  145. cp.balanceTracker.stop()
  146. cp.ns.Stop()
  147. }
  148. // Register registers the peer into the client pool. If the peer has insufficient
  149. // priority and remains inactive for longer than the allowed timeout then it will be
  150. // disconnected by calling the Disconnect function of the clientPeer interface.
  151. func (cp *ClientPool) Register(peer clientPeer) ConnectedBalance {
  152. cp.ns.SetField(peer.Node(), cp.setup.clientField, peerWrapper{peer})
  153. balance, _ := cp.ns.GetField(peer.Node(), cp.setup.balanceField).(*nodeBalance)
  154. return balance
  155. }
  156. // Unregister removes the peer from the client pool
  157. func (cp *ClientPool) Unregister(peer clientPeer) {
  158. cp.ns.SetField(peer.Node(), cp.setup.clientField, nil)
  159. }
  160. // setConnectedBias sets the connection bias, which is applied to already connected clients
  161. // So that already connected client won't be kicked out very soon and we can ensure all
  162. // connected clients can have enough time to request or sync some data.
  163. func (cp *ClientPool) SetConnectedBias(bias time.Duration) {
  164. cp.lock.Lock()
  165. cp.connectedBias = bias
  166. cp.setActiveBias(bias)
  167. cp.lock.Unlock()
  168. }
  169. // SetCapacity sets the assigned capacity of a connected client
  170. func (cp *ClientPool) SetCapacity(node *enode.Node, reqCap uint64, bias time.Duration, requested bool) (capacity uint64, err error) {
  171. cp.lock.RLock()
  172. if cp.connectedBias > bias {
  173. bias = cp.connectedBias
  174. }
  175. cp.lock.RUnlock()
  176. cp.ns.Operation(func() {
  177. balance, _ := cp.ns.GetField(node, cp.setup.balanceField).(*nodeBalance)
  178. if balance == nil {
  179. err = ErrNotConnected
  180. return
  181. }
  182. capacity, _ = cp.ns.GetField(node, cp.setup.capacityField).(uint64)
  183. if capacity == 0 {
  184. // if the client is inactive then it has insufficient priority for the minimal capacity
  185. // (will be activated automatically with minCap when possible)
  186. return
  187. }
  188. if reqCap < cp.minCap {
  189. // can't request less than minCap; switching between 0 (inactive state) and minCap is
  190. // performed by the server automatically as soon as necessary/possible
  191. reqCap = cp.minCap
  192. }
  193. if reqCap > cp.minCap && cp.ns.GetState(node).HasNone(cp.setup.priorityFlag) {
  194. err = ErrNoPriority
  195. return
  196. }
  197. if reqCap == capacity {
  198. return
  199. }
  200. if requested {
  201. // mark the requested node so that the UpdateCapacity callback can signal
  202. // whether the update is the direct result of a SetCapacity call on the given node
  203. cp.capReqNode = node
  204. defer func() {
  205. cp.capReqNode = nil
  206. }()
  207. }
  208. var minTarget, maxTarget uint64
  209. if reqCap > capacity {
  210. // Estimate maximum available capacity at the current priority level and request
  211. // the estimated amount.
  212. // Note: requestCapacity could find the highest available capacity between the
  213. // current and the requested capacity but it could cost a lot of iterations with
  214. // fine step adjustment if the requested capacity is very high. By doing a quick
  215. // estimation of the maximum available capacity based on the capacity curve we
  216. // can limit the number of required iterations.
  217. curve := cp.getCapacityCurve().exclude(node.ID())
  218. maxTarget = curve.maxCapacity(func(capacity uint64) int64 {
  219. return balance.estimatePriority(capacity, 0, 0, bias, false)
  220. })
  221. if maxTarget < reqCap {
  222. return
  223. }
  224. maxTarget = reqCap
  225. // Specify a narrow target range that allows a limited number of fine step
  226. // iterations
  227. minTarget = maxTarget - maxTarget/20
  228. if minTarget < capacity {
  229. minTarget = capacity
  230. }
  231. } else {
  232. minTarget, maxTarget = reqCap, reqCap
  233. }
  234. if newCap := cp.requestCapacity(node, minTarget, maxTarget, bias); newCap >= minTarget && newCap <= maxTarget {
  235. capacity = newCap
  236. return
  237. }
  238. // we should be able to find the maximum allowed capacity in a few iterations
  239. log.Error("Unable to find maximum allowed capacity")
  240. err = ErrCantFindMaximum
  241. })
  242. return
  243. }
  244. // serveCapQuery serves a vflux capacity query. It receives multiple token amount values
  245. // and a bias time value. For each given token amount it calculates the maximum achievable
  246. // capacity in case the amount is added to the balance.
  247. func (cp *ClientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
  248. var req vflux.CapacityQueryReq
  249. if rlp.DecodeBytes(data, &req) != nil {
  250. return nil
  251. }
  252. if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
  253. return nil
  254. }
  255. result := make(vflux.CapacityQueryReply, len(req.AddTokens))
  256. if !cp.synced() {
  257. capacityQueryZeroMeter.Mark(1)
  258. reply, _ := rlp.EncodeToBytes(&result)
  259. return reply
  260. }
  261. bias := time.Second * time.Duration(req.Bias)
  262. cp.lock.RLock()
  263. if cp.connectedBias > bias {
  264. bias = cp.connectedBias
  265. }
  266. cp.lock.RUnlock()
  267. // use capacityCurve to answer request for multiple newly bought token amounts
  268. curve := cp.getCapacityCurve().exclude(id)
  269. cp.BalanceOperation(id, freeID, func(balance AtomicBalanceOperator) {
  270. pb, _ := balance.GetBalance()
  271. for i, addTokens := range req.AddTokens {
  272. add := addTokens.Int64()
  273. result[i] = curve.maxCapacity(func(capacity uint64) int64 {
  274. return balance.estimatePriority(capacity, add, 0, bias, false) / int64(capacity)
  275. })
  276. if add <= 0 && uint64(-add) >= pb && result[i] > cp.minCap {
  277. result[i] = cp.minCap
  278. }
  279. if result[i] < cp.minCap {
  280. result[i] = 0
  281. }
  282. }
  283. })
  284. // add first result to metrics (don't care about priority client multi-queries yet)
  285. if result[0] == 0 {
  286. capacityQueryZeroMeter.Mark(1)
  287. } else {
  288. capacityQueryNonZeroMeter.Mark(1)
  289. }
  290. reply, _ := rlp.EncodeToBytes(&result)
  291. return reply
  292. }
  293. // Handle implements Service
  294. func (cp *ClientPool) Handle(id enode.ID, address string, name string, data []byte) []byte {
  295. switch name {
  296. case vflux.CapacityQueryName:
  297. return cp.serveCapQuery(id, address, data)
  298. default:
  299. return nil
  300. }
  301. }