handler_qlight_client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package eth
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync/atomic"
  6. "time"
  7. "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/core"
  9. "github.com/ethereum/go-ethereum/core/forkid"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/eth/downloader"
  12. "github.com/ethereum/go-ethereum/eth/fetcher"
  13. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  14. qlightproto "github.com/ethereum/go-ethereum/eth/protocols/qlight"
  15. "github.com/ethereum/go-ethereum/event"
  16. "github.com/ethereum/go-ethereum/log"
  17. "github.com/ethereum/go-ethereum/p2p"
  18. "github.com/ethereum/go-ethereum/p2p/enode"
  19. "github.com/ethereum/go-ethereum/params"
  20. "github.com/ethereum/go-ethereum/trie"
  21. )
  22. type qlightClientHandler ethHandler
  23. func (h *qlightClientHandler) Chain() *core.BlockChain { return h.chain }
  24. func (h *qlightClientHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
  25. func (h *qlightClientHandler) TxPool() eth.TxPool { return h.txpool }
  26. func (h *qlightClientHandler) RunPeer(peer *eth.Peer, handler eth.Handler) error {
  27. return nil
  28. }
  29. func (h *qlightClientHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
  30. return (*ethHandler)(h).Handle(peer, packet)
  31. }
  32. func (h *qlightClientHandler) RunQPeer(peer *qlightproto.Peer, hand qlightproto.Handler) error {
  33. return (*handler)(h).runQLightClientPeer(peer, hand)
  34. }
  35. // PeerInfo retrieves all known `eth` information about a peer.
  36. func (h *qlightClientHandler) PeerInfo(id enode.ID) interface{} {
  37. if p := h.peers.peer(id.String()); p != nil {
  38. return p.info()
  39. }
  40. return nil
  41. }
  42. // AcceptTxs retrieves whether transaction processing is enabled on the node
  43. // or if inbound transactions should simply be dropped.
  44. func (h *qlightClientHandler) AcceptTxs() bool {
  45. return atomic.LoadUint32(&h.acceptTxs) == 1
  46. }
  47. // newHandler returns a handler for all Ethereum chain management protocol.
  48. func newQLightClientHandler(config *handlerConfig) (*handler, error) {
  49. // Create the protocol manager with the base fields
  50. if config.EventMux == nil {
  51. config.EventMux = new(event.TypeMux) // Nicety initialization for tests
  52. }
  53. h := &handler{
  54. networkID: config.Network,
  55. forkFilter: forkid.NewFilter(config.Chain),
  56. eventMux: config.EventMux,
  57. database: config.Database,
  58. txpool: config.TxPool,
  59. chain: config.Chain,
  60. peers: newPeerSet(),
  61. authorizationList: config.AuthorizationList,
  62. txsyncCh: make(chan *txsync),
  63. quitSync: make(chan struct{}),
  64. raftMode: config.RaftMode,
  65. engine: config.Engine,
  66. psi: config.psi,
  67. privateClientCache: config.privateClientCache,
  68. tokenHolder: config.tokenHolder,
  69. }
  70. if config.Sync == downloader.FullSync {
  71. // The database seems empty as the current block is the genesis. Yet the fast
  72. // block is ahead, so fast sync was enabled for this node at a certain point.
  73. // The scenarios where this can happen is
  74. // * if the user manually (or via a bad block) rolled back a fast sync node
  75. // below the sync point.
  76. // * the last fast sync is not finished while user specifies a full sync this
  77. // time. But we don't have any recent state for full sync.
  78. // In these cases however it's safe to reenable fast sync.
  79. fullBlock, fastBlock := h.chain.CurrentBlock(), h.chain.CurrentFastBlock()
  80. if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
  81. h.fastSync = uint32(1)
  82. log.Warn("Switch sync mode from full sync to fast sync")
  83. }
  84. } else {
  85. if h.chain.CurrentBlock().NumberU64() > 0 {
  86. // Print warning log if database is not empty to run fast sync.
  87. log.Warn("Switch sync mode from fast sync to full sync")
  88. } else {
  89. // If fast sync was requested and our database is empty, grant it
  90. h.fastSync = uint32(1)
  91. if config.Sync == downloader.SnapSync {
  92. h.snapSync = uint32(1)
  93. }
  94. }
  95. }
  96. // If we have trusted checkpoints, enforce them on the chain
  97. if config.Checkpoint != nil {
  98. h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
  99. h.checkpointHash = config.Checkpoint.SectionHead
  100. }
  101. // Construct the downloader (long sync) and its backing state bloom if fast
  102. // sync is requested. The downloader is responsible for deallocating the state
  103. // bloom when it's done.
  104. if atomic.LoadUint32(&h.fastSync) == 1 {
  105. h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
  106. }
  107. h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
  108. // Construct the fetcher (short sync)
  109. validator := func(header *types.Header) error {
  110. return h.chain.Engine().VerifyHeader(h.chain, header, true)
  111. }
  112. heighter := func() uint64 {
  113. return h.chain.CurrentBlock().NumberU64()
  114. }
  115. inserter := func(blocks types.Blocks) (int, error) {
  116. // If sync hasn't reached the checkpoint yet, deny importing weird blocks.
  117. //
  118. // Ideally we would also compare the head block's timestamp and similarly reject
  119. // the propagated block if the head is too old. Unfortunately there is a corner
  120. // case when starting new networks, where the genesis might be ancient (0 unix)
  121. // which would prevent full nodes from accepting it.
  122. if h.chain.CurrentBlock().NumberU64() < h.checkpointNumber {
  123. log.Warn("Unsynced yet, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
  124. return 0, nil
  125. }
  126. // If fast sync is running, deny importing weird blocks. This is a problematic
  127. // clause when starting up a new network, because fast-syncing miners might not
  128. // accept each others' blocks until a restart. Unfortunately we haven't figured
  129. // out a way yet where nodes can decide unilaterally whether the network is new
  130. // or not. This should be fixed if we figure out a solution.
  131. if atomic.LoadUint32(&h.fastSync) == 1 {
  132. log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
  133. return 0, nil
  134. }
  135. n, err := h.chain.InsertChain(blocks)
  136. if err == nil {
  137. atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
  138. }
  139. return n, err
  140. }
  141. h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlockQLightClient, heighter, nil, inserter, h.removePeer)
  142. fetchTx := func(peer string, hashes []common.Hash) error {
  143. p := h.peers.peer(peer)
  144. if p == nil {
  145. return errors.New("unknown peer")
  146. }
  147. return p.RequestTxs(hashes)
  148. }
  149. h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx)
  150. h.chainSync = newChainSyncer(h)
  151. return h, nil
  152. }
  153. // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
  154. // various subsistems and starts handling messages.
  155. func (h *handler) runQLightClientPeer(peer *qlightproto.Peer, handler qlightproto.Handler) error {
  156. // If the peer has a `snap` extension, wait for it to connect so we can have
  157. // a uniform initialization/teardown mechanism
  158. snap, err := h.peers.waitSnapExtension(peer.EthPeer)
  159. if err != nil {
  160. peer.Log().Error("Snapshot extension barrier failed", "err", err)
  161. return err
  162. }
  163. // TODO(karalabe): Not sure why this is needed
  164. if !h.chainSync.handlePeerEvent(peer.EthPeer) {
  165. return p2p.DiscQuitting
  166. }
  167. h.peerWG.Add(1)
  168. defer h.peerWG.Done()
  169. // Execute the Ethereum handshake
  170. var (
  171. genesis = h.chain.Genesis()
  172. head = h.chain.CurrentHeader()
  173. hash = head.Hash()
  174. number = head.Number.Uint64()
  175. td = h.chain.GetTd(hash, number)
  176. )
  177. forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
  178. if err := peer.EthPeer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
  179. peer.Log().Debug("Ethereum handshake failed", "err", err)
  180. // Quorum
  181. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  182. peer.EthPeerDisconnected <- struct{}{}
  183. // End Quorum
  184. return err
  185. }
  186. log.Info("QLight attempting handshake")
  187. if err := peer.QLightHandshake(false, h.psi, h.tokenHolder.CurrentToken()); err != nil {
  188. peer.Log().Debug("QLight handshake failed", "err", err)
  189. log.Info("QLight handshake failed", "err", err)
  190. // Quorum
  191. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  192. peer.EthPeerDisconnected <- struct{}{}
  193. // End Quorum
  194. return err
  195. }
  196. peer.Log().Debug("QLight handshake result for peer", "peer", peer.ID(), "server", peer.QLightServer(), "psi", peer.QLightPSI(), "token", peer.QLightToken())
  197. log.Info("QLight handshake result for peer", "peer", peer.ID(), "server", peer.QLightServer(), "psi", peer.QLightPSI(), "token", peer.QLightToken())
  198. // if we're not connected to a qlight server - disconnect the peer
  199. if !peer.QLightServer() {
  200. peer.Log().Debug("QLight connected to a non server peer. Disconnecting.")
  201. // Quorum
  202. // When the Handshake() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  203. peer.EthPeerDisconnected <- struct{}{}
  204. // End Quorum
  205. return fmt.Errorf("connected to a non server peer")
  206. }
  207. reject := false // reserved peer slots
  208. if atomic.LoadUint32(&h.snapSync) == 1 {
  209. if snap == nil {
  210. // If we are running snap-sync, we want to reserve roughly half the peer
  211. // slots for peers supporting the snap protocol.
  212. // The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
  213. if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 {
  214. reject = true
  215. }
  216. }
  217. }
  218. // Ignore maxPeers if this is a trusted peer
  219. if !peer.Peer.Info().Network.Trusted {
  220. if reject || h.peers.len() >= h.maxPeers {
  221. return p2p.DiscTooManyPeers
  222. }
  223. }
  224. peer.Log().Debug("Ethereum peer connected", "name", peer.Name())
  225. // Register the peer locally
  226. if err := h.peers.registerQPeer(peer); err != nil {
  227. peer.Log().Error("Ethereum peer registration failed", "err", err)
  228. // Quorum
  229. // When the Register() returns an error, the Run method corresponding to `eth` protocol returns with the error, causing the peer to drop, signal subprotocol as well to exit the `Run` method
  230. peer.EthPeerDisconnected <- struct{}{}
  231. // End Quorum
  232. return err
  233. }
  234. defer h.removePeer(peer.ID())
  235. p := h.peers.peer(peer.ID())
  236. if p == nil {
  237. return errors.New("peer dropped during handling")
  238. }
  239. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  240. if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer.EthPeer); err != nil {
  241. peer.Log().Error("Failed to register peer in eth syncer", "err", err)
  242. return err
  243. }
  244. if snap != nil {
  245. if err := h.downloader.SnapSyncer.Register(snap); err != nil {
  246. peer.Log().Error("Failed to register peer in snap syncer", "err", err)
  247. return err
  248. }
  249. }
  250. h.chainSync.handlePeerEvent(peer.EthPeer)
  251. // Propagate existing transactions. new transactions appearing
  252. // after this will be sent via broadcasts.
  253. h.syncTransactions(peer.EthPeer)
  254. // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
  255. if h.checkpointHash != (common.Hash{}) {
  256. // Request the peer's checkpoint header for chain height/weight validation
  257. if err := peer.EthPeer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false); err != nil {
  258. return err
  259. }
  260. // Start a timer to disconnect if the peer doesn't reply in time
  261. p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() {
  262. peer.Log().Warn("Checkpoint challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
  263. h.removePeer(peer.ID())
  264. })
  265. // Make sure it's cleaned up if the peer dies off
  266. defer func() {
  267. if p.syncDrop != nil {
  268. p.syncDrop.Stop()
  269. p.syncDrop = nil
  270. }
  271. }()
  272. }
  273. // If we have any explicit authorized block hashes, request them
  274. for number := range h.authorizationList {
  275. if err := peer.EthPeer.RequestHeadersByNumber(number, 1, 0, false); err != nil {
  276. return err
  277. }
  278. }
  279. // Quorum notify other subprotocols that the eth peer is ready, and has been added to the peerset.
  280. p.EthPeerRegistered <- struct{}{}
  281. // Quorum
  282. // Handle incoming messages until the connection is torn down
  283. return handler(peer)
  284. }
  285. func (h *handler) StartQLightClient() {
  286. h.maxPeers = 1
  287. // Quorum
  288. if h.raftMode {
  289. // We set this immediately in raft mode to make sure the miner never drops
  290. // incoming txes. Raft mode doesn't use the fetcher or downloader, and so
  291. // this would never be set otherwise.
  292. atomic.StoreUint32(&h.acceptTxs, 1)
  293. }
  294. // End Quorum
  295. // start sync handlers
  296. h.wg.Add(1)
  297. go h.chainSync.loop()
  298. }
  299. func (h *handler) StopQLightClient() {
  300. if h == nil {
  301. return
  302. }
  303. // Quit chainSync and txsync64.
  304. // After this is done, no new peers will be accepted.
  305. close(h.quitSync)
  306. h.wg.Wait()
  307. // Disconnect existing sessions.
  308. // This also closes the gate for any new registrations on the peer set.
  309. // sessions which are already established but not added to h.peers yet
  310. // will exit when they try to register.
  311. h.peers.close()
  312. h.peerWG.Wait()
  313. log.Info("QLight client protocol stopped")
  314. }
  315. // BroadcastBlock will either propagate a block to a subset of its peers, or
  316. // will only announce its availability (depending what's requested).
  317. func (h *handler) BroadcastBlockQLightClient(block *types.Block, propagate bool) {
  318. }
  319. // Handle is invoked from a peer's message handler when it receives a new remote
  320. // message that the handler couldn't consume and serve itself.
  321. func (h *qlightClientHandler) QHandle(peer *qlightproto.Peer, packet eth.Packet) error {
  322. // Consume any broadcasts and announces, forwarding the rest to the downloader
  323. switch packet := packet.(type) {
  324. case *eth.BlockHeadersPacket:
  325. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  326. case *eth.BlockBodiesPacket:
  327. txset, uncleset := packet.Unpack()
  328. h.handleBodiesQLight(txset)
  329. return (*ethHandler)(h).handleBodies(peer.EthPeer, txset, uncleset)
  330. case *eth.NewBlockHashesPacket:
  331. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  332. case *eth.NewBlockPacket:
  333. h.updateCacheWithNonPartyTxData(packet.Block.Transactions())
  334. return (*ethHandler)(h).handleBlockBroadcast(peer.EthPeer, packet.Block, packet.TD)
  335. case *qlightproto.BlockPrivateDataPacket:
  336. return h.handleBlockPrivateData(packet)
  337. case *eth.NewPooledTransactionHashesPacket:
  338. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  339. case *eth.TransactionsPacket:
  340. h.updateCacheWithNonPartyTxData(*packet)
  341. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  342. case *eth.PooledTransactionsPacket:
  343. return (*ethHandler)(h).Handle(peer.EthPeer, packet)
  344. default:
  345. return fmt.Errorf("unexpected eth packet type: %T", packet)
  346. }
  347. }
  348. // handleBodies is invoked from a peer's message handler when it transmits a batch
  349. // of block bodies for the local node to process.
  350. func (h *qlightClientHandler) handleBodiesQLight(txs [][]*types.Transaction) {
  351. for _, txArray := range txs {
  352. h.updateCacheWithNonPartyTxData(txArray)
  353. }
  354. }
  355. func (h *qlightClientHandler) updateCacheWithNonPartyTxData(transactions []*types.Transaction) {
  356. for _, tx := range transactions {
  357. if tx.IsPrivate() || tx.IsPrivacyMarker() {
  358. txHash := common.BytesToEncryptedPayloadHash(tx.Data())
  359. h.privateClientCache.CheckAndAddEmptyEntry(txHash)
  360. }
  361. }
  362. }
  363. func (h *qlightClientHandler) handleBlockPrivateData(blockPrivateData *qlightproto.BlockPrivateDataPacket) error {
  364. for _, b := range *blockPrivateData {
  365. if err := h.privateClientCache.AddPrivateBlock(b); err != nil {
  366. return fmt.Errorf("Unable to handle private block data: %v", err)
  367. }
  368. }
  369. return nil
  370. }