les_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "runtime"
  8. "strings"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/ethereum/go-ethereum/p2p"
  13. "github.com/ethereum/go-ethereum/rpc"
  14. )
  15. type gethrpc struct {
  16. name string
  17. rpc *rpc.Client
  18. geth *testgeth
  19. nodeInfo *p2p.NodeInfo
  20. }
  21. func (g *gethrpc) killAndWait() {
  22. g.geth.Kill()
  23. g.geth.WaitExit()
  24. }
  25. func (g *gethrpc) callRPC(result interface{}, method string, args ...interface{}) {
  26. if err := g.rpc.Call(&result, method, args...); err != nil {
  27. g.geth.Fatalf("callRPC %v: %v", method, err)
  28. }
  29. }
  30. func (g *gethrpc) addPeer(peer *gethrpc) {
  31. g.geth.Logf("%v.addPeer(%v)", g.name, peer.name)
  32. enode := peer.getNodeInfo().Enode
  33. peerCh := make(chan *p2p.PeerEvent)
  34. sub, err := g.rpc.Subscribe(context.Background(), "admin", peerCh, "peerEvents")
  35. if err != nil {
  36. g.geth.Fatalf("subscribe %v: %v", g.name, err)
  37. }
  38. defer sub.Unsubscribe()
  39. g.callRPC(nil, "admin_addPeer", enode)
  40. dur := 14 * time.Second
  41. timeout := time.After(dur)
  42. select {
  43. case ev := <-peerCh:
  44. g.geth.Logf("%v received event: type=%v, peer=%v", g.name, ev.Type, ev.Peer)
  45. case err := <-sub.Err():
  46. g.geth.Fatalf("%v sub error: %v", g.name, err)
  47. case <-timeout:
  48. g.geth.Error("timeout adding peer after", dur)
  49. }
  50. }
  51. // Use this function instead of `g.nodeInfo` directly
  52. func (g *gethrpc) getNodeInfo() *p2p.NodeInfo {
  53. if g.nodeInfo != nil {
  54. return g.nodeInfo
  55. }
  56. g.nodeInfo = &p2p.NodeInfo{}
  57. g.callRPC(&g.nodeInfo, "admin_nodeInfo")
  58. return g.nodeInfo
  59. }
  60. func (g *gethrpc) waitSynced() {
  61. // Check if it's synced now
  62. var result interface{}
  63. g.callRPC(&result, "eth_syncing")
  64. syncing, ok := result.(bool)
  65. if ok && !syncing {
  66. g.geth.Logf("%v already synced", g.name)
  67. return
  68. }
  69. // Actually wait, subscribe to the event
  70. ch := make(chan interface{})
  71. sub, err := g.rpc.Subscribe(context.Background(), "eth", ch, "syncing")
  72. if err != nil {
  73. g.geth.Fatalf("%v syncing: %v", g.name, err)
  74. }
  75. defer sub.Unsubscribe()
  76. timeout := time.After(4 * time.Second)
  77. select {
  78. case ev := <-ch:
  79. g.geth.Log("'syncing' event", ev)
  80. syncing, ok := ev.(bool)
  81. if ok && !syncing {
  82. break
  83. }
  84. g.geth.Log("Other 'syncing' event", ev)
  85. case err := <-sub.Err():
  86. g.geth.Fatalf("%v notification: %v", g.name, err)
  87. break
  88. case <-timeout:
  89. g.geth.Fatalf("%v timeout syncing", g.name)
  90. break
  91. }
  92. }
  93. // ipcEndpoint resolves an IPC endpoint based on a configured value, taking into
  94. // account the set data folders as well as the designated platform we're currently
  95. // running on.
  96. func ipcEndpoint(ipcPath, datadir string) string {
  97. // On windows we can only use plain top-level pipes
  98. if runtime.GOOS == "windows" {
  99. if strings.HasPrefix(ipcPath, `\\.\pipe\`) {
  100. return ipcPath
  101. }
  102. return `\\.\pipe\` + ipcPath
  103. }
  104. // Resolve names into the data directory full paths otherwise
  105. if filepath.Base(ipcPath) == ipcPath {
  106. if datadir == "" {
  107. return filepath.Join(os.TempDir(), ipcPath)
  108. }
  109. return filepath.Join(datadir, ipcPath)
  110. }
  111. return ipcPath
  112. }
  113. // nextIPC ensures that each ipc pipe gets a unique name.
  114. // On linux, it works well to use ipc pipes all over the filesystem (in datadirs),
  115. // but windows require pipes to sit in "\\.\pipe\". Therefore, to run several
  116. // nodes simultaneously, we need to distinguish between them, which we do by
  117. // the pipe filename instead of folder.
  118. var nextIPC = uint32(0)
  119. func startGethWithIpc(t *testing.T, name string, args ...string) *gethrpc {
  120. ipcName := fmt.Sprintf("geth-%d.ipc", atomic.AddUint32(&nextIPC, 1))
  121. args = append([]string{"--networkid=42", "--port=0", "--ipcpath", ipcName}, args...)
  122. t.Logf("Starting %v with rpc: %v", name, args)
  123. g := &gethrpc{
  124. name: name,
  125. geth: runGeth(t, args...),
  126. }
  127. // wait before we can attach to it. TODO: probe for it properly
  128. time.Sleep(1 * time.Second)
  129. var err error
  130. ipcpath := ipcEndpoint(ipcName, g.geth.Datadir)
  131. if g.rpc, err = rpc.Dial(ipcpath); err != nil {
  132. t.Fatalf("%v rpc connect to %v: %v", name, ipcpath, err)
  133. }
  134. return g
  135. }
  136. func initGeth(t *testing.T) string {
  137. args := []string{"--networkid=42", "init", "./testdata/clique.json"}
  138. t.Logf("Initializing geth: %v ", args)
  139. g := runGeth(t, args...)
  140. datadir := g.Datadir
  141. g.WaitExit()
  142. return datadir
  143. }
  144. func startLightServer(t *testing.T) *gethrpc {
  145. datadir := initGeth(t)
  146. t.Logf("Importing keys to geth")
  147. runGeth(t, "--datadir", datadir, "--password", "./testdata/password.txt", "account", "import", "./testdata/key.prv", "--lightkdf").WaitExit()
  148. account := "0x02f0d131f1f97aef08aec6e3291b957d9efe7105"
  149. server := startGethWithIpc(t, "lightserver", "--allow-insecure-unlock", "--datadir", datadir, "--password", "./testdata/password.txt", "--unlock", account, "--mine", "--light.serve=100", "--light.maxpeers=1", "--nodiscover", "--nat=extip:127.0.0.1", "--verbosity=4")
  150. return server
  151. }
  152. func startClient(t *testing.T, name string) *gethrpc {
  153. datadir := initGeth(t)
  154. return startGethWithIpc(t, name, "--datadir", datadir, "--nodiscover", "--syncmode=light", "--nat=extip:127.0.0.1", "--verbosity=4")
  155. }
  156. func TestPriorityClient(t *testing.T) {
  157. // Quorum
  158. t.Skip("skipping test in Quorum (no support for light sync mode).")
  159. // End Quorum
  160. lightServer := startLightServer(t)
  161. defer lightServer.killAndWait()
  162. // Start client and add lightServer as peer
  163. freeCli := startClient(t, "freeCli")
  164. defer freeCli.killAndWait()
  165. freeCli.addPeer(lightServer)
  166. var peers []*p2p.PeerInfo
  167. freeCli.callRPC(&peers, "admin_peers")
  168. if len(peers) != 1 {
  169. t.Errorf("Expected: # of client peers == 1, actual: %v", len(peers))
  170. return
  171. }
  172. // Set up priority client, get its nodeID, increase its balance on the lightServer
  173. prioCli := startClient(t, "prioCli")
  174. defer prioCli.killAndWait()
  175. // 3_000_000_000 once we move to Go 1.13
  176. tokens := uint64(3000000000)
  177. lightServer.callRPC(nil, "les_addBalance", prioCli.getNodeInfo().ID, tokens)
  178. prioCli.addPeer(lightServer)
  179. // Check if priority client is actually syncing and the regular client got kicked out
  180. prioCli.callRPC(&peers, "admin_peers")
  181. if len(peers) != 1 {
  182. t.Errorf("Expected: # of prio peers == 1, actual: %v", len(peers))
  183. }
  184. nodes := map[string]*gethrpc{
  185. lightServer.getNodeInfo().ID: lightServer,
  186. freeCli.getNodeInfo().ID: freeCli,
  187. prioCli.getNodeInfo().ID: prioCli,
  188. }
  189. time.Sleep(1 * time.Second)
  190. lightServer.callRPC(&peers, "admin_peers")
  191. peersWithNames := make(map[string]string)
  192. for _, p := range peers {
  193. peersWithNames[nodes[p.ID].name] = p.ID
  194. }
  195. if _, freeClientFound := peersWithNames[freeCli.name]; freeClientFound {
  196. t.Error("client is still a peer of lightServer", peersWithNames)
  197. }
  198. if _, prioClientFound := peersWithNames[prioCli.name]; !prioClientFound {
  199. t.Error("prio client is not among lightServer peers", peersWithNames)
  200. }
  201. }