123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- // Copyright 2017 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package backend
- import (
- "bytes"
- "errors"
- "io/ioutil"
- "math/big"
- "reflect"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/consensus/istanbul"
- qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/p2p"
- lru "github.com/hashicorp/golang-lru"
- )
- const (
- NewBlockMsg = 0x07
- istanbulMsg = 0x11
- )
- var (
- // errDecodeFailed is returned when decode message fails
- errDecodeFailed = errors.New("fail to decode istanbul message")
- // errPayloadReadFailed is returned when qbft message read fails
- errPayloadReadFailed = errors.New("unable to read payload from message")
- )
- // Protocol implements consensus.Engine.Protocol
- func (sb *Backend) Protocol() consensus.Protocol {
- return consensus.IstanbulProtocol
- }
- func (sb *Backend) decode(msg p2p.Msg) ([]byte, common.Hash, error) {
- var data []byte
- if sb.IsQBFTConsensus() {
- data = make([]byte, msg.Size)
- if _, err := msg.Payload.Read(data); err != nil {
- return nil, common.Hash{}, errPayloadReadFailed
- }
- } else {
- if err := msg.Decode(&data); err != nil {
- return nil, common.Hash{}, errDecodeFailed
- }
- }
- return data, istanbul.RLPHash(data), nil
- }
- // HandleMsg implements consensus.Handler.HandleMsg
- func (sb *Backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
- sb.coreMu.Lock()
- defer sb.coreMu.Unlock()
- if _, ok := qbfttypes.MessageCodes()[msg.Code]; ok || msg.Code == istanbulMsg {
- if !sb.coreStarted {
- return true, istanbul.ErrStoppedEngine
- }
- data, hash, err := sb.decode(msg)
- if err != nil {
- return true, errDecodeFailed
- }
- // Mark peer's message
- ms, ok := sb.recentMessages.Get(addr)
- var m *lru.ARCCache
- if ok {
- m, _ = ms.(*lru.ARCCache)
- } else {
- m, _ = lru.NewARC(inmemoryMessages)
- sb.recentMessages.Add(addr, m)
- }
- m.Add(hash, true)
- // Mark self known message
- if _, ok := sb.knownMessages.Get(hash); ok {
- return true, nil
- }
- sb.knownMessages.Add(hash, true)
- go sb.istanbulEventMux.Post(istanbul.MessageEvent{
- Code: msg.Code,
- Payload: data,
- })
- return true, nil
- }
- //https://github.com/ConsenSys/quorum/pull/539
- //https://github.com/ConsenSys/quorum/issues/389
- if msg.Code == NewBlockMsg && sb.core != nil && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle
- // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing
- // as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it
- sb.logger.Debug("BFT: received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr)
- if reader, ok := msg.Payload.(*bytes.Reader); ok {
- payload, err := ioutil.ReadAll(reader)
- if err != nil {
- return true, err
- }
- reader.Reset(payload) // ready to be decoded
- defer reader.Reset(payload) // restore so main eth/handler can decode
- var request struct { // this has to be same as eth/protocol.go#newBlockData as we are reading NewBlockMsg
- Block *types.Block
- TD *big.Int
- }
- if err := msg.Decode(&request); err != nil {
- sb.logger.Error("BFT: unable to decode the NewBlockMsg", "error", err)
- return false, nil
- }
- newRequestedBlock := request.Block
- if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && sb.core.IsCurrentProposal(newRequestedBlock.Hash()) {
- sb.logger.Debug("BFT: block already proposed", "hash", newRequestedBlock.Hash(), "sender", addr)
- return true, nil
- }
- }
- }
- return false, nil
- }
- // SetBroadcaster implements consensus.Handler.SetBroadcaster
- func (sb *Backend) SetBroadcaster(broadcaster consensus.Broadcaster) {
- sb.broadcaster = broadcaster
- }
- func (sb *Backend) NewChainHead() error {
- sb.coreMu.RLock()
- defer sb.coreMu.RUnlock()
- if !sb.coreStarted {
- return istanbul.ErrStoppedEngine
- }
- go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{})
- return nil
- }
|