dial.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package p2p
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. mrand "math/rand"
  24. "net"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/netutil"
  31. )
  32. const (
  33. // This is the amount of time spent waiting in between redialing a certain node. The
  34. // limit is a bit higher than inboundThrottleTime to prevent failing dials in small
  35. // private networks.
  36. dialHistoryExpiration = inboundThrottleTime + 5*time.Second
  37. // Config for the "Looking for peers" message.
  38. dialStatsLogInterval = 10 * time.Second // printed at most this often
  39. dialStatsPeerLimit = 3 // but not if more than this many dialed peers
  40. // Endpoint resolution is throttled with bounded backoff.
  41. initialResolveDelay = 60 * time.Second
  42. maxResolveDelay = time.Hour
  43. )
  44. // NodeDialer is used to connect to nodes in the network, typically by using
  45. // an underlying net.Dialer but also using net.Pipe in tests.
  46. type NodeDialer interface {
  47. Dial(context.Context, *enode.Node) (net.Conn, error)
  48. }
  49. type nodeResolver interface {
  50. Resolve(*enode.Node) *enode.Node
  51. }
  52. // tcpDialer implements NodeDialer using real TCP connections.
  53. type tcpDialer struct {
  54. d *net.Dialer
  55. }
  56. func (t tcpDialer) Dial(ctx context.Context, dest *enode.Node) (net.Conn, error) {
  57. return t.d.DialContext(ctx, "tcp", nodeAddr(dest).String())
  58. }
  59. func nodeAddr(n *enode.Node) net.Addr {
  60. return &net.TCPAddr{IP: n.IP(), Port: n.TCP()}
  61. }
  62. // checkDial errors:
  63. var (
  64. errSelf = errors.New("is self")
  65. errAlreadyDialing = errors.New("already dialing")
  66. errAlreadyConnected = errors.New("already connected")
  67. errRecentlyDialed = errors.New("recently dialed")
  68. errNotWhitelisted = errors.New("not contained in netrestrict whitelist")
  69. errNoPort = errors.New("node does not provide TCP port")
  70. )
  71. // dialer creates outbound connections and submits them into Server.
  72. // Two types of peer connections can be created:
  73. //
  74. // - static dials are pre-configured connections. The dialer attempts
  75. // keep these nodes connected at all times.
  76. //
  77. // - dynamic dials are created from node discovery results. The dialer
  78. // continuously reads candidate nodes from its input iterator and attempts
  79. // to create peer connections to nodes arriving through the iterator.
  80. //
  81. type dialScheduler struct {
  82. dialConfig
  83. setupFunc dialSetupFunc
  84. wg sync.WaitGroup
  85. cancel context.CancelFunc
  86. ctx context.Context
  87. nodesIn chan *enode.Node
  88. doneCh chan *dialTask
  89. addStaticCh chan *enode.Node
  90. remStaticCh chan *enode.Node
  91. addPeerCh chan *conn
  92. remPeerCh chan *conn
  93. // Everything below here belongs to loop and
  94. // should only be accessed by code on the loop goroutine.
  95. dialing map[enode.ID]*dialTask // active tasks
  96. peers map[enode.ID]connFlag // all connected peers
  97. dialPeers int // current number of dialed peers
  98. // The static map tracks all static dial tasks. The subset of usable static dial tasks
  99. // (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
  100. // launching random static tasks from the pool over launching dynamic dials from the
  101. // iterator.
  102. static map[enode.ID]*dialTask
  103. staticPool []*dialTask
  104. // The dial history keeps recently dialed nodes. Members of history are not dialed.
  105. history expHeap
  106. historyTimer mclock.Timer
  107. historyTimerTime mclock.AbsTime
  108. // for logStats
  109. lastStatsLog mclock.AbsTime
  110. doneSinceLastLog int
  111. }
  112. type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
  113. type dialConfig struct {
  114. self enode.ID // our own ID
  115. maxDialPeers int // maximum number of dialed peers
  116. maxActiveDials int // maximum number of active dials
  117. netRestrict *netutil.Netlist // IP whitelist, disabled if nil
  118. resolver nodeResolver
  119. dialer NodeDialer
  120. log log.Logger
  121. clock mclock.Clock
  122. rand *mrand.Rand
  123. }
  124. func (cfg dialConfig) withDefaults() dialConfig {
  125. if cfg.maxActiveDials == 0 {
  126. cfg.maxActiveDials = defaultMaxPendingPeers
  127. }
  128. if cfg.log == nil {
  129. cfg.log = log.Root()
  130. }
  131. if cfg.clock == nil {
  132. cfg.clock = mclock.System{}
  133. }
  134. if cfg.rand == nil {
  135. seedb := make([]byte, 8)
  136. crand.Read(seedb)
  137. seed := int64(binary.BigEndian.Uint64(seedb))
  138. cfg.rand = mrand.New(mrand.NewSource(seed))
  139. }
  140. return cfg
  141. }
  142. func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
  143. d := &dialScheduler{
  144. dialConfig: config.withDefaults(),
  145. setupFunc: setupFunc,
  146. dialing: make(map[enode.ID]*dialTask),
  147. static: make(map[enode.ID]*dialTask),
  148. peers: make(map[enode.ID]connFlag),
  149. doneCh: make(chan *dialTask),
  150. nodesIn: make(chan *enode.Node),
  151. addStaticCh: make(chan *enode.Node),
  152. remStaticCh: make(chan *enode.Node),
  153. addPeerCh: make(chan *conn),
  154. remPeerCh: make(chan *conn),
  155. }
  156. d.lastStatsLog = d.clock.Now()
  157. d.ctx, d.cancel = context.WithCancel(context.Background())
  158. d.wg.Add(2)
  159. go d.readNodes(it)
  160. go d.loop(it)
  161. return d
  162. }
  163. // stop shuts down the dialer, canceling all current dial tasks.
  164. func (d *dialScheduler) stop() {
  165. d.cancel()
  166. d.wg.Wait()
  167. }
  168. // addStatic adds a static dial candidate.
  169. func (d *dialScheduler) addStatic(n *enode.Node) {
  170. select {
  171. case d.addStaticCh <- n:
  172. case <-d.ctx.Done():
  173. }
  174. }
  175. // removeStatic removes a static dial candidate.
  176. func (d *dialScheduler) removeStatic(n *enode.Node) {
  177. select {
  178. case d.remStaticCh <- n:
  179. case <-d.ctx.Done():
  180. }
  181. }
  182. // peerAdded updates the peer set.
  183. func (d *dialScheduler) peerAdded(c *conn) {
  184. select {
  185. case d.addPeerCh <- c:
  186. case <-d.ctx.Done():
  187. }
  188. }
  189. // peerRemoved updates the peer set.
  190. func (d *dialScheduler) peerRemoved(c *conn) {
  191. select {
  192. case d.remPeerCh <- c:
  193. case <-d.ctx.Done():
  194. }
  195. }
  196. // loop is the main loop of the dialer.
  197. func (d *dialScheduler) loop(it enode.Iterator) {
  198. var (
  199. nodesCh chan *enode.Node
  200. historyExp = make(chan struct{}, 1)
  201. )
  202. loop:
  203. for {
  204. // Launch new dials if slots are available.
  205. slots := d.freeDialSlots()
  206. slots -= d.startStaticDials(slots)
  207. if slots > 0 {
  208. nodesCh = d.nodesIn
  209. } else {
  210. nodesCh = nil
  211. }
  212. d.rearmHistoryTimer(historyExp)
  213. d.logStats()
  214. select {
  215. case node := <-nodesCh:
  216. if err := d.checkDial(node); err != nil {
  217. d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
  218. } else {
  219. d.startDial(newDialTask(node, dynDialedConn))
  220. }
  221. case task := <-d.doneCh:
  222. id := task.dest.ID()
  223. delete(d.dialing, id)
  224. d.updateStaticPool(id)
  225. d.doneSinceLastLog++
  226. case c := <-d.addPeerCh:
  227. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  228. d.dialPeers++
  229. }
  230. id := c.node.ID()
  231. d.peers[id] = c.flags
  232. // Remove from static pool because the node is now connected.
  233. task := d.static[id]
  234. if task != nil && task.staticPoolIndex >= 0 {
  235. d.removeFromStaticPool(task.staticPoolIndex)
  236. }
  237. // TODO: cancel dials to connected peers
  238. case c := <-d.remPeerCh:
  239. if c.is(dynDialedConn) || c.is(staticDialedConn) {
  240. d.dialPeers--
  241. }
  242. delete(d.peers, c.node.ID())
  243. d.updateStaticPool(c.node.ID())
  244. case node := <-d.addStaticCh:
  245. id := node.ID()
  246. _, exists := d.static[id]
  247. d.log.Trace("Adding static node", "id", id, "ip", node.IP(), "added", !exists)
  248. if exists {
  249. continue loop
  250. }
  251. task := newDialTask(node, staticDialedConn)
  252. d.static[id] = task
  253. if d.checkDial(node) == nil {
  254. d.addToStaticPool(task)
  255. }
  256. case node := <-d.remStaticCh:
  257. id := node.ID()
  258. task := d.static[id]
  259. d.log.Trace("Removing static node", "id", id, "ok", task != nil)
  260. if task != nil {
  261. delete(d.static, id)
  262. if task.staticPoolIndex >= 0 {
  263. d.removeFromStaticPool(task.staticPoolIndex)
  264. }
  265. }
  266. case <-historyExp:
  267. d.expireHistory()
  268. case <-d.ctx.Done():
  269. it.Close()
  270. break loop
  271. }
  272. }
  273. d.stopHistoryTimer(historyExp)
  274. for range d.dialing {
  275. <-d.doneCh
  276. }
  277. d.wg.Done()
  278. }
  279. // readNodes runs in its own goroutine and delivers nodes from
  280. // the input iterator to the nodesIn channel.
  281. func (d *dialScheduler) readNodes(it enode.Iterator) {
  282. defer d.wg.Done()
  283. for it.Next() {
  284. select {
  285. case d.nodesIn <- it.Node():
  286. case <-d.ctx.Done():
  287. }
  288. }
  289. }
  290. // logStats prints dialer statistics to the log. The message is suppressed when enough
  291. // peers are connected because users should only see it while their client is starting up
  292. // or comes back online.
  293. func (d *dialScheduler) logStats() {
  294. now := d.clock.Now()
  295. if d.lastStatsLog.Add(dialStatsLogInterval) > now {
  296. return
  297. }
  298. if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
  299. d.log.Info("Looking for peers", "peercount", len(d.peers), "tried", d.doneSinceLastLog, "static", len(d.static))
  300. }
  301. d.doneSinceLastLog = 0
  302. d.lastStatsLog = now
  303. }
  304. // rearmHistoryTimer configures d.historyTimer to fire when the
  305. // next item in d.history expires.
  306. func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) {
  307. if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() {
  308. return
  309. }
  310. d.stopHistoryTimer(ch)
  311. d.historyTimerTime = d.history.nextExpiry()
  312. timeout := time.Duration(d.historyTimerTime - d.clock.Now())
  313. d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} })
  314. }
  315. // stopHistoryTimer stops the timer and drains the channel it sends on.
  316. func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) {
  317. if d.historyTimer != nil && !d.historyTimer.Stop() {
  318. <-ch
  319. }
  320. }
  321. // expireHistory removes expired items from d.history.
  322. func (d *dialScheduler) expireHistory() {
  323. d.historyTimer.Stop()
  324. d.historyTimer = nil
  325. d.historyTimerTime = 0
  326. d.history.expire(d.clock.Now(), func(hkey string) {
  327. var id enode.ID
  328. copy(id[:], hkey)
  329. d.updateStaticPool(id)
  330. })
  331. }
  332. // freeDialSlots returns the number of free dial slots. The result can be negative
  333. // when peers are connected while their task is still running.
  334. func (d *dialScheduler) freeDialSlots() int {
  335. slots := (d.maxDialPeers - d.dialPeers) * 2
  336. if slots > d.maxActiveDials {
  337. slots = d.maxActiveDials
  338. }
  339. free := slots - len(d.dialing)
  340. return free
  341. }
  342. // checkDial returns an error if node n should not be dialed.
  343. func (d *dialScheduler) checkDial(n *enode.Node) error {
  344. if n.ID() == d.self {
  345. return errSelf
  346. }
  347. if n.IP() != nil && n.TCP() == 0 {
  348. // This check can trigger if a non-TCP node is found
  349. // by discovery. If there is no IP, the node is a static
  350. // node and the actual endpoint will be resolved later in dialTask.
  351. return errNoPort
  352. }
  353. if _, ok := d.dialing[n.ID()]; ok {
  354. return errAlreadyDialing
  355. }
  356. if _, ok := d.peers[n.ID()]; ok {
  357. return errAlreadyConnected
  358. }
  359. if d.netRestrict != nil && !d.netRestrict.Contains(n.IP()) {
  360. return errNotWhitelisted
  361. }
  362. if d.history.contains(string(n.ID().Bytes())) {
  363. return errRecentlyDialed
  364. }
  365. return nil
  366. }
  367. // startStaticDials starts n static dial tasks.
  368. func (d *dialScheduler) startStaticDials(n int) (started int) {
  369. for started = 0; started < n && len(d.staticPool) > 0; started++ {
  370. idx := d.rand.Intn(len(d.staticPool))
  371. task := d.staticPool[idx]
  372. d.startDial(task)
  373. d.removeFromStaticPool(idx)
  374. }
  375. return started
  376. }
  377. // updateStaticPool attempts to move the given static dial back into staticPool.
  378. func (d *dialScheduler) updateStaticPool(id enode.ID) {
  379. task, ok := d.static[id]
  380. if ok && task.staticPoolIndex < 0 && d.checkDial(task.dest) == nil {
  381. d.addToStaticPool(task)
  382. }
  383. }
  384. func (d *dialScheduler) addToStaticPool(task *dialTask) {
  385. if task.staticPoolIndex >= 0 {
  386. panic("attempt to add task to staticPool twice")
  387. }
  388. d.staticPool = append(d.staticPool, task)
  389. task.staticPoolIndex = len(d.staticPool) - 1
  390. }
  391. // removeFromStaticPool removes the task at idx from staticPool. It does that by moving the
  392. // current last element of the pool to idx and then shortening the pool by one.
  393. func (d *dialScheduler) removeFromStaticPool(idx int) {
  394. task := d.staticPool[idx]
  395. end := len(d.staticPool) - 1
  396. d.staticPool[idx] = d.staticPool[end]
  397. d.staticPool[idx].staticPoolIndex = idx
  398. d.staticPool[end] = nil
  399. d.staticPool = d.staticPool[:end]
  400. task.staticPoolIndex = -1
  401. }
  402. // startDial runs the given dial task in a separate goroutine.
  403. func (d *dialScheduler) startDial(task *dialTask) {
  404. d.log.Trace("Starting p2p dial", "id", task.dest.ID(), "ip", task.dest.IP(), "flag", task.flags)
  405. hkey := string(task.dest.ID().Bytes())
  406. d.history.add(hkey, d.clock.Now().Add(dialHistoryExpiration))
  407. d.dialing[task.dest.ID()] = task
  408. go func() {
  409. task.run(d)
  410. d.doneCh <- task
  411. }()
  412. }
  413. // A dialTask generated for each node that is dialed.
  414. type dialTask struct {
  415. staticPoolIndex int
  416. flags connFlag
  417. // These fields are private to the task and should not be
  418. // accessed by dialScheduler while the task is running.
  419. dest *enode.Node
  420. lastResolved mclock.AbsTime
  421. resolveDelay time.Duration
  422. }
  423. func newDialTask(dest *enode.Node, flags connFlag) *dialTask {
  424. return &dialTask{dest: dest, flags: flags, staticPoolIndex: -1}
  425. }
  426. type dialError struct {
  427. error
  428. }
  429. func (t *dialTask) run(d *dialScheduler) {
  430. if t.needResolve() && !t.resolve(d) {
  431. return
  432. }
  433. err := t.dial(d, t.dest)
  434. if err != nil {
  435. // For static nodes, resolve one more time if dialing fails.
  436. if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
  437. if t.resolve(d) {
  438. t.dial(d, t.dest)
  439. }
  440. }
  441. }
  442. }
  443. func (t *dialTask) needResolve() bool {
  444. return t.flags&staticDialedConn != 0 && t.dest.IP() == nil
  445. }
  446. // resolve attempts to find the current endpoint for the destination
  447. // using discovery.
  448. //
  449. // Resolve operations are throttled with backoff to avoid flooding the
  450. // discovery network with useless queries for nodes that don't exist.
  451. // The backoff delay resets when the node is found.
  452. func (t *dialTask) resolve(d *dialScheduler) bool {
  453. if d.resolver == nil {
  454. return false
  455. }
  456. if t.resolveDelay == 0 {
  457. t.resolveDelay = initialResolveDelay
  458. }
  459. if t.lastResolved > 0 && time.Duration(d.clock.Now()-t.lastResolved) < t.resolveDelay {
  460. return false
  461. }
  462. resolved := d.resolver.Resolve(t.dest)
  463. t.lastResolved = d.clock.Now()
  464. if resolved == nil {
  465. t.resolveDelay *= 2
  466. if t.resolveDelay > maxResolveDelay {
  467. t.resolveDelay = maxResolveDelay
  468. }
  469. d.log.Debug("Resolving node failed", "id", t.dest.ID(), "newdelay", t.resolveDelay)
  470. return false
  471. }
  472. // The node was found.
  473. t.resolveDelay = initialResolveDelay
  474. t.dest = resolved
  475. d.log.Debug("Resolved node", "id", t.dest.ID(), "addr", &net.TCPAddr{IP: t.dest.IP(), Port: t.dest.TCP()})
  476. return true
  477. }
  478. // dial performs the actual connection attempt.
  479. func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
  480. fd, err := d.dialer.Dial(d.ctx, t.dest)
  481. if err != nil {
  482. d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
  483. return &dialError{err}
  484. }
  485. mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
  486. return d.setupFunc(mfd, t.flags, dest)
  487. }
  488. func (t *dialTask) String() string {
  489. id := t.dest.ID()
  490. return fmt.Sprintf("%v %x %v:%d", t.flags, id[:8], t.dest.IP(), t.dest.TCP())
  491. }
  492. func cleanupDialErr(err error) error {
  493. if netErr, ok := err.(*net.OpError); ok && netErr.Op == "dial" {
  494. return netErr.Err
  495. }
  496. return err
  497. }