123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- // Copyright 2019 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package enode
- import (
- "sync"
- "time"
- )
- // Iterator represents a sequence of nodes. The Next method moves to the next node in the
- // sequence. It returns false when the sequence has ended or the iterator is closed. Close
- // may be called concurrently with Next and Node, and interrupts Next if it is blocked.
- type Iterator interface {
- Next() bool // moves to next node
- Node() *Node // returns current node
- Close() // ends the iterator
- }
- // ReadNodes reads at most n nodes from the given iterator. The return value contains no
- // duplicates and no nil values. To prevent looping indefinitely for small repeating node
- // sequences, this function calls Next at most n times.
- func ReadNodes(it Iterator, n int) []*Node {
- seen := make(map[ID]*Node, n)
- for i := 0; i < n && it.Next(); i++ {
- // Remove duplicates, keeping the node with higher seq.
- node := it.Node()
- prevNode, ok := seen[node.ID()]
- if ok && prevNode.Seq() > node.Seq() {
- continue
- }
- seen[node.ID()] = node
- }
- result := make([]*Node, 0, len(seen))
- for _, node := range seen {
- result = append(result, node)
- }
- return result
- }
- // IterNodes makes an iterator which runs through the given nodes once.
- func IterNodes(nodes []*Node) Iterator {
- return &sliceIter{nodes: nodes, index: -1}
- }
- // CycleNodes makes an iterator which cycles through the given nodes indefinitely.
- func CycleNodes(nodes []*Node) Iterator {
- return &sliceIter{nodes: nodes, index: -1, cycle: true}
- }
- type sliceIter struct {
- mu sync.Mutex
- nodes []*Node
- index int
- cycle bool
- }
- func (it *sliceIter) Next() bool {
- it.mu.Lock()
- defer it.mu.Unlock()
- if len(it.nodes) == 0 {
- return false
- }
- it.index++
- if it.index == len(it.nodes) {
- if it.cycle {
- it.index = 0
- } else {
- it.nodes = nil
- return false
- }
- }
- return true
- }
- func (it *sliceIter) Node() *Node {
- it.mu.Lock()
- defer it.mu.Unlock()
- if len(it.nodes) == 0 {
- return nil
- }
- return it.nodes[it.index]
- }
- func (it *sliceIter) Close() {
- it.mu.Lock()
- defer it.mu.Unlock()
- it.nodes = nil
- }
- // Filter wraps an iterator such that Next only returns nodes for which
- // the 'check' function returns true.
- func Filter(it Iterator, check func(*Node) bool) Iterator {
- return &filterIter{it, check}
- }
- type filterIter struct {
- Iterator
- check func(*Node) bool
- }
- func (f *filterIter) Next() bool {
- for f.Iterator.Next() {
- if f.check(f.Node()) {
- return true
- }
- }
- return false
- }
- // FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
- // only when Close is called. Source iterators added via AddSource are removed from the
- // mix when they end.
- //
- // The distribution of nodes returned by Next is approximately fair, i.e. FairMix
- // attempts to draw from all sources equally often. However, if a certain source is slow
- // and doesn't return a node within the configured timeout, a node from any other source
- // will be returned.
- //
- // It's safe to call AddSource and Close concurrently with Next.
- type FairMix struct {
- wg sync.WaitGroup
- fromAny chan *Node
- timeout time.Duration
- cur *Node
- mu sync.Mutex
- closed chan struct{}
- sources []*mixSource
- last int
- }
- type mixSource struct {
- it Iterator
- next chan *Node
- timeout time.Duration
- }
- // NewFairMix creates a mixer.
- //
- // The timeout specifies how long the mixer will wait for the next fairly-chosen source
- // before giving up and taking a node from any other source. A good way to set the timeout
- // is deciding how long you'd want to wait for a node on average. Passing a negative
- // timeout makes the mixer completely fair.
- func NewFairMix(timeout time.Duration) *FairMix {
- m := &FairMix{
- fromAny: make(chan *Node),
- closed: make(chan struct{}),
- timeout: timeout,
- }
- return m
- }
- // AddSource adds a source of nodes.
- func (m *FairMix) AddSource(it Iterator) {
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.closed == nil {
- return
- }
- m.wg.Add(1)
- source := &mixSource{it, make(chan *Node), m.timeout}
- m.sources = append(m.sources, source)
- go m.runSource(m.closed, source)
- }
- // Close shuts down the mixer and all current sources.
- // Calling this is required to release resources associated with the mixer.
- func (m *FairMix) Close() {
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.closed == nil {
- return
- }
- for _, s := range m.sources {
- s.it.Close()
- }
- close(m.closed)
- m.wg.Wait()
- close(m.fromAny)
- m.sources = nil
- m.closed = nil
- }
- // Next returns a node from a random source.
- func (m *FairMix) Next() bool {
- m.cur = nil
- var timeout <-chan time.Time
- if m.timeout >= 0 {
- timer := time.NewTimer(m.timeout)
- timeout = timer.C
- defer timer.Stop()
- }
- for {
- source := m.pickSource()
- if source == nil {
- return m.nextFromAny()
- }
- select {
- case n, ok := <-source.next:
- if ok {
- m.cur = n
- source.timeout = m.timeout
- return true
- }
- // This source has ended.
- m.deleteSource(source)
- case <-timeout:
- source.timeout /= 2
- return m.nextFromAny()
- }
- }
- }
- // Node returns the current node.
- func (m *FairMix) Node() *Node {
- return m.cur
- }
- // nextFromAny is used when there are no sources or when the 'fair' choice
- // doesn't turn up a node quickly enough.
- func (m *FairMix) nextFromAny() bool {
- n, ok := <-m.fromAny
- if ok {
- m.cur = n
- }
- return ok
- }
- // pickSource chooses the next source to read from, cycling through them in order.
- func (m *FairMix) pickSource() *mixSource {
- m.mu.Lock()
- defer m.mu.Unlock()
- if len(m.sources) == 0 {
- return nil
- }
- m.last = (m.last + 1) % len(m.sources)
- return m.sources[m.last]
- }
- // deleteSource deletes a source.
- func (m *FairMix) deleteSource(s *mixSource) {
- m.mu.Lock()
- defer m.mu.Unlock()
- for i := range m.sources {
- if m.sources[i] == s {
- copy(m.sources[i:], m.sources[i+1:])
- m.sources[len(m.sources)-1] = nil
- m.sources = m.sources[:len(m.sources)-1]
- break
- }
- }
- }
- // runSource reads a single source in a loop.
- func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
- defer m.wg.Done()
- defer close(s.next)
- for s.it.Next() {
- n := s.it.Node()
- select {
- case s.next <- n:
- case m.fromAny <- n:
- case <-closed:
- return
- }
- }
- }
|