distributor.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "container/list"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/les/utils"
  23. )
  24. // requestDistributor implements a mechanism that distributes requests to
  25. // suitable peers, obeying flow control rules and prioritizing them in creation
  26. // order (even when a resend is necessary).
  27. type requestDistributor struct {
  28. clock mclock.Clock
  29. reqQueue *list.List
  30. lastReqOrder uint64
  31. peers map[distPeer]struct{}
  32. peerLock sync.RWMutex
  33. loopChn chan struct{}
  34. loopNextSent bool
  35. lock sync.Mutex
  36. closeCh chan struct{}
  37. wg sync.WaitGroup
  38. }
  39. // distPeer is an LES server peer interface for the request distributor.
  40. // waitBefore returns either the necessary waiting time before sending a request
  41. // with the given upper estimated cost or the estimated remaining relative buffer
  42. // value after sending such a request (in which case the request can be sent
  43. // immediately). At least one of these values is always zero.
  44. type distPeer interface {
  45. waitBefore(uint64) (time.Duration, float64)
  46. canQueue() bool
  47. queueSend(f func()) bool
  48. }
  49. // distReq is the request abstraction used by the distributor. It is based on
  50. // three callback functions:
  51. // - getCost returns the upper estimate of the cost of sending the request to a given peer
  52. // - canSend tells if the server peer is suitable to serve the request
  53. // - request prepares sending the request to the given peer and returns a function that
  54. // does the actual sending. Request order should be preserved but the callback itself should not
  55. // block until it is sent because other peers might still be able to receive requests while
  56. // one of them is blocking. Instead, the returned function is put in the peer's send queue.
  57. type distReq struct {
  58. getCost func(distPeer) uint64
  59. canSend func(distPeer) bool
  60. request func(distPeer) func()
  61. reqOrder uint64
  62. sentChn chan distPeer
  63. element *list.Element
  64. waitForPeers mclock.AbsTime
  65. enterQueue mclock.AbsTime
  66. }
  67. // newRequestDistributor creates a new request distributor
  68. func newRequestDistributor(peers *serverPeerSet, clock mclock.Clock) *requestDistributor {
  69. d := &requestDistributor{
  70. clock: clock,
  71. reqQueue: list.New(),
  72. loopChn: make(chan struct{}, 2),
  73. closeCh: make(chan struct{}),
  74. peers: make(map[distPeer]struct{}),
  75. }
  76. if peers != nil {
  77. peers.subscribe(d)
  78. }
  79. d.wg.Add(1)
  80. go d.loop()
  81. return d
  82. }
  83. // registerPeer implements peerSetNotify
  84. func (d *requestDistributor) registerPeer(p *serverPeer) {
  85. d.peerLock.Lock()
  86. d.peers[p] = struct{}{}
  87. d.peerLock.Unlock()
  88. }
  89. // unregisterPeer implements peerSetNotify
  90. func (d *requestDistributor) unregisterPeer(p *serverPeer) {
  91. d.peerLock.Lock()
  92. delete(d.peers, p)
  93. d.peerLock.Unlock()
  94. }
  95. // registerTestPeer adds a new test peer
  96. func (d *requestDistributor) registerTestPeer(p distPeer) {
  97. d.peerLock.Lock()
  98. d.peers[p] = struct{}{}
  99. d.peerLock.Unlock()
  100. }
  101. var (
  102. // distMaxWait is the maximum waiting time after which further necessary waiting
  103. // times are recalculated based on new feedback from the servers
  104. distMaxWait = time.Millisecond * 50
  105. // waitForPeers is the time window in which a request does not fail even if it
  106. // has no suitable peers to send to at the moment
  107. waitForPeers = time.Second * 3
  108. )
  109. // main event loop
  110. func (d *requestDistributor) loop() {
  111. defer d.wg.Done()
  112. for {
  113. select {
  114. case <-d.closeCh:
  115. d.lock.Lock()
  116. elem := d.reqQueue.Front()
  117. for elem != nil {
  118. req := elem.Value.(*distReq)
  119. close(req.sentChn)
  120. req.sentChn = nil
  121. elem = elem.Next()
  122. }
  123. d.lock.Unlock()
  124. return
  125. case <-d.loopChn:
  126. d.lock.Lock()
  127. d.loopNextSent = false
  128. loop:
  129. for {
  130. peer, req, wait := d.nextRequest()
  131. if req != nil && wait == 0 {
  132. chn := req.sentChn // save sentChn because remove sets it to nil
  133. d.remove(req)
  134. send := req.request(peer)
  135. if send != nil {
  136. peer.queueSend(send)
  137. requestSendDelay.Update(time.Duration(d.clock.Now() - req.enterQueue))
  138. }
  139. chn <- peer
  140. close(chn)
  141. } else {
  142. if wait == 0 {
  143. // no request to send and nothing to wait for; the next
  144. // queued request will wake up the loop
  145. break loop
  146. }
  147. d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
  148. if wait > distMaxWait {
  149. // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
  150. wait = distMaxWait
  151. }
  152. go func() {
  153. d.clock.Sleep(wait)
  154. d.loopChn <- struct{}{}
  155. }()
  156. break loop
  157. }
  158. }
  159. d.lock.Unlock()
  160. }
  161. }
  162. }
  163. // selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
  164. type selectPeerItem struct {
  165. peer distPeer
  166. req *distReq
  167. weight uint64
  168. }
  169. func selectPeerWeight(i interface{}) uint64 {
  170. return i.(selectPeerItem).weight
  171. }
  172. // nextRequest returns the next possible request from any peer, along with the
  173. // associated peer and necessary waiting time
  174. func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
  175. checkedPeers := make(map[distPeer]struct{})
  176. elem := d.reqQueue.Front()
  177. var (
  178. bestWait time.Duration
  179. sel *utils.WeightedRandomSelect
  180. )
  181. d.peerLock.RLock()
  182. defer d.peerLock.RUnlock()
  183. peerCount := len(d.peers)
  184. for (len(checkedPeers) < peerCount || elem == d.reqQueue.Front()) && elem != nil {
  185. req := elem.Value.(*distReq)
  186. canSend := false
  187. now := d.clock.Now()
  188. if req.waitForPeers > now {
  189. canSend = true
  190. wait := time.Duration(req.waitForPeers - now)
  191. if bestWait == 0 || wait < bestWait {
  192. bestWait = wait
  193. }
  194. }
  195. for peer := range d.peers {
  196. if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) {
  197. canSend = true
  198. cost := req.getCost(peer)
  199. wait, bufRemain := peer.waitBefore(cost)
  200. if wait == 0 {
  201. if sel == nil {
  202. sel = utils.NewWeightedRandomSelect(selectPeerWeight)
  203. }
  204. sel.Update(selectPeerItem{peer: peer, req: req, weight: uint64(bufRemain*1000000) + 1})
  205. } else {
  206. if bestWait == 0 || wait < bestWait {
  207. bestWait = wait
  208. }
  209. }
  210. checkedPeers[peer] = struct{}{}
  211. }
  212. }
  213. next := elem.Next()
  214. if !canSend && elem == d.reqQueue.Front() {
  215. close(req.sentChn)
  216. d.remove(req)
  217. }
  218. elem = next
  219. }
  220. if sel != nil {
  221. c := sel.Choose().(selectPeerItem)
  222. return c.peer, c.req, 0
  223. }
  224. return nil, nil, bestWait
  225. }
  226. // queue adds a request to the distribution queue, returns a channel where the
  227. // receiving peer is sent once the request has been sent (request callback returned).
  228. // If the request is cancelled or timed out without suitable peers, the channel is
  229. // closed without sending any peer references to it.
  230. func (d *requestDistributor) queue(r *distReq) chan distPeer {
  231. d.lock.Lock()
  232. defer d.lock.Unlock()
  233. if r.reqOrder == 0 {
  234. d.lastReqOrder++
  235. r.reqOrder = d.lastReqOrder
  236. r.waitForPeers = d.clock.Now() + mclock.AbsTime(waitForPeers)
  237. }
  238. // Assign the timestamp when the request is queued no matter it's
  239. // a new one or re-queued one.
  240. r.enterQueue = d.clock.Now()
  241. back := d.reqQueue.Back()
  242. if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
  243. r.element = d.reqQueue.PushBack(r)
  244. } else {
  245. before := d.reqQueue.Front()
  246. for before.Value.(*distReq).reqOrder < r.reqOrder {
  247. before = before.Next()
  248. }
  249. r.element = d.reqQueue.InsertBefore(r, before)
  250. }
  251. if !d.loopNextSent {
  252. d.loopNextSent = true
  253. d.loopChn <- struct{}{}
  254. }
  255. r.sentChn = make(chan distPeer, 1)
  256. return r.sentChn
  257. }
  258. // cancel removes a request from the queue if it has not been sent yet (returns
  259. // false if it has been sent already). It is guaranteed that the callback functions
  260. // will not be called after cancel returns.
  261. func (d *requestDistributor) cancel(r *distReq) bool {
  262. d.lock.Lock()
  263. defer d.lock.Unlock()
  264. if r.sentChn == nil {
  265. return false
  266. }
  267. close(r.sentChn)
  268. d.remove(r)
  269. return true
  270. }
  271. // remove removes a request from the queue
  272. func (d *requestDistributor) remove(r *distReq) {
  273. r.sentChn = nil
  274. if r.element != nil {
  275. d.reqQueue.Remove(r.element)
  276. r.element = nil
  277. }
  278. }
  279. func (d *requestDistributor) close() {
  280. close(d.closeCh)
  281. d.wg.Wait()
  282. }