123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101 |
- // Copyright 2017 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package simulations
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- )
- var DialBanTimeout = 200 * time.Millisecond
- // NetworkConfig defines configuration options for starting a Network
- type NetworkConfig struct {
- ID string `json:"id"`
- DefaultService string `json:"default_service,omitempty"`
- }
- // Network models a p2p simulation network which consists of a collection of
- // simulated nodes and the connections which exist between them.
- //
- // The Network has a single NodeAdapter which is responsible for actually
- // starting nodes and connecting them together.
- //
- // The Network emits events when nodes are started and stopped, when they are
- // connected and disconnected, and also when messages are sent between nodes.
- type Network struct {
- NetworkConfig
- Nodes []*Node `json:"nodes"`
- nodeMap map[enode.ID]int
- // Maps a node property string to node indexes of all nodes that hold this property
- propertyMap map[string][]int
- Conns []*Conn `json:"conns"`
- connMap map[string]int
- nodeAdapter adapters.NodeAdapter
- events event.Feed
- lock sync.RWMutex
- quitc chan struct{}
- }
- // NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
- func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
- return &Network{
- NetworkConfig: *conf,
- nodeAdapter: nodeAdapter,
- nodeMap: make(map[enode.ID]int),
- propertyMap: make(map[string][]int),
- connMap: make(map[string]int),
- quitc: make(chan struct{}),
- }
- }
- // Events returns the output event feed of the Network.
- func (net *Network) Events() *event.Feed {
- return &net.events
- }
- // NewNodeWithConfig adds a new node to the network with the given config,
- // returning an error if a node with the same ID or name already exists
- func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
- net.lock.Lock()
- defer net.lock.Unlock()
- if conf.Reachable == nil {
- conf.Reachable = func(otherID enode.ID) bool {
- _, err := net.InitConn(conf.ID, otherID)
- if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
- return false
- }
- return true
- }
- }
- // check the node doesn't already exist
- if node := net.getNode(conf.ID); node != nil {
- return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
- }
- if node := net.getNodeByName(conf.Name); node != nil {
- return nil, fmt.Errorf("node with name %q already exists", conf.Name)
- }
- // if no services are configured, use the default service
- if len(conf.Lifecycles) == 0 {
- conf.Lifecycles = []string{net.DefaultService}
- }
- // use the NodeAdapter to create the node
- adapterNode, err := net.nodeAdapter.NewNode(conf)
- if err != nil {
- return nil, err
- }
- node := newNode(adapterNode, conf, false)
- log.Trace("Node created", "id", conf.ID)
- nodeIndex := len(net.Nodes)
- net.nodeMap[conf.ID] = nodeIndex
- net.Nodes = append(net.Nodes, node)
- // Register any node properties with the network-level propertyMap
- for _, property := range conf.Properties {
- net.propertyMap[property] = append(net.propertyMap[property], nodeIndex)
- }
- // emit a "control" event
- net.events.Send(ControlEvent(node))
- return node, nil
- }
- // Config returns the network configuration
- func (net *Network) Config() *NetworkConfig {
- return &net.NetworkConfig
- }
- // StartAll starts all nodes in the network
- func (net *Network) StartAll() error {
- for _, node := range net.Nodes {
- if node.Up() {
- continue
- }
- if err := net.Start(node.ID()); err != nil {
- return err
- }
- }
- return nil
- }
- // StopAll stops all nodes in the network
- func (net *Network) StopAll() error {
- for _, node := range net.Nodes {
- if !node.Up() {
- continue
- }
- if err := net.Stop(node.ID()); err != nil {
- return err
- }
- }
- return nil
- }
- // Start starts the node with the given ID
- func (net *Network) Start(id enode.ID) error {
- return net.startWithSnapshots(id, nil)
- }
- // startWithSnapshots starts the node with the given ID using the give
- // snapshots
- func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
- net.lock.Lock()
- defer net.lock.Unlock()
- node := net.getNode(id)
- if node == nil {
- return fmt.Errorf("node %v does not exist", id)
- }
- if node.Up() {
- return fmt.Errorf("node %v already up", id)
- }
- log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
- if err := node.Start(snapshots); err != nil {
- log.Warn("Node startup failed", "id", id, "err", err)
- return err
- }
- node.SetUp(true)
- log.Info("Started node", "id", id)
- ev := NewEvent(node)
- net.events.Send(ev)
- // subscribe to peer events
- client, err := node.Client()
- if err != nil {
- return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
- }
- events := make(chan *p2p.PeerEvent)
- sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
- if err != nil {
- return fmt.Errorf("error getting peer events for node %v: %s", id, err)
- }
- go net.watchPeerEvents(id, events, sub)
- return nil
- }
- // watchPeerEvents reads peer events from the given channel and emits
- // corresponding network events
- func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub event.Subscription) {
- defer func() {
- sub.Unsubscribe()
- // assume the node is now down
- net.lock.Lock()
- defer net.lock.Unlock()
- node := net.getNode(id)
- if node == nil {
- return
- }
- node.SetUp(false)
- ev := NewEvent(node)
- net.events.Send(ev)
- }()
- for {
- select {
- case event, ok := <-events:
- if !ok {
- return
- }
- peer := event.Peer
- switch event.Type {
- case p2p.PeerEventTypeAdd:
- net.DidConnect(id, peer)
- case p2p.PeerEventTypeDrop:
- net.DidDisconnect(id, peer)
- case p2p.PeerEventTypeMsgSend:
- net.DidSend(id, peer, event.Protocol, *event.MsgCode)
- case p2p.PeerEventTypeMsgRecv:
- net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
- }
- case err := <-sub.Err():
- if err != nil {
- log.Error("Error in peer event subscription", "id", id, "err", err)
- }
- return
- }
- }
- }
- // Stop stops the node with the given ID
- func (net *Network) Stop(id enode.ID) error {
- // IMPORTANT: node.Stop() must NOT be called under net.lock as
- // node.Reachable() closure has a reference to the network and
- // calls net.InitConn() what also locks the network. => DEADLOCK
- // That holds until the following ticket is not resolved:
- var err error
- node, err := func() (*Node, error) {
- net.lock.Lock()
- defer net.lock.Unlock()
- node := net.getNode(id)
- if node == nil {
- return nil, fmt.Errorf("node %v does not exist", id)
- }
- if !node.Up() {
- return nil, fmt.Errorf("node %v already down", id)
- }
- node.SetUp(false)
- return node, nil
- }()
- if err != nil {
- return err
- }
- err = node.Stop() // must be called without net.lock
- net.lock.Lock()
- defer net.lock.Unlock()
- if err != nil {
- node.SetUp(true)
- return err
- }
- log.Info("Stopped node", "id", id, "err", err)
- ev := ControlEvent(node)
- net.events.Send(ev)
- return nil
- }
- // Connect connects two nodes together by calling the "admin_addPeer" RPC
- // method on the "one" node so that it connects to the "other" node
- func (net *Network) Connect(oneID, otherID enode.ID) error {
- net.lock.Lock()
- defer net.lock.Unlock()
- return net.connect(oneID, otherID)
- }
- func (net *Network) connect(oneID, otherID enode.ID) error {
- log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
- conn, err := net.initConn(oneID, otherID)
- if err != nil {
- return err
- }
- client, err := conn.one.Client()
- if err != nil {
- return err
- }
- net.events.Send(ControlEvent(conn))
- return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
- }
- // Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
- // method on the "one" node so that it disconnects from the "other" node
- func (net *Network) Disconnect(oneID, otherID enode.ID) error {
- conn := net.GetConn(oneID, otherID)
- if conn == nil {
- return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
- }
- if !conn.Up {
- return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
- }
- client, err := conn.one.Client()
- if err != nil {
- return err
- }
- net.events.Send(ControlEvent(conn))
- return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
- }
- // DidConnect tracks the fact that the "one" node connected to the "other" node
- func (net *Network) DidConnect(one, other enode.ID) error {
- net.lock.Lock()
- defer net.lock.Unlock()
- conn, err := net.getOrCreateConn(one, other)
- if err != nil {
- return fmt.Errorf("connection between %v and %v does not exist", one, other)
- }
- if conn.Up {
- return fmt.Errorf("%v and %v already connected", one, other)
- }
- conn.Up = true
- net.events.Send(NewEvent(conn))
- return nil
- }
- // DidDisconnect tracks the fact that the "one" node disconnected from the
- // "other" node
- func (net *Network) DidDisconnect(one, other enode.ID) error {
- net.lock.Lock()
- defer net.lock.Unlock()
- conn := net.getConn(one, other)
- if conn == nil {
- return fmt.Errorf("connection between %v and %v does not exist", one, other)
- }
- if !conn.Up {
- return fmt.Errorf("%v and %v already disconnected", one, other)
- }
- conn.Up = false
- conn.initiated = time.Now().Add(-DialBanTimeout)
- net.events.Send(NewEvent(conn))
- return nil
- }
- // DidSend tracks the fact that "sender" sent a message to "receiver"
- func (net *Network) DidSend(sender, receiver enode.ID, proto string, code uint64) error {
- msg := &Msg{
- One: sender,
- Other: receiver,
- Protocol: proto,
- Code: code,
- Received: false,
- }
- net.events.Send(NewEvent(msg))
- return nil
- }
- // DidReceive tracks the fact that "receiver" received a message from "sender"
- func (net *Network) DidReceive(sender, receiver enode.ID, proto string, code uint64) error {
- msg := &Msg{
- One: sender,
- Other: receiver,
- Protocol: proto,
- Code: code,
- Received: true,
- }
- net.events.Send(NewEvent(msg))
- return nil
- }
- // GetNode gets the node with the given ID, returning nil if the node does not
- // exist
- func (net *Network) GetNode(id enode.ID) *Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNode(id)
- }
- func (net *Network) getNode(id enode.ID) *Node {
- i, found := net.nodeMap[id]
- if !found {
- return nil
- }
- return net.Nodes[i]
- }
- // GetNodeByName gets the node with the given name, returning nil if the node does
- // not exist
- func (net *Network) GetNodeByName(name string) *Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodeByName(name)
- }
- func (net *Network) getNodeByName(name string) *Node {
- for _, node := range net.Nodes {
- if node.Config.Name == name {
- return node
- }
- }
- return nil
- }
- // GetNodeIDs returns the IDs of all existing nodes
- // Nodes can optionally be excluded by specifying their enode.ID.
- func (net *Network) GetNodeIDs(excludeIDs ...enode.ID) []enode.ID {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodeIDs(excludeIDs)
- }
- func (net *Network) getNodeIDs(excludeIDs []enode.ID) []enode.ID {
- // Get all current nodeIDs
- nodeIDs := make([]enode.ID, 0, len(net.nodeMap))
- for id := range net.nodeMap {
- nodeIDs = append(nodeIDs, id)
- }
- if len(excludeIDs) > 0 {
- // Return the difference of nodeIDs and excludeIDs
- return filterIDs(nodeIDs, excludeIDs)
- }
- return nodeIDs
- }
- // GetNodes returns the existing nodes.
- // Nodes can optionally be excluded by specifying their enode.ID.
- func (net *Network) GetNodes(excludeIDs ...enode.ID) []*Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodes(excludeIDs)
- }
- func (net *Network) getNodes(excludeIDs []enode.ID) []*Node {
- if len(excludeIDs) > 0 {
- nodeIDs := net.getNodeIDs(excludeIDs)
- return net.getNodesByID(nodeIDs)
- }
- return net.Nodes
- }
- // GetNodesByID returns existing nodes with the given enode.IDs.
- // If a node doesn't exist with a given enode.ID, it is ignored.
- func (net *Network) GetNodesByID(nodeIDs []enode.ID) []*Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodesByID(nodeIDs)
- }
- func (net *Network) getNodesByID(nodeIDs []enode.ID) []*Node {
- nodes := make([]*Node, 0, len(nodeIDs))
- for _, id := range nodeIDs {
- node := net.getNode(id)
- if node != nil {
- nodes = append(nodes, node)
- }
- }
- return nodes
- }
- // GetNodesByProperty returns existing nodes that have the given property string registered in their NodeConfig
- func (net *Network) GetNodesByProperty(property string) []*Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodesByProperty(property)
- }
- func (net *Network) getNodesByProperty(property string) []*Node {
- nodes := make([]*Node, 0, len(net.propertyMap[property]))
- for _, nodeIndex := range net.propertyMap[property] {
- nodes = append(nodes, net.Nodes[nodeIndex])
- }
- return nodes
- }
- // GetNodeIDsByProperty returns existing node's enode IDs that have the given property string registered in the NodeConfig
- func (net *Network) GetNodeIDsByProperty(property string) []enode.ID {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getNodeIDsByProperty(property)
- }
- func (net *Network) getNodeIDsByProperty(property string) []enode.ID {
- nodeIDs := make([]enode.ID, 0, len(net.propertyMap[property]))
- for _, nodeIndex := range net.propertyMap[property] {
- node := net.Nodes[nodeIndex]
- nodeIDs = append(nodeIDs, node.ID())
- }
- return nodeIDs
- }
- // GetRandomUpNode returns a random node on the network, which is running.
- func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getRandomUpNode(excludeIDs...)
- }
- // GetRandomUpNode returns a random node on the network, which is running.
- func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
- return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
- }
- func (net *Network) getUpNodeIDs() (ids []enode.ID) {
- for _, node := range net.Nodes {
- if node.Up() {
- ids = append(ids, node.ID())
- }
- }
- return ids
- }
- // GetRandomDownNode returns a random node on the network, which is stopped.
- func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getRandomNode(net.getDownNodeIDs(), excludeIDs)
- }
- func (net *Network) getDownNodeIDs() (ids []enode.ID) {
- for _, node := range net.Nodes {
- if !node.Up() {
- ids = append(ids, node.ID())
- }
- }
- return ids
- }
- // GetRandomNode returns a random node on the network, regardless of whether it is running or not
- func (net *Network) GetRandomNode(excludeIDs ...enode.ID) *Node {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getRandomNode(net.getNodeIDs(nil), excludeIDs) // no need to exclude twice
- }
- func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
- filtered := filterIDs(ids, excludeIDs)
- l := len(filtered)
- if l == 0 {
- return nil
- }
- return net.getNode(filtered[rand.Intn(l)])
- }
- func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
- exclude := make(map[enode.ID]bool)
- for _, id := range excludeIDs {
- exclude[id] = true
- }
- var filtered []enode.ID
- for _, id := range ids {
- if _, found := exclude[id]; !found {
- filtered = append(filtered, id)
- }
- }
- return filtered
- }
- // GetConn returns the connection which exists between "one" and "other"
- // regardless of which node initiated the connection
- func (net *Network) GetConn(oneID, otherID enode.ID) *Conn {
- net.lock.RLock()
- defer net.lock.RUnlock()
- return net.getConn(oneID, otherID)
- }
- // GetOrCreateConn is like GetConn but creates the connection if it doesn't
- // already exist
- func (net *Network) GetOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
- net.lock.Lock()
- defer net.lock.Unlock()
- return net.getOrCreateConn(oneID, otherID)
- }
- func (net *Network) getOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
- if conn := net.getConn(oneID, otherID); conn != nil {
- return conn, nil
- }
- one := net.getNode(oneID)
- if one == nil {
- return nil, fmt.Errorf("node %v does not exist", oneID)
- }
- other := net.getNode(otherID)
- if other == nil {
- return nil, fmt.Errorf("node %v does not exist", otherID)
- }
- conn := &Conn{
- One: oneID,
- Other: otherID,
- one: one,
- other: other,
- }
- label := ConnLabel(oneID, otherID)
- net.connMap[label] = len(net.Conns)
- net.Conns = append(net.Conns, conn)
- return conn, nil
- }
- func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
- label := ConnLabel(oneID, otherID)
- i, found := net.connMap[label]
- if !found {
- return nil
- }
- return net.Conns[i]
- }
- // InitConn(one, other) retrieves the connection model for the connection between
- // peers one and other, or creates a new one if it does not exist
- // the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
- // it checks if the connection is already up, and if the nodes are running
- // NOTE:
- // it also checks whether there has been recent attempt to connect the peers
- // this is cheating as the simulation is used as an oracle and know about
- // remote peers attempt to connect to a node which will then not initiate the connection
- func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
- net.lock.Lock()
- defer net.lock.Unlock()
- return net.initConn(oneID, otherID)
- }
- func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
- if oneID == otherID {
- return nil, fmt.Errorf("refusing to connect to self %v", oneID)
- }
- conn, err := net.getOrCreateConn(oneID, otherID)
- if err != nil {
- return nil, err
- }
- if conn.Up {
- return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
- }
- if time.Since(conn.initiated) < DialBanTimeout {
- return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
- }
- err = conn.nodesUp()
- if err != nil {
- log.Trace("Nodes not up", "err", err)
- return nil, fmt.Errorf("nodes not up: %v", err)
- }
- log.Debug("Connection initiated", "id", oneID, "other", otherID)
- conn.initiated = time.Now()
- return conn, nil
- }
- // Shutdown stops all nodes in the network and closes the quit channel
- func (net *Network) Shutdown() {
- for _, node := range net.Nodes {
- log.Debug("Stopping node", "id", node.ID())
- if err := node.Stop(); err != nil {
- log.Warn("Can't stop node", "id", node.ID(), "err", err)
- }
- // If the node has the close method, call it.
- if closer, ok := node.Node.(io.Closer); ok {
- if err := closer.Close(); err != nil {
- log.Warn("Can't close node", "id", node.ID(), "err", err)
- }
- }
- }
- close(net.quitc)
- }
- // Reset resets all network properties:
- // empties the nodes and the connection list
- func (net *Network) Reset() {
- net.lock.Lock()
- defer net.lock.Unlock()
- //re-initialize the maps
- net.connMap = make(map[string]int)
- net.nodeMap = make(map[enode.ID]int)
- net.propertyMap = make(map[string][]int)
- net.Nodes = nil
- net.Conns = nil
- }
- // Node is a wrapper around adapters.Node which is used to track the status
- // of a node in the network
- type Node struct {
- adapters.Node `json:"-"`
- // Config if the config used to created the node
- Config *adapters.NodeConfig `json:"config"`
- // up tracks whether or not the node is running
- up bool
- upMu *sync.RWMutex
- }
- func newNode(an adapters.Node, ac *adapters.NodeConfig, up bool) *Node {
- return &Node{Node: an, Config: ac, up: up, upMu: new(sync.RWMutex)}
- }
- func (n *Node) copy() *Node {
- configCpy := *n.Config
- return newNode(n.Node, &configCpy, n.Up())
- }
- // Up returns whether the node is currently up (online)
- func (n *Node) Up() bool {
- n.upMu.RLock()
- defer n.upMu.RUnlock()
- return n.up
- }
- // SetUp sets the up (online) status of the nodes with the given value
- func (n *Node) SetUp(up bool) {
- n.upMu.Lock()
- defer n.upMu.Unlock()
- n.up = up
- }
- // ID returns the ID of the node
- func (n *Node) ID() enode.ID {
- return n.Config.ID
- }
- // String returns a log-friendly string
- func (n *Node) String() string {
- return fmt.Sprintf("Node %v", n.ID().TerminalString())
- }
- // NodeInfo returns information about the node
- func (n *Node) NodeInfo() *p2p.NodeInfo {
- // avoid a panic if the node is not started yet
- if n.Node == nil {
- return nil
- }
- info := n.Node.NodeInfo()
- info.Name = n.Config.Name
- return info
- }
- // MarshalJSON implements the json.Marshaler interface so that the encoded
- // JSON includes the NodeInfo
- func (n *Node) MarshalJSON() ([]byte, error) {
- return json.Marshal(struct {
- Info *p2p.NodeInfo `json:"info,omitempty"`
- Config *adapters.NodeConfig `json:"config,omitempty"`
- Up bool `json:"up"`
- }{
- Info: n.NodeInfo(),
- Config: n.Config,
- Up: n.Up(),
- })
- }
- // UnmarshalJSON implements json.Unmarshaler interface so that we don't lose Node.up
- // status. IMPORTANT: The implementation is incomplete; we lose p2p.NodeInfo.
- func (n *Node) UnmarshalJSON(raw []byte) error {
- // TODO: How should we turn back NodeInfo into n.Node?
- // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
- var node struct {
- Config *adapters.NodeConfig `json:"config,omitempty"`
- Up bool `json:"up"`
- }
- if err := json.Unmarshal(raw, &node); err != nil {
- return err
- }
- *n = *newNode(nil, node.Config, node.Up)
- return nil
- }
- // Conn represents a connection between two nodes in the network
- type Conn struct {
- // One is the node which initiated the connection
- One enode.ID `json:"one"`
- // Other is the node which the connection was made to
- Other enode.ID `json:"other"`
- // Up tracks whether or not the connection is active
- Up bool `json:"up"`
- // Registers when the connection was grabbed to dial
- initiated time.Time
- one *Node
- other *Node
- }
- // nodesUp returns whether both nodes are currently up
- func (c *Conn) nodesUp() error {
- if !c.one.Up() {
- return fmt.Errorf("one %v is not up", c.One)
- }
- if !c.other.Up() {
- return fmt.Errorf("other %v is not up", c.Other)
- }
- return nil
- }
- // String returns a log-friendly string
- func (c *Conn) String() string {
- return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
- }
- // Msg represents a p2p message sent between two nodes in the network
- type Msg struct {
- One enode.ID `json:"one"`
- Other enode.ID `json:"other"`
- Protocol string `json:"protocol"`
- Code uint64 `json:"code"`
- Received bool `json:"received"`
- }
- // String returns a log-friendly string
- func (m *Msg) String() string {
- return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
- }
- // ConnLabel generates a deterministic string which represents a connection
- // between two nodes, used to compare if two connections are between the same
- // nodes
- func ConnLabel(source, target enode.ID) string {
- var first, second enode.ID
- if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
- first = target
- second = source
- } else {
- first = source
- second = target
- }
- return fmt.Sprintf("%v-%v", first, second)
- }
- // Snapshot represents the state of a network at a single point in time and can
- // be used to restore the state of a network
- type Snapshot struct {
- Nodes []NodeSnapshot `json:"nodes,omitempty"`
- Conns []Conn `json:"conns,omitempty"`
- }
- // NodeSnapshot represents the state of a node in the network
- type NodeSnapshot struct {
- Node Node `json:"node,omitempty"`
- // Snapshots is arbitrary data gathered from calling node.Snapshots()
- Snapshots map[string][]byte `json:"snapshots,omitempty"`
- }
- // Snapshot creates a network snapshot
- func (net *Network) Snapshot() (*Snapshot, error) {
- return net.snapshot(nil, nil)
- }
- func (net *Network) SnapshotWithServices(addServices []string, removeServices []string) (*Snapshot, error) {
- return net.snapshot(addServices, removeServices)
- }
- func (net *Network) snapshot(addServices []string, removeServices []string) (*Snapshot, error) {
- net.lock.Lock()
- defer net.lock.Unlock()
- snap := &Snapshot{
- Nodes: make([]NodeSnapshot, len(net.Nodes)),
- }
- for i, node := range net.Nodes {
- snap.Nodes[i] = NodeSnapshot{Node: *node.copy()}
- if !node.Up() {
- continue
- }
- snapshots, err := node.Snapshots()
- if err != nil {
- return nil, err
- }
- snap.Nodes[i].Snapshots = snapshots
- for _, addSvc := range addServices {
- haveSvc := false
- for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
- if svc == addSvc {
- haveSvc = true
- break
- }
- }
- if !haveSvc {
- snap.Nodes[i].Node.Config.Lifecycles = append(snap.Nodes[i].Node.Config.Lifecycles, addSvc)
- }
- }
- if len(removeServices) > 0 {
- var cleanedServices []string
- for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
- haveSvc := false
- for _, rmSvc := range removeServices {
- if rmSvc == svc {
- haveSvc = true
- break
- }
- }
- if !haveSvc {
- cleanedServices = append(cleanedServices, svc)
- }
- }
- snap.Nodes[i].Node.Config.Lifecycles = cleanedServices
- }
- }
- for _, conn := range net.Conns {
- if conn.Up {
- snap.Conns = append(snap.Conns, *conn)
- }
- }
- return snap, nil
- }
- // longrunning tests may need a longer timeout
- var snapshotLoadTimeout = 900 * time.Second
- // Load loads a network snapshot
- func (net *Network) Load(snap *Snapshot) error {
- // Start nodes.
- for _, n := range snap.Nodes {
- if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
- return err
- }
- if !n.Node.Up() {
- continue
- }
- if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
- return err
- }
- }
- // Prepare connection events counter.
- allConnected := make(chan struct{}) // closed when all connections are established
- done := make(chan struct{}) // ensures that the event loop goroutine is terminated
- defer close(done)
- // Subscribe to event channel.
- // It needs to be done outside of the event loop goroutine (created below)
- // to ensure that the event channel is blocking before connect calls are made.
- events := make(chan *Event)
- sub := net.Events().Subscribe(events)
- defer sub.Unsubscribe()
- go func() {
- // Expected number of connections.
- total := len(snap.Conns)
- // Set of all established connections from the snapshot, not other connections.
- // Key array element 0 is the connection One field value, and element 1 connection Other field.
- connections := make(map[[2]enode.ID]struct{}, total)
- for {
- select {
- case e := <-events:
- // Ignore control events as they do not represent
- // connect or disconnect (Up) state change.
- if e.Control {
- continue
- }
- // Detect only connection events.
- if e.Type != EventTypeConn {
- continue
- }
- connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
- // Nodes are still not connected or have been disconnected.
- if !e.Conn.Up {
- // Delete the connection from the set of established connections.
- // This will prevent false positive in case disconnections happen.
- delete(connections, connection)
- log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
- continue
- }
- // Check that the connection is from the snapshot.
- for _, conn := range snap.Conns {
- if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
- // Add the connection to the set of established connections.
- connections[connection] = struct{}{}
- if len(connections) == total {
- // Signal that all nodes are connected.
- close(allConnected)
- return
- }
- break
- }
- }
- case <-done:
- // Load function returned, terminate this goroutine.
- return
- }
- }
- }()
- // Start connecting.
- for _, conn := range snap.Conns {
- if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
- //in this case, at least one of the nodes of a connection is not up,
- //so it would result in the snapshot `Load` to fail
- continue
- }
- if err := net.Connect(conn.One, conn.Other); err != nil {
- return err
- }
- }
- select {
- // Wait until all connections from the snapshot are established.
- case <-allConnected:
- // Make sure that we do not wait forever.
- case <-time.After(snapshotLoadTimeout):
- return errors.New("snapshot connections not established")
- }
- return nil
- }
- // Subscribe reads control events from a channel and executes them
- func (net *Network) Subscribe(events chan *Event) {
- for {
- select {
- case event, ok := <-events:
- if !ok {
- return
- }
- if event.Control {
- net.executeControlEvent(event)
- }
- case <-net.quitc:
- return
- }
- }
- }
- func (net *Network) executeControlEvent(event *Event) {
- log.Trace("Executing control event", "type", event.Type, "event", event)
- switch event.Type {
- case EventTypeNode:
- if err := net.executeNodeEvent(event); err != nil {
- log.Error("Error executing node event", "event", event, "err", err)
- }
- case EventTypeConn:
- if err := net.executeConnEvent(event); err != nil {
- log.Error("Error executing conn event", "event", event, "err", err)
- }
- case EventTypeMsg:
- log.Warn("Ignoring control msg event")
- }
- }
- func (net *Network) executeNodeEvent(e *Event) error {
- if !e.Node.Up() {
- return net.Stop(e.Node.ID())
- }
- if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
- return err
- }
- return net.Start(e.Node.ID())
- }
- func (net *Network) executeConnEvent(e *Event) error {
- if e.Conn.Up {
- return net.Connect(e.Conn.One, e.Conn.Other)
- }
- return net.Disconnect(e.Conn.One, e.Conn.Other)
- }
|