retrieve.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "context"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/light"
  25. )
  26. var (
  27. retryQueue = time.Millisecond * 100
  28. hardRequestTimeout = time.Second * 10
  29. )
  30. // retrieveManager is a layer on top of requestDistributor which takes care of
  31. // matching replies by request ID and handles timeouts and resends if necessary.
  32. type retrieveManager struct {
  33. dist *requestDistributor
  34. peers *serverPeerSet
  35. softRequestTimeout func() time.Duration
  36. lock sync.RWMutex
  37. sentReqs map[uint64]*sentReq
  38. }
  39. // validatorFunc is a function that processes a reply message
  40. type validatorFunc func(distPeer, *Msg) error
  41. // sentReq represents a request sent and tracked by retrieveManager
  42. type sentReq struct {
  43. rm *retrieveManager
  44. req *distReq
  45. id uint64
  46. validate validatorFunc
  47. eventsCh chan reqPeerEvent
  48. stopCh chan struct{}
  49. stopped bool
  50. err error
  51. lock sync.RWMutex // protect access to sentTo map
  52. sentTo map[distPeer]sentReqToPeer
  53. lastReqQueued bool // last request has been queued but not sent
  54. lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out
  55. reqSrtoCount int // number of requests that reached soft (but not hard) timeout
  56. }
  57. // sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response
  58. // delivered by the given peer. Only one delivery is allowed per request per peer,
  59. // after which delivered is set to true, the validity of the response is sent on the
  60. // valid channel and no more responses are accepted.
  61. type sentReqToPeer struct {
  62. delivered, frozen bool
  63. event chan int
  64. }
  65. // reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the
  66. // request state machine (retrieveLoop) through the eventsCh channel.
  67. type reqPeerEvent struct {
  68. event int
  69. peer distPeer
  70. }
  71. const (
  72. rpSent = iota // if peer == nil, not sent (no suitable peers)
  73. rpSoftTimeout
  74. rpHardTimeout
  75. rpDeliveredValid
  76. rpDeliveredInvalid
  77. rpNotDelivered
  78. )
  79. // newRetrieveManager creates the retrieve manager
  80. func newRetrieveManager(peers *serverPeerSet, dist *requestDistributor, srto func() time.Duration) *retrieveManager {
  81. return &retrieveManager{
  82. peers: peers,
  83. dist: dist,
  84. sentReqs: make(map[uint64]*sentReq),
  85. softRequestTimeout: srto,
  86. }
  87. }
  88. // retrieve sends a request (to multiple peers if necessary) and waits for an answer
  89. // that is delivered through the deliver function and successfully validated by the
  90. // validator callback. It returns when a valid answer is delivered or the context is
  91. // cancelled.
  92. func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
  93. sentReq := rm.sendReq(reqID, req, val)
  94. select {
  95. case <-sentReq.stopCh:
  96. case <-ctx.Done():
  97. sentReq.stop(ctx.Err())
  98. case <-shutdown:
  99. sentReq.stop(fmt.Errorf("client is shutting down"))
  100. }
  101. return sentReq.getError()
  102. }
  103. // sendReq starts a process that keeps trying to retrieve a valid answer for a
  104. // request from any suitable peers until stopped or succeeded.
  105. func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
  106. r := &sentReq{
  107. rm: rm,
  108. req: req,
  109. id: reqID,
  110. sentTo: make(map[distPeer]sentReqToPeer),
  111. stopCh: make(chan struct{}),
  112. eventsCh: make(chan reqPeerEvent, 10),
  113. validate: val,
  114. }
  115. canSend := req.canSend
  116. req.canSend = func(p distPeer) bool {
  117. // add an extra check to canSend: the request has not been sent to the same peer before
  118. r.lock.RLock()
  119. _, sent := r.sentTo[p]
  120. r.lock.RUnlock()
  121. return !sent && canSend(p)
  122. }
  123. request := req.request
  124. req.request = func(p distPeer) func() {
  125. // before actually sending the request, put an entry into the sentTo map
  126. r.lock.Lock()
  127. r.sentTo[p] = sentReqToPeer{delivered: false, frozen: false, event: make(chan int, 1)}
  128. r.lock.Unlock()
  129. return request(p)
  130. }
  131. rm.lock.Lock()
  132. rm.sentReqs[reqID] = r
  133. rm.lock.Unlock()
  134. go r.retrieveLoop()
  135. return r
  136. }
  137. // requested reports whether the request with given reqid is sent by the retriever.
  138. func (rm *retrieveManager) requested(reqId uint64) bool {
  139. rm.lock.RLock()
  140. defer rm.lock.RUnlock()
  141. _, ok := rm.sentReqs[reqId]
  142. return ok
  143. }
  144. // deliver is called by the LES protocol manager to deliver reply messages to waiting requests
  145. func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
  146. rm.lock.RLock()
  147. req, ok := rm.sentReqs[msg.ReqID]
  148. rm.lock.RUnlock()
  149. if ok {
  150. return req.deliver(peer, msg)
  151. }
  152. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  153. }
  154. // frozen is called by the LES protocol manager when a server has suspended its service and we
  155. // should not expect an answer for the requests already sent there
  156. func (rm *retrieveManager) frozen(peer distPeer) {
  157. rm.lock.RLock()
  158. defer rm.lock.RUnlock()
  159. for _, req := range rm.sentReqs {
  160. req.frozen(peer)
  161. }
  162. }
  163. // reqStateFn represents a state of the retrieve loop state machine
  164. type reqStateFn func() reqStateFn
  165. // retrieveLoop is the retrieval state machine event loop
  166. func (r *sentReq) retrieveLoop() {
  167. go r.tryRequest()
  168. r.lastReqQueued = true
  169. state := r.stateRequesting
  170. for state != nil {
  171. state = state()
  172. }
  173. r.rm.lock.Lock()
  174. delete(r.rm.sentReqs, r.id)
  175. r.rm.lock.Unlock()
  176. }
  177. // stateRequesting: a request has been queued or sent recently; when it reaches soft timeout,
  178. // a new request is sent to a new peer
  179. func (r *sentReq) stateRequesting() reqStateFn {
  180. select {
  181. case ev := <-r.eventsCh:
  182. r.update(ev)
  183. switch ev.event {
  184. case rpSent:
  185. if ev.peer == nil {
  186. // request send failed, no more suitable peers
  187. if r.waiting() {
  188. // we are already waiting for sent requests which may succeed so keep waiting
  189. return r.stateNoMorePeers
  190. }
  191. // nothing to wait for, no more peers to ask, return with error
  192. r.stop(light.ErrNoPeers)
  193. // no need to go to stopped state because waiting() already returned false
  194. return nil
  195. }
  196. case rpSoftTimeout:
  197. // last request timed out, try asking a new peer
  198. go r.tryRequest()
  199. r.lastReqQueued = true
  200. return r.stateRequesting
  201. case rpDeliveredInvalid, rpNotDelivered:
  202. // if it was the last sent request (set to nil by update) then start a new one
  203. if !r.lastReqQueued && r.lastReqSentTo == nil {
  204. go r.tryRequest()
  205. r.lastReqQueued = true
  206. }
  207. return r.stateRequesting
  208. case rpDeliveredValid:
  209. r.stop(nil)
  210. return r.stateStopped
  211. }
  212. return r.stateRequesting
  213. case <-r.stopCh:
  214. return r.stateStopped
  215. }
  216. }
  217. // stateNoMorePeers: could not send more requests because no suitable peers are available.
  218. // Peers may become suitable for a certain request later or new peers may appear so we
  219. // keep trying.
  220. func (r *sentReq) stateNoMorePeers() reqStateFn {
  221. select {
  222. case <-time.After(retryQueue):
  223. go r.tryRequest()
  224. r.lastReqQueued = true
  225. return r.stateRequesting
  226. case ev := <-r.eventsCh:
  227. r.update(ev)
  228. if ev.event == rpDeliveredValid {
  229. r.stop(nil)
  230. return r.stateStopped
  231. }
  232. if r.waiting() {
  233. return r.stateNoMorePeers
  234. }
  235. r.stop(light.ErrNoPeers)
  236. return nil
  237. case <-r.stopCh:
  238. return r.stateStopped
  239. }
  240. }
  241. // stateStopped: request succeeded or cancelled, just waiting for some peers
  242. // to either answer or time out hard
  243. func (r *sentReq) stateStopped() reqStateFn {
  244. for r.waiting() {
  245. r.update(<-r.eventsCh)
  246. }
  247. return nil
  248. }
  249. // update updates the queued/sent flags and timed out peers counter according to the event
  250. func (r *sentReq) update(ev reqPeerEvent) {
  251. switch ev.event {
  252. case rpSent:
  253. r.lastReqQueued = false
  254. r.lastReqSentTo = ev.peer
  255. case rpSoftTimeout:
  256. r.lastReqSentTo = nil
  257. r.reqSrtoCount++
  258. case rpHardTimeout:
  259. r.reqSrtoCount--
  260. case rpDeliveredValid, rpDeliveredInvalid, rpNotDelivered:
  261. if ev.peer == r.lastReqSentTo {
  262. r.lastReqSentTo = nil
  263. } else {
  264. r.reqSrtoCount--
  265. }
  266. }
  267. }
  268. // waiting returns true if the retrieval mechanism is waiting for an answer from
  269. // any peer
  270. func (r *sentReq) waiting() bool {
  271. return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
  272. }
  273. // tryRequest tries to send the request to a new peer and waits for it to either
  274. // succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent
  275. // messages to the request's event channel.
  276. func (r *sentReq) tryRequest() {
  277. sent := r.rm.dist.queue(r.req)
  278. var p distPeer
  279. select {
  280. case p = <-sent:
  281. case <-r.stopCh:
  282. if r.rm.dist.cancel(r.req) {
  283. p = nil
  284. } else {
  285. p = <-sent
  286. }
  287. }
  288. r.eventsCh <- reqPeerEvent{rpSent, p}
  289. if p == nil {
  290. return
  291. }
  292. hrto := false
  293. r.lock.RLock()
  294. s, ok := r.sentTo[p]
  295. r.lock.RUnlock()
  296. if !ok {
  297. panic(nil)
  298. }
  299. defer func() {
  300. pp, ok := p.(*serverPeer)
  301. if hrto && ok {
  302. pp.Log().Debug("Request timed out hard")
  303. if r.rm.peers != nil {
  304. r.rm.peers.unregister(pp.id)
  305. }
  306. }
  307. }()
  308. select {
  309. case event := <-s.event:
  310. if event == rpNotDelivered {
  311. r.lock.Lock()
  312. delete(r.sentTo, p)
  313. r.lock.Unlock()
  314. }
  315. r.eventsCh <- reqPeerEvent{event, p}
  316. return
  317. case <-time.After(r.rm.softRequestTimeout()):
  318. r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
  319. }
  320. select {
  321. case event := <-s.event:
  322. if event == rpNotDelivered {
  323. r.lock.Lock()
  324. delete(r.sentTo, p)
  325. r.lock.Unlock()
  326. }
  327. r.eventsCh <- reqPeerEvent{event, p}
  328. case <-time.After(hardRequestTimeout):
  329. hrto = true
  330. r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
  331. }
  332. }
  333. // deliver a reply belonging to this request
  334. func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
  335. r.lock.Lock()
  336. defer r.lock.Unlock()
  337. s, ok := r.sentTo[peer]
  338. if !ok || s.delivered {
  339. return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
  340. }
  341. if s.frozen {
  342. return nil
  343. }
  344. valid := r.validate(peer, msg) == nil
  345. r.sentTo[peer] = sentReqToPeer{delivered: true, frozen: false, event: s.event}
  346. if valid {
  347. s.event <- rpDeliveredValid
  348. } else {
  349. s.event <- rpDeliveredInvalid
  350. }
  351. if !valid {
  352. return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
  353. }
  354. return nil
  355. }
  356. // frozen sends a "not delivered" event to the peer event channel belonging to the
  357. // given peer if the request has been sent there, causing the state machine to not
  358. // expect an answer and potentially even send the request to the same peer again
  359. // when canSend allows it.
  360. func (r *sentReq) frozen(peer distPeer) {
  361. r.lock.Lock()
  362. defer r.lock.Unlock()
  363. s, ok := r.sentTo[peer]
  364. if ok && !s.delivered && !s.frozen {
  365. r.sentTo[peer] = sentReqToPeer{delivered: false, frozen: true, event: s.event}
  366. s.event <- rpNotDelivered
  367. }
  368. }
  369. // stop stops the retrieval process and sets an error code that will be returned
  370. // by getError
  371. func (r *sentReq) stop(err error) {
  372. r.lock.Lock()
  373. if !r.stopped {
  374. r.stopped = true
  375. r.err = err
  376. close(r.stopCh)
  377. }
  378. r.lock.Unlock()
  379. }
  380. // getError returns any retrieval error (either internally generated or set by the
  381. // stop function) after stopCh has been closed
  382. func (r *sentReq) getError() error {
  383. return r.err
  384. }
  385. // genReqID generates a new random request ID
  386. func genReqID() uint64 {
  387. var rnd [8]byte
  388. rand.Read(rnd[:])
  389. return binary.BigEndian.Uint64(rnd[:])
  390. }