handler_qlight_server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package eth
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sync/atomic"
  7. "time"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core"
  10. "github.com/ethereum/go-ethereum/core/forkid"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  13. qlightproto "github.com/ethereum/go-ethereum/eth/protocols/qlight"
  14. "github.com/ethereum/go-ethereum/log"
  15. "github.com/ethereum/go-ethereum/p2p"
  16. "github.com/ethereum/go-ethereum/p2p/enode"
  17. "github.com/ethereum/go-ethereum/qlight"
  18. "github.com/ethereum/go-ethereum/rlp"
  19. "github.com/ethereum/go-ethereum/trie"
  20. )
  21. type qlightServerHandler ethHandler
  22. func (h *qlightServerHandler) Chain() *core.BlockChain { return h.chain }
  23. func (h *qlightServerHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
  24. func (h *qlightServerHandler) TxPool() eth.TxPool { return h.txpool }
  25. func (h *qlightServerHandler) RunPeer(peer *eth.Peer, handler eth.Handler) error {
  26. return nil
  27. }
  28. func (h *qlightServerHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
  29. return (*ethHandler)(h).Handle(peer, packet)
  30. }
  31. func (h *qlightServerHandler) RunQPeer(peer *qlightproto.Peer, hand qlightproto.Handler) error {
  32. return (*handler)(h).runQLightServerPeer(peer, hand)
  33. }
  34. // PeerInfo retrieves all known `eth` information about a peer.
  35. func (h *qlightServerHandler) PeerInfo(id enode.ID) interface{} {
  36. if p := h.peers.peer(id.String()); p != nil {
  37. return p.info()
  38. }
  39. return nil
  40. }
  41. // AcceptTxs retrieves whether transaction processing is enabled on the node
  42. // or if inbound transactions should simply be dropped.
  43. func (h *qlightServerHandler) AcceptTxs() bool {
  44. return atomic.LoadUint32(&h.acceptTxs) == 1
  45. }
  46. // newHandler returns a handler for all Ethereum chain management protocol.
  47. func newQLightServerHandler(config *handlerConfig) (*handler, error) {
  48. // Create the protocol manager with the base fields
  49. h := &handler{
  50. networkID: config.Network,
  51. forkFilter: forkid.NewFilter(config.Chain),
  52. eventMux: config.EventMux,
  53. database: config.Database,
  54. txpool: config.TxPool,
  55. chain: config.Chain,
  56. peers: newPeerSet(),
  57. authorizationList: config.AuthorizationList,
  58. txsyncCh: make(chan *txsync),
  59. quitSync: make(chan struct{}),
  60. raftMode: config.RaftMode,
  61. engine: config.Engine,
  62. authProvider: config.authProvider,
  63. privateBlockDataResolver: config.privateBlockDataResolver,
  64. }
  65. return h, nil
  66. }
  67. // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
  68. // various subsistems and starts handling messages.
  69. func (h *handler) runQLightServerPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error {
  70. h.peerWG.Add(1)
  71. defer h.peerWG.Done()
  72. // Execute the Ethereum handshake
  73. var (
  74. genesis = h.chain.Genesis()
  75. head = h.chain.CurrentHeader()
  76. hash = head.Hash()
  77. number = head.Number.Uint64()
  78. td = h.chain.GetTd(hash, number)
  79. )
  80. forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
  81. if err := peer.EthPeer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
  82. peer.Log().Debug("Ethereum handshake failed", "err", err)
  83. // Quorum
  84. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  85. peer.EthPeerDisconnected <- struct{}{}
  86. // End Quorum
  87. return err
  88. }
  89. log.Info("QLight attempting handshake")
  90. if err := peer.QLightHandshake(true, "", ""); err != nil {
  91. peer.Log().Debug("QLight handshake failed", "err", err)
  92. log.Info("QLight handshake failed", "err", err)
  93. // Quorum
  94. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  95. peer.EthPeerDisconnected <- struct{}{}
  96. // End Quorum
  97. return err
  98. }
  99. peer.Log().Debug("QLight handshake result for peer", "peer", peer.ID(), "server", peer.QLightServer(), "psi", peer.QLightPSI(), "token", peer.QLightToken())
  100. log.Info("QLight handshake result for peer", "peer", peer.ID(), "server", peer.QLightServer(), "psi", peer.QLightPSI(), "token", peer.QLightToken())
  101. // if we're not connected to a qlight server - disconnect the peer
  102. if peer.QLightServer() {
  103. peer.Log().Debug("QLight server connected to a server peer. Disconnecting.")
  104. // Quorum
  105. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  106. peer.EthPeerDisconnected <- struct{}{}
  107. // End Quorum
  108. return fmt.Errorf("connected to a server peer")
  109. }
  110. // Ignore maxPeers if this is a trusted peer
  111. if !peer.Peer.Info().Network.Trusted {
  112. if h.peers.len() >= h.maxPeers {
  113. return p2p.DiscTooManyPeers
  114. }
  115. }
  116. peer.Log().Debug("Ethereum peer connected", "name", peer.Name())
  117. err := h.authProvider.Authorize(peer.QLightToken(), peer.QLightPSI())
  118. if err != nil {
  119. peer.Log().Error("Auth error", "err", err)
  120. return p2p.DiscAuthError
  121. }
  122. // Register the peer locally
  123. if err := h.peers.registerQPeer(peer); err != nil {
  124. peer.Log().Error("Ethereum peer registration failed", "err", err)
  125. // Quorum
  126. // When the Register() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  127. peer.EthPeerDisconnected <- struct{}{}
  128. // End Quorum
  129. return err
  130. }
  131. defer h.removeQLightServerPeer(peer.ID())
  132. // start periodic auth checks
  133. peer.QLightPeriodicAuthFunc = func() error { return h.authProvider.Authorize(peer.QLightToken(), peer.QLightPSI()) }
  134. go peer.PeriodicAuthCheck()
  135. p := h.peers.peer(peer.ID())
  136. if p == nil {
  137. return errors.New("peer dropped during handling")
  138. }
  139. // Propagate existing transactions. new transactions appearing
  140. // after this will be sent via broadcasts.
  141. h.syncTransactions(peer.EthPeer)
  142. // Quorum notify other subprotocols that the eth peer is ready, and has been added to the peerset.
  143. p.EthPeerRegistered <- struct{}{}
  144. // Quorum
  145. // Handle incoming messages until the connection is torn down
  146. return handler(peer)
  147. }
  148. func (h *handler) StartQLightServer(maxPeers int) {
  149. h.maxPeers = maxPeers
  150. h.wg.Add(1)
  151. h.txsCh = make(chan core.NewTxsEvent, txChanSize)
  152. h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
  153. go h.txBroadcastLoop()
  154. // broadcast mined blocks
  155. h.wg.Add(1)
  156. go h.newBlockBroadcastLoop()
  157. h.authProvider.Initialize()
  158. }
  159. func (h *handler) StopQLightServer() {
  160. h.txsSub.Unsubscribe()
  161. close(h.quitSync)
  162. h.wg.Wait()
  163. // Disconnect existing sessions.
  164. // This also closes the gate for any new registrations on the peer set.
  165. // sessions which are already established but not added to h.peers yet
  166. // will exit when they try to register.
  167. h.peers.close()
  168. h.peerWG.Wait()
  169. log.Info("QLight server protocol stopped")
  170. }
  171. func (h *handler) newBlockBroadcastLoop() {
  172. defer h.wg.Done()
  173. headCh := make(chan core.ChainHeadEvent, 10)
  174. headSub := h.chain.SubscribeChainHeadEvent(headCh)
  175. defer headSub.Unsubscribe()
  176. for {
  177. select {
  178. case ev := <-headCh:
  179. log.Debug("Announcing block to peers", "number", ev.Block.Number(), "hash", ev.Block.Hash(), "td", ev.Block.Difficulty())
  180. h.BroadcastBlockQLServer(ev.Block)
  181. case <-h.quitSync:
  182. return
  183. }
  184. }
  185. }
  186. func (h *handler) BroadcastBlockQLServer(block *types.Block) {
  187. hash := block.Hash()
  188. peers := h.peers.qlightPeersWithoutBlock(hash)
  189. // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
  190. var td *big.Int
  191. if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
  192. td = new(big.Int).Add(block.Difficulty(), h.chain.GetTd(block.ParentHash(), block.NumberU64()-1))
  193. } else {
  194. log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
  195. return
  196. }
  197. // Send the block to a subset of our peers
  198. for _, peer := range peers {
  199. log.Info("Preparing new block private data")
  200. blockPrivateData, err := h.privateBlockDataResolver.PrepareBlockPrivateData(block, peer.qlight.QLightPSI())
  201. if err != nil {
  202. log.Error("Unable to prepare private data for block", "number", block.Number(), "hash", hash, "err", err, "psi", peer.qlight.QLightPSI())
  203. return
  204. }
  205. log.Info("Private transactions data", "is nil", blockPrivateData == nil)
  206. peer.qlight.AsyncSendNewBlock(block, td, blockPrivateData)
  207. }
  208. log.Trace("Propagated block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  209. }
  210. // removePeer unregisters a peer from the downloader and fetchers, removes it from
  211. // the set of tracked peers and closes the network connection to it.
  212. func (h *handler) removeQLightServerPeer(id string) {
  213. // Create a custom logger to avoid printing the entire id
  214. var logger log.Logger
  215. if len(id) < 16 {
  216. // Tests use short IDs, don't choke on them
  217. logger = log.New("peer", id)
  218. } else {
  219. logger = log.New("peer", id[:8])
  220. }
  221. // Abort if the peer does not exist
  222. peer := h.peers.peer(id)
  223. if peer == nil {
  224. logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
  225. return
  226. }
  227. // Remove the `eth` peer if it exists
  228. logger.Debug("Removing QLight server peer", "snap", peer.snapExt != nil)
  229. if err := h.peers.unregisterPeer(id); err != nil {
  230. logger.Error("Ethereum peer removal failed", "err", err)
  231. }
  232. // Hard disconnect at the networking layer
  233. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  234. }
  235. func (ps *peerSet) qlightPeersWithoutBlock(hash common.Hash) []*ethPeer {
  236. ps.lock.RLock()
  237. defer ps.lock.RUnlock()
  238. list := make([]*ethPeer, 0, len(ps.peers))
  239. for _, p := range ps.peers {
  240. if !p.qlight.KnownBlock(hash) {
  241. list = append(list, p)
  242. }
  243. }
  244. return list
  245. }
  246. // Handle is invoked from a peer's message handler when it receives a new remote
  247. // message that the handler couldn't consume and serve itself.
  248. func (h *qlightServerHandler) QHandle(peer *qlightproto.Peer, packet eth.Packet) error {
  249. // Consume any broadcasts and announces, forwarding the rest to the downloader
  250. switch packet := packet.(type) {
  251. case *eth.NewPooledTransactionHashesPacket:
  252. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  253. case *eth.TransactionsPacket:
  254. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  255. case *eth.PooledTransactionsPacket:
  256. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  257. case *eth.GetBlockBodiesPacket:
  258. return h.handleGetBlockBodies(packet, peer)
  259. default:
  260. return fmt.Errorf("unexpected eth packet type: %T", packet)
  261. }
  262. }
  263. func (h *qlightServerHandler) handleGetBlockBodies(query *eth.GetBlockBodiesPacket, peer *qlightproto.Peer) error {
  264. blockPublicData, blockPrivateData, err := h.answerGetBlockBodiesQuery(query, peer)
  265. if err != nil {
  266. return err
  267. }
  268. if len(blockPrivateData) > 0 {
  269. err := peer.SendBlockPrivateData(blockPrivateData)
  270. if err != nil {
  271. log.Info("Error occurred while sending private data msg", "err", err)
  272. return err
  273. }
  274. }
  275. return peer.EthPeer.SendBlockBodiesRLP(blockPublicData)
  276. }
  277. const (
  278. // softResponseLimit is the target maximum size of replies to data retrievals.
  279. softResponseLimit = 2 * 1024 * 1024
  280. maxBodiesServe = 1024
  281. )
  282. func (h *qlightServerHandler) answerGetBlockBodiesQuery(query *eth.GetBlockBodiesPacket, peer *qlightproto.Peer) ([]rlp.RawValue, []qlight.BlockPrivateData, error) {
  283. // Gather blocks until the fetch or network limits is reached
  284. var (
  285. bytes int
  286. bodies []rlp.RawValue
  287. blockPrivateDatas []qlight.BlockPrivateData
  288. )
  289. for lookups, hash := range *query {
  290. if bytes >= softResponseLimit || len(bodies) >= maxBodiesServe ||
  291. lookups >= 2*maxBodiesServe {
  292. break
  293. }
  294. block := h.chain.GetBlockByHash(hash)
  295. if block != nil {
  296. if bpd, err := h.privateBlockDataResolver.PrepareBlockPrivateData(block, peer.QLightPSI()); err != nil {
  297. return nil, nil, fmt.Errorf("Unable to produce block private transaction data %v: %v", hash, err)
  298. } else if bpd != nil {
  299. blockPrivateDatas = append(blockPrivateDatas, *bpd)
  300. }
  301. // TODO qlight - add soft limits for block private data as well
  302. }
  303. if data := h.chain.GetBodyRLP(hash); len(data) != 0 {
  304. bodies = append(bodies, data)
  305. bytes += len(data)
  306. }
  307. }
  308. return bodies, blockPrivateDatas, nil
  309. }