core.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "math"
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/consensus/istanbul"
  24. qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/event"
  27. "github.com/ethereum/go-ethereum/log"
  28. metrics "github.com/ethereum/go-ethereum/metrics"
  29. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  30. )
  31. var (
  32. roundMeter = metrics.NewRegisteredMeter("consensus/istanbul/qbft/core/round", nil)
  33. sequenceMeter = metrics.NewRegisteredMeter("consensus/istanbul/qbft/core/sequence", nil)
  34. consensusTimer = metrics.NewRegisteredTimer("consensus/istanbul/qbft/core/consensus", nil)
  35. )
  36. // New creates an Istanbul consensus core
  37. func New(backend istanbul.Backend, config *istanbul.Config) istanbul.Core {
  38. c := &core{
  39. config: config,
  40. address: backend.Address(),
  41. state: StateAcceptRequest,
  42. handlerWg: new(sync.WaitGroup),
  43. logger: log.New("address", backend.Address()),
  44. backend: backend,
  45. backlogs: make(map[common.Address]*prque.Prque),
  46. backlogsMu: new(sync.Mutex),
  47. pendingRequests: prque.New(),
  48. pendingRequestsMu: new(sync.Mutex),
  49. consensusTimestamp: time.Time{},
  50. }
  51. c.validateFn = c.checkValidatorSignature
  52. return c
  53. }
  54. // ----------------------------------------------------------------------------
  55. type core struct {
  56. config *istanbul.Config
  57. address common.Address
  58. state State
  59. logger log.Logger
  60. backend istanbul.Backend
  61. events *event.TypeMuxSubscription
  62. finalCommittedSub *event.TypeMuxSubscription
  63. timeoutSub *event.TypeMuxSubscription
  64. futurePreprepareTimer *time.Timer
  65. valSet istanbul.ValidatorSet
  66. validateFn func([]byte, []byte) (common.Address, error)
  67. backlogs map[common.Address]*prque.Prque
  68. backlogsMu *sync.Mutex
  69. current *roundState
  70. handlerWg *sync.WaitGroup
  71. roundChangeSet *roundChangeSet
  72. roundChangeTimer *time.Timer
  73. QBFTPreparedPrepares []*qbfttypes.Prepare
  74. pendingRequests *prque.Prque
  75. pendingRequestsMu *sync.Mutex
  76. consensusTimestamp time.Time
  77. newRoundMutex sync.Mutex
  78. newRoundTimer *time.Timer
  79. }
  80. func (c *core) currentView() *istanbul.View {
  81. return &istanbul.View{
  82. Sequence: new(big.Int).Set(c.current.Sequence()),
  83. Round: new(big.Int).Set(c.current.Round()),
  84. }
  85. }
  86. func (c *core) IsProposer() bool {
  87. v := c.valSet
  88. if v == nil {
  89. return false
  90. }
  91. return v.IsProposer(c.backend.Address())
  92. }
  93. func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
  94. return c.current != nil && c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
  95. }
  96. // startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
  97. func (c *core) startNewRound(round *big.Int) {
  98. var logger log.Logger
  99. if c.current == nil {
  100. logger = c.logger.New("old.round", -1, "old.seq", 0)
  101. } else {
  102. logger = c.currentLogger(false, nil)
  103. }
  104. logger = logger.New("target.round", round)
  105. roundChange := false
  106. // Try to get last proposal
  107. lastProposal, lastProposer := c.backend.LastProposal()
  108. if lastProposal != nil {
  109. logger = logger.New("lastProposal.number", lastProposal.Number().Uint64(), "lastProposal.hash", lastProposal.Hash())
  110. }
  111. logger.Info("QBFT: initialize new round")
  112. if c.current == nil {
  113. logger.Debug("QBFT: start at the initial round")
  114. } else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
  115. diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
  116. sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
  117. if !c.consensusTimestamp.IsZero() {
  118. consensusTimer.UpdateSince(c.consensusTimestamp)
  119. c.consensusTimestamp = time.Time{}
  120. }
  121. logger.Debug("QBFT: catch up last block proposal")
  122. } else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
  123. if round.Cmp(common.Big0) == 0 {
  124. // same seq and round, don't need to start new round
  125. logger.Debug("QBFT: same round, no need to start new round")
  126. return
  127. } else if round.Cmp(c.current.Round()) < 0 {
  128. logger.Warn("QBFT: next round is inferior to current round")
  129. return
  130. }
  131. roundChange = true
  132. } else {
  133. logger.Warn("QBFT: next sequence is before last block proposal")
  134. return
  135. }
  136. var oldLogger log.Logger
  137. if c.current == nil {
  138. oldLogger = c.logger.New("old.round", -1, "old.seq", 0)
  139. } else {
  140. oldLogger = c.logger.New("old.round", c.current.Round().Uint64(), "old.sequence", c.current.Sequence().Uint64(), "old.state", c.state.String(), "old.proposer", c.valSet.GetProposer())
  141. }
  142. // Create next view
  143. var newView *istanbul.View
  144. if roundChange {
  145. newView = &istanbul.View{
  146. Sequence: new(big.Int).Set(c.current.Sequence()),
  147. Round: new(big.Int).Set(round),
  148. }
  149. } else {
  150. newView = &istanbul.View{
  151. Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
  152. Round: new(big.Int),
  153. }
  154. c.valSet = c.backend.Validators(lastProposal)
  155. }
  156. // New snapshot for new round
  157. c.updateRoundState(newView, c.valSet, roundChange)
  158. // Calculate new proposer
  159. c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
  160. c.setState(StateAcceptRequest)
  161. if c.current != nil && round.Cmp(c.current.Round()) > 0 {
  162. roundMeter.Mark(new(big.Int).Sub(round, c.current.Round()).Int64())
  163. }
  164. // Update RoundChangeSet by deleting older round messages
  165. if round.Uint64() == 0 {
  166. c.QBFTPreparedPrepares = nil
  167. c.roundChangeSet = newRoundChangeSet(c.valSet)
  168. } else {
  169. // Clear earlier round messages
  170. c.roundChangeSet.ClearLowerThan(round)
  171. }
  172. c.roundChangeSet.NewRound(round)
  173. if round.Uint64() > 0 {
  174. c.newRoundChangeTimer()
  175. }
  176. oldLogger.Info("QBFT: start new round", "next.round", newView.Round, "next.seq", newView.Sequence, "next.proposer", c.valSet.GetProposer(), "next.valSet", c.valSet.List(), "next.size", c.valSet.Size(), "next.IsProposer", c.IsProposer())
  177. }
  178. // updateRoundState updates round state by checking if locking block is necessary
  179. func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
  180. if roundChange && c.current != nil {
  181. c.current = newRoundState(view, validatorSet, c.current.Preprepare, c.current.preparedRound, c.current.preparedBlock, c.current.pendingRequest, c.backend.HasBadProposal)
  182. } else {
  183. c.current = newRoundState(view, validatorSet, nil, nil, nil, nil, c.backend.HasBadProposal)
  184. }
  185. }
  186. func (c *core) setState(state State) {
  187. if c.state != state {
  188. oldState := c.state
  189. c.state = state
  190. c.currentLogger(false, nil).Info("QBFT: changed state", "old.state", oldState.String(), "new.state", state.String())
  191. }
  192. if state == StateAcceptRequest {
  193. c.processPendingRequests()
  194. }
  195. // each time we change state, we process backlog for possible message that are
  196. // now ready
  197. c.processBacklog()
  198. }
  199. func (c *core) Address() common.Address {
  200. return c.address
  201. }
  202. func (c *core) stopFuturePreprepareTimer() {
  203. if c.futurePreprepareTimer != nil {
  204. c.futurePreprepareTimer.Stop()
  205. }
  206. }
  207. func (c *core) stopTimer() {
  208. c.stopFuturePreprepareTimer()
  209. if c.roundChangeTimer != nil {
  210. c.roundChangeTimer.Stop()
  211. }
  212. }
  213. func (c *core) newRoundChangeTimer() {
  214. c.stopTimer()
  215. for c.current == nil { // wait because it is asynchronous in handleRequest
  216. time.Sleep(10 * time.Millisecond)
  217. }
  218. // set timeout based on the round number
  219. baseTimeout := time.Duration(c.config.GetConfig(c.current.Sequence()).RequestTimeout) * time.Millisecond
  220. round := c.current.Round().Uint64()
  221. timeout := baseTimeout * time.Duration(math.Pow(2, float64(round)))
  222. c.currentLogger(true, nil).Trace("QBFT: start new ROUND-CHANGE timer", "timeout", timeout.Seconds())
  223. c.roundChangeTimer = time.AfterFunc(timeout, func() {
  224. c.sendEvent(timeoutEvent{})
  225. })
  226. }
  227. func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
  228. return istanbul.CheckValidatorSignature(c.valSet, data, sig)
  229. }
  230. func (c *core) QuorumSize() int {
  231. if c.config.Get2FPlus1Enabled(c.current.sequence) || c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
  232. c.currentLogger(true, nil).Trace("QBFT: confirmation Formula used 2F+ 1")
  233. return (2 * c.valSet.F()) + 1
  234. }
  235. c.currentLogger(true, nil).Trace("QBFT: confirmation Formula used ceil(2N/3)")
  236. return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
  237. }
  238. // PrepareCommittedSeal returns a committed seal for the given header and takes current round under consideration
  239. func PrepareCommittedSeal(header *types.Header, round uint32) []byte {
  240. h := types.CopyHeader(header)
  241. return h.QBFTHashWithRoundNumber(round).Bytes()
  242. }