queueiterator.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/p2p/enode"
  20. "github.com/ethereum/go-ethereum/p2p/nodestate"
  21. )
  22. // QueueIterator returns nodes from the specified selectable set in the same order as
  23. // they entered the set.
  24. type QueueIterator struct {
  25. lock sync.Mutex
  26. cond *sync.Cond
  27. ns *nodestate.NodeStateMachine
  28. queue []*enode.Node
  29. nextNode *enode.Node
  30. waitCallback func(bool)
  31. fifo, closed bool
  32. }
  33. // NewQueueIterator creates a new QueueIterator. Nodes are selectable if they have all the required
  34. // and none of the disabled flags set. When a node is selected the selectedFlag is set which also
  35. // disables further selectability until it is removed or times out.
  36. func NewQueueIterator(ns *nodestate.NodeStateMachine, requireFlags, disableFlags nodestate.Flags, fifo bool, waitCallback func(bool)) *QueueIterator {
  37. qi := &QueueIterator{
  38. ns: ns,
  39. fifo: fifo,
  40. waitCallback: waitCallback,
  41. }
  42. qi.cond = sync.NewCond(&qi.lock)
  43. ns.SubscribeState(requireFlags.Or(disableFlags), func(n *enode.Node, oldState, newState nodestate.Flags) {
  44. oldMatch := oldState.HasAll(requireFlags) && oldState.HasNone(disableFlags)
  45. newMatch := newState.HasAll(requireFlags) && newState.HasNone(disableFlags)
  46. if newMatch == oldMatch {
  47. return
  48. }
  49. qi.lock.Lock()
  50. defer qi.lock.Unlock()
  51. if newMatch {
  52. qi.queue = append(qi.queue, n)
  53. } else {
  54. id := n.ID()
  55. for i, qn := range qi.queue {
  56. if qn.ID() == id {
  57. copy(qi.queue[i:len(qi.queue)-1], qi.queue[i+1:])
  58. qi.queue = qi.queue[:len(qi.queue)-1]
  59. break
  60. }
  61. }
  62. }
  63. qi.cond.Signal()
  64. })
  65. return qi
  66. }
  67. // Next moves to the next selectable node.
  68. func (qi *QueueIterator) Next() bool {
  69. qi.lock.Lock()
  70. if !qi.closed && len(qi.queue) == 0 {
  71. if qi.waitCallback != nil {
  72. qi.waitCallback(true)
  73. }
  74. for !qi.closed && len(qi.queue) == 0 {
  75. qi.cond.Wait()
  76. }
  77. if qi.waitCallback != nil {
  78. qi.waitCallback(false)
  79. }
  80. }
  81. if qi.closed {
  82. qi.nextNode = nil
  83. qi.lock.Unlock()
  84. return false
  85. }
  86. // Move to the next node in queue.
  87. if qi.fifo {
  88. qi.nextNode = qi.queue[0]
  89. copy(qi.queue[:len(qi.queue)-1], qi.queue[1:])
  90. qi.queue = qi.queue[:len(qi.queue)-1]
  91. } else {
  92. qi.nextNode = qi.queue[len(qi.queue)-1]
  93. qi.queue = qi.queue[:len(qi.queue)-1]
  94. }
  95. qi.lock.Unlock()
  96. return true
  97. }
  98. // Close ends the iterator.
  99. func (qi *QueueIterator) Close() {
  100. qi.lock.Lock()
  101. qi.closed = true
  102. qi.lock.Unlock()
  103. qi.cond.Signal()
  104. }
  105. // Node returns the current node.
  106. func (qi *QueueIterator) Node() *enode.Node {
  107. qi.lock.Lock()
  108. defer qi.lock.Unlock()
  109. return qi.nextNode
  110. }