backend.go 13 KB

  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <>.
  16. package backend
  17. import (
  18. "crypto/ecdsa"
  19. "math/big"
  20. "sync"
  21. "time"
  22. ""
  23. ""
  24. ""
  25. istanbulcommon ""
  26. ibftcore ""
  27. ibftengine ""
  28. qbftcore ""
  29. qbftengine ""
  30. qbfttypes ""
  31. ""
  32. ""
  33. ""
  34. ""
  35. ""
  36. ""
  37. ""
  38. lru ""
  39. )
  40. const (
  41. // fetcherID is the ID indicates the block is from Istanbul engine
  42. fetcherID = "istanbul"
  43. )
  44. // New creates an Ethereum backend for Istanbul core engine.
  45. func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) *Backend {
  46. // Allocate the snapshot caches and create the engine
  47. recents, _ := lru.NewARC(inmemorySnapshots)
  48. recentMessages, _ := lru.NewARC(inmemoryPeers)
  49. knownMessages, _ := lru.NewARC(inmemoryMessages)
  50. sb := &Backend{
  51. config: config,
  52. istanbulEventMux: new(event.TypeMux),
  53. privateKey: privateKey,
  54. address: crypto.PubkeyToAddress(privateKey.PublicKey),
  55. logger: log.New(),
  56. db: db,
  57. commitCh: make(chan *types.Block, 1),
  58. recents: recents,
  59. candidates: make(map[common.Address]bool),
  60. coreStarted: false,
  61. recentMessages: recentMessages,
  62. knownMessages: knownMessages,
  63. }
  64. sb.qbftEngine = qbftengine.NewEngine(sb.config, sb.address, sb.Sign)
  65. sb.ibftEngine = ibftengine.NewEngine(sb.config, sb.address, sb.Sign)
  66. return sb
  67. }
  68. // ----------------------------------------------------------------------------
  69. type Backend struct {
  70. config *istanbul.Config
  71. privateKey *ecdsa.PrivateKey
  72. address common.Address
  73. core istanbul.Core
  74. ibftEngine *ibftengine.Engine
  75. qbftEngine *qbftengine.Engine
  76. istanbulEventMux *event.TypeMux
  77. logger log.Logger
  78. db ethdb.Database
  79. chain consensus.ChainHeaderReader
  80. currentBlock func() *types.Block
  81. hasBadBlock func(db ethdb.Reader, hash common.Hash) bool
  82. // the channels for istanbul engine notifications
  83. commitCh chan *types.Block
  84. proposedBlockHash common.Hash
  85. sealMu sync.Mutex
  86. coreStarted bool
  87. coreMu sync.RWMutex
  88. // Current list of candidates we are pushing
  89. candidates map[common.Address]bool
  90. // Protects the signer fields
  91. candidatesLock sync.RWMutex
  92. // Snapshots for recent block to speed up reorgs
  93. recents *lru.ARCCache
  94. // event subscription for ChainHeadEvent event
  95. broadcaster consensus.Broadcaster
  96. recentMessages *lru.ARCCache // the cache of peer's messages
  97. knownMessages *lru.ARCCache // the cache of self messages
  98. qbftConsensusEnabled bool // qbft consensus
  99. }
  100. func (sb *Backend) Engine() istanbul.Engine {
  101. return sb.EngineForBlockNumber(nil)
  102. }
  103. func (sb *Backend) EngineForBlockNumber(blockNumber *big.Int) istanbul.Engine {
  104. switch {
  105. case blockNumber != nil && sb.IsQBFTConsensusAt(blockNumber):
  106. return sb.qbftEngine
  107. case blockNumber == nil && sb.IsQBFTConsensus():
  108. return sb.qbftEngine
  109. default:
  110. return sb.ibftEngine
  111. }
  112. }
  113. // zekun: HACK
  114. func (sb *Backend) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int {
  115. return sb.EngineForBlockNumber(parent.Number).CalcDifficulty(chain, time, parent)
  116. }
  117. // Address implements istanbul.Backend.Address
  118. func (sb *Backend) Address() common.Address {
  119. return sb.Engine().Address()
  120. }
  121. // Validators implements istanbul.Backend.Validators
  122. func (sb *Backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
  123. return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
  124. }
  125. // Broadcast implements istanbul.Backend.Broadcast
  126. func (sb *Backend) Broadcast(valSet istanbul.ValidatorSet, code uint64, payload []byte) error {
  127. // send to others
  128. sb.Gossip(valSet, code, payload)
  129. // send to self
  130. msg := istanbul.MessageEvent{
  131. Code: code,
  132. Payload: payload,
  133. }
  134. go sb.istanbulEventMux.Post(msg)
  135. return nil
  136. }
  137. // Gossip implements istanbul.Backend.Gossip
  138. func (sb *Backend) Gossip(valSet istanbul.ValidatorSet, code uint64, payload []byte) error {
  139. hash := istanbul.RLPHash(payload)
  140. sb.knownMessages.Add(hash, true)
  141. targets := make(map[common.Address]bool)
  142. for _, val := range valSet.List() {
  143. if val.Address() != sb.Address() {
  144. targets[val.Address()] = true
  145. }
  146. }
  147. if sb.broadcaster != nil && len(targets) > 0 {
  148. ps := sb.broadcaster.FindPeers(targets)
  149. for addr, p := range ps {
  150. ms, ok := sb.recentMessages.Get(addr)
  151. var m *lru.ARCCache
  152. if ok {
  153. m, _ = ms.(*lru.ARCCache)
  154. if _, k := m.Get(hash); k {
  155. // This peer had this event, skip it
  156. continue
  157. }
  158. } else {
  159. m, _ = lru.NewARC(inmemoryMessages)
  160. }
  161. m.Add(hash, true)
  162. sb.recentMessages.Add(addr, m)
  163. if sb.IsQBFTConsensus() {
  164. var outboundCode uint64 = istanbulMsg
  165. if _, ok := qbfttypes.MessageCodes()[code]; ok {
  166. outboundCode = code
  167. }
  168. go p.SendQBFTConsensus(outboundCode, payload)
  169. } else {
  170. go p.SendConsensus(istanbulMsg, payload)
  171. }
  172. }
  173. }
  174. return nil
  175. }
  176. // Commit implements istanbul.Backend.Commit
  177. func (sb *Backend) Commit(proposal istanbul.Proposal, seals [][]byte, round *big.Int) (err error) {
  178. // Check if the proposal is a valid block
  179. block, ok := proposal.(*types.Block)
  180. if !ok {
  181. sb.logger.Error("BFT: invalid block proposal", "proposal", proposal)
  182. return istanbulcommon.ErrInvalidProposal
  183. }
  184. // Commit header
  185. h := block.Header()
  186. err = sb.EngineForBlockNumber(h.Number).CommitHeader(h, seals, round)
  187. if err != nil {
  188. return
  189. }
  190. // Remove ValidatorSet added to ProposerPolicy registry, if not done, the registry keeps increasing size with each block height
  191. sb.config.ProposerPolicy.ClearRegistry()
  192. // update block's header
  193. block = block.WithSeal(h)
  194. sb.logger.Info("BFT: block proposal committed", "author", sb.Address(), "hash", proposal.Hash(), "number", proposal.Number().Uint64())
  195. // - if the proposed and committed blocks are the same, send the proposed hash
  196. // to commit channel, which is being watched inside the engine.Seal() function.
  197. // - otherwise, we try to insert the block.
  198. // -- if success, the ChainHeadEvent event will be broadcasted, try to build
  199. // the next block and the previous Seal() will be stopped.
  200. // -- otherwise, a error will be returned and a round change event will be fired.
  201. if sb.proposedBlockHash == block.Hash() {
  202. // feed block hash to Seal() and wait the Seal() result
  203. sb.commitCh <- block
  204. return nil
  205. }
  206. if sb.broadcaster != nil {
  207. sb.broadcaster.Enqueue(fetcherID, block)
  208. }
  209. return nil
  210. }
  211. // EventMux implements istanbul.Backend.EventMux
  212. func (sb *Backend) EventMux() *event.TypeMux {
  213. return sb.istanbulEventMux
  214. }
  215. // Verify implements istanbul.Backend.Verify
  216. func (sb *Backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
  217. // Check if the proposal is a valid block
  218. block, ok := proposal.(*types.Block)
  219. if !ok {
  220. sb.logger.Error("BFT: invalid block proposal", "proposal", proposal)
  221. return 0, istanbulcommon.ErrInvalidProposal
  222. }
  223. // check bad block
  224. if sb.HasBadProposal(block.Hash()) {
  225. sb.logger.Warn("BFT: bad block proposal", "proposal", proposal)
  226. return 0, core.ErrBlacklistedHash
  227. }
  228. header := block.Header()
  229. snap, err := sb.snapshot(sb.chain, header.Number.Uint64()-1, header.ParentHash, nil)
  230. if err != nil {
  231. return 0, err
  232. }
  233. return sb.EngineForBlockNumber(header.Number).VerifyBlockProposal(sb.chain, block, snap.ValSet)
  234. }
  235. // Sign implements istanbul.Backend.Sign
  236. func (sb *Backend) Sign(data []byte) ([]byte, error) {
  237. hashData := crypto.Keccak256(data)
  238. return crypto.Sign(hashData, sb.privateKey)
  239. }
  240. // SignWithoutHashing implements istanbul.Backend.SignWithoutHashing and signs input data with the backend's private key without hashing the input data
  241. func (sb *Backend) SignWithoutHashing(data []byte) ([]byte, error) {
  242. return crypto.Sign(data, sb.privateKey)
  243. }
  244. // CheckSignature implements istanbul.Backend.CheckSignature
  245. func (sb *Backend) CheckSignature(data []byte, address common.Address, sig []byte) error {
  246. signer, err := istanbul.GetSignatureAddress(data, sig)
  247. if err != nil {
  248. return err
  249. }
  250. // Compare derived addresses
  251. if signer != address {
  252. return istanbulcommon.ErrInvalidSignature
  253. }
  254. return nil
  255. }
  256. // HasPropsal implements istanbul.Backend.HashBlock
  257. func (sb *Backend) HasPropsal(hash common.Hash, number *big.Int) bool {
  258. return sb.chain.GetHeader(hash, number.Uint64()) != nil
  259. }
  260. // GetProposer implements istanbul.Backend.GetProposer
  261. func (sb *Backend) GetProposer(number uint64) common.Address {
  262. if h := sb.chain.GetHeaderByNumber(number); h != nil {
  263. a, _ := sb.Author(h)
  264. return a
  265. }
  266. return common.Address{}
  267. }
  268. // ParentValidators implements istanbul.Backend.GetParentValidators
  269. func (sb *Backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
  270. if block, ok := proposal.(*types.Block); ok {
  271. return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
  272. }
  273. return validator.NewSet(nil, sb.config.ProposerPolicy)
  274. }
  275. func (sb *Backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
  276. snap, err := sb.snapshot(sb.chain, number, hash, nil)
  277. if err != nil {
  278. return validator.NewSet(nil, sb.config.ProposerPolicy)
  279. }
  280. return snap.ValSet
  281. }
  282. func (sb *Backend) LastProposal() (istanbul.Proposal, common.Address) {
  283. block := sb.currentBlock()
  284. var proposer common.Address
  285. if block.Number().Cmp(common.Big0) > 0 {
  286. var err error
  287. proposer, err = sb.Author(block.Header())
  288. if err != nil {
  289. sb.logger.Error("BFT: last block proposal invalid", "err", err)
  290. return nil, common.Address{}
  291. }
  292. }
  293. // Return header only block here since we don't need block body
  294. return block, proposer
  295. }
  296. func (sb *Backend) HasBadProposal(hash common.Hash) bool {
  297. if sb.hasBadBlock == nil {
  298. return false
  299. }
  300. return sb.hasBadBlock(sb.db, hash)
  301. }
  302. func (sb *Backend) Close() error {
  303. return nil
  304. }
  305. // IsQBFTConsensus returns whether qbft consensus should be used
  306. func (sb *Backend) IsQBFTConsensus() bool {
  307. if sb.qbftConsensusEnabled {
  308. return true
  309. }
  310. if sb.chain != nil {
  311. qbftEnabled := sb.IsQBFTConsensusAt(sb.chain.CurrentHeader().Number)
  312. sb.qbftConsensusEnabled = qbftEnabled
  313. return qbftEnabled
  314. }
  315. return false
  316. }
  317. // IsQBFTConsensusForHeader checks if qbft consensus is enabled for the block height identified by the given header
  318. func (sb *Backend) IsQBFTConsensusAt(blockNumber *big.Int) bool {
  319. return sb.config.IsQBFTConsensusAt(blockNumber)
  320. }
  321. func (sb *Backend) startIBFT() error {
  322. sb.logger.Info("BFT: activate IBFT")
  323. sb.logger.Trace("BFT: set ProposerPolicy sorter to ValidatorSortByStringFun")
  324. sb.config.ProposerPolicy.Use(istanbul.ValidatorSortByString())
  325. sb.qbftConsensusEnabled = false
  326. sb.core = ibftcore.New(sb, sb.config)
  327. if err := sb.core.Start(); err != nil {
  328. sb.logger.Error("BFT: failed to activate IBFT", "err", err)
  329. return err
  330. }
  331. return nil
  332. }
  333. func (sb *Backend) startQBFT() error {
  334. sb.logger.Info("BFT: activate QBFT")
  335. sb.logger.Trace("BFT: set ProposerPolicy sorter to ValidatorSortByByteFunc")
  336. sb.config.ProposerPolicy.Use(istanbul.ValidatorSortByByte())
  337. sb.qbftConsensusEnabled = true
  338. sb.core = qbftcore.New(sb, sb.config)
  339. if err := sb.core.Start(); err != nil {
  340. sb.logger.Error("BFT: failed to activate QBFT", "err", err)
  341. return err
  342. }
  343. return nil
  344. }
  345. func (sb *Backend) stop() error {
  346. core := sb.core
  347. sb.core = nil
  348. if core != nil {
  349. sb.logger.Info("BFT: deactivate")
  350. if err := core.Stop(); err != nil {
  351. sb.logger.Error("BFT: failed to deactivate", "err", err)
  352. return err
  353. }
  354. }
  355. sb.qbftConsensusEnabled = false
  356. return nil
  357. }
  358. // StartQBFTConsensus stops existing legacy ibft consensus and starts the new qbft consensus
  359. func (sb *Backend) StartQBFTConsensus() error {
  360. sb.logger.Info("BFT: switch from IBFT to QBFT")
  361. if err := sb.stop(); err != nil {
  362. return err
  363. }
  364. return sb.startQBFT()
  365. }