peerset.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  23. "github.com/ethereum/go-ethereum/eth/protocols/qlight"
  24. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. )
  27. var (
  28. // errPeerSetClosed is returned if a peer is attempted to be added or removed
  29. // from the peer set after it has been terminated.
  30. errPeerSetClosed = errors.New("peerset closed")
  31. // errPeerAlreadyRegistered is returned if a peer is attempted to be added
  32. // to the peer set, but one with the same id already exists.
  33. errPeerAlreadyRegistered = errors.New("peer already registered")
  34. // errPeerNotRegistered is returned if a peer is attempted to be removed from
  35. // a peer set, but no peer with the given id exists.
  36. errPeerNotRegistered = errors.New("peer not registered")
  37. // errSnapWithoutEth is returned if a peer attempts to connect only on the
  38. // snap protocol without advertizing the eth main protocol.
  39. errSnapWithoutEth = errors.New("peer connected on snap without compatible eth support")
  40. )
  41. // peerSet represents the collection of active peers currently participating in
  42. // the `eth` protocol, with or without the `snap` extension.
  43. type peerSet struct {
  44. peers map[string]*ethPeer // Peers connected on the `eth` protocol
  45. snapPeers int // Number of `snap` compatible peers for connection prioritization
  46. snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension
  47. snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth`
  48. lock sync.RWMutex
  49. closed bool
  50. }
  51. // newPeerSet creates a new peer set to track the active participants.
  52. func newPeerSet() *peerSet {
  53. return &peerSet{
  54. peers: make(map[string]*ethPeer),
  55. snapWait: make(map[string]chan *snap.Peer),
  56. snapPend: make(map[string]*snap.Peer),
  57. }
  58. }
  59. // registerSnapExtension unblocks an already connected `eth` peer waiting for its
  60. // `snap` extension, or if no such peer exists, tracks the extension for the time
  61. // being until the `eth` main protocol starts looking for it.
  62. func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error {
  63. // Reject the peer if it advertises `snap` without `eth` as `snap` is only a
  64. // satellite protocol meaningful with the chain selection of `eth`
  65. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  66. return errSnapWithoutEth
  67. }
  68. // Ensure nobody can double connect
  69. ps.lock.Lock()
  70. defer ps.lock.Unlock()
  71. id := peer.ID()
  72. if _, ok := ps.peers[id]; ok {
  73. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  74. }
  75. if _, ok := ps.snapPend[id]; ok {
  76. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  77. }
  78. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  79. if wait, ok := ps.snapWait[id]; ok {
  80. delete(ps.snapWait, id)
  81. wait <- peer
  82. return nil
  83. }
  84. ps.snapPend[id] = peer
  85. return nil
  86. }
  87. // waitExtensions blocks until all satellite protocols are connected and tracked
  88. // by the peerset.
  89. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
  90. // If the peer does not support a compatible `snap`, don't wait
  91. if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) {
  92. return nil, nil
  93. }
  94. // Ensure nobody can double connect
  95. ps.lock.Lock()
  96. id := peer.ID()
  97. if _, ok := ps.peers[id]; ok {
  98. ps.lock.Unlock()
  99. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  100. }
  101. if _, ok := ps.snapWait[id]; ok {
  102. ps.lock.Unlock()
  103. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  104. }
  105. // If `snap` already connected, retrieve the peer from the pending set
  106. if snap, ok := ps.snapPend[id]; ok {
  107. delete(ps.snapPend, id)
  108. ps.lock.Unlock()
  109. return snap, nil
  110. }
  111. // Otherwise wait for `snap` to connect concurrently
  112. wait := make(chan *snap.Peer)
  113. ps.snapWait[id] = wait
  114. ps.lock.Unlock()
  115. return <-wait, nil
  116. }
  117. // registerPeer injects a new `eth` peer into the working set, or returns an error
  118. // if the peer is already known.
  119. func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer) error {
  120. // Start tracking the new peer
  121. ps.lock.Lock()
  122. defer ps.lock.Unlock()
  123. if ps.closed {
  124. return errPeerSetClosed
  125. }
  126. id := peer.ID()
  127. if _, ok := ps.peers[id]; ok {
  128. return errPeerAlreadyRegistered
  129. }
  130. eth := &ethPeer{
  131. Peer: peer,
  132. }
  133. if ext != nil {
  134. eth.snapExt = &snapPeer{ext}
  135. ps.snapPeers++
  136. }
  137. ps.peers[id] = eth
  138. return nil
  139. }
  140. // unregisterPeer removes a remote peer from the active set, disabling any further
  141. // actions to/from that particular entity.
  142. func (ps *peerSet) unregisterPeer(id string) error {
  143. ps.lock.Lock()
  144. defer ps.lock.Unlock()
  145. peer, ok := ps.peers[id]
  146. if !ok {
  147. return errPeerNotRegistered
  148. }
  149. delete(ps.peers, id)
  150. if peer.snapExt != nil {
  151. ps.snapPeers--
  152. }
  153. return nil
  154. }
  155. // peer retrieves the registered peer with the given id.
  156. func (ps *peerSet) peer(id string) *ethPeer {
  157. ps.lock.RLock()
  158. defer ps.lock.RUnlock()
  159. return ps.peers[id]
  160. }
  161. // peersWithoutBlock retrieves a list of peers that do not have a given block in
  162. // their set of known hashes so it might be propagated to them.
  163. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
  164. ps.lock.RLock()
  165. defer ps.lock.RUnlock()
  166. list := make([]*ethPeer, 0, len(ps.peers))
  167. for _, p := range ps.peers {
  168. if !p.KnownBlock(hash) {
  169. list = append(list, p)
  170. }
  171. }
  172. return list
  173. }
  174. // peersWithoutTransaction retrieves a list of peers that do not have a given
  175. // transaction in their set of known hashes.
  176. func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
  177. ps.lock.RLock()
  178. defer ps.lock.RUnlock()
  179. list := make([]*ethPeer, 0, len(ps.peers))
  180. for _, p := range ps.peers {
  181. if !p.KnownTransaction(hash) {
  182. list = append(list, p)
  183. }
  184. }
  185. return list
  186. }
  187. // len returns if the current number of `eth` peers in the set. Since the `snap`
  188. // peers are tied to the existence of an `eth` connection, that will always be a
  189. // subset of `eth`.
  190. func (ps *peerSet) len() int {
  191. ps.lock.RLock()
  192. defer ps.lock.RUnlock()
  193. return len(ps.peers)
  194. }
  195. // snapLen returns if the current number of `snap` peers in the set.
  196. func (ps *peerSet) snapLen() int {
  197. ps.lock.RLock()
  198. defer ps.lock.RUnlock()
  199. return ps.snapPeers
  200. }
  201. // peerWithHighestTD retrieves the known peer with the currently highest total
  202. // difficulty.
  203. func (ps *peerSet) peerWithHighestTD() *eth.Peer {
  204. ps.lock.RLock()
  205. defer ps.lock.RUnlock()
  206. var (
  207. bestPeer *eth.Peer
  208. bestTd *big.Int
  209. )
  210. for _, p := range ps.peers {
  211. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  212. bestPeer, bestTd = p.Peer, td
  213. }
  214. }
  215. return bestPeer
  216. }
  217. // close disconnects all peers.
  218. func (ps *peerSet) close() {
  219. ps.lock.Lock()
  220. defer ps.lock.Unlock()
  221. for _, p := range ps.peers {
  222. p.Disconnect(p2p.DiscQuitting)
  223. }
  224. ps.closed = true
  225. }
  226. // Quorum
  227. func (ps *peerSet) UpdateTokenForRunningQPeers(token string) error {
  228. ps.lock.Lock()
  229. defer ps.lock.Unlock()
  230. for _, p := range ps.peers {
  231. if p.qlight != nil {
  232. err := p.qlight.SendNewAuthToken(token)
  233. if err != nil {
  234. return err
  235. }
  236. }
  237. }
  238. return nil
  239. }
  240. // registerPeer injects a new `eth` peer into the working set, or returns an error
  241. // if the peer is already known.
  242. func (ps *peerSet) registerQPeer(peer *qlight.Peer) error {
  243. // Start tracking the new peer
  244. ps.lock.Lock()
  245. defer ps.lock.Unlock()
  246. if ps.closed {
  247. return errPeerSetClosed
  248. }
  249. id := peer.ID()
  250. if _, ok := ps.peers[id]; ok {
  251. return errPeerAlreadyRegistered
  252. }
  253. eth := &ethPeer{
  254. Peer: peer.EthPeer,
  255. qlight: peer,
  256. }
  257. ps.peers[id] = eth
  258. return nil
  259. }