dial_test.go 17 KB


  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "math/rand"
  22. "net"
  23. "reflect"
  24. "sync"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/internal/testlog"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/p2p/enode"
  31. "github.com/ethereum/go-ethereum/p2p/netutil"
  32. )
  33. // This test checks that dynamic dials are launched from discovery results.
  34. func TestDialSchedDynDial(t *testing.T) {
  35. t.Parallel()
  36. config := dialConfig{
  37. maxActiveDials: 5,
  38. maxDialPeers: 4,
  39. }
  40. runDialTest(t, config, []dialTestRound{
  41. // 3 out of 4 peers are connected, leaving 2 dial slots.
  42. // 9 nodes are discovered, but only 2 are dialed.
  43. {
  44. peersAdded: []*conn{
  45. {flags: staticDialedConn, node: newNode(uintID(0x00), "")},
  46. {flags: dynDialedConn, node: newNode(uintID(0x01), "")},
  47. {flags: dynDialedConn, node: newNode(uintID(0x02), "")},
  48. },
  49. discovered: []*enode.Node{
  50. newNode(uintID(0x00), "127.0.0.1:30303"), // not dialed because already connected as static peer
  51. newNode(uintID(0x02), "127.0.0.1:30303"), // ...
  52. newNode(uintID(0x03), "127.0.0.1:30303"),
  53. newNode(uintID(0x04), "127.0.0.1:30303"),
  54. newNode(uintID(0x05), "127.0.0.1:30303"), // not dialed because there are only two slots
  55. newNode(uintID(0x06), "127.0.0.1:30303"), // ...
  56. newNode(uintID(0x07), "127.0.0.1:30303"), // ...
  57. newNode(uintID(0x08), "127.0.0.1:30303"), // ...
  58. },
  59. wantNewDials: []*enode.Node{
  60. newNode(uintID(0x03), "127.0.0.1:30303"),
  61. newNode(uintID(0x04), "127.0.0.1:30303"),
  62. },
  63. },
  64. // One dial completes, freeing one dial slot.
  65. {
  66. failed: []enode.ID{
  67. uintID(0x04),
  68. },
  69. wantNewDials: []*enode.Node{
  70. newNode(uintID(0x05), "127.0.0.1:30303"),
  71. },
  72. },
  73. // Dial to 0x03 completes, filling the last remaining peer slot.
  74. {
  75. succeeded: []enode.ID{
  76. uintID(0x03),
  77. },
  78. failed: []enode.ID{
  79. uintID(0x05),
  80. },
  81. discovered: []*enode.Node{
  82. newNode(uintID(0x09), "127.0.0.1:30303"), // not dialed because there are no free slots
  83. },
  84. },
  85. // 3 peers drop off, creating 6 dial slots. Check that 5 of those slots
  86. // (i.e. up to maxActiveDialTasks) are used.
  87. {
  88. peersRemoved: []enode.ID{
  89. uintID(0x00),
  90. uintID(0x01),
  91. uintID(0x02),
  92. },
  93. discovered: []*enode.Node{
  94. newNode(uintID(0x0a), "127.0.0.1:30303"),
  95. newNode(uintID(0x0b), "127.0.0.1:30303"),
  96. newNode(uintID(0x0c), "127.0.0.1:30303"),
  97. newNode(uintID(0x0d), "127.0.0.1:30303"),
  98. newNode(uintID(0x0f), "127.0.0.1:30303"),
  99. },
  100. wantNewDials: []*enode.Node{
  101. newNode(uintID(0x06), "127.0.0.1:30303"),
  102. newNode(uintID(0x07), "127.0.0.1:30303"),
  103. newNode(uintID(0x08), "127.0.0.1:30303"),
  104. newNode(uintID(0x09), "127.0.0.1:30303"),
  105. newNode(uintID(0x0a), "127.0.0.1:30303"),
  106. },
  107. },
  108. })
  109. }
  110. // This test checks that candidates that do not match the netrestrict list are not dialed.
  111. func TestDialSchedNetRestrict(t *testing.T) {
  112. t.Parallel()
  113. nodes := []*enode.Node{
  114. newNode(uintID(0x01), "127.0.0.1:30303"),
  115. newNode(uintID(0x02), "127.0.0.2:30303"),
  116. newNode(uintID(0x03), "127.0.0.3:30303"),
  117. newNode(uintID(0x04), "127.0.0.4:30303"),
  118. newNode(uintID(0x05), "127.0.2.5:30303"),
  119. newNode(uintID(0x06), "127.0.2.6:30303"),
  120. newNode(uintID(0x07), "127.0.2.7:30303"),
  121. newNode(uintID(0x08), "127.0.2.8:30303"),
  122. }
  123. config := dialConfig{
  124. netRestrict: new(netutil.Netlist),
  125. maxActiveDials: 10,
  126. maxDialPeers: 10,
  127. }
  128. config.netRestrict.Add("127.0.2.0/24")
  129. runDialTest(t, config, []dialTestRound{
  130. {
  131. discovered: nodes,
  132. wantNewDials: nodes[4:8],
  133. },
  134. {
  135. succeeded: []enode.ID{
  136. nodes[4].ID(),
  137. nodes[5].ID(),
  138. nodes[6].ID(),
  139. nodes[7].ID(),
  140. },
  141. },
  142. })
  143. }
  144. // This test checks that static dials work and obey the limits.
  145. func TestDialSchedStaticDial(t *testing.T) {
  146. t.Parallel()
  147. config := dialConfig{
  148. maxActiveDials: 5,
  149. maxDialPeers: 4,
  150. }
  151. runDialTest(t, config, []dialTestRound{
  152. // Static dials are launched for the nodes that
  153. // aren't yet connected.
  154. {
  155. peersAdded: []*conn{
  156. {flags: dynDialedConn, node: newNode(uintID(0x01), "127.0.0.1:30303")},
  157. {flags: dynDialedConn, node: newNode(uintID(0x02), "127.0.0.2:30303")},
  158. },
  159. update: func(d *dialScheduler) {
  160. // These two are not dialed because they're already connected
  161. // as dynamic peers.
  162. d.addStatic(newNode(uintID(0x01), "127.0.0.1:30303"))
  163. d.addStatic(newNode(uintID(0x02), "127.0.0.2:30303"))
  164. // These nodes will be dialed:
  165. d.addStatic(newNode(uintID(0x03), "127.0.0.3:30303"))
  166. d.addStatic(newNode(uintID(0x04), "127.0.0.4:30303"))
  167. d.addStatic(newNode(uintID(0x05), "127.0.0.5:30303"))
  168. d.addStatic(newNode(uintID(0x06), "127.0.0.6:30303"))
  169. d.addStatic(newNode(uintID(0x07), "127.0.0.7:30303"))
  170. d.addStatic(newNode(uintID(0x08), "127.0.0.8:30303"))
  171. d.addStatic(newNode(uintID(0x09), "127.0.0.9:30303"))
  172. },
  173. wantNewDials: []*enode.Node{
  174. newNode(uintID(0x03), "127.0.0.3:30303"),
  175. newNode(uintID(0x04), "127.0.0.4:30303"),
  176. newNode(uintID(0x05), "127.0.0.5:30303"),
  177. newNode(uintID(0x06), "127.0.0.6:30303"),
  178. },
  179. },
  180. // Dial to 0x03 completes, filling a peer slot. One slot remains,
  181. // two dials are launched to attempt to fill it.
  182. {
  183. succeeded: []enode.ID{
  184. uintID(0x03),
  185. },
  186. failed: []enode.ID{
  187. uintID(0x04),
  188. uintID(0x05),
  189. uintID(0x06),
  190. },
  191. wantResolves: map[enode.ID]*enode.Node{
  192. uintID(0x04): nil,
  193. uintID(0x05): nil,
  194. uintID(0x06): nil,
  195. },
  196. wantNewDials: []*enode.Node{
  197. newNode(uintID(0x08), "127.0.0.8:30303"),
  198. newNode(uintID(0x09), "127.0.0.9:30303"),
  199. },
  200. },
  201. // Peer 0x01 drops and 0x07 connects as inbound peer.
  202. // Only 0x01 is dialed.
  203. {
  204. peersAdded: []*conn{
  205. {flags: inboundConn, node: newNode(uintID(0x07), "127.0.0.7:30303")},
  206. },
  207. peersRemoved: []enode.ID{
  208. uintID(0x01),
  209. },
  210. wantNewDials: []*enode.Node{
  211. newNode(uintID(0x01), "127.0.0.1:30303"),
  212. },
  213. },
  214. })
  215. }
  216. // This test checks that removing static nodes stops connecting to them.
  217. func TestDialSchedRemoveStatic(t *testing.T) {
  218. t.Parallel()
  219. config := dialConfig{
  220. maxActiveDials: 1,
  221. maxDialPeers: 1,
  222. }
  223. runDialTest(t, config, []dialTestRound{
  224. // Add static nodes.
  225. {
  226. update: func(d *dialScheduler) {
  227. d.addStatic(newNode(uintID(0x01), "127.0.0.1:30303"))
  228. d.addStatic(newNode(uintID(0x02), "127.0.0.2:30303"))
  229. d.addStatic(newNode(uintID(0x03), "127.0.0.3:30303"))
  230. },
  231. wantNewDials: []*enode.Node{
  232. newNode(uintID(0x01), "127.0.0.1:30303"),
  233. },
  234. },
  235. // Dial to 0x01 fails.
  236. {
  237. failed: []enode.ID{
  238. uintID(0x01),
  239. },
  240. wantResolves: map[enode.ID]*enode.Node{
  241. uintID(0x01): nil,
  242. },
  243. wantNewDials: []*enode.Node{
  244. newNode(uintID(0x02), "127.0.0.2:30303"),
  245. },
  246. },
  247. // All static nodes are removed. 0x01 is in history, 0x02 is being
  248. // dialed, 0x03 is in staticPool.
  249. {
  250. update: func(d *dialScheduler) {
  251. d.removeStatic(newNode(uintID(0x01), "127.0.0.1:30303"))
  252. d.removeStatic(newNode(uintID(0x02), "127.0.0.2:30303"))
  253. d.removeStatic(newNode(uintID(0x03), "127.0.0.3:30303"))
  254. },
  255. failed: []enode.ID{
  256. uintID(0x02),
  257. },
  258. wantResolves: map[enode.ID]*enode.Node{
  259. uintID(0x02): nil,
  260. },
  261. },
  262. // Since all static nodes are removed, they should not be dialed again.
  263. {}, {}, {},
  264. })
  265. }
  266. // This test checks that static dials are selected at random.
  267. func TestDialSchedManyStaticNodes(t *testing.T) {
  268. t.Parallel()
  269. config := dialConfig{maxDialPeers: 2}
  270. runDialTest(t, config, []dialTestRound{
  271. {
  272. peersAdded: []*conn{
  273. {flags: dynDialedConn, node: newNode(uintID(0xFFFE), "")},
  274. {flags: dynDialedConn, node: newNode(uintID(0xFFFF), "")},
  275. },
  276. update: func(d *dialScheduler) {
  277. for id := uint16(0); id < 2000; id++ {
  278. n := newNode(uintID(id), "127.0.0.1:30303")
  279. d.addStatic(n)
  280. }
  281. },
  282. },
  283. {
  284. peersRemoved: []enode.ID{
  285. uintID(0xFFFE),
  286. uintID(0xFFFF),
  287. },
  288. wantNewDials: []*enode.Node{
  289. newNode(uintID(0x0085), "127.0.0.1:30303"),
  290. newNode(uintID(0x02dc), "127.0.0.1:30303"),
  291. newNode(uintID(0x0285), "127.0.0.1:30303"),
  292. newNode(uintID(0x00cb), "127.0.0.1:30303"),
  293. },
  294. },
  295. })
  296. }
  297. // This test checks that past dials are not retried for some time.
  298. func TestDialSchedHistory(t *testing.T) {
  299. t.Parallel()
  300. config := dialConfig{
  301. maxActiveDials: 3,
  302. maxDialPeers: 3,
  303. }
  304. runDialTest(t, config, []dialTestRound{
  305. {
  306. update: func(d *dialScheduler) {
  307. d.addStatic(newNode(uintID(0x01), "127.0.0.1:30303"))
  308. d.addStatic(newNode(uintID(0x02), "127.0.0.2:30303"))
  309. d.addStatic(newNode(uintID(0x03), "127.0.0.3:30303"))
  310. },
  311. wantNewDials: []*enode.Node{
  312. newNode(uintID(0x01), "127.0.0.1:30303"),
  313. newNode(uintID(0x02), "127.0.0.2:30303"),
  314. newNode(uintID(0x03), "127.0.0.3:30303"),
  315. },
  316. },
  317. // No new tasks are launched in this round because all static
  318. // nodes are either connected or still being dialed.
  319. {
  320. succeeded: []enode.ID{
  321. uintID(0x01),
  322. uintID(0x02),
  323. },
  324. failed: []enode.ID{
  325. uintID(0x03),
  326. },
  327. wantResolves: map[enode.ID]*enode.Node{
  328. uintID(0x03): nil,
  329. },
  330. },
  331. // Nothing happens in this round because we're waiting for
  332. // node 0x3's history entry to expire.
  333. {},
  334. // The cache entry for node 0x03 has expired and is retried.
  335. {
  336. wantNewDials: []*enode.Node{
  337. newNode(uintID(0x03), "127.0.0.3:30303"),
  338. },
  339. },
  340. })
  341. }
  342. func TestDialSchedResolve(t *testing.T) {
  343. t.Parallel()
  344. config := dialConfig{
  345. maxActiveDials: 1,
  346. maxDialPeers: 1,
  347. }
  348. node := newNode(uintID(0x01), "")
  349. resolved := newNode(uintID(0x01), "127.0.0.1:30303")
  350. resolved2 := newNode(uintID(0x01), "127.0.0.55:30303")
  351. runDialTest(t, config, []dialTestRound{
  352. {
  353. update: func(d *dialScheduler) {
  354. d.addStatic(node)
  355. },
  356. wantResolves: map[enode.ID]*enode.Node{
  357. uintID(0x01): resolved,
  358. },
  359. wantNewDials: []*enode.Node{
  360. resolved,
  361. },
  362. },
  363. {
  364. failed: []enode.ID{
  365. uintID(0x01),
  366. },
  367. wantResolves: map[enode.ID]*enode.Node{
  368. uintID(0x01): resolved2,
  369. },
  370. wantNewDials: []*enode.Node{
  371. resolved2,
  372. },
  373. },
  374. })
  375. }
  376. // -------
  377. // Code below here is the framework for the tests above.
  378. type dialTestRound struct {
  379. peersAdded []*conn
  380. peersRemoved []enode.ID
  381. update func(*dialScheduler) // called at beginning of round
  382. discovered []*enode.Node // newly discovered nodes
  383. succeeded []enode.ID // dials which succeed this round
  384. failed []enode.ID // dials which fail this round
  385. wantResolves map[enode.ID]*enode.Node
  386. wantNewDials []*enode.Node // dials that should be launched in this round
  387. }
  388. func runDialTest(t *testing.T, config dialConfig, rounds []dialTestRound) {
  389. var (
  390. clock = new(mclock.Simulated)
  391. iterator = newDialTestIterator()
  392. dialer = newDialTestDialer()
  393. resolver = new(dialTestResolver)
  394. peers = make(map[enode.ID]*conn)
  395. setupCh = make(chan *conn)
  396. )
  397. // Override config.
  398. config.clock = clock
  399. config.dialer = dialer
  400. config.resolver = resolver
  401. config.log = testlog.Logger(t, log.LvlTrace)
  402. config.rand = rand.New(rand.NewSource(0x1111))
  403. // Set up the dialer. The setup function below runs on the dialTask
  404. // goroutine and adds the peer.
  405. var dialsched *dialScheduler
  406. setup := func(fd net.Conn, f connFlag, node *enode.Node) error {
  407. conn := &conn{flags: f, node: node}
  408. dialsched.peerAdded(conn)
  409. setupCh <- conn
  410. return nil
  411. }
  412. dialsched = newDialScheduler(config, iterator, setup)
  413. defer dialsched.stop()
  414. for i, round := range rounds {
  415. // Apply peer set updates.
  416. for _, c := range round.peersAdded {
  417. if peers[c.node.ID()] != nil {
  418. t.Fatalf("round %d: peer %v already connected", i, c.node.ID())
  419. }
  420. dialsched.peerAdded(c)
  421. peers[c.node.ID()] = c
  422. }
  423. for _, id := range round.peersRemoved {
  424. c := peers[id]
  425. if c == nil {
  426. t.Fatalf("round %d: can't remove non-existent peer %v", i, id)
  427. }
  428. dialsched.peerRemoved(c)
  429. }
  430. // Init round.
  431. t.Logf("round %d (%d peers)", i, len(peers))
  432. resolver.setAnswers(round.wantResolves)
  433. if round.update != nil {
  434. round.update(dialsched)
  435. }
  436. iterator.addNodes(round.discovered)
  437. // Unblock dialTask goroutines.
  438. if err := dialer.completeDials(round.succeeded, nil); err != nil {
  439. t.Fatalf("round %d: %v", i, err)
  440. }
  441. for range round.succeeded {
  442. conn := <-setupCh
  443. peers[conn.node.ID()] = conn
  444. }
  445. if err := dialer.completeDials(round.failed, errors.New("oops")); err != nil {
  446. t.Fatalf("round %d: %v", i, err)
  447. }
  448. // Wait for new tasks.
  449. if err := dialer.waitForDials(round.wantNewDials); err != nil {
  450. t.Fatalf("round %d: %v", i, err)
  451. }
  452. if !resolver.checkCalls() {
  453. t.Fatalf("unexpected calls to Resolve: %v", resolver.calls)
  454. }
  455. clock.Run(16 * time.Second)
  456. }
  457. }
  458. // dialTestIterator is the input iterator for dialer tests. This works a bit like a channel
  459. // with infinite buffer: nodes are added to the buffer with addNodes, which unblocks Next
  460. // and returns them from the iterator.
  461. type dialTestIterator struct {
  462. cur *enode.Node
  463. mu sync.Mutex
  464. buf []*enode.Node
  465. cond *sync.Cond
  466. closed bool
  467. }
  468. func newDialTestIterator() *dialTestIterator {
  469. it := &dialTestIterator{}
  470. it.cond = sync.NewCond(&it.mu)
  471. return it
  472. }
  473. // addNodes adds nodes to the iterator buffer and unblocks Next.
  474. func (it *dialTestIterator) addNodes(nodes []*enode.Node) {
  475. it.mu.Lock()
  476. defer it.mu.Unlock()
  477. it.buf = append(it.buf, nodes...)
  478. it.cond.Signal()
  479. }
  480. // Node returns the current node.
  481. func (it *dialTestIterator) Node() *enode.Node {
  482. return it.cur
  483. }
  484. // Next moves to the next node.
  485. func (it *dialTestIterator) Next() bool {
  486. it.mu.Lock()
  487. defer it.mu.Unlock()
  488. it.cur = nil
  489. for len(it.buf) == 0 && !it.closed {
  490. it.cond.Wait()
  491. }
  492. if it.closed {
  493. return false
  494. }
  495. it.cur = it.buf[0]
  496. copy(it.buf[:], it.buf[1:])
  497. it.buf = it.buf[:len(it.buf)-1]
  498. return true
  499. }
  500. // Close ends the iterator, unblocking Next.
  501. func (it *dialTestIterator) Close() {
  502. it.mu.Lock()
  503. defer it.mu.Unlock()
  504. it.closed = true
  505. it.buf = nil
  506. it.cond.Signal()
  507. }
  508. // dialTestDialer is the NodeDialer used by runDialTest.
  509. type dialTestDialer struct {
  510. init chan *dialTestReq
  511. blocked map[enode.ID]*dialTestReq
  512. }
  513. type dialTestReq struct {
  514. n *enode.Node
  515. unblock chan error
  516. }
  517. func newDialTestDialer() *dialTestDialer {
  518. return &dialTestDialer{
  519. init: make(chan *dialTestReq),
  520. blocked: make(map[enode.ID]*dialTestReq),
  521. }
  522. }
  523. // Dial implements NodeDialer.
  524. func (d *dialTestDialer) Dial(ctx context.Context, n *enode.Node) (net.Conn, error) {
  525. req := &dialTestReq{n: n, unblock: make(chan error, 1)}
  526. select {
  527. case d.init <- req:
  528. select {
  529. case err := <-req.unblock:
  530. pipe, _ := net.Pipe()
  531. return pipe, err
  532. case <-ctx.Done():
  533. return nil, ctx.Err()
  534. }
  535. case <-ctx.Done():
  536. return nil, ctx.Err()
  537. }
  538. }
  539. // waitForDials waits for calls to Dial with the given nodes as argument.
  540. // Those calls will be held blocking until completeDials is called with the same nodes.
  541. func (d *dialTestDialer) waitForDials(nodes []*enode.Node) error {
  542. waitset := make(map[enode.ID]*enode.Node)
  543. for _, n := range nodes {
  544. waitset[n.ID()] = n
  545. }
  546. timeout := time.NewTimer(1 * time.Second)
  547. defer timeout.Stop()
  548. for len(waitset) > 0 {
  549. select {
  550. case req := <-d.init:
  551. want, ok := waitset[req.n.ID()]
  552. if !ok {
  553. return fmt.Errorf("attempt to dial unexpected node %v", req.n.ID())
  554. }
  555. if !reflect.DeepEqual(req.n, want) {
  556. return fmt.Errorf("ENR of dialed node %v does not match test", req.n.ID())
  557. }
  558. delete(waitset, req.n.ID())
  559. d.blocked[req.n.ID()] = req
  560. case <-timeout.C:
  561. var waitlist []enode.ID
  562. for id := range waitset {
  563. waitlist = append(waitlist, id)
  564. }
  565. return fmt.Errorf("timed out waiting for dials to %v", waitlist)
  566. }
  567. }
  568. return d.checkUnexpectedDial()
  569. }
  570. func (d *dialTestDialer) checkUnexpectedDial() error {
  571. select {
  572. case req := <-d.init:
  573. return fmt.Errorf("attempt to dial unexpected node %v", req.n.ID())
  574. case <-time.After(150 * time.Millisecond):
  575. return nil
  576. }
  577. }
  578. // completeDials unblocks calls to Dial for the given nodes.
  579. func (d *dialTestDialer) completeDials(ids []enode.ID, err error) error {
  580. for _, id := range ids {
  581. req := d.blocked[id]
  582. if req == nil {
  583. return fmt.Errorf("can't complete dial to %v", id)
  584. }
  585. req.unblock <- err
  586. }
  587. return nil
  588. }
  589. // dialTestResolver tracks calls to resolve.
  590. type dialTestResolver struct {
  591. mu sync.Mutex
  592. calls []enode.ID
  593. answers map[enode.ID]*enode.Node
  594. }
  595. func (t *dialTestResolver) setAnswers(m map[enode.ID]*enode.Node) {
  596. t.mu.Lock()
  597. defer t.mu.Unlock()
  598. t.answers = m
  599. t.calls = nil
  600. }
  601. func (t *dialTestResolver) checkCalls() bool {
  602. t.mu.Lock()
  603. defer t.mu.Unlock()
  604. for _, id := range t.calls {
  605. if _, ok := t.answers[id]; !ok {
  606. return false
  607. }
  608. }
  609. return true
  610. }
  611. func (t *dialTestResolver) Resolve(n *enode.Node) *enode.Node {
  612. t.mu.Lock()
  613. defer t.mu.Unlock()
  614. t.calls = append(t.calls, n.ID())
  615. return t.answers[n.ID()]
  616. }