eth66_suiteHelpers.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright 2021 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package ethtest
  17. import (
  18. "fmt"
  19. "reflect"
  20. "time"
  21. "github.com/ethereum/go-ethereum/core/types"
  22. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  23. "github.com/ethereum/go-ethereum/internal/utesting"
  24. "github.com/ethereum/go-ethereum/p2p"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. "github.com/stretchr/testify/assert"
  27. )
  28. func (c *Conn) statusExchange66(t *utesting.T, chain *Chain) Message {
  29. status := &Status{
  30. ProtocolVersion: uint32(66),
  31. NetworkID: chain.chainConfig.ChainID.Uint64(),
  32. TD: chain.TD(chain.Len()),
  33. Head: chain.blocks[chain.Len()-1].Hash(),
  34. Genesis: chain.blocks[0].Hash(),
  35. ForkID: chain.ForkID(),
  36. }
  37. return c.statusExchange(t, chain, status)
  38. }
  39. func (s *Suite) dial66(t *utesting.T) *Conn {
  40. conn, err := s.dial()
  41. if err != nil {
  42. t.Fatalf("could not dial: %v", err)
  43. }
  44. conn.caps = append(conn.caps, p2p.Cap{Name: "eth", Version: 66})
  45. conn.ourHighestProtoVersion = 66
  46. return conn
  47. }
  48. func (c *Conn) write66(req eth.Packet, code int) error {
  49. payload, err := rlp.EncodeToBytes(req)
  50. if err != nil {
  51. return err
  52. }
  53. _, err = c.Conn.Write(uint64(code), payload)
  54. return err
  55. }
  56. func (c *Conn) read66() (uint64, Message) {
  57. code, rawData, _, err := c.Conn.Read()
  58. if err != nil {
  59. return 0, errorf("could not read from connection: %v", err)
  60. }
  61. var msg Message
  62. switch int(code) {
  63. case (Hello{}).Code():
  64. msg = new(Hello)
  65. case (Ping{}).Code():
  66. msg = new(Ping)
  67. case (Pong{}).Code():
  68. msg = new(Pong)
  69. case (Disconnect{}).Code():
  70. msg = new(Disconnect)
  71. case (Status{}).Code():
  72. msg = new(Status)
  73. case (GetBlockHeaders{}).Code():
  74. ethMsg := new(eth.GetBlockHeadersPacket66)
  75. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  76. return 0, errorf("could not rlp decode message: %v", err)
  77. }
  78. return ethMsg.RequestId, GetBlockHeaders(*ethMsg.GetBlockHeadersPacket)
  79. case (BlockHeaders{}).Code():
  80. ethMsg := new(eth.BlockHeadersPacket66)
  81. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  82. return 0, errorf("could not rlp decode message: %v", err)
  83. }
  84. return ethMsg.RequestId, BlockHeaders(ethMsg.BlockHeadersPacket)
  85. case (GetBlockBodies{}).Code():
  86. ethMsg := new(eth.GetBlockBodiesPacket66)
  87. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  88. return 0, errorf("could not rlp decode message: %v", err)
  89. }
  90. return ethMsg.RequestId, GetBlockBodies(ethMsg.GetBlockBodiesPacket)
  91. case (BlockBodies{}).Code():
  92. ethMsg := new(eth.BlockBodiesPacket66)
  93. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  94. return 0, errorf("could not rlp decode message: %v", err)
  95. }
  96. return ethMsg.RequestId, BlockBodies(ethMsg.BlockBodiesPacket)
  97. case (NewBlock{}).Code():
  98. msg = new(NewBlock)
  99. case (NewBlockHashes{}).Code():
  100. msg = new(NewBlockHashes)
  101. case (Transactions{}).Code():
  102. msg = new(Transactions)
  103. case (NewPooledTransactionHashes{}).Code():
  104. msg = new(NewPooledTransactionHashes)
  105. case (GetPooledTransactions{}.Code()):
  106. ethMsg := new(eth.GetPooledTransactionsPacket66)
  107. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  108. return 0, errorf("could not rlp decode message: %v", err)
  109. }
  110. return ethMsg.RequestId, GetPooledTransactions(ethMsg.GetPooledTransactionsPacket)
  111. case (PooledTransactions{}.Code()):
  112. ethMsg := new(eth.PooledTransactionsPacket66)
  113. if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
  114. return 0, errorf("could not rlp decode message: %v", err)
  115. }
  116. return ethMsg.RequestId, PooledTransactions(ethMsg.PooledTransactionsPacket)
  117. default:
  118. msg = errorf("invalid message code: %d", code)
  119. }
  120. if msg != nil {
  121. if err := rlp.DecodeBytes(rawData, msg); err != nil {
  122. return 0, errorf("could not rlp decode message: %v", err)
  123. }
  124. return 0, msg
  125. }
  126. return 0, errorf("invalid message: %s", string(rawData))
  127. }
  128. func (c *Conn) waitForResponse(chain *Chain, timeout time.Duration, requestID uint64) Message {
  129. for {
  130. id, msg := c.readAndServe66(chain, timeout)
  131. if id == requestID {
  132. return msg
  133. }
  134. }
  135. }
  136. // ReadAndServe serves GetBlockHeaders requests while waiting
  137. // on another message from the node.
  138. func (c *Conn) readAndServe66(chain *Chain, timeout time.Duration) (uint64, Message) {
  139. start := time.Now()
  140. for time.Since(start) < timeout {
  141. c.SetReadDeadline(time.Now().Add(10 * time.Second))
  142. reqID, msg := c.read66()
  143. switch msg := msg.(type) {
  144. case *Ping:
  145. c.Write(&Pong{})
  146. case *GetBlockHeaders:
  147. headers, err := chain.GetHeaders(*msg)
  148. if err != nil {
  149. return 0, errorf("could not get headers for inbound header request: %v", err)
  150. }
  151. resp := &eth.BlockHeadersPacket66{
  152. RequestId: reqID,
  153. BlockHeadersPacket: eth.BlockHeadersPacket(headers),
  154. }
  155. if err := c.write66(resp, BlockHeaders{}.Code()); err != nil {
  156. return 0, errorf("could not write to connection: %v", err)
  157. }
  158. default:
  159. return reqID, msg
  160. }
  161. }
  162. return 0, errorf("no message received within %v", timeout)
  163. }
  164. func (s *Suite) setupConnection66(t *utesting.T) *Conn {
  165. // create conn
  166. sendConn := s.dial66(t)
  167. sendConn.handshake(t)
  168. sendConn.statusExchange66(t, s.chain)
  169. return sendConn
  170. }
  171. func (s *Suite) testAnnounce66(t *utesting.T, sendConn, receiveConn *Conn, blockAnnouncement *NewBlock) {
  172. // Announce the block.
  173. if err := sendConn.Write(blockAnnouncement); err != nil {
  174. t.Fatalf("could not write to connection: %v", err)
  175. }
  176. s.waitAnnounce66(t, receiveConn, blockAnnouncement)
  177. }
  178. func (s *Suite) waitAnnounce66(t *utesting.T, conn *Conn, blockAnnouncement *NewBlock) {
  179. for {
  180. _, msg := conn.readAndServe66(s.chain, timeout)
  181. switch msg := msg.(type) {
  182. case *NewBlock:
  183. t.Logf("received NewBlock message: %s", pretty.Sdump(msg.Block))
  184. assert.Equal(t,
  185. blockAnnouncement.Block.Header(), msg.Block.Header(),
  186. "wrong block header in announcement",
  187. )
  188. assert.Equal(t,
  189. blockAnnouncement.TD, msg.TD,
  190. "wrong TD in announcement",
  191. )
  192. return
  193. case *NewBlockHashes:
  194. blockHashes := *msg
  195. t.Logf("received NewBlockHashes message: %s", pretty.Sdump(blockHashes))
  196. assert.Equal(t, blockAnnouncement.Block.Hash(), blockHashes[0].Hash,
  197. "wrong block hash in announcement",
  198. )
  199. return
  200. case *NewPooledTransactionHashes:
  201. // ignore old txs being propagated
  202. continue
  203. default:
  204. t.Fatalf("unexpected: %s", pretty.Sdump(msg))
  205. }
  206. }
  207. }
  208. // waitForBlock66 waits for confirmation from the client that it has
  209. // imported the given block.
  210. func (c *Conn) waitForBlock66(block *types.Block) error {
  211. defer c.SetReadDeadline(time.Time{})
  212. c.SetReadDeadline(time.Now().Add(20 * time.Second))
  213. // note: if the node has not yet imported the block, it will respond
  214. // to the GetBlockHeaders request with an empty BlockHeaders response,
  215. // so the GetBlockHeaders request must be sent again until the BlockHeaders
  216. // response contains the desired header.
  217. for {
  218. req := eth.GetBlockHeadersPacket66{
  219. RequestId: 54,
  220. GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
  221. Origin: eth.HashOrNumber{
  222. Hash: block.Hash(),
  223. },
  224. Amount: 1,
  225. },
  226. }
  227. if err := c.write66(req, GetBlockHeaders{}.Code()); err != nil {
  228. return err
  229. }
  230. reqID, msg := c.read66()
  231. // check message
  232. switch msg := msg.(type) {
  233. case BlockHeaders:
  234. // check request ID
  235. if reqID != req.RequestId {
  236. return fmt.Errorf("request ID mismatch: wanted %d, got %d", req.RequestId, reqID)
  237. }
  238. for _, header := range msg {
  239. if header.Number.Uint64() == block.NumberU64() {
  240. return nil
  241. }
  242. }
  243. time.Sleep(100 * time.Millisecond)
  244. case *NewPooledTransactionHashes:
  245. // ignore old announcements
  246. continue
  247. default:
  248. return fmt.Errorf("invalid message: %s", pretty.Sdump(msg))
  249. }
  250. }
  251. }
  252. func sendSuccessfulTx66(t *utesting.T, s *Suite, tx *types.Transaction) {
  253. sendConn := s.setupConnection66(t)
  254. defer sendConn.Close()
  255. sendSuccessfulTxWithConn(t, s, tx, sendConn)
  256. }
  257. // waitForBlockHeadersResponse66 waits for a BlockHeaders message with the given expected request ID
  258. func (s *Suite) waitForBlockHeadersResponse66(conn *Conn, expectedID uint64) (BlockHeaders, error) {
  259. reqID, msg := conn.readAndServe66(s.chain, timeout)
  260. switch msg := msg.(type) {
  261. case BlockHeaders:
  262. if reqID != expectedID {
  263. return nil, fmt.Errorf("request ID mismatch: wanted %d, got %d", expectedID, reqID)
  264. }
  265. return msg, nil
  266. default:
  267. return nil, fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
  268. }
  269. }
  270. func (s *Suite) getBlockHeaders66(conn *Conn, req eth.Packet, expectedID uint64) (BlockHeaders, error) {
  271. if err := conn.write66(req, GetBlockHeaders{}.Code()); err != nil {
  272. return nil, fmt.Errorf("could not write to connection: %v", err)
  273. }
  274. return s.waitForBlockHeadersResponse66(conn, expectedID)
  275. }
  276. func headersMatch(t *utesting.T, chain *Chain, headers BlockHeaders) bool {
  277. mismatched := 0
  278. for _, header := range headers {
  279. num := header.Number.Uint64()
  280. t.Logf("received header (%d): %s", num, pretty.Sdump(header.Hash()))
  281. if !reflect.DeepEqual(chain.blocks[int(num)].Header(), header) {
  282. mismatched += 1
  283. t.Logf("received wrong header: %v", pretty.Sdump(header))
  284. }
  285. }
  286. return mismatched == 0
  287. }
  288. func (s *Suite) sendNextBlock66(t *utesting.T) {
  289. sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t)
  290. defer sendConn.Close()
  291. defer receiveConn.Close()
  292. // create new block announcement
  293. nextBlock := len(s.chain.blocks)
  294. blockAnnouncement := &NewBlock{
  295. Block: s.fullChain.blocks[nextBlock],
  296. TD: s.fullChain.TD(nextBlock + 1),
  297. }
  298. // send announcement and wait for node to request the header
  299. s.testAnnounce66(t, sendConn, receiveConn, blockAnnouncement)
  300. // wait for client to update its chain
  301. if err := receiveConn.waitForBlock66(s.fullChain.blocks[nextBlock]); err != nil {
  302. t.Fatal(err)
  303. }
  304. // update test suite chain
  305. s.chain.blocks = append(s.chain.blocks, s.fullChain.blocks[nextBlock])
  306. }