123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- // Copyright 2019 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package les
- import (
- "sort"
- "sync"
- "sync/atomic"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/common/prque"
- )
- // servingQueue allows running tasks in a limited number of threads and puts the
- // waiting tasks in a priority queue
- type servingQueue struct {
- recentTime, queuedTime, servingTimeDiff uint64
- burstLimit, burstDropLimit uint64
- burstDecRate float64
- lastUpdate mclock.AbsTime
- queueAddCh, queueBestCh chan *servingTask
- stopThreadCh, quit chan struct{}
- setThreadsCh chan int
- wg sync.WaitGroup
- threadCount int // number of currently running threads
- queue *prque.Prque // priority queue for waiting or suspended tasks
- best *servingTask // the highest priority task (not included in the queue)
- suspendBias int64 // priority bias against suspending an already running task
- }
- // servingTask represents a request serving task. Tasks can be implemented to
- // run in multiple steps, allowing the serving queue to suspend execution between
- // steps if higher priority tasks are entered. The creator of the task should
- // set the following fields:
- //
- // - priority: greater value means higher priority; values can wrap around the int64 range
- // - run: execute a single step; return true if finished
- // - after: executed after run finishes or returns an error, receives the total serving time
- type servingTask struct {
- sq *servingQueue
- servingTime, timeAdded, maxTime, expTime uint64
- peer *clientPeer
- priority int64
- biasAdded bool
- token runToken
- tokenCh chan runToken
- }
- // runToken received by servingTask.start allows the task to run. Closing the
- // channel by servingTask.stop signals the thread controller to allow a new task
- // to start running.
- type runToken chan struct{}
- // start blocks until the task can start and returns true if it is allowed to run.
- // Returning false means that the task should be cancelled.
- func (t *servingTask) start() bool {
- if t.peer.isFrozen() {
- return false
- }
- t.tokenCh = make(chan runToken, 1)
- select {
- case t.sq.queueAddCh <- t:
- case <-t.sq.quit:
- return false
- }
- select {
- case t.token = <-t.tokenCh:
- case <-t.sq.quit:
- return false
- }
- if t.token == nil {
- return false
- }
- t.servingTime -= uint64(mclock.Now())
- return true
- }
- // done signals the thread controller about the task being finished and returns
- // the total serving time of the task in nanoseconds.
- func (t *servingTask) done() uint64 {
- t.servingTime += uint64(mclock.Now())
- close(t.token)
- diff := t.servingTime - t.timeAdded
- t.timeAdded = t.servingTime
- if t.expTime > diff {
- t.expTime -= diff
- atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
- } else {
- t.expTime = 0
- }
- return t.servingTime
- }
- // waitOrStop can be called during the execution of the task. It blocks if there
- // is a higher priority task waiting (a bias is applied in favor of the currently
- // running task). Returning true means that the execution can be resumed. False
- // means the task should be cancelled.
- func (t *servingTask) waitOrStop() bool {
- t.done()
- if !t.biasAdded {
- t.priority += t.sq.suspendBias
- t.biasAdded = true
- }
- return t.start()
- }
- // newServingQueue returns a new servingQueue
- func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
- sq := &servingQueue{
- queue: prque.NewWrapAround(nil),
- suspendBias: suspendBias,
- queueAddCh: make(chan *servingTask, 100),
- queueBestCh: make(chan *servingTask),
- stopThreadCh: make(chan struct{}),
- quit: make(chan struct{}),
- setThreadsCh: make(chan int, 10),
- burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
- burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
- burstDecRate: utilTarget,
- lastUpdate: mclock.Now(),
- }
- sq.wg.Add(2)
- go sq.queueLoop()
- go sq.threadCountLoop()
- return sq
- }
- // newTask creates a new task with the given priority
- func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask {
- return &servingTask{
- sq: sq,
- peer: peer,
- maxTime: maxTime,
- expTime: maxTime,
- priority: priority,
- }
- }
- // threadController is started in multiple goroutines and controls the execution
- // of tasks. The number of active thread controllers equals the allowed number of
- // concurrently running threads. It tries to fetch the highest priority queued
- // task first. If there are no queued tasks waiting then it can directly catch
- // run tokens from the token channel and allow the corresponding tasks to run
- // without entering the priority queue.
- func (sq *servingQueue) threadController() {
- for {
- token := make(runToken)
- select {
- case best := <-sq.queueBestCh:
- best.tokenCh <- token
- case <-sq.stopThreadCh:
- sq.wg.Done()
- return
- case <-sq.quit:
- sq.wg.Done()
- return
- }
- <-token
- select {
- case <-sq.stopThreadCh:
- sq.wg.Done()
- return
- case <-sq.quit:
- sq.wg.Done()
- return
- default:
- }
- }
- }
- type (
- // peerTasks lists the tasks received from a given peer when selecting peers to freeze
- peerTasks struct {
- peer *clientPeer
- list []*servingTask
- sumTime uint64
- priority float64
- }
- // peerList is a sortable list of peerTasks
- peerList []*peerTasks
- )
- func (l peerList) Len() int {
- return len(l)
- }
- func (l peerList) Less(i, j int) bool {
- return l[i].priority < l[j].priority
- }
- func (l peerList) Swap(i, j int) {
- l[i], l[j] = l[j], l[i]
- }
- // freezePeers selects the peers with the worst priority queued tasks and freezes
- // them until burstTime goes under burstDropLimit or all peers are frozen
- func (sq *servingQueue) freezePeers() {
- peerMap := make(map[*clientPeer]*peerTasks)
- var peerList peerList
- if sq.best != nil {
- sq.queue.Push(sq.best, sq.best.priority)
- }
- sq.best = nil
- for sq.queue.Size() > 0 {
- task := sq.queue.PopItem().(*servingTask)
- tasks := peerMap[task.peer]
- if tasks == nil {
- bufValue, bufLimit := task.peer.fcClient.BufferStatus()
- if bufLimit < 1 {
- bufLimit = 1
- }
- tasks = &peerTasks{
- peer: task.peer,
- priority: float64(bufValue) / float64(bufLimit), // lower value comes first
- }
- peerMap[task.peer] = tasks
- peerList = append(peerList, tasks)
- }
- tasks.list = append(tasks.list, task)
- tasks.sumTime += task.expTime
- }
- sort.Sort(peerList)
- drop := true
- for _, tasks := range peerList {
- if drop {
- tasks.peer.freeze()
- tasks.peer.fcClient.Freeze()
- sq.queuedTime -= tasks.sumTime
- sqQueuedGauge.Update(int64(sq.queuedTime))
- clientFreezeMeter.Mark(1)
- drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
- for _, task := range tasks.list {
- task.tokenCh <- nil
- }
- } else {
- for _, task := range tasks.list {
- sq.queue.Push(task, task.priority)
- }
- }
- }
- if sq.queue.Size() > 0 {
- sq.best = sq.queue.PopItem().(*servingTask)
- }
- }
- // updateRecentTime recalculates the recent serving time value
- func (sq *servingQueue) updateRecentTime() {
- subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
- now := mclock.Now()
- dt := now - sq.lastUpdate
- sq.lastUpdate = now
- if dt > 0 {
- subTime += uint64(float64(dt) * sq.burstDecRate)
- }
- if sq.recentTime > subTime {
- sq.recentTime -= subTime
- } else {
- sq.recentTime = 0
- }
- }
- // addTask inserts a task into the priority queue
- func (sq *servingQueue) addTask(task *servingTask) {
- if sq.best == nil {
- sq.best = task
- } else if task.priority-sq.best.priority > 0 {
- sq.queue.Push(sq.best, sq.best.priority)
- sq.best = task
- } else {
- sq.queue.Push(task, task.priority)
- }
- sq.updateRecentTime()
- sq.queuedTime += task.expTime
- sqServedGauge.Update(int64(sq.recentTime))
- sqQueuedGauge.Update(int64(sq.queuedTime))
- if sq.recentTime+sq.queuedTime > sq.burstLimit {
- sq.freezePeers()
- }
- }
- // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
- // and always tries to send the highest priority task to queueBestCh. Successfully sent
- // tasks are removed from the queue.
- func (sq *servingQueue) queueLoop() {
- for {
- if sq.best != nil {
- expTime := sq.best.expTime
- select {
- case task := <-sq.queueAddCh:
- sq.addTask(task)
- case sq.queueBestCh <- sq.best:
- sq.updateRecentTime()
- sq.queuedTime -= expTime
- sq.recentTime += expTime
- sqServedGauge.Update(int64(sq.recentTime))
- sqQueuedGauge.Update(int64(sq.queuedTime))
- if sq.queue.Size() == 0 {
- sq.best = nil
- } else {
- sq.best, _ = sq.queue.PopItem().(*servingTask)
- }
- case <-sq.quit:
- sq.wg.Done()
- return
- }
- } else {
- select {
- case task := <-sq.queueAddCh:
- sq.addTask(task)
- case <-sq.quit:
- sq.wg.Done()
- return
- }
- }
- }
- }
- // threadCountLoop is an event loop running in a goroutine. It adjusts the number
- // of active thread controller goroutines.
- func (sq *servingQueue) threadCountLoop() {
- var threadCountTarget int
- for {
- for threadCountTarget > sq.threadCount {
- sq.wg.Add(1)
- go sq.threadController()
- sq.threadCount++
- }
- if threadCountTarget < sq.threadCount {
- select {
- case threadCountTarget = <-sq.setThreadsCh:
- case sq.stopThreadCh <- struct{}{}:
- sq.threadCount--
- case <-sq.quit:
- sq.wg.Done()
- return
- }
- } else {
- select {
- case threadCountTarget = <-sq.setThreadsCh:
- case <-sq.quit:
- sq.wg.Done()
- return
- }
- }
- }
- }
- // setThreads sets the allowed processing thread count, suspending tasks as soon as
- // possible if necessary.
- func (sq *servingQueue) setThreads(threadCount int) {
- select {
- case sq.setThreadsCh <- threadCount:
- case <-sq.quit:
- return
- }
- }
- // stop stops task processing as soon as possible and shuts down the serving queue.
- func (sq *servingQueue) stop() {
- close(sq.quit)
- sq.wg.Wait()
- }
|