123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- // Copyright 2017 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package core
- import (
- "bytes"
- "math"
- "math/big"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus/istanbul"
- ibfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/ibft/types"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/log"
- metrics "github.com/ethereum/go-ethereum/metrics"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
- )
- var (
- roundMeter = metrics.NewRegisteredMeter("consensus/istanbul/core/round", nil)
- sequenceMeter = metrics.NewRegisteredMeter("consensus/istanbul/core/sequence", nil)
- consensusTimer = metrics.NewRegisteredTimer("consensus/istanbul/core/consensus", nil)
- )
- // New creates an Istanbul consensus core
- func New(backend istanbul.Backend, config *istanbul.Config) *core {
- c := &core{
- config: config,
- address: backend.Address(),
- state: ibfttypes.StateAcceptRequest,
- handlerWg: new(sync.WaitGroup),
- logger: log.New("address", backend.Address()),
- backend: backend,
- backlogs: make(map[common.Address]*prque.Prque),
- backlogsMu: new(sync.Mutex),
- pendingRequests: prque.New(),
- pendingRequestsMu: new(sync.Mutex),
- consensusTimestamp: time.Time{},
- }
- c.validateFn = c.checkValidatorSignature
- return c
- }
- // ----------------------------------------------------------------------------
- type core struct {
- config *istanbul.Config
- address common.Address
- state ibfttypes.State
- logger log.Logger
- backend istanbul.Backend
- events *event.TypeMuxSubscription
- finalCommittedSub *event.TypeMuxSubscription
- timeoutSub *event.TypeMuxSubscription
- futurePreprepareTimer *time.Timer
- valSet istanbul.ValidatorSet
- waitingForRoundChange bool
- validateFn func([]byte, []byte) (common.Address, error)
- backlogs map[common.Address]*prque.Prque
- backlogsMu *sync.Mutex
- current *roundState
- handlerWg *sync.WaitGroup
- roundChangeSet *roundChangeSet
- roundChangeTimer *time.Timer
- pendingRequests *prque.Prque
- pendingRequestsMu *sync.Mutex
- consensusTimestamp time.Time
- }
- func (c *core) finalizeMessage(msg *ibfttypes.Message) ([]byte, error) {
- var err error
- // Add sender address
- msg.Address = c.Address()
- // Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
- if msg.Code == ibfttypes.MsgCommit && c.current.Proposal() != nil {
- msg.CommittedSeal = []byte{}
- seal := PrepareCommittedSeal(c.current.Proposal().Hash())
- // Add proof of consensus
- msg.CommittedSeal, err = c.backend.Sign(seal)
- if err != nil {
- return nil, err
- }
- }
- // Sign message
- data, err := msg.PayloadNoSig()
- if err != nil {
- return nil, err
- }
- msg.Signature, err = c.backend.Sign(data)
- if err != nil {
- return nil, err
- }
- // Convert to payload
- payload, err := msg.Payload()
- if err != nil {
- return nil, err
- }
- return payload, nil
- }
- func (c *core) broadcast(msg *ibfttypes.Message) {
- logger := c.logger.New("state", c.state)
- payload, err := c.finalizeMessage(msg)
- if err != nil {
- logger.Error("Failed to finalize message", "msg", msg, "err", err)
- return
- }
- // Broadcast payload
- if err = c.backend.Broadcast(c.valSet, msg.Code, payload); err != nil {
- logger.Error("Failed to broadcast message", "msg", msg, "err", err)
- return
- }
- }
- func (c *core) currentView() *istanbul.View {
- return &istanbul.View{
- Sequence: new(big.Int).Set(c.current.Sequence()),
- Round: new(big.Int).Set(c.current.Round()),
- }
- }
- func (c *core) IsProposer() bool {
- v := c.valSet
- if v == nil {
- return false
- }
- return v.IsProposer(c.backend.Address())
- }
- func (c *core) IsCurrentProposal(blockHash common.Hash) bool {
- return c.current != nil && c.current.pendingRequest != nil && c.current.pendingRequest.Proposal.Hash() == blockHash
- }
- func (c *core) commit() {
- c.setState(ibfttypes.StateCommitted)
- proposal := c.current.Proposal()
- if proposal != nil {
- committedSeals := make([][]byte, c.current.Commits.Size())
- for i, v := range c.current.Commits.Values() {
- committedSeals[i] = make([]byte, types.IstanbulExtraSeal)
- copy(committedSeals[i][:], v.CommittedSeal[:])
- }
- if err := c.backend.Commit(proposal, committedSeals, big.NewInt(-1)); err != nil {
- c.current.UnlockHash() //Unlock block when insertion fails
- c.sendNextRoundChange()
- return
- }
- }
- }
- // startNewRound starts a new round. if round equals to 0, it means to starts a new sequence
- func (c *core) startNewRound(round *big.Int) {
- var logger log.Logger
- if c.current == nil {
- logger = c.logger.New("old_round", -1, "old_seq", 0)
- } else {
- logger = c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence())
- }
- logger.Trace("Start new ibft round")
- roundChange := false
- // Try to get last proposal
- lastProposal, lastProposer := c.backend.LastProposal()
- if c.current == nil {
- logger.Trace("Start to the initial round")
- } else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
- diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
- sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
- if !c.consensusTimestamp.IsZero() {
- consensusTimer.UpdateSince(c.consensusTimestamp)
- c.consensusTimestamp = time.Time{}
- }
- logger.Trace("Catch up latest proposal", "number", lastProposal.Number().Uint64(), "hash", lastProposal.Hash())
- } else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
- if round.Cmp(common.Big0) == 0 {
- // same seq and round, don't need to start new round
- return
- } else if round.Cmp(c.current.Round()) < 0 {
- logger.Warn("New round should not be smaller than current round", "seq", lastProposal.Number().Int64(), "new_round", round, "old_round", c.current.Round())
- return
- }
- roundChange = true
- } else {
- logger.Warn("New sequence should be larger than current sequence", "new_seq", lastProposal.Number().Int64())
- return
- }
- var newView *istanbul.View
- if roundChange {
- newView = &istanbul.View{
- Sequence: new(big.Int).Set(c.current.Sequence()),
- Round: new(big.Int).Set(round),
- }
- } else {
- newView = &istanbul.View{
- Sequence: new(big.Int).Add(lastProposal.Number(), common.Big1),
- Round: new(big.Int),
- }
- c.valSet = c.backend.Validators(lastProposal)
- }
- // If new round is 0, then check if qbftConsensus needs to be enabled
- if round.Uint64() == 0 && c.backend.IsQBFTConsensusAt(newView.Sequence) {
- logger.Trace("Starting qbft consensus as qbftBlock has passed")
- if err := c.backend.StartQBFTConsensus(); err != nil {
- // If err is returned, then QBFT consensus is started for the next block
- logger.Error("Unable to start QBFT Consensus, retrying for the next block", "error", err)
- }
- return
- }
- // Update logger
- logger = logger.New("old_proposer", c.valSet.GetProposer())
- // Clear invalid ROUND CHANGE messages
- c.roundChangeSet = newRoundChangeSet(c.valSet)
- // New snapshot for new round
- c.updateRoundState(newView, c.valSet, roundChange)
- // Calculate new proposer
- c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
- c.waitingForRoundChange = false
- c.setState(ibfttypes.StateAcceptRequest)
- if roundChange && c.IsProposer() && c.current != nil {
- // If it is locked, propose the old proposal
- // If we have pending request, propose pending request
- if c.current.IsHashLocked() {
- r := &istanbul.Request{
- Proposal: c.current.Proposal(), //c.current.Proposal would be the locked proposal by previous proposer, see updateRoundState
- }
- c.sendPreprepare(r)
- } else if c.current.pendingRequest != nil {
- c.sendPreprepare(c.current.pendingRequest)
- }
- }
- c.newRoundChangeTimer()
- logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.valSet.GetProposer(), "valSet", c.valSet.List(), "size", c.valSet.Size(), "IsProposer", c.IsProposer())
- }
- func (c *core) catchUpRound(view *istanbul.View) {
- logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
- if view.Round.Cmp(c.current.Round()) > 0 {
- roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
- }
- c.waitingForRoundChange = true
- // Need to keep block locked for round catching up
- c.updateRoundState(view, c.valSet, true)
- c.roundChangeSet.Clear(view.Round)
- c.newRoundChangeTimer()
- logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
- }
- // updateRoundState updates round state by checking if locking block is necessary
- func (c *core) updateRoundState(view *istanbul.View, validatorSet istanbul.ValidatorSet, roundChange bool) {
- // Lock only if both roundChange is true and it is locked
- if roundChange && c.current != nil {
- if c.current.IsHashLocked() {
- c.current = newRoundState(view, validatorSet, c.current.GetLockedHash(), c.current.Preprepare, c.current.pendingRequest, c.backend.HasBadProposal)
- } else {
- c.current = newRoundState(view, validatorSet, common.Hash{}, nil, c.current.pendingRequest, c.backend.HasBadProposal)
- }
- } else {
- c.current = newRoundState(view, validatorSet, common.Hash{}, nil, nil, c.backend.HasBadProposal)
- }
- }
- func (c *core) setState(state ibfttypes.State) {
- if c.state != state {
- c.state = state
- }
- if state == ibfttypes.StateAcceptRequest {
- c.processPendingRequests()
- }
- c.processBacklog()
- }
- func (c *core) Address() common.Address {
- return c.address
- }
- func (c *core) stopFuturePreprepareTimer() {
- if c.futurePreprepareTimer != nil {
- c.futurePreprepareTimer.Stop()
- }
- }
- func (c *core) stopTimer() {
- c.stopFuturePreprepareTimer()
- if c.roundChangeTimer != nil {
- c.roundChangeTimer.Stop()
- }
- }
- func (c *core) newRoundChangeTimer() {
- c.stopTimer()
- // set timeout based on the round number
- timeout := time.Duration(c.config.GetConfig(c.current.Sequence()).RequestTimeout) * time.Millisecond
- round := c.current.Round().Uint64()
- if round > 0 {
- timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
- }
- c.roundChangeTimer = time.AfterFunc(timeout, func() {
- c.sendEvent(timeoutEvent{})
- })
- }
- func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
- return istanbul.CheckValidatorSignature(c.valSet, data, sig)
- }
- func (c *core) QuorumSize() int {
- if c.config.Get2FPlus1Enabled(c.current.sequence) || c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
- c.logger.Trace("Confirmation Formula used 2F+ 1")
- return (2 * c.valSet.F()) + 1
- }
- c.logger.Trace("Confirmation Formula used ceil(2N/3)")
- return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
- }
- // PrepareCommittedSeal returns a committed seal for the given hash
- func PrepareCommittedSeal(hash common.Hash) []byte {
- var buf bytes.Buffer
- buf.Write(hash.Bytes())
- buf.Write([]byte{byte(ibfttypes.MsgCommit)})
- return buf.Bytes()
- }
|