handler.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package backend
  17. import (
  18. "bytes"
  19. "errors"
  20. "io/ioutil"
  21. "math/big"
  22. "reflect"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/consensus/istanbul"
  26. qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/p2p"
  29. lru "github.com/hashicorp/golang-lru"
  30. )
  31. const (
  32. NewBlockMsg = 0x07
  33. istanbulMsg = 0x11
  34. )
  35. var (
  36. // errDecodeFailed is returned when decode message fails
  37. errDecodeFailed = errors.New("fail to decode istanbul message")
  38. // errPayloadReadFailed is returned when qbft message read fails
  39. errPayloadReadFailed = errors.New("unable to read payload from message")
  40. )
  41. // Protocol implements consensus.Engine.Protocol
  42. func (sb *Backend) Protocol() consensus.Protocol {
  43. return consensus.IstanbulProtocol
  44. }
  45. func (sb *Backend) decode(msg p2p.Msg) ([]byte, common.Hash, error) {
  46. var data []byte
  47. if sb.IsQBFTConsensus() {
  48. data = make([]byte, msg.Size)
  49. if _, err := msg.Payload.Read(data); err != nil {
  50. return nil, common.Hash{}, errPayloadReadFailed
  51. }
  52. } else {
  53. if err := msg.Decode(&data); err != nil {
  54. return nil, common.Hash{}, errDecodeFailed
  55. }
  56. }
  57. return data, istanbul.RLPHash(data), nil
  58. }
  59. // HandleMsg implements consensus.Handler.HandleMsg
  60. func (sb *Backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
  61. sb.coreMu.Lock()
  62. defer sb.coreMu.Unlock()
  63. if _, ok := qbfttypes.MessageCodes()[msg.Code]; ok || msg.Code == istanbulMsg {
  64. if !sb.coreStarted {
  65. return true, istanbul.ErrStoppedEngine
  66. }
  67. data, hash, err := sb.decode(msg)
  68. if err != nil {
  69. return true, errDecodeFailed
  70. }
  71. // Mark peer's message
  72. ms, ok := sb.recentMessages.Get(addr)
  73. var m *lru.ARCCache
  74. if ok {
  75. m, _ = ms.(*lru.ARCCache)
  76. } else {
  77. m, _ = lru.NewARC(inmemoryMessages)
  78. sb.recentMessages.Add(addr, m)
  79. }
  80. m.Add(hash, true)
  81. // Mark self known message
  82. if _, ok := sb.knownMessages.Get(hash); ok {
  83. return true, nil
  84. }
  85. sb.knownMessages.Add(hash, true)
  86. go sb.istanbulEventMux.Post(istanbul.MessageEvent{
  87. Code: msg.Code,
  88. Payload: data,
  89. })
  90. return true, nil
  91. }
  92. //https://github.com/ConsenSys/quorum/pull/539
  93. //https://github.com/ConsenSys/quorum/issues/389
  94. if msg.Code == NewBlockMsg && sb.core != nil && sb.core.IsProposer() { // eth.NewBlockMsg: import cycle
  95. // this case is to safeguard the race of similar block which gets propagated from other node while this node is proposing
  96. // as p2p.Msg can only be decoded once (get EOF for any subsequence read), we need to make sure the payload is restored after we decode it
  97. sb.logger.Debug("BFT: received NewBlockMsg", "size", msg.Size, "payload.type", reflect.TypeOf(msg.Payload), "sender", addr)
  98. if reader, ok := msg.Payload.(*bytes.Reader); ok {
  99. payload, err := ioutil.ReadAll(reader)
  100. if err != nil {
  101. return true, err
  102. }
  103. reader.Reset(payload) // ready to be decoded
  104. defer reader.Reset(payload) // restore so main eth/handler can decode
  105. var request struct { // this has to be same as eth/protocol.go#newBlockData as we are reading NewBlockMsg
  106. Block *types.Block
  107. TD *big.Int
  108. }
  109. if err := msg.Decode(&request); err != nil {
  110. sb.logger.Error("BFT: unable to decode the NewBlockMsg", "error", err)
  111. return false, nil
  112. }
  113. newRequestedBlock := request.Block
  114. if newRequestedBlock.Header().MixDigest == types.IstanbulDigest && sb.core.IsCurrentProposal(newRequestedBlock.Hash()) {
  115. sb.logger.Debug("BFT: block already proposed", "hash", newRequestedBlock.Hash(), "sender", addr)
  116. return true, nil
  117. }
  118. }
  119. }
  120. return false, nil
  121. }
  122. // SetBroadcaster implements consensus.Handler.SetBroadcaster
  123. func (sb *Backend) SetBroadcaster(broadcaster consensus.Broadcaster) {
  124. sb.broadcaster = broadcaster
  125. }
  126. func (sb *Backend) NewChainHead() error {
  127. sb.coreMu.RLock()
  128. defer sb.coreMu.RUnlock()
  129. if !sb.coreStarted {
  130. return istanbul.ErrStoppedEngine
  131. }
  132. go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{})
  133. return nil
  134. }