tracker.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Copyright 2021 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package tracker
  17. import (
  18. "container/list"
  19. "fmt"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/log"
  23. "github.com/ethereum/go-ethereum/metrics"
  24. )
  25. const (
  26. // trackedGaugeName is the prefix of the per-packet request tracking.
  27. trackedGaugeName = "p2p/tracked"
  28. // lostMeterName is the prefix of the per-packet request expirations.
  29. lostMeterName = "p2p/lost"
  30. // staleMeterName is the prefix of the per-packet stale responses.
  31. staleMeterName = "p2p/stale"
  32. // waitHistName is the prefix of the per-packet (req only) waiting time histograms.
  33. waitHistName = "p2p/wait"
  34. // maxTrackedPackets is a huge number to act as a failsafe on the number of
  35. // pending requests the node will track. It should never be hit unless an
  36. // attacker figures out a way to spin requests.
  37. maxTrackedPackets = 100000
  38. )
  39. // request tracks sent network requests which have not yet received a response.
  40. type request struct {
  41. peer string
  42. version uint // Protocol version
  43. reqCode uint64 // Protocol message code of the request
  44. resCode uint64 // Protocol message code of the expected response
  45. time time.Time // Timestamp when the request was made
  46. expire *list.Element // Expiration marker to untrack it
  47. }
  48. // Tracker is a pending network request tracker to measure how much time it takes
  49. // a remote peer to respond.
  50. type Tracker struct {
  51. protocol string // Protocol capability identifier for the metrics
  52. timeout time.Duration // Global timeout after which to drop a tracked packet
  53. pending map[uint64]*request // Currently pending requests
  54. expire *list.List // Linked list tracking the expiration order
  55. wake *time.Timer // Timer tracking the expiration of the next item
  56. lock sync.Mutex // Lock protecting from concurrent updates
  57. }
  58. // New creates a new network request tracker to monitor how much time it takes to
  59. // fill certain requests and how individual peers perform.
  60. func New(protocol string, timeout time.Duration) *Tracker {
  61. return &Tracker{
  62. protocol: protocol,
  63. timeout: timeout,
  64. pending: make(map[uint64]*request),
  65. expire: list.New(),
  66. }
  67. }
  68. // Track adds a network request to the tracker to wait for a response to arrive
  69. // or until the request it cancelled or times out.
  70. func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
  71. if !metrics.Enabled {
  72. return
  73. }
  74. t.lock.Lock()
  75. defer t.lock.Unlock()
  76. // If there's a duplicate request, we've just random-collided (or more probably,
  77. // we have a bug), report it. We could also add a metric, but we're not really
  78. // expecting ourselves to be buggy, so a noisy warning should be enough.
  79. if _, ok := t.pending[id]; ok {
  80. log.Error("Network request id collision", "protocol", t.protocol, "version", version, "code", reqCode, "id", id)
  81. return
  82. }
  83. // If we have too many pending requests, bail out instead of leaking memory
  84. if pending := len(t.pending); pending >= maxTrackedPackets {
  85. log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "protocol", t.protocol, "version", version, "code", reqCode)
  86. return
  87. }
  88. // Id doesn't exist yet, start tracking it
  89. t.pending[id] = &request{
  90. peer: peer,
  91. version: version,
  92. reqCode: reqCode,
  93. resCode: resCode,
  94. time: time.Now(),
  95. expire: t.expire.PushBack(id),
  96. }
  97. g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, version, reqCode)
  98. metrics.GetOrRegisterGauge(g, nil).Inc(1)
  99. // If we've just inserted the first item, start the expiration timer
  100. if t.wake == nil {
  101. t.wake = time.AfterFunc(t.timeout, t.clean)
  102. }
  103. }
  104. // clean is called automatically when a preset time passes without a response
  105. // being dleivered for the first network request.
  106. func (t *Tracker) clean() {
  107. t.lock.Lock()
  108. defer t.lock.Unlock()
  109. // Expire anything within a certain threshold (might be no items at all if
  110. // we raced with the delivery)
  111. for t.expire.Len() > 0 {
  112. // Stop iterating if the next pending request is still alive
  113. var (
  114. head = t.expire.Front()
  115. id = head.Value.(uint64)
  116. req = t.pending[id]
  117. )
  118. if time.Since(req.time) < t.timeout+5*time.Millisecond {
  119. break
  120. }
  121. // Nope, dead, drop it
  122. t.expire.Remove(head)
  123. delete(t.pending, id)
  124. g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
  125. metrics.GetOrRegisterGauge(g, nil).Dec(1)
  126. m := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.protocol, req.version, req.reqCode)
  127. metrics.GetOrRegisterMeter(m, nil).Mark(1)
  128. }
  129. t.schedule()
  130. }
  131. // schedule starts a timer to trigger on the expiration of the first network
  132. // packet.
  133. func (t *Tracker) schedule() {
  134. if t.expire.Len() == 0 {
  135. t.wake = nil
  136. return
  137. }
  138. t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
  139. }
  140. // Fulfil fills a pending request, if any is available, reporting on various metrics.
  141. func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) {
  142. if !metrics.Enabled {
  143. return
  144. }
  145. t.lock.Lock()
  146. defer t.lock.Unlock()
  147. // If it's a non existing request, track as stale response
  148. req, ok := t.pending[id]
  149. if !ok {
  150. m := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.protocol, version, code)
  151. metrics.GetOrRegisterMeter(m, nil).Mark(1)
  152. return
  153. }
  154. // If the response is funky, it might be some active attack
  155. if req.peer != peer || req.version != version || req.resCode != code {
  156. log.Warn("Network response id collision",
  157. "have", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, version, code),
  158. "want", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, req.version, req.resCode),
  159. )
  160. return
  161. }
  162. // Everything matches, mark the request serviced and meter it
  163. t.expire.Remove(req.expire)
  164. delete(t.pending, id)
  165. if req.expire.Prev() == nil {
  166. if t.wake.Stop() {
  167. t.schedule()
  168. }
  169. }
  170. g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
  171. metrics.GetOrRegisterGauge(g, nil).Dec(1)
  172. h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode)
  173. sampler := func() metrics.Sample {
  174. return metrics.ResettingSample(
  175. metrics.NewExpDecaySample(1028, 0.015),
  176. )
  177. }
  178. metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds())
  179. }