http_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "context"
  19. "flag"
  20. "fmt"
  21. "math/rand"
  22. "net/http/httptest"
  23. "os"
  24. "reflect"
  25. "sync"
  26. "sync/atomic"
  27. "testing"
  28. "time"
  29. "github.com/ethereum/go-ethereum/event"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/node"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/p2p/enode"
  34. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  35. "github.com/ethereum/go-ethereum/rpc"
  36. "github.com/mattn/go-colorable"
  37. )
  38. func TestMain(m *testing.M) {
  39. loglevel := flag.Int("loglevel", 2, "verbosity of logs")
  40. flag.Parse()
  41. log.PrintOrigins(true)
  42. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  43. os.Exit(m.Run())
  44. }
  45. // testService implements the node.Service interface and provides protocols
  46. // and APIs which are useful for testing nodes in a simulation network
  47. type testService struct {
  48. id enode.ID
  49. // peerCount is incremented once a peer handshake has been performed
  50. peerCount int64
  51. peers map[enode.ID]*testPeer
  52. peersMtx sync.Mutex
  53. // state stores []byte which is used to test creating and loading
  54. // snapshots
  55. state atomic.Value
  56. }
  57. func newTestService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
  58. svc := &testService{
  59. id: ctx.Config.ID,
  60. peers: make(map[enode.ID]*testPeer),
  61. }
  62. svc.state.Store(ctx.Snapshot)
  63. stack.RegisterProtocols(svc.Protocols())
  64. stack.RegisterAPIs(svc.APIs())
  65. return svc, nil
  66. }
  67. type testPeer struct {
  68. testReady chan struct{}
  69. dumReady chan struct{}
  70. }
  71. func (t *testService) peer(id enode.ID) *testPeer {
  72. t.peersMtx.Lock()
  73. defer t.peersMtx.Unlock()
  74. if peer, ok := t.peers[id]; ok {
  75. return peer
  76. }
  77. peer := &testPeer{
  78. testReady: make(chan struct{}),
  79. dumReady: make(chan struct{}),
  80. }
  81. t.peers[id] = peer
  82. return peer
  83. }
  84. func (t *testService) Protocols() []p2p.Protocol {
  85. return []p2p.Protocol{
  86. {
  87. Name: "test",
  88. Version: 1,
  89. Length: 3,
  90. Run: t.RunTest,
  91. },
  92. {
  93. Name: "dum",
  94. Version: 1,
  95. Length: 1,
  96. Run: t.RunDum,
  97. },
  98. {
  99. Name: "prb",
  100. Version: 1,
  101. Length: 1,
  102. Run: t.RunPrb,
  103. },
  104. }
  105. }
  106. func (t *testService) APIs() []rpc.API {
  107. return []rpc.API{{
  108. Namespace: "test",
  109. Version: "1.0",
  110. Service: &TestAPI{
  111. state: &t.state,
  112. peerCount: &t.peerCount,
  113. },
  114. }}
  115. }
  116. func (t *testService) Start() error {
  117. return nil
  118. }
  119. func (t *testService) Stop() error {
  120. return nil
  121. }
  122. // handshake performs a peer handshake by sending and expecting an empty
  123. // message with the given code
  124. func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error {
  125. errc := make(chan error, 2)
  126. go func() { errc <- p2p.Send(rw, code, struct{}{}) }()
  127. go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }()
  128. for i := 0; i < 2; i++ {
  129. if err := <-errc; err != nil {
  130. return err
  131. }
  132. }
  133. return nil
  134. }
  135. func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  136. peer := t.peer(p.ID())
  137. // perform three handshakes with three different message codes,
  138. // used to test message sending and filtering
  139. if err := t.handshake(rw, 2); err != nil {
  140. return err
  141. }
  142. if err := t.handshake(rw, 1); err != nil {
  143. return err
  144. }
  145. if err := t.handshake(rw, 0); err != nil {
  146. return err
  147. }
  148. // close the testReady channel so that other protocols can run
  149. close(peer.testReady)
  150. // track the peer
  151. atomic.AddInt64(&t.peerCount, 1)
  152. defer atomic.AddInt64(&t.peerCount, -1)
  153. // block until the peer is dropped
  154. for {
  155. _, err := rw.ReadMsg()
  156. if err != nil {
  157. return err
  158. }
  159. }
  160. }
  161. func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  162. peer := t.peer(p.ID())
  163. // wait for the test protocol to perform its handshake
  164. <-peer.testReady
  165. // perform a handshake
  166. if err := t.handshake(rw, 0); err != nil {
  167. return err
  168. }
  169. // close the dumReady channel so that other protocols can run
  170. close(peer.dumReady)
  171. // block until the peer is dropped
  172. for {
  173. _, err := rw.ReadMsg()
  174. if err != nil {
  175. return err
  176. }
  177. }
  178. }
  179. func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  180. peer := t.peer(p.ID())
  181. // wait for the dum protocol to perform its handshake
  182. <-peer.dumReady
  183. // perform a handshake
  184. if err := t.handshake(rw, 0); err != nil {
  185. return err
  186. }
  187. // block until the peer is dropped
  188. for {
  189. _, err := rw.ReadMsg()
  190. if err != nil {
  191. return err
  192. }
  193. }
  194. }
  195. func (t *testService) Snapshot() ([]byte, error) {
  196. return t.state.Load().([]byte), nil
  197. }
  198. // TestAPI provides a test API to:
  199. // * get the peer count
  200. // * get and set an arbitrary state byte slice
  201. // * get and increment a counter
  202. // * subscribe to counter increment events
  203. type TestAPI struct {
  204. state *atomic.Value
  205. peerCount *int64
  206. counter int64
  207. feed event.Feed
  208. }
  209. func (t *TestAPI) PeerCount() int64 {
  210. return atomic.LoadInt64(t.peerCount)
  211. }
  212. func (t *TestAPI) Get() int64 {
  213. return atomic.LoadInt64(&t.counter)
  214. }
  215. func (t *TestAPI) Add(delta int64) {
  216. atomic.AddInt64(&t.counter, delta)
  217. t.feed.Send(delta)
  218. }
  219. func (t *TestAPI) GetState() []byte {
  220. return t.state.Load().([]byte)
  221. }
  222. func (t *TestAPI) SetState(state []byte) {
  223. t.state.Store(state)
  224. }
  225. func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
  226. notifier, supported := rpc.NotifierFromContext(ctx)
  227. if !supported {
  228. return nil, rpc.ErrNotificationsUnsupported
  229. }
  230. rpcSub := notifier.CreateSubscription()
  231. go func() {
  232. events := make(chan int64)
  233. sub := t.feed.Subscribe(events)
  234. defer sub.Unsubscribe()
  235. for {
  236. select {
  237. case event := <-events:
  238. notifier.Notify(rpcSub.ID, event)
  239. case <-sub.Err():
  240. return
  241. case <-rpcSub.Err():
  242. return
  243. case <-notifier.Closed():
  244. return
  245. }
  246. }
  247. }()
  248. return rpcSub, nil
  249. }
  250. var testServices = adapters.LifecycleConstructors{
  251. "test": newTestService,
  252. }
  253. func testHTTPServer(t *testing.T) (*Network, *httptest.Server) {
  254. t.Helper()
  255. adapter := adapters.NewSimAdapter(testServices)
  256. network := NewNetwork(adapter, &NetworkConfig{
  257. DefaultService: "test",
  258. })
  259. return network, httptest.NewServer(NewServer(network))
  260. }
  261. // TestHTTPNetwork tests interacting with a simulation network using the HTTP
  262. // API
  263. func TestHTTPNetwork(t *testing.T) {
  264. // start the server
  265. network, s := testHTTPServer(t)
  266. defer s.Close()
  267. // subscribe to events so we can check them later
  268. client := NewClient(s.URL)
  269. events := make(chan *Event, 100)
  270. var opts SubscribeOpts
  271. sub, err := client.SubscribeNetwork(events, opts)
  272. if err != nil {
  273. t.Fatalf("error subscribing to network events: %s", err)
  274. }
  275. defer sub.Unsubscribe()
  276. // check we can retrieve details about the network
  277. gotNetwork, err := client.GetNetwork()
  278. if err != nil {
  279. t.Fatalf("error getting network: %s", err)
  280. }
  281. if gotNetwork.ID != network.ID {
  282. t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID)
  283. }
  284. // start a simulation network
  285. nodeIDs := startTestNetwork(t, client)
  286. // check we got all the events
  287. x := &expectEvents{t, events, sub}
  288. x.expect(
  289. x.nodeEvent(nodeIDs[0], false),
  290. x.nodeEvent(nodeIDs[1], false),
  291. x.nodeEvent(nodeIDs[0], true),
  292. x.nodeEvent(nodeIDs[1], true),
  293. x.connEvent(nodeIDs[0], nodeIDs[1], false),
  294. x.connEvent(nodeIDs[0], nodeIDs[1], true),
  295. )
  296. // reconnect the stream and check we get the current nodes and conns
  297. events = make(chan *Event, 100)
  298. opts.Current = true
  299. sub, err = client.SubscribeNetwork(events, opts)
  300. if err != nil {
  301. t.Fatalf("error subscribing to network events: %s", err)
  302. }
  303. defer sub.Unsubscribe()
  304. x = &expectEvents{t, events, sub}
  305. x.expect(
  306. x.nodeEvent(nodeIDs[0], true),
  307. x.nodeEvent(nodeIDs[1], true),
  308. x.connEvent(nodeIDs[0], nodeIDs[1], true),
  309. )
  310. }
  311. func startTestNetwork(t *testing.T, client *Client) []string {
  312. // create two nodes
  313. nodeCount := 2
  314. nodeIDs := make([]string, nodeCount)
  315. for i := 0; i < nodeCount; i++ {
  316. config := adapters.RandomNodeConfig()
  317. node, err := client.CreateNode(config)
  318. if err != nil {
  319. t.Fatalf("error creating node: %s", err)
  320. }
  321. nodeIDs[i] = node.ID
  322. }
  323. // check both nodes exist
  324. nodes, err := client.GetNodes()
  325. if err != nil {
  326. t.Fatalf("error getting nodes: %s", err)
  327. }
  328. if len(nodes) != nodeCount {
  329. t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes))
  330. }
  331. for i, nodeID := range nodeIDs {
  332. if nodes[i].ID != nodeID {
  333. t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID)
  334. }
  335. node, err := client.GetNode(nodeID)
  336. if err != nil {
  337. t.Fatalf("error getting node %d: %s", i, err)
  338. }
  339. if node.ID != nodeID {
  340. t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID)
  341. }
  342. }
  343. // start both nodes
  344. for _, nodeID := range nodeIDs {
  345. if err := client.StartNode(nodeID); err != nil {
  346. t.Fatalf("error starting node %q: %s", nodeID, err)
  347. }
  348. }
  349. // connect the nodes
  350. for i := 0; i < nodeCount-1; i++ {
  351. peerId := i + 1
  352. if i == nodeCount-1 {
  353. peerId = 0
  354. }
  355. if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil {
  356. t.Fatalf("error connecting nodes: %s", err)
  357. }
  358. }
  359. return nodeIDs
  360. }
  361. type expectEvents struct {
  362. *testing.T
  363. events chan *Event
  364. sub event.Subscription
  365. }
  366. func (t *expectEvents) nodeEvent(id string, up bool) *Event {
  367. config := &adapters.NodeConfig{ID: enode.HexID(id)}
  368. return &Event{Type: EventTypeNode, Node: newNode(nil, config, up)}
  369. }
  370. func (t *expectEvents) connEvent(one, other string, up bool) *Event {
  371. return &Event{
  372. Type: EventTypeConn,
  373. Conn: &Conn{
  374. One: enode.HexID(one),
  375. Other: enode.HexID(other),
  376. Up: up,
  377. },
  378. }
  379. }
  380. func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) {
  381. actual := make(map[MsgFilter]int)
  382. timeout := time.After(10 * time.Second)
  383. loop:
  384. for {
  385. select {
  386. case event := <-t.events:
  387. t.Logf("received %s event: %v", event.Type, event)
  388. if event.Type != EventTypeMsg || event.Msg.Received {
  389. continue loop
  390. }
  391. if event.Msg == nil {
  392. t.Fatal("expected event.Msg to be set")
  393. }
  394. filter := MsgFilter{
  395. Proto: event.Msg.Protocol,
  396. Code: int64(event.Msg.Code),
  397. }
  398. actual[filter]++
  399. if actual[filter] > expected[filter] {
  400. t.Fatalf("received too many msgs for filter: %v", filter)
  401. }
  402. if reflect.DeepEqual(actual, expected) {
  403. return
  404. }
  405. case err := <-t.sub.Err():
  406. t.Fatalf("network stream closed unexpectedly: %s", err)
  407. case <-timeout:
  408. t.Fatal("timed out waiting for expected events")
  409. }
  410. }
  411. }
  412. func (t *expectEvents) expect(events ...*Event) {
  413. t.Helper()
  414. timeout := time.After(10 * time.Second)
  415. i := 0
  416. for {
  417. select {
  418. case event := <-t.events:
  419. t.Logf("received %s event: %v", event.Type, event)
  420. expected := events[i]
  421. if event.Type != expected.Type {
  422. t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type)
  423. }
  424. switch expected.Type {
  425. case EventTypeNode:
  426. if event.Node == nil {
  427. t.Fatal("expected event.Node to be set")
  428. }
  429. if event.Node.ID() != expected.Node.ID() {
  430. t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
  431. }
  432. if event.Node.Up() != expected.Node.Up() {
  433. t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up())
  434. }
  435. case EventTypeConn:
  436. if event.Conn == nil {
  437. t.Fatal("expected event.Conn to be set")
  438. }
  439. if event.Conn.One != expected.Conn.One {
  440. t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString())
  441. }
  442. if event.Conn.Other != expected.Conn.Other {
  443. t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString())
  444. }
  445. if event.Conn.Up != expected.Conn.Up {
  446. t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up)
  447. }
  448. }
  449. i++
  450. if i == len(events) {
  451. return
  452. }
  453. case err := <-t.sub.Err():
  454. t.Fatalf("network stream closed unexpectedly: %s", err)
  455. case <-timeout:
  456. t.Fatal("timed out waiting for expected events")
  457. }
  458. }
  459. }
  460. // TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API
  461. func TestHTTPNodeRPC(t *testing.T) {
  462. // start the server
  463. _, s := testHTTPServer(t)
  464. defer s.Close()
  465. // start a node in the network
  466. client := NewClient(s.URL)
  467. config := adapters.RandomNodeConfig()
  468. node, err := client.CreateNode(config)
  469. if err != nil {
  470. t.Fatalf("error creating node: %s", err)
  471. }
  472. if err := client.StartNode(node.ID); err != nil {
  473. t.Fatalf("error starting node: %s", err)
  474. }
  475. // create two RPC clients
  476. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  477. defer cancel()
  478. rpcClient1, err := client.RPCClient(ctx, node.ID)
  479. if err != nil {
  480. t.Fatalf("error getting node RPC client: %s", err)
  481. }
  482. rpcClient2, err := client.RPCClient(ctx, node.ID)
  483. if err != nil {
  484. t.Fatalf("error getting node RPC client: %s", err)
  485. }
  486. // subscribe to events using client 1
  487. events := make(chan int64, 1)
  488. sub, err := rpcClient1.Subscribe(ctx, "test", events, "events")
  489. if err != nil {
  490. t.Fatalf("error subscribing to events: %s", err)
  491. }
  492. defer sub.Unsubscribe()
  493. // call some RPC methods using client 2
  494. if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil {
  495. t.Fatalf("error calling RPC method: %s", err)
  496. }
  497. var result int64
  498. if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil {
  499. t.Fatalf("error calling RPC method: %s", err)
  500. }
  501. if result != 10 {
  502. t.Fatalf("expected result to be 10, got %d", result)
  503. }
  504. // check we got an event from client 1
  505. select {
  506. case event := <-events:
  507. if event != 10 {
  508. t.Fatalf("expected event to be 10, got %d", event)
  509. }
  510. case <-ctx.Done():
  511. t.Fatal(ctx.Err())
  512. }
  513. }
  514. // TestHTTPSnapshot tests creating and loading network snapshots
  515. func TestHTTPSnapshot(t *testing.T) {
  516. // start the server
  517. network, s := testHTTPServer(t)
  518. defer s.Close()
  519. var eventsDone = make(chan struct{})
  520. count := 1
  521. eventsDoneChan := make(chan *Event)
  522. eventSub := network.Events().Subscribe(eventsDoneChan)
  523. go func() {
  524. defer eventSub.Unsubscribe()
  525. for event := range eventsDoneChan {
  526. if event.Type == EventTypeConn && !event.Control {
  527. count--
  528. if count == 0 {
  529. eventsDone <- struct{}{}
  530. return
  531. }
  532. }
  533. }
  534. }()
  535. // create a two-node network
  536. client := NewClient(s.URL)
  537. nodeCount := 2
  538. nodes := make([]*p2p.NodeInfo, nodeCount)
  539. for i := 0; i < nodeCount; i++ {
  540. config := adapters.RandomNodeConfig()
  541. node, err := client.CreateNode(config)
  542. if err != nil {
  543. t.Fatalf("error creating node: %s", err)
  544. }
  545. if err := client.StartNode(node.ID); err != nil {
  546. t.Fatalf("error starting node: %s", err)
  547. }
  548. nodes[i] = node
  549. }
  550. if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil {
  551. t.Fatalf("error connecting nodes: %s", err)
  552. }
  553. // store some state in the test services
  554. states := make([]string, nodeCount)
  555. for i, node := range nodes {
  556. rpc, err := client.RPCClient(context.Background(), node.ID)
  557. if err != nil {
  558. t.Fatalf("error getting RPC client: %s", err)
  559. }
  560. defer rpc.Close()
  561. state := fmt.Sprintf("%x", rand.Int())
  562. if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil {
  563. t.Fatalf("error setting service state: %s", err)
  564. }
  565. states[i] = state
  566. }
  567. <-eventsDone
  568. // create a snapshot
  569. snap, err := client.CreateSnapshot()
  570. if err != nil {
  571. t.Fatalf("error creating snapshot: %s", err)
  572. }
  573. for i, state := range states {
  574. gotState := snap.Nodes[i].Snapshots["test"]
  575. if string(gotState) != state {
  576. t.Fatalf("expected snapshot state %q, got %q", state, gotState)
  577. }
  578. }
  579. // create another network
  580. network2, s := testHTTPServer(t)
  581. defer s.Close()
  582. client = NewClient(s.URL)
  583. count = 1
  584. eventSub = network2.Events().Subscribe(eventsDoneChan)
  585. go func() {
  586. defer eventSub.Unsubscribe()
  587. for event := range eventsDoneChan {
  588. if event.Type == EventTypeConn && !event.Control {
  589. count--
  590. if count == 0 {
  591. eventsDone <- struct{}{}
  592. return
  593. }
  594. }
  595. }
  596. }()
  597. // subscribe to events so we can check them later
  598. events := make(chan *Event, 100)
  599. var opts SubscribeOpts
  600. sub, err := client.SubscribeNetwork(events, opts)
  601. if err != nil {
  602. t.Fatalf("error subscribing to network events: %s", err)
  603. }
  604. defer sub.Unsubscribe()
  605. // load the snapshot
  606. if err := client.LoadSnapshot(snap); err != nil {
  607. t.Fatalf("error loading snapshot: %s", err)
  608. }
  609. <-eventsDone
  610. // check the nodes and connection exists
  611. net, err := client.GetNetwork()
  612. if err != nil {
  613. t.Fatalf("error getting network: %s", err)
  614. }
  615. if len(net.Nodes) != nodeCount {
  616. t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes))
  617. }
  618. for i, node := range nodes {
  619. id := net.Nodes[i].ID().String()
  620. if id != node.ID {
  621. t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id)
  622. }
  623. }
  624. if len(net.Conns) != 1 {
  625. t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns))
  626. }
  627. conn := net.Conns[0]
  628. if conn.One.String() != nodes[0].ID {
  629. t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One)
  630. }
  631. if conn.Other.String() != nodes[1].ID {
  632. t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other)
  633. }
  634. if !conn.Up {
  635. t.Fatal("should be up")
  636. }
  637. // check the node states were restored
  638. for i, node := range nodes {
  639. rpc, err := client.RPCClient(context.Background(), node.ID)
  640. if err != nil {
  641. t.Fatalf("error getting RPC client: %s", err)
  642. }
  643. defer rpc.Close()
  644. var state []byte
  645. if err := rpc.Call(&state, "test_getState"); err != nil {
  646. t.Fatalf("error getting service state: %s", err)
  647. }
  648. if string(state) != states[i] {
  649. t.Fatalf("expected snapshot state %q, got %q", states[i], state)
  650. }
  651. }
  652. // check we got all the events
  653. x := &expectEvents{t, events, sub}
  654. x.expect(
  655. x.nodeEvent(nodes[0].ID, false),
  656. x.nodeEvent(nodes[0].ID, true),
  657. x.nodeEvent(nodes[1].ID, false),
  658. x.nodeEvent(nodes[1].ID, true),
  659. x.connEvent(nodes[0].ID, nodes[1].ID, false),
  660. x.connEvent(nodes[0].ID, nodes[1].ID, true),
  661. )
  662. }
  663. // TestMsgFilterPassMultiple tests streaming message events using a filter
  664. // with multiple protocols
  665. func TestMsgFilterPassMultiple(t *testing.T) {
  666. // start the server
  667. _, s := testHTTPServer(t)
  668. defer s.Close()
  669. // subscribe to events with a message filter
  670. client := NewClient(s.URL)
  671. events := make(chan *Event, 10)
  672. opts := SubscribeOpts{
  673. Filter: "prb:0-test:0",
  674. }
  675. sub, err := client.SubscribeNetwork(events, opts)
  676. if err != nil {
  677. t.Fatalf("error subscribing to network events: %s", err)
  678. }
  679. defer sub.Unsubscribe()
  680. // start a simulation network
  681. startTestNetwork(t, client)
  682. // check we got the expected events
  683. x := &expectEvents{t, events, sub}
  684. x.expectMsgs(map[MsgFilter]int{
  685. {"test", 0}: 2,
  686. {"prb", 0}: 2,
  687. })
  688. }
  689. // TestMsgFilterPassWildcard tests streaming message events using a filter
  690. // with a code wildcard
  691. func TestMsgFilterPassWildcard(t *testing.T) {
  692. // start the server
  693. _, s := testHTTPServer(t)
  694. defer s.Close()
  695. // subscribe to events with a message filter
  696. client := NewClient(s.URL)
  697. events := make(chan *Event, 10)
  698. opts := SubscribeOpts{
  699. Filter: "prb:0,2-test:*",
  700. }
  701. sub, err := client.SubscribeNetwork(events, opts)
  702. if err != nil {
  703. t.Fatalf("error subscribing to network events: %s", err)
  704. }
  705. defer sub.Unsubscribe()
  706. // start a simulation network
  707. startTestNetwork(t, client)
  708. // check we got the expected events
  709. x := &expectEvents{t, events, sub}
  710. x.expectMsgs(map[MsgFilter]int{
  711. {"test", 2}: 2,
  712. {"test", 1}: 2,
  713. {"test", 0}: 2,
  714. {"prb", 0}: 2,
  715. })
  716. }
  717. // TestMsgFilterPassSingle tests streaming message events using a filter
  718. // with a single protocol and code
  719. func TestMsgFilterPassSingle(t *testing.T) {
  720. // start the server
  721. _, s := testHTTPServer(t)
  722. defer s.Close()
  723. // subscribe to events with a message filter
  724. client := NewClient(s.URL)
  725. events := make(chan *Event, 10)
  726. opts := SubscribeOpts{
  727. Filter: "dum:0",
  728. }
  729. sub, err := client.SubscribeNetwork(events, opts)
  730. if err != nil {
  731. t.Fatalf("error subscribing to network events: %s", err)
  732. }
  733. defer sub.Unsubscribe()
  734. // start a simulation network
  735. startTestNetwork(t, client)
  736. // check we got the expected events
  737. x := &expectEvents{t, events, sub}
  738. x.expectMsgs(map[MsgFilter]int{
  739. {"dum", 0}: 2,
  740. })
  741. }
  742. // TestMsgFilterPassSingle tests streaming message events using an invalid
  743. // filter
  744. func TestMsgFilterFailBadParams(t *testing.T) {
  745. // start the server
  746. _, s := testHTTPServer(t)
  747. defer s.Close()
  748. client := NewClient(s.URL)
  749. events := make(chan *Event, 10)
  750. opts := SubscribeOpts{
  751. Filter: "foo:",
  752. }
  753. _, err := client.SubscribeNetwork(events, opts)
  754. if err == nil {
  755. t.Fatalf("expected event subscription to fail but succeeded!")
  756. }
  757. opts.Filter = "bzz:aa"
  758. _, err = client.SubscribeNetwork(events, opts)
  759. if err == nil {
  760. t.Fatalf("expected event subscription to fail but succeeded!")
  761. }
  762. opts.Filter = "invalid"
  763. _, err = client.SubscribeNetwork(events, opts)
  764. if err == nil {
  765. t.Fatalf("expected event subscription to fail but succeeded!")
  766. }
  767. }