roundchange.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "math/big"
  20. "sort"
  21. "sync"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/hexutil"
  24. "github.com/ethereum/go-ethereum/consensus/istanbul"
  25. qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. // broadcastNextRoundChange sends the ROUND CHANGE message with current round + 1
  31. func (c *core) broadcastNextRoundChange() {
  32. cv := c.currentView()
  33. c.broadcastRoundChange(new(big.Int).Add(cv.Round, common.Big1))
  34. }
  35. // broadcastRoundChange is called when either
  36. // - ROUND-CHANGE timeout expires (meaning either we have not received PRE-PREPARE message or we have not received a quorum of COMMIT messages)
  37. // -
  38. // It
  39. // - Creates and sign ROUND-CHANGE message
  40. // - broadcast the ROUND-CHANGE message with the given round
  41. func (c *core) broadcastRoundChange(round *big.Int) {
  42. logger := c.currentLogger(true, nil)
  43. // Validates new round corresponds to current view
  44. cv := c.currentView()
  45. if cv.Round.Cmp(round) > 0 {
  46. logger.Error("QBFT: invalid past target round", "target", round)
  47. return
  48. }
  49. roundChange := qbfttypes.NewRoundChange(c.current.Sequence(), round, c.current.preparedRound, c.current.preparedBlock)
  50. // Sign message
  51. encodedPayload, err := roundChange.EncodePayloadForSigning()
  52. if err != nil {
  53. withMsg(logger, roundChange).Error("QBFT: failed to encode ROUND-CHANGE message", "err", err)
  54. return
  55. }
  56. signature, err := c.backend.Sign(encodedPayload)
  57. if err != nil {
  58. withMsg(logger, roundChange).Error("QBFT: failed to sign ROUND-CHANGE message", "err", err)
  59. return
  60. }
  61. roundChange.SetSignature(signature)
  62. // Extend ROUND-CHANGE message with PREPARE justification
  63. if c.QBFTPreparedPrepares != nil {
  64. roundChange.Justification = c.QBFTPreparedPrepares
  65. withMsg(logger, roundChange).Debug("QBFT: extended ROUND-CHANGE message with PREPARE justification", "justification", roundChange.Justification)
  66. }
  67. // RLP-encode message
  68. data, err := rlp.EncodeToBytes(roundChange)
  69. if err != nil {
  70. withMsg(logger, roundChange).Error("QBFT: failed to encode ROUND-CHANGE message", "err", err)
  71. return
  72. }
  73. withMsg(logger, roundChange).Info("QBFT: broadcast ROUND-CHANGE message", "payload", hexutil.Encode(data))
  74. // Broadcast RLP-encoded message
  75. if err = c.backend.Broadcast(c.valSet, roundChange.Code(), data); err != nil {
  76. withMsg(logger, roundChange).Error("QBFT: failed to broadcast ROUND-CHANGE message", "err", err)
  77. return
  78. }
  79. }
  80. // handleRoundChange is called when receiving a ROUND-CHANGE message from another validator
  81. // - accumulates ROUND-CHANGE messages until reaching quorum for a given round
  82. // - when quorum of ROUND-CHANGE messages is reached then
  83. func (c *core) handleRoundChange(roundChange *qbfttypes.RoundChange) error {
  84. logger := c.currentLogger(true, roundChange)
  85. view := roundChange.View()
  86. currentRound := c.currentView().Round
  87. // number of validators we received ROUND-CHANGE from for a round higher than the current one
  88. num := c.roundChangeSet.higherRoundMessages(currentRound)
  89. // number of validators we received ROUND-CHANGE from for the current round
  90. currentRoundMessages := c.roundChangeSet.getRCMessagesForGivenRound(currentRound)
  91. logger.Info("QBFT: handle ROUND-CHANGE message", "higherRoundChanges.count", num, "currentRoundChanges.count", currentRoundMessages)
  92. // Add ROUND-CHANGE message to message set
  93. if view.Round.Cmp(currentRound) >= 0 {
  94. var prepareMessages []*qbfttypes.Prepare = nil
  95. var pr *big.Int = nil
  96. var pb *types.Block = nil
  97. if roundChange.PreparedRound != nil && roundChange.PreparedBlock != nil && roundChange.Justification != nil && len(roundChange.Justification) > 0 {
  98. prepareMessages = roundChange.Justification
  99. pr = roundChange.PreparedRound
  100. pb = roundChange.PreparedBlock
  101. }
  102. err := c.roundChangeSet.Add(view.Round, roundChange, pr, pb, prepareMessages, c.QuorumSize())
  103. if err != nil {
  104. logger.Warn("QBFT: failed to add ROUND-CHANGE message", "err", err)
  105. return err
  106. }
  107. }
  108. // number of validators we received ROUND-CHANGE from for a round higher than the current one
  109. num = c.roundChangeSet.higherRoundMessages(currentRound)
  110. // number of validators we received ROUND-CHANGE from for the current round
  111. currentRoundMessages = c.roundChangeSet.getRCMessagesForGivenRound(currentRound)
  112. logger = logger.New("higherRoundChanges.count", num, "currentRoundChanges.count", currentRoundMessages)
  113. if num == c.valSet.F()+1 {
  114. // We received F+1 ROUND-CHANGE messages (this may happen before our timeout exprired)
  115. // we start new round and broadcast ROUND-CHANGE message
  116. newRound := c.roundChangeSet.getMinRoundChange(currentRound)
  117. logger.Info("QBFT: received F+1 ROUND-CHANGE messages", "F", c.valSet.F())
  118. c.startNewRound(newRound)
  119. c.broadcastRoundChange(newRound)
  120. } else if currentRoundMessages >= c.QuorumSize() && c.IsProposer() && c.current.preprepareSent.Cmp(currentRound) < 0 {
  121. logger.Info("QBFT: received quorum of ROUND-CHANGE messages")
  122. // We received quorum of ROUND-CHANGE for current round and we are proposer
  123. // If we have received a quorum of PREPARE message
  124. // then we propose the same block proposal again if not we
  125. // propose the block proposal that we generated
  126. _, proposal := c.highestPrepared(currentRound)
  127. if proposal == nil {
  128. if c.current != nil && c.current.pendingRequest != nil {
  129. proposal = c.current.pendingRequest.Proposal
  130. } else {
  131. log.Warn("round change returns an error: no proposal as pending request is nil")
  132. return errors.New("no proposal as pending request is nil")
  133. }
  134. }
  135. // Prepare justification for ROUND-CHANGE messages
  136. roundChangeMessages := c.roundChangeSet.roundChanges[currentRound.Uint64()]
  137. rcSignedPayloads := make([]*qbfttypes.SignedRoundChangePayload, 0)
  138. for _, m := range roundChangeMessages.Values() {
  139. rcMsg := m.(*qbfttypes.RoundChange)
  140. rcSignedPayloads = append(rcSignedPayloads, &rcMsg.SignedRoundChangePayload)
  141. }
  142. prepareMessages := c.roundChangeSet.prepareMessages[currentRound.Uint64()]
  143. if err := isJustified(proposal, rcSignedPayloads, prepareMessages, c.QuorumSize()); err != nil {
  144. logger.Error("QBFT: invalid ROUND-CHANGE message justification", "err", err)
  145. return nil
  146. }
  147. r := &Request{
  148. Proposal: proposal,
  149. RCMessages: roundChangeMessages,
  150. PrepareMessages: prepareMessages,
  151. }
  152. c.sendPreprepareMsg(r)
  153. } else {
  154. logger.Debug("QBFT: accepted ROUND-CHANGE messages")
  155. }
  156. return nil
  157. }
  158. // highestPrepared returns the highest Prepared Round and the corresponding Prepared Block
  159. func (c *core) highestPrepared(round *big.Int) (*big.Int, istanbul.Proposal) {
  160. return c.roundChangeSet.highestPreparedRound[round.Uint64()], c.roundChangeSet.highestPreparedBlock[round.Uint64()]
  161. }
  162. // ----------------------------------------------------------------------------
  163. func newRoundChangeSet(valSet istanbul.ValidatorSet) *roundChangeSet {
  164. return &roundChangeSet{
  165. validatorSet: valSet,
  166. roundChanges: make(map[uint64]*qbftMsgSet),
  167. prepareMessages: make(map[uint64][]*qbfttypes.Prepare),
  168. highestPreparedRound: make(map[uint64]*big.Int),
  169. highestPreparedBlock: make(map[uint64]istanbul.Proposal),
  170. mu: new(sync.Mutex),
  171. }
  172. }
  173. type roundChangeSet struct {
  174. validatorSet istanbul.ValidatorSet
  175. roundChanges map[uint64]*qbftMsgSet
  176. prepareMessages map[uint64][]*qbfttypes.Prepare
  177. highestPreparedRound map[uint64]*big.Int
  178. highestPreparedBlock map[uint64]istanbul.Proposal
  179. mu *sync.Mutex
  180. }
  181. func (rcs *roundChangeSet) NewRound(r *big.Int) {
  182. rcs.mu.Lock()
  183. defer rcs.mu.Unlock()
  184. round := r.Uint64()
  185. if rcs.roundChanges[round] == nil {
  186. rcs.roundChanges[round] = newQBFTMsgSet(rcs.validatorSet)
  187. }
  188. if rcs.prepareMessages[round] == nil {
  189. rcs.prepareMessages[round] = make([]*qbfttypes.Prepare, 0)
  190. }
  191. }
  192. // Add adds the round and message into round change set
  193. func (rcs *roundChangeSet) Add(r *big.Int, msg qbfttypes.QBFTMessage, preparedRound *big.Int, preparedBlock istanbul.Proposal, prepareMessages []*qbfttypes.Prepare, quorumSize int) error {
  194. rcs.mu.Lock()
  195. defer rcs.mu.Unlock()
  196. round := r.Uint64()
  197. if rcs.roundChanges[round] == nil {
  198. rcs.roundChanges[round] = newQBFTMsgSet(rcs.validatorSet)
  199. }
  200. if err := rcs.roundChanges[round].Add(msg); err != nil {
  201. return err
  202. }
  203. if preparedRound != nil && (rcs.highestPreparedRound[round] == nil || preparedRound.Cmp(rcs.highestPreparedRound[round]) > 0) {
  204. roundChange := msg.(*qbfttypes.RoundChange)
  205. if hasMatchingRoundChangeAndPrepares(roundChange, prepareMessages, quorumSize) == nil {
  206. rcs.highestPreparedRound[round] = preparedRound
  207. rcs.highestPreparedBlock[round] = preparedBlock
  208. rcs.prepareMessages[round] = prepareMessages
  209. }
  210. }
  211. return nil
  212. }
  213. // higherRoundMessages returns the count of validators we received a ROUND-CHANGE message from
  214. // for any round greater than the given round
  215. func (rcs *roundChangeSet) higherRoundMessages(round *big.Int) int {
  216. rcs.mu.Lock()
  217. defer rcs.mu.Unlock()
  218. addresses := make(map[common.Address]struct{})
  219. for k, rms := range rcs.roundChanges {
  220. if k > round.Uint64() {
  221. for addr := range rms.messages {
  222. addresses[addr] = struct{}{}
  223. }
  224. }
  225. }
  226. return len(addresses)
  227. }
  228. // getRCMessagesForGivenRound return the count ROUND-CHANGE messages
  229. // received for a given round
  230. func (rcs *roundChangeSet) getRCMessagesForGivenRound(round *big.Int) int {
  231. rcs.mu.Lock()
  232. defer rcs.mu.Unlock()
  233. if rms := rcs.roundChanges[round.Uint64()]; rms != nil {
  234. return len(rms.messages)
  235. }
  236. return 0
  237. }
  238. // getMinRoundChange returns the minimum round greater than the given round
  239. func (rcs *roundChangeSet) getMinRoundChange(round *big.Int) *big.Int {
  240. rcs.mu.Lock()
  241. defer rcs.mu.Unlock()
  242. var keys []int
  243. for k := range rcs.roundChanges {
  244. if k > round.Uint64() {
  245. keys = append(keys, int(k))
  246. }
  247. }
  248. sort.Ints(keys)
  249. if len(keys) == 0 {
  250. return round
  251. }
  252. return big.NewInt(int64(keys[0]))
  253. }
  254. // ClearLowerThan deletes the messages for round earlier than the given round
  255. func (rcs *roundChangeSet) ClearLowerThan(round *big.Int) {
  256. rcs.mu.Lock()
  257. defer rcs.mu.Unlock()
  258. for k, rms := range rcs.roundChanges {
  259. if len(rms.Values()) == 0 || k < round.Uint64() {
  260. delete(rcs.roundChanges, k)
  261. delete(rcs.highestPreparedRound, k)
  262. delete(rcs.highestPreparedBlock, k)
  263. delete(rcs.prepareMessages, k)
  264. }
  265. }
  266. }
  267. // MaxRound returns the max round which the number of messages is equal or larger than num
  268. func (rcs *roundChangeSet) MaxRound(num int) *big.Int {
  269. rcs.mu.Lock()
  270. defer rcs.mu.Unlock()
  271. var maxRound *big.Int
  272. for k, rms := range rcs.roundChanges {
  273. if rms.Size() < num {
  274. continue
  275. }
  276. r := big.NewInt(int64(k))
  277. if maxRound == nil || maxRound.Cmp(r) < 0 {
  278. maxRound = r
  279. }
  280. }
  281. return maxRound
  282. }