123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package main
- import (
- "context"
- "fmt"
- "os"
- "path/filepath"
- "runtime"
- "strings"
- "sync/atomic"
- "testing"
- "time"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/rpc"
- )
- type gethrpc struct {
- name string
- rpc *rpc.Client
- geth *testgeth
- nodeInfo *p2p.NodeInfo
- }
- func (g *gethrpc) killAndWait() {
- g.geth.Kill()
- g.geth.WaitExit()
- }
- func (g *gethrpc) callRPC(result interface{}, method string, args ...interface{}) {
- if err := g.rpc.Call(&result, method, args...); err != nil {
- g.geth.Fatalf("callRPC %v: %v", method, err)
- }
- }
- func (g *gethrpc) addPeer(peer *gethrpc) {
- g.geth.Logf("%v.addPeer(%v)", g.name, peer.name)
- enode := peer.getNodeInfo().Enode
- peerCh := make(chan *p2p.PeerEvent)
- sub, err := g.rpc.Subscribe(context.Background(), "admin", peerCh, "peerEvents")
- if err != nil {
- g.geth.Fatalf("subscribe %v: %v", g.name, err)
- }
- defer sub.Unsubscribe()
- g.callRPC(nil, "admin_addPeer", enode)
- dur := 14 * time.Second
- timeout := time.After(dur)
- select {
- case ev := <-peerCh:
- g.geth.Logf("%v received event: type=%v, peer=%v", g.name, ev.Type, ev.Peer)
- case err := <-sub.Err():
- g.geth.Fatalf("%v sub error: %v", g.name, err)
- case <-timeout:
- g.geth.Error("timeout adding peer after", dur)
- }
- }
- // Use this function instead of `g.nodeInfo` directly
- func (g *gethrpc) getNodeInfo() *p2p.NodeInfo {
- if g.nodeInfo != nil {
- return g.nodeInfo
- }
- g.nodeInfo = &p2p.NodeInfo{}
- g.callRPC(&g.nodeInfo, "admin_nodeInfo")
- return g.nodeInfo
- }
- func (g *gethrpc) waitSynced() {
- // Check if it's synced now
- var result interface{}
- g.callRPC(&result, "eth_syncing")
- syncing, ok := result.(bool)
- if ok && !syncing {
- g.geth.Logf("%v already synced", g.name)
- return
- }
- // Actually wait, subscribe to the event
- ch := make(chan interface{})
- sub, err := g.rpc.Subscribe(context.Background(), "eth", ch, "syncing")
- if err != nil {
- g.geth.Fatalf("%v syncing: %v", g.name, err)
- }
- defer sub.Unsubscribe()
- timeout := time.After(4 * time.Second)
- select {
- case ev := <-ch:
- g.geth.Log("'syncing' event", ev)
- syncing, ok := ev.(bool)
- if ok && !syncing {
- break
- }
- g.geth.Log("Other 'syncing' event", ev)
- case err := <-sub.Err():
- g.geth.Fatalf("%v notification: %v", g.name, err)
- break
- case <-timeout:
- g.geth.Fatalf("%v timeout syncing", g.name)
- break
- }
- }
- // ipcEndpoint resolves an IPC endpoint based on a configured value, taking into
- // account the set data folders as well as the designated platform we're currently
- // running on.
- func ipcEndpoint(ipcPath, datadir string) string {
- // On windows we can only use plain top-level pipes
- if runtime.GOOS == "windows" {
- if strings.HasPrefix(ipcPath, `\\.\pipe\`) {
- return ipcPath
- }
- return `\\.\pipe\` + ipcPath
- }
- // Resolve names into the data directory full paths otherwise
- if filepath.Base(ipcPath) == ipcPath {
- if datadir == "" {
- return filepath.Join(os.TempDir(), ipcPath)
- }
- return filepath.Join(datadir, ipcPath)
- }
- return ipcPath
- }
- // nextIPC ensures that each ipc pipe gets a unique name.
- // On linux, it works well to use ipc pipes all over the filesystem (in datadirs),
- // but windows require pipes to sit in "\\.\pipe\". Therefore, to run several
- // nodes simultaneously, we need to distinguish between them, which we do by
- // the pipe filename instead of folder.
- var nextIPC = uint32(0)
- func startGethWithIpc(t *testing.T, name string, args ...string) *gethrpc {
- ipcName := fmt.Sprintf("geth-%d.ipc", atomic.AddUint32(&nextIPC, 1))
- args = append([]string{"--networkid=42", "--port=0", "--ipcpath", ipcName}, args...)
- t.Logf("Starting %v with rpc: %v", name, args)
- g := &gethrpc{
- name: name,
- geth: runGeth(t, args...),
- }
- // wait before we can attach to it. TODO: probe for it properly
- time.Sleep(1 * time.Second)
- var err error
- ipcpath := ipcEndpoint(ipcName, g.geth.Datadir)
- if g.rpc, err = rpc.Dial(ipcpath); err != nil {
- t.Fatalf("%v rpc connect to %v: %v", name, ipcpath, err)
- }
- return g
- }
- func initGeth(t *testing.T) string {
- args := []string{"--networkid=42", "init", "./testdata/clique.json"}
- t.Logf("Initializing geth: %v ", args)
- g := runGeth(t, args...)
- datadir := g.Datadir
- g.WaitExit()
- return datadir
- }
- func startLightServer(t *testing.T) *gethrpc {
- datadir := initGeth(t)
- t.Logf("Importing keys to geth")
- runGeth(t, "--datadir", datadir, "--password", "./testdata/password.txt", "account", "import", "./testdata/key.prv", "--lightkdf").WaitExit()
- account := "0x02f0d131f1f97aef08aec6e3291b957d9efe7105"
- server := startGethWithIpc(t, "lightserver", "--allow-insecure-unlock", "--datadir", datadir, "--password", "./testdata/password.txt", "--unlock", account, "--mine", "--light.serve=100", "--light.maxpeers=1", "--nodiscover", "--nat=extip:127.0.0.1", "--verbosity=4")
- return server
- }
- func startClient(t *testing.T, name string) *gethrpc {
- datadir := initGeth(t)
- return startGethWithIpc(t, name, "--datadir", datadir, "--nodiscover", "--syncmode=light", "--nat=extip:127.0.0.1", "--verbosity=4")
- }
- func TestPriorityClient(t *testing.T) {
- // Quorum
- t.Skip("skipping test in Quorum (no support for light sync mode).")
- // End Quorum
- lightServer := startLightServer(t)
- defer lightServer.killAndWait()
- // Start client and add lightServer as peer
- freeCli := startClient(t, "freeCli")
- defer freeCli.killAndWait()
- freeCli.addPeer(lightServer)
- var peers []*p2p.PeerInfo
- freeCli.callRPC(&peers, "admin_peers")
- if len(peers) != 1 {
- t.Errorf("Expected: # of client peers == 1, actual: %v", len(peers))
- return
- }
- // Set up priority client, get its nodeID, increase its balance on the lightServer
- prioCli := startClient(t, "prioCli")
- defer prioCli.killAndWait()
- // 3_000_000_000 once we move to Go 1.13
- tokens := uint64(3000000000)
- lightServer.callRPC(nil, "les_addBalance", prioCli.getNodeInfo().ID, tokens)
- prioCli.addPeer(lightServer)
- // Check if priority client is actually syncing and the regular client got kicked out
- prioCli.callRPC(&peers, "admin_peers")
- if len(peers) != 1 {
- t.Errorf("Expected: # of prio peers == 1, actual: %v", len(peers))
- }
- nodes := map[string]*gethrpc{
- lightServer.getNodeInfo().ID: lightServer,
- freeCli.getNodeInfo().ID: freeCli,
- prioCli.getNodeInfo().ID: prioCli,
- }
- time.Sleep(1 * time.Second)
- lightServer.callRPC(&peers, "admin_peers")
- peersWithNames := make(map[string]string)
- for _, p := range peers {
- peersWithNames[nodes[p.ID].name] = p.ID
- }
- if _, freeClientFound := peersWithNames[freeCli.name]; freeClientFound {
- t.Error("client is still a peer of lightServer", peersWithNames)
- }
- if _, prioClientFound := peersWithNames[prioCli.name]; !prioClientFound {
- t.Error("prio client is not among lightServer peers", peersWithNames)
- }
- }
|