core.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "bytes"
  19. "math"
  20. "math/big"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/consensus/istanbul"
  25. ibfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/ibft/types"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/log"
  29. metrics "github.com/ethereum/go-ethereum/metrics"
  30. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  31. )
  32. var (
  33. roundMeter = metrics.NewRegisteredMeter("consensus/istanbul/core/round", nil)
  34. sequenceMeter = metrics.NewRegisteredMeter("consensus/istanbul/core/sequence", nil)
  35. consensusTimer = metrics.NewRegisteredTimer("consensus/istanbul/core/consensus", nil)
  36. )
  37. // New creates an Istanbul consensus core
  38. func New(backend istanbul.Backend, config *istanbul.Config) *core {
  39. c := &core{
  40. config: config,
  41. address: backend.Address(),
  42. state: ibfttypes.StateAcceptRequest,
  43. handlerWg: new(sync.WaitGroup),
  44. logger: log.New("address", backend.Address()),
  45. backend: backend,
  46. backlogs: make(map[common.Address]*prque.Prque),
  47. backlogsMu: new(sync.Mutex),
  48. pendingRequests: prque.New(),
  49. pendingRequestsMu: new(sync.Mutex),
  50. consensusTimestamp: time.Time{},
  51. }
  52. c.validateFn = c.checkValidatorSignature
  53. return c
  54. }
  55. // ----------------------------------------------------------------------------
  56. type core struct {
  57. config *istanbul.Config
  58. address common.Address
  59. state ibfttypes.State
  60. logger log.Logger
  61. backend istanbul.Backend
  62. events *event.TypeMuxSubscription
  63. finalCommittedSub *event.TypeMuxSubscription
  64. timeoutSub *event.TypeMuxSubscription
  65. futurePreprepareTimer *time.Timer
  66. valSet istanbul.ValidatorSet
  67. waitingForRoundChange bool
  68. validateFn func([]byte, []byte) (common.Address, error)
  69. backlogs map[common.Address]*prque.Prque
  70. backlogsMu *sync.Mutex
  71. current *roundState
  72. handlerWg *sync.WaitGroup
  73. roundChangeSet *roundChangeSet
  74. roundChangeTimer *time.Timer
  75. pendingRequests *prque.Prque
  76. pendingRequestsMu *sync.Mutex
  77. consensusTimestamp time.Time
  78. }
  79. func (c *core) finalizeMessage(msg *ibfttypes.Message) ([]byte, error) {
  80. var err error
  81. // Add sender address
  82. msg.Address = c.Address()
  83. // Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
  84. if msg.Code == ibfttypes.MsgCommit && c.current.Proposal() != nil {
  85. msg.CommittedSeal = []byte{}
  86. seal := PrepareCommittedSeal(c.current.Proposal().Hash())
  87. // Add proof of consensus
  88. msg.CommittedSeal, err = c.backend.Sign(seal)
  89. if err != nil {
  90. return nil, err
  91. }
  92. }
  93. // Sign message
  94. data, err := msg.PayloadNoSig()
  95. if err != nil {
  96. return nil, err
  97. }
  98. msg.Signature, err = c.backend.Sign(data)
  99. if err != nil {
  100. return nil, err
  101. }
  102. // Convert to payload
  103. payload, err := msg.Payload()
  104. if err != nil {
  105. return nil, err
  106. }
  107. return payload, nil
  108. }
  109. func (c *core) broadcast(msg *ibfttypes.Message) {
  110. logger := c.logger.New("state", c.state)
  111. payload, err := c.finalizeMessage(msg)
  112. if err != nil {
  113. logger.Error("Failed to finalize message", "msg", msg, "err", err)
  114. return
  115. }
  116. // Broadcast payload
  117. if err = c.backend.Broadcast(c.valSet, msg.Code, payload); err != nil {
  118. logger.Error("Failed to broadcast message", "msg", msg, "err", err)
  119. return
  120. }
  121. }
  122. func (c *core) currentView() *istanbul.View {
  123. return &istanbul.View{
  124. Sequence: new(big.Int).Set(c.current.Sequence()),
  125. Round: new(big.Int).Set(c.current.Round()),
  126. }
  127. }
  128. func (c *core) IsProposer() bool {
  129. v := c.valSet
  130. if v == nil {
  131. return false
  132. }
  133. return v.IsProposer(c.backend.Address())
  134. }
  135. func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
  136. return c.current != nil && c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
  137. }
  138. func (c *core) commit() {
  139. c.setState(ibfttypes.StateCommitted)
  140. proposal := c.current.Proposal()
  141. if proposal != nil {
  142. committedSeals := make([][]byte, c.current.Commits.Size())
  143. for i, v := range c.current.Commits.Values() {
  144. committedSeals[i] = make([]byte, types.IstanbulExtraSeal)
  145. copy(committedSeals[i][:], v.CommittedSeal[:])
  146. }
  147. if err := c.backend.Commit(proposal, committedSeals, big.NewInt(-1)); err != nil {
  148. c.current.UnlockHash() //Unlock block when insertion fails
  149. c.sendNextRoundChange()
  150. return
  151. }
  152. }
  153. }
  154. // startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
  155. func (c *core) startNewRound(round *big.Int) {
  156. var logger log.Logger
  157. if c.current == nil {
  158. logger = c.logger.New("old_round", -1, "old_seq", 0)
  159. } else {
  160. logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
  161. }
  162. logger.Trace("Start new ibft round")
  163. roundChange := false
  164. // Try to get last proposal
  165. lastProposal, lastProposer := c.backend.LastProposal()
  166. if c.current == nil {
  167. logger.Trace("Start to the initial round")
  168. } else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
  169. diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
  170. sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
  171. if !c.consensusTimestamp.IsZero() {
  172. consensusTimer.UpdateSince(c.consensusTimestamp)
  173. c.consensusTimestamp = time.Time{}
  174. }
  175. logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
  176. } else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
  177. if round.Cmp(common.Big0) == 0 {
  178. // same seq and round, don't need to start new round
  179. return
  180. } else if round.Cmp(c.current.Round()) < 0 {
  181. logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round())
  182. return
  183. }
  184. roundChange = true
  185. } else {
  186. logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64())
  187. return
  188. }
  189. var newView *istanbul.View
  190. if roundChange {
  191. newView = &istanbul.View{
  192. Sequence: new(big.Int).Set(c.current.Sequence()),
  193. Round: new(big.Int).Set(round),
  194. }
  195. } else {
  196. newView = &istanbul.View{
  197. Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
  198. Round: new(big.Int),
  199. }
  200. c.valSet = c.backend.Validators(lastProposal)
  201. }
  202. // If new round is 0, then check if qbftConsensus needs to be enabled
  203. if round.Uint64() == 0 && c.backend.IsQBFTConsensusAt(newView.Sequence) {
  204. logger.Trace("Starting qbft consensus as qbftBlock has passed")
  205. if err := c.backend.StartQBFTConsensus(); err != nil {
  206. // If err is returned, then QBFT consensus is started for the next block
  207. logger.Error("Unable to start QBFT Consensus, retrying for the next block", "error", err)
  208. }
  209. return
  210. }
  211. // Update logger
  212. logger = logger.New("old_proposer", c.valSet.GetProposer())
  213. // Clear invalid ROUND CHANGE messages
  214. c.roundChangeSet = newRoundChangeSet(c.valSet)
  215. // New snapshot for new round
  216. c.updateRoundState(newView, c.valSet, roundChange)
  217. // Calculate new proposer
  218. c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
  219. c.waitingForRoundChange = false
  220. c.setState(ibfttypes.StateAcceptRequest)
  221. if roundChange && c.IsProposer() && c.current != nil {
  222. // If it is locked, propose the old proposal
  223. // If we have pending request, propose pending request
  224. if c.current.IsHashLocked() {
  225. r := &istanbul.Request{
  226. Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
  227. }
  228. c.sendPreprepare(r)
  229. } else if c.current.pendingRequest != nil {
  230. c.sendPreprepare(c.current.pendingRequest)
  231. }
  232. }
  233. c.newRoundChangeTimer()
  234. logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())
  235. }
  236. func (c *core) catchUpRound(view *istanbul.View) {
  237. logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
  238. if view.Round.Cmp(c.current.Round()) > 0 {
  239. roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
  240. }
  241. c.waitingForRoundChange = true
  242. // Need to keep block locked for round catching up
  243. c.updateRoundState(view, c.valSet, true)
  244. c.roundChangeSet.Clear(view.Round)
  245. c.newRoundChangeTimer()
  246. logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
  247. }
  248. // updateRoundState updates round state by checking if locking block is necessary
  249. func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
  250. // Lock only if both roundChange is true and it is locked
  251. if roundChange && c.current != nil {
  252. if c.current.IsHashLocked() {
  253. c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
  254. } else {
  255. c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal)
  256. }
  257. } else {
  258. c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal)
  259. }
  260. }
  261. func (c *core) setState(state ibfttypes.State) {
  262. if c.state != state {
  263. c.state = state
  264. }
  265. if state == ibfttypes.StateAcceptRequest {
  266. c.processPendingRequests()
  267. }
  268. c.processBacklog()
  269. }
  270. func (c *core) Address() common.Address {
  271. return c.address
  272. }
  273. func (c *core) stopFuturePreprepareTimer() {
  274. if c.futurePreprepareTimer != nil {
  275. c.futurePreprepareTimer.Stop()
  276. }
  277. }
  278. func (c *core) stopTimer() {
  279. c.stopFuturePreprepareTimer()
  280. if c.roundChangeTimer != nil {
  281. c.roundChangeTimer.Stop()
  282. }
  283. }
  284. func (c *core) newRoundChangeTimer() {
  285. c.stopTimer()
  286. // set timeout based on the round number
  287. timeout := time.Duration(c.config.GetConfig(c.current.Sequence()).RequestTimeout) * time.Millisecond
  288. round := c.current.Round().Uint64()
  289. if round > 0 {
  290. timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
  291. }
  292. c.roundChangeTimer = time.AfterFunc(timeout, func() {
  293. c.sendEvent(timeoutEvent{})
  294. })
  295. }
  296. func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
  297. return istanbul.CheckValidatorSignature(c.valSet, data, sig)
  298. }
  299. func (c *core) QuorumSize() int {
  300. if c.config.Get2FPlus1Enabled(c.current.sequence) || c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
  301. c.logger.Trace("Confirmation Formula used 2F+ 1")
  302. return (2 * c.valSet.F()) + 1
  303. }
  304. c.logger.Trace("Confirmation Formula used ceil(2N/3)")
  305. return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
  306. }
  307. // PrepareCommittedSeal returns a committed seal for the given hash
  308. func PrepareCommittedSeal(hash common.Hash) []byte {
  309. var buf bytes.Buffer
  310. buf.Write(hash.Bytes())
  311. buf.Write([]byte{byte(ibfttypes.MsgCommit)})
  312. return buf.Bytes()
  313. }