crawl.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package main
  17. import (
  18. "time"
  19. "github.com/ethereum/go-ethereum/log"
  20. "github.com/ethereum/go-ethereum/p2p/enode"
  21. )
  22. type crawler struct {
  23. input nodeSet
  24. output nodeSet
  25. disc resolver
  26. iters []enode.Iterator
  27. inputIter enode.Iterator
  28. ch chan *enode.Node
  29. closed chan struct{}
  30. // settings
  31. revalidateInterval time.Duration
  32. }
  33. type resolver interface {
  34. RequestENR(*enode.Node) (*enode.Node, error)
  35. }
  36. func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler {
  37. c := &crawler{
  38. input: input,
  39. output: make(nodeSet, len(input)),
  40. disc: disc,
  41. iters: iters,
  42. inputIter: enode.IterNodes(input.nodes()),
  43. ch: make(chan *enode.Node),
  44. closed: make(chan struct{}),
  45. }
  46. c.iters = append(c.iters, c.inputIter)
  47. // Copy input to output initially. Any nodes that fail validation
  48. // will be dropped from output during the run.
  49. for id, n := range input {
  50. c.output[id] = n
  51. }
  52. return c
  53. }
  54. func (c *crawler) run(timeout time.Duration) nodeSet {
  55. var (
  56. timeoutTimer = time.NewTimer(timeout)
  57. timeoutCh <-chan time.Time
  58. doneCh = make(chan enode.Iterator, len(c.iters))
  59. liveIters = len(c.iters)
  60. )
  61. defer timeoutTimer.Stop()
  62. for _, it := range c.iters {
  63. go c.runIterator(doneCh, it)
  64. }
  65. loop:
  66. for {
  67. select {
  68. case n := <-c.ch:
  69. c.updateNode(n)
  70. case it := <-doneCh:
  71. if it == c.inputIter {
  72. // Enable timeout when we're done revalidating the input nodes.
  73. log.Info("Revalidation of input set is done", "len", len(c.input))
  74. if timeout > 0 {
  75. timeoutCh = timeoutTimer.C
  76. }
  77. }
  78. if liveIters--; liveIters == 0 {
  79. break loop
  80. }
  81. case <-timeoutCh:
  82. break loop
  83. }
  84. }
  85. close(c.closed)
  86. for _, it := range c.iters {
  87. it.Close()
  88. }
  89. for ; liveIters > 0; liveIters-- {
  90. <-doneCh
  91. }
  92. return c.output
  93. }
  94. func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
  95. defer func() { done <- it }()
  96. for it.Next() {
  97. select {
  98. case c.ch <- it.Node():
  99. case <-c.closed:
  100. return
  101. }
  102. }
  103. }
  104. func (c *crawler) updateNode(n *enode.Node) {
  105. node, ok := c.output[n.ID()]
  106. // Skip validation of recently-seen nodes.
  107. if ok && time.Since(node.LastCheck) < c.revalidateInterval {
  108. return
  109. }
  110. // Request the node record.
  111. nn, err := c.disc.RequestENR(n)
  112. node.LastCheck = truncNow()
  113. if err != nil {
  114. if node.Score == 0 {
  115. // Node doesn't implement EIP-868.
  116. log.Debug("Skipping node", "id", n.ID())
  117. return
  118. }
  119. node.Score /= 2
  120. } else {
  121. node.N = nn
  122. node.Seq = nn.Seq()
  123. node.Score++
  124. if node.FirstResponse.IsZero() {
  125. node.FirstResponse = node.LastCheck
  126. }
  127. node.LastResponse = node.LastCheck
  128. }
  129. // Store/update node in output set.
  130. if node.Score <= 0 {
  131. log.Info("Removing node", "id", n.ID())
  132. delete(c.output, n.ID())
  133. } else {
  134. log.Info("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
  135. c.output[n.ID()] = node
  136. }
  137. }
  138. func truncNow() time.Time {
  139. return time.Now().UTC().Truncate(1 * time.Second)
  140. }