network.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package simulations
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math/rand"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. )
  33. var DialBanTimeout = 200 * time.Millisecond
  34. // NetworkConfig defines configuration options for starting a Network
  35. type NetworkConfig struct {
  36. ID string `json:"id"`
  37. DefaultService string `json:"default_service,omitempty"`
  38. }
  39. // Network models a p2p simulation network which consists of a collection of
  40. // simulated nodes and the connections which exist between them.
  41. //
  42. // The Network has a single NodeAdapter which is responsible for actually
  43. // starting nodes and connecting them together.
  44. //
  45. // The Network emits events when nodes are started and stopped, when they are
  46. // connected and disconnected, and also when messages are sent between nodes.
  47. type Network struct {
  48. NetworkConfig
  49. Nodes []*Node `json:"nodes"`
  50. nodeMap map[enode.ID]int
  51. // Maps a node property string to node indexes of all nodes that hold this property
  52. propertyMap map[string][]int
  53. Conns []*Conn `json:"conns"`
  54. connMap map[string]int
  55. nodeAdapter adapters.NodeAdapter
  56. events event.Feed
  57. lock sync.RWMutex
  58. quitc chan struct{}
  59. }
  60. // NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
  61. func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
  62. return &Network{
  63. NetworkConfig: *conf,
  64. nodeAdapter: nodeAdapter,
  65. nodeMap: make(map[enode.ID]int),
  66. propertyMap: make(map[string][]int),
  67. connMap: make(map[string]int),
  68. quitc: make(chan struct{}),
  69. }
  70. }
  71. // Events returns the output event feed of the Network.
  72. func (net *Network) Events() *event.Feed {
  73. return &net.events
  74. }
  75. // NewNodeWithConfig adds a new node to the network with the given config,
  76. // returning an error if a node with the same ID or name already exists
  77. func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
  78. net.lock.Lock()
  79. defer net.lock.Unlock()
  80. if conf.Reachable == nil {
  81. conf.Reachable = func(otherID enode.ID) bool {
  82. _, err := net.InitConn(conf.ID, otherID)
  83. if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
  84. return false
  85. }
  86. return true
  87. }
  88. }
  89. // check the node doesn't already exist
  90. if node := net.getNode(conf.ID); node != nil {
  91. return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
  92. }
  93. if node := net.getNodeByName(conf.Name); node != nil {
  94. return nil, fmt.Errorf("node with name %q already exists", conf.Name)
  95. }
  96. // if no services are configured, use the default service
  97. if len(conf.Lifecycles) == 0 {
  98. conf.Lifecycles = []string{net.DefaultService}
  99. }
  100. // use the NodeAdapter to create the node
  101. adapterNode, err := net.nodeAdapter.NewNode(conf)
  102. if err != nil {
  103. return nil, err
  104. }
  105. node := newNode(adapterNode, conf, false)
  106. log.Trace("Node created", "id", conf.ID)
  107. nodeIndex := len(net.Nodes)
  108. net.nodeMap[conf.ID] = nodeIndex
  109. net.Nodes = append(net.Nodes, node)
  110. // Register any node properties with the network-level propertyMap
  111. for _, property := range conf.Properties {
  112. net.propertyMap[property] = append(net.propertyMap[property], nodeIndex)
  113. }
  114. // emit a "control" event
  115. net.events.Send(ControlEvent(node))
  116. return node, nil
  117. }
  118. // Config returns the network configuration
  119. func (net *Network) Config() *NetworkConfig {
  120. return &net.NetworkConfig
  121. }
  122. // StartAll starts all nodes in the network
  123. func (net *Network) StartAll() error {
  124. for _, node := range net.Nodes {
  125. if node.Up() {
  126. continue
  127. }
  128. if err := net.Start(node.ID()); err != nil {
  129. return err
  130. }
  131. }
  132. return nil
  133. }
  134. // StopAll stops all nodes in the network
  135. func (net *Network) StopAll() error {
  136. for _, node := range net.Nodes {
  137. if !node.Up() {
  138. continue
  139. }
  140. if err := net.Stop(node.ID()); err != nil {
  141. return err
  142. }
  143. }
  144. return nil
  145. }
  146. // Start starts the node with the given ID
  147. func (net *Network) Start(id enode.ID) error {
  148. return net.startWithSnapshots(id, nil)
  149. }
  150. // startWithSnapshots starts the node with the given ID using the give
  151. // snapshots
  152. func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
  153. net.lock.Lock()
  154. defer net.lock.Unlock()
  155. node := net.getNode(id)
  156. if node == nil {
  157. return fmt.Errorf("node %v does not exist", id)
  158. }
  159. if node.Up() {
  160. return fmt.Errorf("node %v already up", id)
  161. }
  162. log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
  163. if err := node.Start(snapshots); err != nil {
  164. log.Warn("Node startup failed", "id", id, "err", err)
  165. return err
  166. }
  167. node.SetUp(true)
  168. log.Info("Started node", "id", id)
  169. ev := NewEvent(node)
  170. net.events.Send(ev)
  171. // subscribe to peer events
  172. client, err := node.Client()
  173. if err != nil {
  174. return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
  175. }
  176. events := make(chan *p2p.PeerEvent)
  177. sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
  178. if err != nil {
  179. return fmt.Errorf("error getting peer events for node %v: %s", id, err)
  180. }
  181. go net.watchPeerEvents(id, events, sub)
  182. return nil
  183. }
  184. // watchPeerEvents reads peer events from the given channel and emits
  185. // corresponding network events
  186. func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub event.Subscription) {
  187. defer func() {
  188. sub.Unsubscribe()
  189. // assume the node is now down
  190. net.lock.Lock()
  191. defer net.lock.Unlock()
  192. node := net.getNode(id)
  193. if node == nil {
  194. return
  195. }
  196. node.SetUp(false)
  197. ev := NewEvent(node)
  198. net.events.Send(ev)
  199. }()
  200. for {
  201. select {
  202. case event, ok := <-events:
  203. if !ok {
  204. return
  205. }
  206. peer := event.Peer
  207. switch event.Type {
  208. case p2p.PeerEventTypeAdd:
  209. net.DidConnect(id, peer)
  210. case p2p.PeerEventTypeDrop:
  211. net.DidDisconnect(id, peer)
  212. case p2p.PeerEventTypeMsgSend:
  213. net.DidSend(id, peer, event.Protocol, *event.MsgCode)
  214. case p2p.PeerEventTypeMsgRecv:
  215. net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
  216. }
  217. case err := <-sub.Err():
  218. if err != nil {
  219. log.Error("Error in peer event subscription", "id", id, "err", err)
  220. }
  221. return
  222. }
  223. }
  224. }
  225. // Stop stops the node with the given ID
  226. func (net *Network) Stop(id enode.ID) error {
  227. // IMPORTANT: node.Stop() must NOT be called under net.lock as
  228. // node.Reachable() closure has a reference to the network and
  229. // calls net.InitConn() what also locks the network. => DEADLOCK
  230. // That holds until the following ticket is not resolved:
  231. var err error
  232. node, err := func() (*Node, error) {
  233. net.lock.Lock()
  234. defer net.lock.Unlock()
  235. node := net.getNode(id)
  236. if node == nil {
  237. return nil, fmt.Errorf("node %v does not exist", id)
  238. }
  239. if !node.Up() {
  240. return nil, fmt.Errorf("node %v already down", id)
  241. }
  242. node.SetUp(false)
  243. return node, nil
  244. }()
  245. if err != nil {
  246. return err
  247. }
  248. err = node.Stop() // must be called without net.lock
  249. net.lock.Lock()
  250. defer net.lock.Unlock()
  251. if err != nil {
  252. node.SetUp(true)
  253. return err
  254. }
  255. log.Info("Stopped node", "id", id, "err", err)
  256. ev := ControlEvent(node)
  257. net.events.Send(ev)
  258. return nil
  259. }
  260. // Connect connects two nodes together by calling the "admin_addPeer" RPC
  261. // method on the "one" node so that it connects to the "other" node
  262. func (net *Network) Connect(oneID, otherID enode.ID) error {
  263. net.lock.Lock()
  264. defer net.lock.Unlock()
  265. return net.connect(oneID, otherID)
  266. }
  267. func (net *Network) connect(oneID, otherID enode.ID) error {
  268. log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
  269. conn, err := net.initConn(oneID, otherID)
  270. if err != nil {
  271. return err
  272. }
  273. client, err := conn.one.Client()
  274. if err != nil {
  275. return err
  276. }
  277. net.events.Send(ControlEvent(conn))
  278. return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
  279. }
  280. // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
  281. // method on the "one" node so that it disconnects from the "other" node
  282. func (net *Network) Disconnect(oneID, otherID enode.ID) error {
  283. conn := net.GetConn(oneID, otherID)
  284. if conn == nil {
  285. return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
  286. }
  287. if !conn.Up {
  288. return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
  289. }
  290. client, err := conn.one.Client()
  291. if err != nil {
  292. return err
  293. }
  294. net.events.Send(ControlEvent(conn))
  295. return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
  296. }
  297. // DidConnect tracks the fact that the "one" node connected to the "other" node
  298. func (net *Network) DidConnect(one, other enode.ID) error {
  299. net.lock.Lock()
  300. defer net.lock.Unlock()
  301. conn, err := net.getOrCreateConn(one, other)
  302. if err != nil {
  303. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  304. }
  305. if conn.Up {
  306. return fmt.Errorf("%v and %v already connected", one, other)
  307. }
  308. conn.Up = true
  309. net.events.Send(NewEvent(conn))
  310. return nil
  311. }
  312. // DidDisconnect tracks the fact that the "one" node disconnected from the
  313. // "other" node
  314. func (net *Network) DidDisconnect(one, other enode.ID) error {
  315. net.lock.Lock()
  316. defer net.lock.Unlock()
  317. conn := net.getConn(one, other)
  318. if conn == nil {
  319. return fmt.Errorf("connection between %v and %v does not exist", one, other)
  320. }
  321. if !conn.Up {
  322. return fmt.Errorf("%v and %v already disconnected", one, other)
  323. }
  324. conn.Up = false
  325. conn.initiated = time.Now().Add(-DialBanTimeout)
  326. net.events.Send(NewEvent(conn))
  327. return nil
  328. }
  329. // DidSend tracks the fact that "sender" sent a message to "receiver"
  330. func (net *Network) DidSend(sender, receiver enode.ID, proto string, code uint64) error {
  331. msg := &Msg{
  332. One: sender,
  333. Other: receiver,
  334. Protocol: proto,
  335. Code: code,
  336. Received: false,
  337. }
  338. net.events.Send(NewEvent(msg))
  339. return nil
  340. }
  341. // DidReceive tracks the fact that "receiver" received a message from "sender"
  342. func (net *Network) DidReceive(sender, receiver enode.ID, proto string, code uint64) error {
  343. msg := &Msg{
  344. One: sender,
  345. Other: receiver,
  346. Protocol: proto,
  347. Code: code,
  348. Received: true,
  349. }
  350. net.events.Send(NewEvent(msg))
  351. return nil
  352. }
  353. // GetNode gets the node with the given ID, returning nil if the node does not
  354. // exist
  355. func (net *Network) GetNode(id enode.ID) *Node {
  356. net.lock.RLock()
  357. defer net.lock.RUnlock()
  358. return net.getNode(id)
  359. }
  360. func (net *Network) getNode(id enode.ID) *Node {
  361. i, found := net.nodeMap[id]
  362. if !found {
  363. return nil
  364. }
  365. return net.Nodes[i]
  366. }
  367. // GetNodeByName gets the node with the given name, returning nil if the node does
  368. // not exist
  369. func (net *Network) GetNodeByName(name string) *Node {
  370. net.lock.RLock()
  371. defer net.lock.RUnlock()
  372. return net.getNodeByName(name)
  373. }
  374. func (net *Network) getNodeByName(name string) *Node {
  375. for _, node := range net.Nodes {
  376. if node.Config.Name == name {
  377. return node
  378. }
  379. }
  380. return nil
  381. }
  382. // GetNodeIDs returns the IDs of all existing nodes
  383. // Nodes can optionally be excluded by specifying their enode.ID.
  384. func (net *Network) GetNodeIDs(excludeIDs ...enode.ID) []enode.ID {
  385. net.lock.RLock()
  386. defer net.lock.RUnlock()
  387. return net.getNodeIDs(excludeIDs)
  388. }
  389. func (net *Network) getNodeIDs(excludeIDs []enode.ID) []enode.ID {
  390. // Get all current nodeIDs
  391. nodeIDs := make([]enode.ID, 0, len(net.nodeMap))
  392. for id := range net.nodeMap {
  393. nodeIDs = append(nodeIDs, id)
  394. }
  395. if len(excludeIDs) > 0 {
  396. // Return the difference of nodeIDs and excludeIDs
  397. return filterIDs(nodeIDs, excludeIDs)
  398. }
  399. return nodeIDs
  400. }
  401. // GetNodes returns the existing nodes.
  402. // Nodes can optionally be excluded by specifying their enode.ID.
  403. func (net *Network) GetNodes(excludeIDs ...enode.ID) []*Node {
  404. net.lock.RLock()
  405. defer net.lock.RUnlock()
  406. return net.getNodes(excludeIDs)
  407. }
  408. func (net *Network) getNodes(excludeIDs []enode.ID) []*Node {
  409. if len(excludeIDs) > 0 {
  410. nodeIDs := net.getNodeIDs(excludeIDs)
  411. return net.getNodesByID(nodeIDs)
  412. }
  413. return net.Nodes
  414. }
  415. // GetNodesByID returns existing nodes with the given enode.IDs.
  416. // If a node doesn't exist with a given enode.ID, it is ignored.
  417. func (net *Network) GetNodesByID(nodeIDs []enode.ID) []*Node {
  418. net.lock.RLock()
  419. defer net.lock.RUnlock()
  420. return net.getNodesByID(nodeIDs)
  421. }
  422. func (net *Network) getNodesByID(nodeIDs []enode.ID) []*Node {
  423. nodes := make([]*Node, 0, len(nodeIDs))
  424. for _, id := range nodeIDs {
  425. node := net.getNode(id)
  426. if node != nil {
  427. nodes = append(nodes, node)
  428. }
  429. }
  430. return nodes
  431. }
  432. // GetNodesByProperty returns existing nodes that have the given property string registered in their NodeConfig
  433. func (net *Network) GetNodesByProperty(property string) []*Node {
  434. net.lock.RLock()
  435. defer net.lock.RUnlock()
  436. return net.getNodesByProperty(property)
  437. }
  438. func (net *Network) getNodesByProperty(property string) []*Node {
  439. nodes := make([]*Node, 0, len(net.propertyMap[property]))
  440. for _, nodeIndex := range net.propertyMap[property] {
  441. nodes = append(nodes, net.Nodes[nodeIndex])
  442. }
  443. return nodes
  444. }
  445. // GetNodeIDsByProperty returns existing node's enode IDs that have the given property string registered in the NodeConfig
  446. func (net *Network) GetNodeIDsByProperty(property string) []enode.ID {
  447. net.lock.RLock()
  448. defer net.lock.RUnlock()
  449. return net.getNodeIDsByProperty(property)
  450. }
  451. func (net *Network) getNodeIDsByProperty(property string) []enode.ID {
  452. nodeIDs := make([]enode.ID, 0, len(net.propertyMap[property]))
  453. for _, nodeIndex := range net.propertyMap[property] {
  454. node := net.Nodes[nodeIndex]
  455. nodeIDs = append(nodeIDs, node.ID())
  456. }
  457. return nodeIDs
  458. }
  459. // GetRandomUpNode returns a random node on the network, which is running.
  460. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
  461. net.lock.RLock()
  462. defer net.lock.RUnlock()
  463. return net.getRandomUpNode(excludeIDs...)
  464. }
  465. // GetRandomUpNode returns a random node on the network, which is running.
  466. func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
  467. return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
  468. }
  469. func (net *Network) getUpNodeIDs() (ids []enode.ID) {
  470. for _, node := range net.Nodes {
  471. if node.Up() {
  472. ids = append(ids, node.ID())
  473. }
  474. }
  475. return ids
  476. }
  477. // GetRandomDownNode returns a random node on the network, which is stopped.
  478. func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
  479. net.lock.RLock()
  480. defer net.lock.RUnlock()
  481. return net.getRandomNode(net.getDownNodeIDs(), excludeIDs)
  482. }
  483. func (net *Network) getDownNodeIDs() (ids []enode.ID) {
  484. for _, node := range net.Nodes {
  485. if !node.Up() {
  486. ids = append(ids, node.ID())
  487. }
  488. }
  489. return ids
  490. }
  491. // GetRandomNode returns a random node on the network, regardless of whether it is running or not
  492. func (net *Network) GetRandomNode(excludeIDs ...enode.ID) *Node {
  493. net.lock.RLock()
  494. defer net.lock.RUnlock()
  495. return net.getRandomNode(net.getNodeIDs(nil), excludeIDs) // no need to exclude twice
  496. }
  497. func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
  498. filtered := filterIDs(ids, excludeIDs)
  499. l := len(filtered)
  500. if l == 0 {
  501. return nil
  502. }
  503. return net.getNode(filtered[rand.Intn(l)])
  504. }
  505. func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
  506. exclude := make(map[enode.ID]bool)
  507. for _, id := range excludeIDs {
  508. exclude[id] = true
  509. }
  510. var filtered []enode.ID
  511. for _, id := range ids {
  512. if _, found := exclude[id]; !found {
  513. filtered = append(filtered, id)
  514. }
  515. }
  516. return filtered
  517. }
  518. // GetConn returns the connection which exists between "one" and "other"
  519. // regardless of which node initiated the connection
  520. func (net *Network) GetConn(oneID, otherID enode.ID) *Conn {
  521. net.lock.RLock()
  522. defer net.lock.RUnlock()
  523. return net.getConn(oneID, otherID)
  524. }
  525. // GetOrCreateConn is like GetConn but creates the connection if it doesn't
  526. // already exist
  527. func (net *Network) GetOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  528. net.lock.Lock()
  529. defer net.lock.Unlock()
  530. return net.getOrCreateConn(oneID, otherID)
  531. }
  532. func (net *Network) getOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
  533. if conn := net.getConn(oneID, otherID); conn != nil {
  534. return conn, nil
  535. }
  536. one := net.getNode(oneID)
  537. if one == nil {
  538. return nil, fmt.Errorf("node %v does not exist", oneID)
  539. }
  540. other := net.getNode(otherID)
  541. if other == nil {
  542. return nil, fmt.Errorf("node %v does not exist", otherID)
  543. }
  544. conn := &Conn{
  545. One: oneID,
  546. Other: otherID,
  547. one: one,
  548. other: other,
  549. }
  550. label := ConnLabel(oneID, otherID)
  551. net.connMap[label] = len(net.Conns)
  552. net.Conns = append(net.Conns, conn)
  553. return conn, nil
  554. }
  555. func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
  556. label := ConnLabel(oneID, otherID)
  557. i, found := net.connMap[label]
  558. if !found {
  559. return nil
  560. }
  561. return net.Conns[i]
  562. }
  563. // InitConn(one, other) retrieves the connection model for the connection between
  564. // peers one and other, or creates a new one if it does not exist
  565. // the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
  566. // it checks if the connection is already up, and if the nodes are running
  567. // NOTE:
  568. // it also checks whether there has been recent attempt to connect the peers
  569. // this is cheating as the simulation is used as an oracle and know about
  570. // remote peers attempt to connect to a node which will then not initiate the connection
  571. func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
  572. net.lock.Lock()
  573. defer net.lock.Unlock()
  574. return net.initConn(oneID, otherID)
  575. }
  576. func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
  577. if oneID == otherID {
  578. return nil, fmt.Errorf("refusing to connect to self %v", oneID)
  579. }
  580. conn, err := net.getOrCreateConn(oneID, otherID)
  581. if err != nil {
  582. return nil, err
  583. }
  584. if conn.Up {
  585. return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
  586. }
  587. if time.Since(conn.initiated) < DialBanTimeout {
  588. return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
  589. }
  590. err = conn.nodesUp()
  591. if err != nil {
  592. log.Trace("Nodes not up", "err", err)
  593. return nil, fmt.Errorf("nodes not up: %v", err)
  594. }
  595. log.Debug("Connection initiated", "id", oneID, "other", otherID)
  596. conn.initiated = time.Now()
  597. return conn, nil
  598. }
  599. // Shutdown stops all nodes in the network and closes the quit channel
  600. func (net *Network) Shutdown() {
  601. for _, node := range net.Nodes {
  602. log.Debug("Stopping node", "id", node.ID())
  603. if err := node.Stop(); err != nil {
  604. log.Warn("Can't stop node", "id", node.ID(), "err", err)
  605. }
  606. // If the node has the close method, call it.
  607. if closer, ok := node.Node.(io.Closer); ok {
  608. if err := closer.Close(); err != nil {
  609. log.Warn("Can't close node", "id", node.ID(), "err", err)
  610. }
  611. }
  612. }
  613. close(net.quitc)
  614. }
  615. // Reset resets all network properties:
  616. // empties the nodes and the connection list
  617. func (net *Network) Reset() {
  618. net.lock.Lock()
  619. defer net.lock.Unlock()
  620. //re-initialize the maps
  621. net.connMap = make(map[string]int)
  622. net.nodeMap = make(map[enode.ID]int)
  623. net.propertyMap = make(map[string][]int)
  624. net.Nodes = nil
  625. net.Conns = nil
  626. }
  627. // Node is a wrapper around adapters.Node which is used to track the status
  628. // of a node in the network
  629. type Node struct {
  630. adapters.Node `json:"-"`
  631. // Config if the config used to created the node
  632. Config *adapters.NodeConfig `json:"config"`
  633. // up tracks whether or not the node is running
  634. up bool
  635. upMu *sync.RWMutex
  636. }
  637. func newNode(an adapters.Node, ac *adapters.NodeConfig, up bool) *Node {
  638. return &Node{Node: an, Config: ac, up: up, upMu: new(sync.RWMutex)}
  639. }
  640. func (n *Node) copy() *Node {
  641. configCpy := *n.Config
  642. return newNode(n.Node, &configCpy, n.Up())
  643. }
  644. // Up returns whether the node is currently up (online)
  645. func (n *Node) Up() bool {
  646. n.upMu.RLock()
  647. defer n.upMu.RUnlock()
  648. return n.up
  649. }
  650. // SetUp sets the up (online) status of the nodes with the given value
  651. func (n *Node) SetUp(up bool) {
  652. n.upMu.Lock()
  653. defer n.upMu.Unlock()
  654. n.up = up
  655. }
  656. // ID returns the ID of the node
  657. func (n *Node) ID() enode.ID {
  658. return n.Config.ID
  659. }
  660. // String returns a log-friendly string
  661. func (n *Node) String() string {
  662. return fmt.Sprintf("Node %v", n.ID().TerminalString())
  663. }
  664. // NodeInfo returns information about the node
  665. func (n *Node) NodeInfo() *p2p.NodeInfo {
  666. // avoid a panic if the node is not started yet
  667. if n.Node == nil {
  668. return nil
  669. }
  670. info := n.Node.NodeInfo()
  671. info.Name = n.Config.Name
  672. return info
  673. }
  674. // MarshalJSON implements the json.Marshaler interface so that the encoded
  675. // JSON includes the NodeInfo
  676. func (n *Node) MarshalJSON() ([]byte, error) {
  677. return json.Marshal(struct {
  678. Info *p2p.NodeInfo `json:"info,omitempty"`
  679. Config *adapters.NodeConfig `json:"config,omitempty"`
  680. Up bool `json:"up"`
  681. }{
  682. Info: n.NodeInfo(),
  683. Config: n.Config,
  684. Up: n.Up(),
  685. })
  686. }
  687. // UnmarshalJSON implements json.Unmarshaler interface so that we don't lose Node.up
  688. // status. IMPORTANT: The implementation is incomplete; we lose p2p.NodeInfo.
  689. func (n *Node) UnmarshalJSON(raw []byte) error {
  690. // TODO: How should we turn back NodeInfo into n.Node?
  691. // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
  692. var node struct {
  693. Config *adapters.NodeConfig `json:"config,omitempty"`
  694. Up bool `json:"up"`
  695. }
  696. if err := json.Unmarshal(raw, &node); err != nil {
  697. return err
  698. }
  699. *n = *newNode(nil, node.Config, node.Up)
  700. return nil
  701. }
  702. // Conn represents a connection between two nodes in the network
  703. type Conn struct {
  704. // One is the node which initiated the connection
  705. One enode.ID `json:"one"`
  706. // Other is the node which the connection was made to
  707. Other enode.ID `json:"other"`
  708. // Up tracks whether or not the connection is active
  709. Up bool `json:"up"`
  710. // Registers when the connection was grabbed to dial
  711. initiated time.Time
  712. one *Node
  713. other *Node
  714. }
  715. // nodesUp returns whether both nodes are currently up
  716. func (c *Conn) nodesUp() error {
  717. if !c.one.Up() {
  718. return fmt.Errorf("one %v is not up", c.One)
  719. }
  720. if !c.other.Up() {
  721. return fmt.Errorf("other %v is not up", c.Other)
  722. }
  723. return nil
  724. }
  725. // String returns a log-friendly string
  726. func (c *Conn) String() string {
  727. return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
  728. }
  729. // Msg represents a p2p message sent between two nodes in the network
  730. type Msg struct {
  731. One enode.ID `json:"one"`
  732. Other enode.ID `json:"other"`
  733. Protocol string `json:"protocol"`
  734. Code uint64 `json:"code"`
  735. Received bool `json:"received"`
  736. }
  737. // String returns a log-friendly string
  738. func (m *Msg) String() string {
  739. return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
  740. }
  741. // ConnLabel generates a deterministic string which represents a connection
  742. // between two nodes, used to compare if two connections are between the same
  743. // nodes
  744. func ConnLabel(source, target enode.ID) string {
  745. var first, second enode.ID
  746. if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
  747. first = target
  748. second = source
  749. } else {
  750. first = source
  751. second = target
  752. }
  753. return fmt.Sprintf("%v-%v", first, second)
  754. }
  755. // Snapshot represents the state of a network at a single point in time and can
  756. // be used to restore the state of a network
  757. type Snapshot struct {
  758. Nodes []NodeSnapshot `json:"nodes,omitempty"`
  759. Conns []Conn `json:"conns,omitempty"`
  760. }
  761. // NodeSnapshot represents the state of a node in the network
  762. type NodeSnapshot struct {
  763. Node Node `json:"node,omitempty"`
  764. // Snapshots is arbitrary data gathered from calling node.Snapshots()
  765. Snapshots map[string][]byte `json:"snapshots,omitempty"`
  766. }
  767. // Snapshot creates a network snapshot
  768. func (net *Network) Snapshot() (*Snapshot, error) {
  769. return net.snapshot(nil, nil)
  770. }
  771. func (net *Network) SnapshotWithServices(addServices []string, removeServices []string) (*Snapshot, error) {
  772. return net.snapshot(addServices, removeServices)
  773. }
  774. func (net *Network) snapshot(addServices []string, removeServices []string) (*Snapshot, error) {
  775. net.lock.Lock()
  776. defer net.lock.Unlock()
  777. snap := &Snapshot{
  778. Nodes: make([]NodeSnapshot, len(net.Nodes)),
  779. }
  780. for i, node := range net.Nodes {
  781. snap.Nodes[i] = NodeSnapshot{Node: *node.copy()}
  782. if !node.Up() {
  783. continue
  784. }
  785. snapshots, err := node.Snapshots()
  786. if err != nil {
  787. return nil, err
  788. }
  789. snap.Nodes[i].Snapshots = snapshots
  790. for _, addSvc := range addServices {
  791. haveSvc := false
  792. for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
  793. if svc == addSvc {
  794. haveSvc = true
  795. break
  796. }
  797. }
  798. if !haveSvc {
  799. snap.Nodes[i].Node.Config.Lifecycles = append(snap.Nodes[i].Node.Config.Lifecycles, addSvc)
  800. }
  801. }
  802. if len(removeServices) > 0 {
  803. var cleanedServices []string
  804. for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
  805. haveSvc := false
  806. for _, rmSvc := range removeServices {
  807. if rmSvc == svc {
  808. haveSvc = true
  809. break
  810. }
  811. }
  812. if !haveSvc {
  813. cleanedServices = append(cleanedServices, svc)
  814. }
  815. }
  816. snap.Nodes[i].Node.Config.Lifecycles = cleanedServices
  817. }
  818. }
  819. for _, conn := range net.Conns {
  820. if conn.Up {
  821. snap.Conns = append(snap.Conns, *conn)
  822. }
  823. }
  824. return snap, nil
  825. }
  826. // longrunning tests may need a longer timeout
  827. var snapshotLoadTimeout = 900 * time.Second
  828. // Load loads a network snapshot
  829. func (net *Network) Load(snap *Snapshot) error {
  830. // Start nodes.
  831. for _, n := range snap.Nodes {
  832. if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
  833. return err
  834. }
  835. if !n.Node.Up() {
  836. continue
  837. }
  838. if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
  839. return err
  840. }
  841. }
  842. // Prepare connection events counter.
  843. allConnected := make(chan struct{}) // closed when all connections are established
  844. done := make(chan struct{}) // ensures that the event loop goroutine is terminated
  845. defer close(done)
  846. // Subscribe to event channel.
  847. // It needs to be done outside of the event loop goroutine (created below)
  848. // to ensure that the event channel is blocking before connect calls are made.
  849. events := make(chan *Event)
  850. sub := net.Events().Subscribe(events)
  851. defer sub.Unsubscribe()
  852. go func() {
  853. // Expected number of connections.
  854. total := len(snap.Conns)
  855. // Set of all established connections from the snapshot, not other connections.
  856. // Key array element 0 is the connection One field value, and element 1 connection Other field.
  857. connections := make(map[[2]enode.ID]struct{}, total)
  858. for {
  859. select {
  860. case e := <-events:
  861. // Ignore control events as they do not represent
  862. // connect or disconnect (Up) state change.
  863. if e.Control {
  864. continue
  865. }
  866. // Detect only connection events.
  867. if e.Type != EventTypeConn {
  868. continue
  869. }
  870. connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
  871. // Nodes are still not connected or have been disconnected.
  872. if !e.Conn.Up {
  873. // Delete the connection from the set of established connections.
  874. // This will prevent false positive in case disconnections happen.
  875. delete(connections, connection)
  876. log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
  877. continue
  878. }
  879. // Check that the connection is from the snapshot.
  880. for _, conn := range snap.Conns {
  881. if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
  882. // Add the connection to the set of established connections.
  883. connections[connection] = struct{}{}
  884. if len(connections) == total {
  885. // Signal that all nodes are connected.
  886. close(allConnected)
  887. return
  888. }
  889. break
  890. }
  891. }
  892. case <-done:
  893. // Load function returned, terminate this goroutine.
  894. return
  895. }
  896. }
  897. }()
  898. // Start connecting.
  899. for _, conn := range snap.Conns {
  900. if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
  901. //in this case, at least one of the nodes of a connection is not up,
  902. //so it would result in the snapshot `Load` to fail
  903. continue
  904. }
  905. if err := net.Connect(conn.One, conn.Other); err != nil {
  906. return err
  907. }
  908. }
  909. select {
  910. // Wait until all connections from the snapshot are established.
  911. case <-allConnected:
  912. // Make sure that we do not wait forever.
  913. case <-time.After(snapshotLoadTimeout):
  914. return errors.New("snapshot connections not established")
  915. }
  916. return nil
  917. }
  918. // Subscribe reads control events from a channel and executes them
  919. func (net *Network) Subscribe(events chan *Event) {
  920. for {
  921. select {
  922. case event, ok := <-events:
  923. if !ok {
  924. return
  925. }
  926. if event.Control {
  927. net.executeControlEvent(event)
  928. }
  929. case <-net.quitc:
  930. return
  931. }
  932. }
  933. }
  934. func (net *Network) executeControlEvent(event *Event) {
  935. log.Trace("Executing control event", "type", event.Type, "event", event)
  936. switch event.Type {
  937. case EventTypeNode:
  938. if err := net.executeNodeEvent(event); err != nil {
  939. log.Error("Error executing node event", "event", event, "err", err)
  940. }
  941. case EventTypeConn:
  942. if err := net.executeConnEvent(event); err != nil {
  943. log.Error("Error executing conn event", "event", event, "err", err)
  944. }
  945. case EventTypeMsg:
  946. log.Warn("Ignoring control msg event")
  947. }
  948. }
  949. func (net *Network) executeNodeEvent(e *Event) error {
  950. if !e.Node.Up() {
  951. return net.Stop(e.Node.ID())
  952. }
  953. if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
  954. return err
  955. }
  956. return net.Start(e.Node.ID())
  957. }
  958. func (net *Network) executeConnEvent(e *Event) error {
  959. if e.Conn.Up {
  960. return net.Connect(e.Conn.One, e.Conn.Other)
  961. }
  962. return net.Disconnect(e.Conn.One, e.Conn.Other)
  963. }