handler.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "fmt"
  19. "math/big"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/consensus/istanbul"
  22. qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. )
  26. // Start implements core.Engine.Start
  27. func (c *core) Start() error {
  28. c.logger.Info("QBFT: start")
  29. // Tests will handle events itself, so we have to make subscribeEvents()
  30. // be able to call in test.
  31. c.subscribeEvents()
  32. c.handlerWg.Add(1)
  33. go c.handleEvents()
  34. // Start a new round from last sequence + 1
  35. c.startNewRound(common.Big0)
  36. return nil
  37. }
  38. // Stop implements core.Engine.Stop
  39. func (c *core) Stop() error {
  40. c.logger.Info("QBFT: stopping...")
  41. c.stopTimer()
  42. c.unsubscribeEvents()
  43. // Make sure the handler goroutine exits
  44. c.handlerWg.Wait()
  45. c.logger.Info("QBFT: stopped")
  46. return nil
  47. }
  48. // ----------------------------------------------------------------------------
  49. // Subscribe both internal and external events
  50. func (c *core) subscribeEvents() {
  51. c.events = c.backend.EventMux().Subscribe(
  52. // external events
  53. istanbul.RequestEvent{},
  54. istanbul.MessageEvent{},
  55. // internal events
  56. backlogEvent{},
  57. )
  58. c.timeoutSub = c.backend.EventMux().Subscribe(
  59. timeoutEvent{},
  60. )
  61. c.finalCommittedSub = c.backend.EventMux().Subscribe(
  62. istanbul.FinalCommittedEvent{},
  63. )
  64. }
  65. // Unsubscribe all events
  66. func (c *core) unsubscribeEvents() {
  67. c.events.Unsubscribe()
  68. c.timeoutSub.Unsubscribe()
  69. c.finalCommittedSub.Unsubscribe()
  70. }
  71. // handleEvents starts main qbft handler loop that processes all incoming messages
  72. // sequentially. Each time a message is processed, internal QBFT state is mutated
  73. // when processing a message it makes sure that the message matches the current state
  74. // - in case the message is past, either for an older round or a state that already got acknowledge (e.g. a PREPARE message but we
  75. // are already in Prepared state), then message is discarded
  76. // - in case the message is future, either for a future round or a state yet to be reached (e.g. a COMMIT message but we are
  77. // in PrePrepared state), then message is added to backlog for future processing
  78. // - if correct time, message is handled
  79. // Each time a message is successfully handled it is gossiped to other validators
  80. func (c *core) handleEvents() {
  81. // Clear state
  82. defer func() {
  83. c.current = nil
  84. c.handlerWg.Done()
  85. }()
  86. for {
  87. select {
  88. case event, ok := <-c.events.Chan():
  89. if !ok {
  90. return
  91. }
  92. // A real event arrived, process interesting content
  93. switch ev := event.Data.(type) {
  94. case istanbul.RequestEvent:
  95. // we are block proposer and look to get our block proposal validated by other validators
  96. r := &Request{
  97. Proposal: ev.Proposal,
  98. }
  99. err := c.handleRequest(r)
  100. if err == errFutureMessage {
  101. // store request for later treatment
  102. c.storeRequestMsg(r)
  103. }
  104. case istanbul.MessageEvent:
  105. // we received a message from another validator
  106. if err := c.handleEncodedMsg(ev.Code, ev.Payload); err != nil {
  107. continue
  108. }
  109. // if successfully processed, we gossip message to other validators
  110. c.backend.Gossip(c.valSet, ev.Code, ev.Payload)
  111. case backlogEvent:
  112. // we process again a future message that was backlogged
  113. // no need to check signature as it was already node when we first received message
  114. if err := c.handleDecodedMessage(ev.msg); err != nil {
  115. continue
  116. }
  117. data, err := rlp.EncodeToBytes(ev.msg)
  118. if err != nil {
  119. c.logger.Error("QBFT: can not encode backlog message", "err", err)
  120. continue
  121. }
  122. // if successfully processed, we gossip message to other validators
  123. c.backend.Gossip(c.valSet, ev.msg.Code(), data)
  124. }
  125. case _, ok := <-c.timeoutSub.Chan():
  126. // we received a round change timeout
  127. if !ok {
  128. return
  129. }
  130. c.handleTimeoutMsg()
  131. case event, ok := <-c.finalCommittedSub.Chan():
  132. // our block proposal got committed
  133. if !ok {
  134. return
  135. }
  136. switch event.Data.(type) {
  137. case istanbul.FinalCommittedEvent:
  138. c.handleFinalCommitted()
  139. }
  140. }
  141. }
  142. }
  143. // sendEvent sends events to mux
  144. func (c *core) sendEvent(ev interface{}) {
  145. c.backend.EventMux().Post(ev)
  146. }
  147. func (c *core) handleEncodedMsg(code uint64, data []byte) error {
  148. logger := c.logger.New("code", code, "data", data)
  149. if _, ok := qbfttypes.MessageCodes()[code]; !ok {
  150. logger.Error("QBFT: invalid message event code")
  151. return fmt.Errorf("invalid message event code %v", code)
  152. }
  153. // Decode data into a QBFTMessage
  154. m, err := qbfttypes.Decode(code, data)
  155. if err != nil {
  156. logger.Error("QBFT: invalid message", "err", err)
  157. return err
  158. }
  159. // Verify signatures and set source address
  160. if err = c.verifySignatures(m); err != nil {
  161. return err
  162. }
  163. return c.handleDecodedMessage(m)
  164. }
  165. func (c *core) handleDecodedMessage(m qbfttypes.QBFTMessage) error {
  166. view := m.View()
  167. if err := c.checkMessage(m.Code(), &view); err != nil {
  168. // Store in the backlog it it's a future message
  169. if err == errFutureMessage {
  170. c.addToBacklog(m)
  171. }
  172. return err
  173. }
  174. return c.deliverMessage(m)
  175. }
  176. // Deliver to specific message handler
  177. func (c *core) deliverMessage(m qbfttypes.QBFTMessage) error {
  178. var err error
  179. switch m.Code() {
  180. case qbfttypes.PreprepareCode:
  181. err = c.handlePreprepareMsg(m.(*qbfttypes.Preprepare))
  182. case qbfttypes.PrepareCode:
  183. err = c.handlePrepare(m.(*qbfttypes.Prepare))
  184. case qbfttypes.CommitCode:
  185. err = c.handleCommitMsg(m.(*qbfttypes.Commit))
  186. case qbfttypes.RoundChangeCode:
  187. err = c.handleRoundChange(m.(*qbfttypes.RoundChange))
  188. default:
  189. c.logger.Error("QBFT: invalid message code", "code", m.Code())
  190. return errInvalidMessage
  191. }
  192. return err
  193. }
  194. func (c *core) handleTimeoutMsg() {
  195. logger := c.currentLogger(true, nil)
  196. // Start the new round
  197. round := c.current.Round()
  198. nextRound := new(big.Int).Add(round, common.Big1)
  199. logger.Warn("QBFT: TIMER CHANGING ROUND", "pr", c.current.preparedRound)
  200. c.startNewRound(nextRound)
  201. logger.Warn("QBFT: TIMER CHANGED ROUND", "pr", c.current.preparedRound)
  202. // Send Round Change
  203. c.broadcastRoundChange(nextRound)
  204. }
  205. // Verifies the signature of the message m and of any justification payloads
  206. // piggybacked in m, if any. It also sets the source address on the messages
  207. // and justification payloads.
  208. func (c *core) verifySignatures(m qbfttypes.QBFTMessage) error {
  209. logger := c.currentLogger(true, m)
  210. // Anonymous function to verify the signature of a single message or payload
  211. verify := func(m qbfttypes.QBFTMessage) error {
  212. payload, err := m.EncodePayloadForSigning()
  213. if err != nil {
  214. logger.Error("QBFT: invalid message payload", "err", err)
  215. return err
  216. }
  217. source, err := c.validateFn(payload, m.Signature())
  218. if err != nil {
  219. logger.Error("QBFT: invalid message signature", "err", err)
  220. return errInvalidSigner
  221. }
  222. m.SetSource(source)
  223. return nil
  224. }
  225. // Verifies the signature of the message
  226. if err := verify(m); err != nil {
  227. return err
  228. }
  229. // Verifies the signature of piggybacked justification payloads.
  230. switch msgType := m.(type) {
  231. case *qbfttypes.RoundChange:
  232. signedPreparePayloads := msgType.Justification
  233. for _, p := range signedPreparePayloads {
  234. if err := verify(p); err != nil {
  235. return err
  236. }
  237. }
  238. case *qbfttypes.Preprepare:
  239. signedRoundChangePayloads := msgType.JustificationRoundChanges
  240. for _, p := range signedRoundChangePayloads {
  241. if err := verify(p); err != nil {
  242. return err
  243. }
  244. }
  245. }
  246. return nil
  247. }
  248. func (c *core) currentLogger(state bool, msg qbfttypes.QBFTMessage) log.Logger {
  249. logCtx := []interface{}{}
  250. if c.current != nil {
  251. logCtx = append(logCtx,
  252. "current.round", c.current.Round().Uint64(),
  253. "current.sequence", c.current.Sequence().Uint64(),
  254. )
  255. }
  256. if state {
  257. logCtx = append(logCtx, "state", c.state)
  258. }
  259. if msg != nil {
  260. logCtx = append(
  261. logCtx,
  262. "msg.code", msg.Code(),
  263. "msg.source", msg.Source().String(),
  264. "msg.round", msg.View().Round.Uint64(),
  265. "msg.sequence", msg.View().Sequence.Uint64(),
  266. )
  267. }
  268. return c.logger.New(logCtx...)
  269. }
  270. func (c *core) withState(logger log.Logger) log.Logger {
  271. return logger.New("state", c.state)
  272. }
  273. func withMsg(logger log.Logger, msg qbfttypes.QBFTMessage) log.Logger {
  274. return logger.New(
  275. "msg.code", msg.Code(),
  276. "msg.source", msg.Source().String(),
  277. "msg.round", msg.View().Round.Uint64(),
  278. "msg.sequence", msg.View().Sequence.Uint64(),
  279. )
  280. }