server.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "time"
  20. "github.com/ethereum/go-ethereum/common/mclock"
  21. "github.com/ethereum/go-ethereum/core"
  22. "github.com/ethereum/go-ethereum/eth/ethconfig"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/les/flowcontrol"
  25. vfs "github.com/ethereum/go-ethereum/les/vflux/server"
  26. "github.com/ethereum/go-ethereum/light"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/node"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/enr"
  32. "github.com/ethereum/go-ethereum/params"
  33. "github.com/ethereum/go-ethereum/rpc"
  34. )
  35. var (
  36. defaultPosFactors = vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}
  37. defaultNegFactors = vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}
  38. )
  39. const defaultConnectedBias = time.Minute * 3
  40. type ethBackend interface {
  41. ArchiveMode() bool
  42. BlockChain() *core.BlockChain
  43. BloomIndexer() *core.ChainIndexer
  44. ChainDb() ethdb.Database
  45. Synced() bool
  46. TxPool() *core.TxPool
  47. }
  48. type LesServer struct {
  49. lesCommons
  50. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  51. handler *serverHandler
  52. peers *clientPeerSet
  53. serverset *serverSet
  54. vfluxServer *vfs.Server
  55. privateKey *ecdsa.PrivateKey
  56. // Flow control and capacity management
  57. fcManager *flowcontrol.ClientManager
  58. costTracker *costTracker
  59. defParams flowcontrol.ServerParams
  60. servingQueue *servingQueue
  61. clientPool *vfs.ClientPool
  62. minCapacity, maxCapacity uint64
  63. threadsIdle int // Request serving threads count when system is idle.
  64. threadsBusy int // Request serving threads count when system is busy(block insertion).
  65. p2pSrv *p2p.Server
  66. }
  67. func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*LesServer, error) {
  68. lesDb, err := node.OpenDatabase("les.server", 0, 0, "eth/db/lesserver/", false)
  69. if err != nil {
  70. return nil, err
  71. }
  72. // Calculate the number of threads used to service the light client
  73. // requests based on the user-specified value.
  74. threads := config.LightServ * 4 / 100
  75. if threads < 4 {
  76. threads = 4
  77. }
  78. srv := &LesServer{
  79. lesCommons: lesCommons{
  80. genesis: e.BlockChain().Genesis().Hash(),
  81. config: config,
  82. chainConfig: e.BlockChain().Config(),
  83. iConfig: light.DefaultServerIndexerConfig,
  84. chainDb: e.ChainDb(),
  85. lesDb: lesDb,
  86. chainReader: e.BlockChain(),
  87. chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
  88. bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
  89. closeCh: make(chan struct{}),
  90. },
  91. archiveMode: e.ArchiveMode(),
  92. peers: newClientPeerSet(),
  93. serverset: newServerSet(),
  94. vfluxServer: vfs.NewServer(time.Millisecond * 10),
  95. fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
  96. servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
  97. threadsBusy: config.LightServ/100 + 1,
  98. threadsIdle: threads,
  99. p2pSrv: node.Server(),
  100. }
  101. issync := e.Synced
  102. if config.LightNoSyncServe {
  103. issync = func() bool { return true }
  104. }
  105. srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), issync)
  106. srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
  107. srv.oracle = srv.setupOracle(node, e.BlockChain().Genesis().Hash(), config)
  108. // Initialize the bloom trie indexer.
  109. e.BloomIndexer().AddChildIndexer(srv.bloomTrieIndexer)
  110. // Initialize server capacity management fields.
  111. srv.defParams = flowcontrol.ServerParams{
  112. BufLimit: srv.minCapacity * bufLimitRatio,
  113. MinRecharge: srv.minCapacity,
  114. }
  115. // LES flow control tries to more or less guarantee the possibility for the
  116. // clients to send a certain amount of requests at any time and get a quick
  117. // response. Most of the clients want this guarantee but don't actually need
  118. // to send requests most of the time. Our goal is to serve as many clients as
  119. // possible while the actually used server capacity does not exceed the limits
  120. totalRecharge := srv.costTracker.totalRecharge()
  121. srv.maxCapacity = srv.minCapacity * uint64(srv.config.LightPeers)
  122. if totalRecharge > srv.maxCapacity {
  123. srv.maxCapacity = totalRecharge
  124. }
  125. srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
  126. srv.clientPool = vfs.NewClientPool(lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, issync)
  127. srv.clientPool.Start()
  128. srv.clientPool.SetDefaultFactors(defaultPosFactors, defaultNegFactors)
  129. srv.vfluxServer.Register(srv.clientPool, "les", "Ethereum light client service")
  130. checkpoint := srv.latestLocalCheckpoint()
  131. if !checkpoint.Empty() {
  132. log.Info("Loaded latest checkpoint", "section", checkpoint.SectionIndex, "head", checkpoint.SectionHead,
  133. "chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
  134. }
  135. srv.chtIndexer.Start(e.BlockChain())
  136. node.RegisterProtocols(srv.Protocols())
  137. node.RegisterAPIs(srv.APIs())
  138. node.RegisterLifecycle(srv)
  139. return srv, nil
  140. }
  141. func (s *LesServer) APIs() []rpc.API {
  142. return []rpc.API{
  143. {
  144. Namespace: "les",
  145. Version: "1.0",
  146. Service: NewPrivateLightAPI(&s.lesCommons),
  147. Public: false,
  148. },
  149. {
  150. Namespace: "les",
  151. Version: "1.0",
  152. Service: NewPrivateLightServerAPI(s),
  153. Public: false,
  154. },
  155. {
  156. Namespace: "debug",
  157. Version: "1.0",
  158. Service: NewPrivateDebugAPI(s),
  159. Public: false,
  160. },
  161. }
  162. }
  163. func (s *LesServer) Protocols() []p2p.Protocol {
  164. ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
  165. if p := s.peers.peer(id); p != nil {
  166. return p.Info()
  167. }
  168. return nil
  169. }, nil)
  170. // Add "les" ENR entries.
  171. for i := range ps {
  172. ps[i].Attributes = []enr.Entry{&lesEntry{
  173. VfxVersion: 1,
  174. }}
  175. }
  176. return ps
  177. }
  178. // Start starts the LES server
  179. func (s *LesServer) Start() error {
  180. s.privateKey = s.p2pSrv.PrivateKey
  181. s.peers.setSignerKey(s.privateKey)
  182. s.handler.start()
  183. s.wg.Add(1)
  184. go s.capacityManagement()
  185. if s.p2pSrv.DiscV5 != nil {
  186. s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
  187. }
  188. return nil
  189. }
  190. // Stop stops the LES service
  191. func (s *LesServer) Stop() error {
  192. close(s.closeCh)
  193. s.clientPool.Stop()
  194. if s.serverset != nil {
  195. s.serverset.close()
  196. }
  197. s.peers.close()
  198. s.fcManager.Stop()
  199. s.costTracker.stop()
  200. s.handler.stop()
  201. s.servingQueue.stop()
  202. if s.vfluxServer != nil {
  203. s.vfluxServer.Stop()
  204. }
  205. // Note, bloom trie indexer is closed by parent bloombits indexer.
  206. if s.chtIndexer != nil {
  207. s.chtIndexer.Close()
  208. }
  209. if s.lesDb != nil {
  210. s.lesDb.Close()
  211. }
  212. s.wg.Wait()
  213. log.Info("Les server stopped")
  214. return nil
  215. }
  216. // capacityManagement starts an event handler loop that updates the recharge curve of
  217. // the client manager and adjusts the client pool's size according to the total
  218. // capacity updates coming from the client manager
  219. func (s *LesServer) capacityManagement() {
  220. defer s.wg.Done()
  221. processCh := make(chan bool, 100)
  222. sub := s.handler.blockchain.SubscribeBlockProcessingEvent(processCh)
  223. defer sub.Unsubscribe()
  224. totalRechargeCh := make(chan uint64, 100)
  225. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  226. totalCapacityCh := make(chan uint64, 100)
  227. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  228. s.clientPool.SetLimits(uint64(s.config.LightPeers), totalCapacity)
  229. var (
  230. busy bool
  231. freePeers uint64
  232. blockProcess mclock.AbsTime
  233. )
  234. updateRecharge := func() {
  235. if busy {
  236. s.servingQueue.setThreads(s.threadsBusy)
  237. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  238. } else {
  239. s.servingQueue.setThreads(s.threadsIdle)
  240. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}})
  241. }
  242. }
  243. updateRecharge()
  244. for {
  245. select {
  246. case busy = <-processCh:
  247. if busy {
  248. blockProcess = mclock.Now()
  249. } else {
  250. blockProcessingTimer.Update(time.Duration(mclock.Now() - blockProcess))
  251. }
  252. updateRecharge()
  253. case totalRecharge = <-totalRechargeCh:
  254. totalRechargeGauge.Update(int64(totalRecharge))
  255. updateRecharge()
  256. case totalCapacity = <-totalCapacityCh:
  257. totalCapacityGauge.Update(int64(totalCapacity))
  258. newFreePeers := totalCapacity / s.minCapacity
  259. if newFreePeers < freePeers && newFreePeers < uint64(s.config.LightPeers) {
  260. log.Warn("Reduced free peer connections", "from", freePeers, "to", newFreePeers)
  261. }
  262. freePeers = newFreePeers
  263. s.clientPool.SetLimits(uint64(s.config.LightPeers), totalCapacity)
  264. case <-s.closeCh:
  265. return
  266. }
  267. }
  268. }