servingqueue.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "sort"
  19. "sync"
  20. "sync/atomic"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. )
  24. // servingQueue allows running tasks in a limited number of threads and puts the
  25. // waiting tasks in a priority queue
  26. type servingQueue struct {
  27. recentTime, queuedTime, servingTimeDiff uint64
  28. burstLimit, burstDropLimit uint64
  29. burstDecRate float64
  30. lastUpdate mclock.AbsTime
  31. queueAddCh, queueBestCh chan *servingTask
  32. stopThreadCh, quit chan struct{}
  33. setThreadsCh chan int
  34. wg sync.WaitGroup
  35. threadCount int // number of currently running threads
  36. queue *prque.Prque // priority queue for waiting or suspended tasks
  37. best *servingTask // the highest priority task (not included in the queue)
  38. suspendBias int64 // priority bias against suspending an already running task
  39. }
  40. // servingTask represents a request serving task. Tasks can be implemented to
  41. // run in multiple steps, allowing the serving queue to suspend execution between
  42. // steps if higher priority tasks are entered. The creator of the task should
  43. // set the following fields:
  44. //
  45. // - priority: greater value means higher priority; values can wrap around the int64 range
  46. // - run: execute a single step; return true if finished
  47. // - after: executed after run finishes or returns an error, receives the total serving time
  48. type servingTask struct {
  49. sq *servingQueue
  50. servingTime, timeAdded, maxTime, expTime uint64
  51. peer *clientPeer
  52. priority int64
  53. biasAdded bool
  54. token runToken
  55. tokenCh chan runToken
  56. }
  57. // runToken received by servingTask.start allows the task to run. Closing the
  58. // channel by servingTask.stop signals the thread controller to allow a new task
  59. // to start running.
  60. type runToken chan struct{}
  61. // start blocks until the task can start and returns true if it is allowed to run.
  62. // Returning false means that the task should be cancelled.
  63. func (t *servingTask) start() bool {
  64. if t.peer.isFrozen() {
  65. return false
  66. }
  67. t.tokenCh = make(chan runToken, 1)
  68. select {
  69. case t.sq.queueAddCh <- t:
  70. case <-t.sq.quit:
  71. return false
  72. }
  73. select {
  74. case t.token = <-t.tokenCh:
  75. case <-t.sq.quit:
  76. return false
  77. }
  78. if t.token == nil {
  79. return false
  80. }
  81. t.servingTime -= uint64(mclock.Now())
  82. return true
  83. }
  84. // done signals the thread controller about the task being finished and returns
  85. // the total serving time of the task in nanoseconds.
  86. func (t *servingTask) done() uint64 {
  87. t.servingTime += uint64(mclock.Now())
  88. close(t.token)
  89. diff := t.servingTime - t.timeAdded
  90. t.timeAdded = t.servingTime
  91. if t.expTime > diff {
  92. t.expTime -= diff
  93. atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
  94. } else {
  95. t.expTime = 0
  96. }
  97. return t.servingTime
  98. }
  99. // waitOrStop can be called during the execution of the task. It blocks if there
  100. // is a higher priority task waiting (a bias is applied in favor of the currently
  101. // running task). Returning true means that the execution can be resumed. False
  102. // means the task should be cancelled.
  103. func (t *servingTask) waitOrStop() bool {
  104. t.done()
  105. if !t.biasAdded {
  106. t.priority += t.sq.suspendBias
  107. t.biasAdded = true
  108. }
  109. return t.start()
  110. }
  111. // newServingQueue returns a new servingQueue
  112. func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
  113. sq := &servingQueue{
  114. queue: prque.NewWrapAround(nil),
  115. suspendBias: suspendBias,
  116. queueAddCh: make(chan *servingTask, 100),
  117. queueBestCh: make(chan *servingTask),
  118. stopThreadCh: make(chan struct{}),
  119. quit: make(chan struct{}),
  120. setThreadsCh: make(chan int, 10),
  121. burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
  122. burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
  123. burstDecRate: utilTarget,
  124. lastUpdate: mclock.Now(),
  125. }
  126. sq.wg.Add(2)
  127. go sq.queueLoop()
  128. go sq.threadCountLoop()
  129. return sq
  130. }
  131. // newTask creates a new task with the given priority
  132. func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask {
  133. return &servingTask{
  134. sq: sq,
  135. peer: peer,
  136. maxTime: maxTime,
  137. expTime: maxTime,
  138. priority: priority,
  139. }
  140. }
  141. // threadController is started in multiple goroutines and controls the execution
  142. // of tasks. The number of active thread controllers equals the allowed number of
  143. // concurrently running threads. It tries to fetch the highest priority queued
  144. // task first. If there are no queued tasks waiting then it can directly catch
  145. // run tokens from the token channel and allow the corresponding tasks to run
  146. // without entering the priority queue.
  147. func (sq *servingQueue) threadController() {
  148. for {
  149. token := make(runToken)
  150. select {
  151. case best := <-sq.queueBestCh:
  152. best.tokenCh <- token
  153. case <-sq.stopThreadCh:
  154. sq.wg.Done()
  155. return
  156. case <-sq.quit:
  157. sq.wg.Done()
  158. return
  159. }
  160. <-token
  161. select {
  162. case <-sq.stopThreadCh:
  163. sq.wg.Done()
  164. return
  165. case <-sq.quit:
  166. sq.wg.Done()
  167. return
  168. default:
  169. }
  170. }
  171. }
  172. type (
  173. // peerTasks lists the tasks received from a given peer when selecting peers to freeze
  174. peerTasks struct {
  175. peer *clientPeer
  176. list []*servingTask
  177. sumTime uint64
  178. priority float64
  179. }
  180. // peerList is a sortable list of peerTasks
  181. peerList []*peerTasks
  182. )
  183. func (l peerList) Len() int {
  184. return len(l)
  185. }
  186. func (l peerList) Less(i, j int) bool {
  187. return l[i].priority < l[j].priority
  188. }
  189. func (l peerList) Swap(i, j int) {
  190. l[i], l[j] = l[j], l[i]
  191. }
  192. // freezePeers selects the peers with the worst priority queued tasks and freezes
  193. // them until burstTime goes under burstDropLimit or all peers are frozen
  194. func (sq *servingQueue) freezePeers() {
  195. peerMap := make(map[*clientPeer]*peerTasks)
  196. var peerList peerList
  197. if sq.best != nil {
  198. sq.queue.Push(sq.best, sq.best.priority)
  199. }
  200. sq.best = nil
  201. for sq.queue.Size() > 0 {
  202. task := sq.queue.PopItem().(*servingTask)
  203. tasks := peerMap[task.peer]
  204. if tasks == nil {
  205. bufValue, bufLimit := task.peer.fcClient.BufferStatus()
  206. if bufLimit < 1 {
  207. bufLimit = 1
  208. }
  209. tasks = &peerTasks{
  210. peer: task.peer,
  211. priority: float64(bufValue) / float64(bufLimit), // lower value comes first
  212. }
  213. peerMap[task.peer] = tasks
  214. peerList = append(peerList, tasks)
  215. }
  216. tasks.list = append(tasks.list, task)
  217. tasks.sumTime += task.expTime
  218. }
  219. sort.Sort(peerList)
  220. drop := true
  221. for _, tasks := range peerList {
  222. if drop {
  223. tasks.peer.freeze()
  224. tasks.peer.fcClient.Freeze()
  225. sq.queuedTime -= tasks.sumTime
  226. sqQueuedGauge.Update(int64(sq.queuedTime))
  227. clientFreezeMeter.Mark(1)
  228. drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
  229. for _, task := range tasks.list {
  230. task.tokenCh <- nil
  231. }
  232. } else {
  233. for _, task := range tasks.list {
  234. sq.queue.Push(task, task.priority)
  235. }
  236. }
  237. }
  238. if sq.queue.Size() > 0 {
  239. sq.best = sq.queue.PopItem().(*servingTask)
  240. }
  241. }
  242. // updateRecentTime recalculates the recent serving time value
  243. func (sq *servingQueue) updateRecentTime() {
  244. subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
  245. now := mclock.Now()
  246. dt := now - sq.lastUpdate
  247. sq.lastUpdate = now
  248. if dt > 0 {
  249. subTime += uint64(float64(dt) * sq.burstDecRate)
  250. }
  251. if sq.recentTime > subTime {
  252. sq.recentTime -= subTime
  253. } else {
  254. sq.recentTime = 0
  255. }
  256. }
  257. // addTask inserts a task into the priority queue
  258. func (sq *servingQueue) addTask(task *servingTask) {
  259. if sq.best == nil {
  260. sq.best = task
  261. } else if task.priority-sq.best.priority > 0 {
  262. sq.queue.Push(sq.best, sq.best.priority)
  263. sq.best = task
  264. } else {
  265. sq.queue.Push(task, task.priority)
  266. }
  267. sq.updateRecentTime()
  268. sq.queuedTime += task.expTime
  269. sqServedGauge.Update(int64(sq.recentTime))
  270. sqQueuedGauge.Update(int64(sq.queuedTime))
  271. if sq.recentTime+sq.queuedTime > sq.burstLimit {
  272. sq.freezePeers()
  273. }
  274. }
  275. // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
  276. // and always tries to send the highest priority task to queueBestCh. Successfully sent
  277. // tasks are removed from the queue.
  278. func (sq *servingQueue) queueLoop() {
  279. for {
  280. if sq.best != nil {
  281. expTime := sq.best.expTime
  282. select {
  283. case task := <-sq.queueAddCh:
  284. sq.addTask(task)
  285. case sq.queueBestCh <- sq.best:
  286. sq.updateRecentTime()
  287. sq.queuedTime -= expTime
  288. sq.recentTime += expTime
  289. sqServedGauge.Update(int64(sq.recentTime))
  290. sqQueuedGauge.Update(int64(sq.queuedTime))
  291. if sq.queue.Size() == 0 {
  292. sq.best = nil
  293. } else {
  294. sq.best, _ = sq.queue.PopItem().(*servingTask)
  295. }
  296. case <-sq.quit:
  297. sq.wg.Done()
  298. return
  299. }
  300. } else {
  301. select {
  302. case task := <-sq.queueAddCh:
  303. sq.addTask(task)
  304. case <-sq.quit:
  305. sq.wg.Done()
  306. return
  307. }
  308. }
  309. }
  310. }
  311. // threadCountLoop is an event loop running in a goroutine. It adjusts the number
  312. // of active thread controller goroutines.
  313. func (sq *servingQueue) threadCountLoop() {
  314. var threadCountTarget int
  315. for {
  316. for threadCountTarget > sq.threadCount {
  317. sq.wg.Add(1)
  318. go sq.threadController()
  319. sq.threadCount++
  320. }
  321. if threadCountTarget < sq.threadCount {
  322. select {
  323. case threadCountTarget = <-sq.setThreadsCh:
  324. case sq.stopThreadCh <- struct{}{}:
  325. sq.threadCount--
  326. case <-sq.quit:
  327. sq.wg.Done()
  328. return
  329. }
  330. } else {
  331. select {
  332. case threadCountTarget = <-sq.setThreadsCh:
  333. case <-sq.quit:
  334. sq.wg.Done()
  335. return
  336. }
  337. }
  338. }
  339. }
  340. // setThreads sets the allowed processing thread count, suspending tasks as soon as
  341. // possible if necessary.
  342. func (sq *servingQueue) setThreads(threadCount int) {
  343. select {
  344. case sq.setThreadsCh <- threadCount:
  345. case <-sq.quit:
  346. return
  347. }
  348. }
  349. // stop stops task processing as soon as possible and shuts down the serving queue.
  350. func (sq *servingQueue) stop() {
  351. close(sq.quit)
  352. sq.wg.Wait()
  353. }