backend.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package backend
  17. import (
  18. "crypto/ecdsa"
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/consensus"
  24. "github.com/ethereum/go-ethereum/consensus/istanbul"
  25. istanbulcommon "github.com/ethereum/go-ethereum/consensus/istanbul/common"
  26. ibftcore "github.com/ethereum/go-ethereum/consensus/istanbul/ibft/core"
  27. ibftengine "github.com/ethereum/go-ethereum/consensus/istanbul/ibft/engine"
  28. qbftcore "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/core"
  29. qbftengine "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/engine"
  30. qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
  31. "github.com/ethereum/go-ethereum/consensus/istanbul/validator"
  32. "github.com/ethereum/go-ethereum/core"
  33. "github.com/ethereum/go-ethereum/core/types"
  34. "github.com/ethereum/go-ethereum/crypto"
  35. "github.com/ethereum/go-ethereum/ethdb"
  36. "github.com/ethereum/go-ethereum/event"
  37. "github.com/ethereum/go-ethereum/log"
  38. lru "github.com/hashicorp/golang-lru"
  39. )
  40. const (
  41. // fetcherID is the ID indicates the block is from Istanbul engine
  42. fetcherID = "istanbul"
  43. )
  44. // New creates an Ethereum backend for Istanbul core engine.
  45. func New(config *istanbul.Config, privateKey *ecdsa.PrivateKey, db ethdb.Database) *Backend {
  46. // Allocate the snapshot caches and create the engine
  47. recents, _ := lru.NewARC(inmemorySnapshots)
  48. recentMessages, _ := lru.NewARC(inmemoryPeers)
  49. knownMessages, _ := lru.NewARC(inmemoryMessages)
  50. sb := &Backend{
  51. config: config,
  52. istanbulEventMux: new(event.TypeMux),
  53. privateKey: privateKey,
  54. address: crypto.PubkeyToAddress(privateKey.PublicKey),
  55. logger: log.New(),
  56. db: db,
  57. commitCh: make(chan *types.Block, 1),
  58. recents: recents,
  59. candidates: make(map[common.Address]bool),
  60. coreStarted: false,
  61. recentMessages: recentMessages,
  62. knownMessages: knownMessages,
  63. }
  64. sb.qbftEngine = qbftengine.NewEngine(sb.config, sb.address, sb.Sign)
  65. sb.ibftEngine = ibftengine.NewEngine(sb.config, sb.address, sb.Sign)
  66. return sb
  67. }
  68. // ----------------------------------------------------------------------------
  69. type Backend struct {
  70. config *istanbul.Config
  71. privateKey *ecdsa.PrivateKey
  72. address common.Address
  73. core istanbul.Core
  74. ibftEngine *ibftengine.Engine
  75. qbftEngine *qbftengine.Engine
  76. istanbulEventMux *event.TypeMux
  77. logger log.Logger
  78. db ethdb.Database
  79. chain consensus.ChainHeaderReader
  80. currentBlock func() *types.Block
  81. hasBadBlock func(db ethdb.Reader, hash common.Hash) bool
  82. // the channels for istanbul engine notifications
  83. commitCh chan *types.Block
  84. proposedBlockHash common.Hash
  85. sealMu sync.Mutex
  86. coreStarted bool
  87. coreMu sync.RWMutex
  88. // Current list of candidates we are pushing
  89. candidates map[common.Address]bool
  90. // Protects the signer fields
  91. candidatesLock sync.RWMutex
  92. // Snapshots for recent block to speed up reorgs
  93. recents *lru.ARCCache
  94. // event subscription for ChainHeadEvent event
  95. broadcaster consensus.Broadcaster
  96. recentMessages *lru.ARCCache // the cache of peer's messages
  97. knownMessages *lru.ARCCache // the cache of self messages
  98. qbftConsensusEnabled bool // qbft consensus
  99. }
  100. func (sb *Backend) Engine() istanbul.Engine {
  101. return sb.EngineForBlockNumber(nil)
  102. }
  103. func (sb *Backend) EngineForBlockNumber(blockNumber *big.Int) istanbul.Engine {
  104. switch {
  105. case blockNumber != nil && sb.IsQBFTConsensusAt(blockNumber):
  106. return sb.qbftEngine
  107. case blockNumber == nil && sb.IsQBFTConsensus():
  108. return sb.qbftEngine
  109. default:
  110. return sb.ibftEngine
  111. }
  112. }
  113. // zekun: HACK
  114. func (sb *Backend) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int {
  115. return sb.EngineForBlockNumber(parent.Number).CalcDifficulty(chain, time, parent)
  116. }
  117. // Address implements istanbul.Backend.Address
  118. func (sb *Backend) Address() common.Address {
  119. return sb.Engine().Address()
  120. }
  121. // Validators implements istanbul.Backend.Validators
  122. func (sb *Backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
  123. return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
  124. }
  125. // Broadcast implements istanbul.Backend.Broadcast
  126. func (sb *Backend) Broadcast(valSet istanbul.ValidatorSet, code uint64, payload []byte) error {
  127. // send to others
  128. sb.Gossip(valSet, code, payload)
  129. // send to self
  130. msg := istanbul.MessageEvent{
  131. Code: code,
  132. Payload: payload,
  133. }
  134. go sb.istanbulEventMux.Post(msg)
  135. return nil
  136. }
  137. // Gossip implements istanbul.Backend.Gossip
  138. func (sb *Backend) Gossip(valSet istanbul.ValidatorSet, code uint64, payload []byte) error {
  139. hash := istanbul.RLPHash(payload)
  140. sb.knownMessages.Add(hash, true)
  141. targets := make(map[common.Address]bool)
  142. for _, val := range valSet.List() {
  143. if val.Address() != sb.Address() {
  144. targets[val.Address()] = true
  145. }
  146. }
  147. if sb.broadcaster != nil && len(targets) > 0 {
  148. ps := sb.broadcaster.FindPeers(targets)
  149. for addr, p := range ps {
  150. ms, ok := sb.recentMessages.Get(addr)
  151. var m *lru.ARCCache
  152. if ok {
  153. m, _ = ms.(*lru.ARCCache)
  154. if _, k := m.Get(hash); k {
  155. // This peer had this event, skip it
  156. continue
  157. }
  158. } else {
  159. m, _ = lru.NewARC(inmemoryMessages)
  160. }
  161. m.Add(hash, true)
  162. sb.recentMessages.Add(addr, m)
  163. if sb.IsQBFTConsensus() {
  164. var outboundCode uint64 = istanbulMsg
  165. if _, ok := qbfttypes.MessageCodes()[code]; ok {
  166. outboundCode = code
  167. }
  168. go p.SendQBFTConsensus(outboundCode, payload)
  169. } else {
  170. go p.SendConsensus(istanbulMsg, payload)
  171. }
  172. }
  173. }
  174. return nil
  175. }
  176. // Commit implements istanbul.Backend.Commit
  177. func (sb *Backend) Commit(proposal istanbul.Proposal, seals [][]byte, round *big.Int) (err error) {
  178. // Check if the proposal is a valid block
  179. block, ok := proposal.(*types.Block)
  180. if !ok {
  181. sb.logger.Error("BFT: invalid block proposal", "proposal", proposal)
  182. return istanbulcommon.ErrInvalidProposal
  183. }
  184. // Commit header
  185. h := block.Header()
  186. err = sb.EngineForBlockNumber(h.Number).CommitHeader(h, seals, round)
  187. if err != nil {
  188. return
  189. }
  190. // Remove ValidatorSet added to ProposerPolicy registry, if not done, the registry keeps increasing size with each block height
  191. sb.config.ProposerPolicy.ClearRegistry()
  192. // update block's header
  193. block = block.WithSeal(h)
  194. sb.logger.Info("BFT: block proposal committed", "author", sb.Address(), "hash", proposal.Hash(), "number", proposal.Number().Uint64())
  195. // - if the proposed and committed blocks are the same, send the proposed hash
  196. // to commit channel, which is being watched inside the engine.Seal() function.
  197. // - otherwise, we try to insert the block.
  198. // -- if success, the ChainHeadEvent event will be broadcasted, try to build
  199. // the next block and the previous Seal() will be stopped.
  200. // -- otherwise, a error will be returned and a round change event will be fired.
  201. if sb.proposedBlockHash == block.Hash() {
  202. // feed block hash to Seal() and wait the Seal() result
  203. sb.commitCh <- block
  204. return nil
  205. }
  206. if sb.broadcaster != nil {
  207. sb.broadcaster.Enqueue(fetcherID, block)
  208. }
  209. return nil
  210. }
  211. // EventMux implements istanbul.Backend.EventMux
  212. func (sb *Backend) EventMux() *event.TypeMux {
  213. return sb.istanbulEventMux
  214. }
  215. // Verify implements istanbul.Backend.Verify
  216. func (sb *Backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
  217. // Check if the proposal is a valid block
  218. block, ok := proposal.(*types.Block)
  219. if !ok {
  220. sb.logger.Error("BFT: invalid block proposal", "proposal", proposal)
  221. return 0, istanbulcommon.ErrInvalidProposal
  222. }
  223. // check bad block
  224. if sb.HasBadProposal(block.Hash()) {
  225. sb.logger.Warn("BFT: bad block proposal", "proposal", proposal)
  226. return 0, core.ErrBlacklistedHash
  227. }
  228. header := block.Header()
  229. snap, err := sb.snapshot(sb.chain, header.Number.Uint64()-1, header.ParentHash, nil)
  230. if err != nil {
  231. return 0, err
  232. }
  233. return sb.EngineForBlockNumber(header.Number).VerifyBlockProposal(sb.chain, block, snap.ValSet)
  234. }
  235. // Sign implements istanbul.Backend.Sign
  236. func (sb *Backend) Sign(data []byte) ([]byte, error) {
  237. hashData := crypto.Keccak256(data)
  238. return crypto.Sign(hashData, sb.privateKey)
  239. }
  240. // SignWithoutHashing implements istanbul.Backend.SignWithoutHashing and signs input data with the backend's private key without hashing the input data
  241. func (sb *Backend) SignWithoutHashing(data []byte) ([]byte, error) {
  242. return crypto.Sign(data, sb.privateKey)
  243. }
  244. // CheckSignature implements istanbul.Backend.CheckSignature
  245. func (sb *Backend) CheckSignature(data []byte, address common.Address, sig []byte) error {
  246. signer, err := istanbul.GetSignatureAddress(data, sig)
  247. if err != nil {
  248. return err
  249. }
  250. // Compare derived addresses
  251. if signer != address {
  252. return istanbulcommon.ErrInvalidSignature
  253. }
  254. return nil
  255. }
  256. // HasPropsal implements istanbul.Backend.HashBlock
  257. func (sb *Backend) HasPropsal(hash common.Hash, number *big.Int) bool {
  258. return sb.chain.GetHeader(hash, number.Uint64()) != nil
  259. }
  260. // GetProposer implements istanbul.Backend.GetProposer
  261. func (sb *Backend) GetProposer(number uint64) common.Address {
  262. if h := sb.chain.GetHeaderByNumber(number); h != nil {
  263. a, _ := sb.Author(h)
  264. return a
  265. }
  266. return common.Address{}
  267. }
  268. // ParentValidators implements istanbul.Backend.GetParentValidators
  269. func (sb *Backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
  270. if block, ok := proposal.(*types.Block); ok {
  271. return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
  272. }
  273. return validator.NewSet(nil, sb.config.ProposerPolicy)
  274. }
  275. func (sb *Backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
  276. snap, err := sb.snapshot(sb.chain, number, hash, nil)
  277. if err != nil {
  278. return validator.NewSet(nil, sb.config.ProposerPolicy)
  279. }
  280. return snap.ValSet
  281. }
  282. func (sb *Backend) LastProposal() (istanbul.Proposal, common.Address) {
  283. block := sb.currentBlock()
  284. var proposer common.Address
  285. if block.Number().Cmp(common.Big0) > 0 {
  286. var err error
  287. proposer, err = sb.Author(block.Header())
  288. if err != nil {
  289. sb.logger.Error("BFT: last block proposal invalid", "err", err)
  290. return nil, common.Address{}
  291. }
  292. }
  293. // Return header only block here since we don't need block body
  294. return block, proposer
  295. }
  296. func (sb *Backend) HasBadProposal(hash common.Hash) bool {
  297. if sb.hasBadBlock == nil {
  298. return false
  299. }
  300. return sb.hasBadBlock(sb.db, hash)
  301. }
  302. func (sb *Backend) Close() error {
  303. return nil
  304. }
  305. // IsQBFTConsensus returns whether qbft consensus should be used
  306. func (sb *Backend) IsQBFTConsensus() bool {
  307. if sb.qbftConsensusEnabled {
  308. return true
  309. }
  310. if sb.chain != nil {
  311. qbftEnabled := sb.IsQBFTConsensusAt(sb.chain.CurrentHeader().Number)
  312. sb.qbftConsensusEnabled = qbftEnabled
  313. return qbftEnabled
  314. }
  315. return false
  316. }
  317. // IsQBFTConsensusForHeader checks if qbft consensus is enabled for the block height identified by the given header
  318. func (sb *Backend) IsQBFTConsensusAt(blockNumber *big.Int) bool {
  319. return sb.config.IsQBFTConsensusAt(blockNumber)
  320. }
  321. func (sb *Backend) startIBFT() error {
  322. sb.logger.Info("BFT: activate IBFT")
  323. sb.logger.Trace("BFT: set ProposerPolicy sorter to ValidatorSortByStringFun")
  324. sb.config.ProposerPolicy.Use(istanbul.ValidatorSortByString())
  325. sb.qbftConsensusEnabled = false
  326. sb.core = ibftcore.New(sb, sb.config)
  327. if err := sb.core.Start(); err != nil {
  328. sb.logger.Error("BFT: failed to activate IBFT", "err", err)
  329. return err
  330. }
  331. return nil
  332. }
  333. func (sb *Backend) startQBFT() error {
  334. sb.logger.Info("BFT: activate QBFT")
  335. sb.logger.Trace("BFT: set ProposerPolicy sorter to ValidatorSortByByteFunc")
  336. sb.config.ProposerPolicy.Use(istanbul.ValidatorSortByByte())
  337. sb.qbftConsensusEnabled = true
  338. sb.core = qbftcore.New(sb, sb.config)
  339. if err := sb.core.Start(); err != nil {
  340. sb.logger.Error("BFT: failed to activate QBFT", "err", err)
  341. return err
  342. }
  343. return nil
  344. }
  345. func (sb *Backend) stop() error {
  346. core := sb.core
  347. sb.core = nil
  348. if core != nil {
  349. sb.logger.Info("BFT: deactivate")
  350. if err := core.Stop(); err != nil {
  351. sb.logger.Error("BFT: failed to deactivate", "err", err)
  352. return err
  353. }
  354. }
  355. sb.qbftConsensusEnabled = false
  356. return nil
  357. }
  358. // StartQBFTConsensus stops existing legacy ibft consensus and starts the new qbft consensus
  359. func (sb *Backend) StartQBFTConsensus() error {
  360. sb.logger.Info("BFT: switch from IBFT to QBFT")
  361. if err := sb.stop(); err != nil {
  362. return err
  363. }
  364. return sb.startQBFT()
  365. }