server.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package p2p implements the Ethereum p2p network protocols.
  17. package p2p
  18. import (
  19. "bytes"
  20. "crypto/ecdsa"
  21. "encoding/hex"
  22. "errors"
  23. "fmt"
  24. "net"
  25. "sort"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/ethereum/go-ethereum/common"
  30. "github.com/ethereum/go-ethereum/common/mclock"
  31. "github.com/ethereum/go-ethereum/crypto"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/log"
  34. "github.com/ethereum/go-ethereum/p2p/discover"
  35. "github.com/ethereum/go-ethereum/p2p/enode"
  36. "github.com/ethereum/go-ethereum/p2p/enr"
  37. "github.com/ethereum/go-ethereum/p2p/nat"
  38. "github.com/ethereum/go-ethereum/p2p/netutil"
  39. "github.com/ethereum/go-ethereum/permission/core"
  40. )
  41. const (
  42. defaultDialTimeout = 15 * time.Second
  43. // This is the fairness knob for the discovery mixer. When looking for peers, we'll
  44. // wait this long for a single source of candidates before moving on and trying other
  45. // sources.
  46. discmixTimeout = 5 * time.Second
  47. // Connectivity defaults.
  48. defaultMaxPendingPeers = 50
  49. defaultDialRatio = 3
  50. // This time limits inbound connection attempts per source IP.
  51. inboundThrottleTime = 30 * time.Second
  52. // Maximum time allowed for reading a complete message.
  53. // This is effectively the amount of time a connection can be idle.
  54. frameReadTimeout = 30 * time.Second
  55. // Maximum amount of time allowed for writing a complete message.
  56. frameWriteTimeout = 20 * time.Second
  57. )
  58. var errServerStopped = errors.New("server stopped")
  59. // Config holds Server options.
  60. type Config struct {
  61. // This field must be set to a valid secp256k1 private key.
  62. PrivateKey *ecdsa.PrivateKey `toml:"-"`
  63. // MaxPeers is the maximum number of peers that can be
  64. // connected. It must be greater than zero.
  65. MaxPeers int
  66. // MaxPendingPeers is the maximum number of peers that can be pending in the
  67. // handshake phase, counted separately for inbound and outbound connections.
  68. // Zero defaults to preset values.
  69. MaxPendingPeers int `toml:",omitempty"`
  70. // DialRatio controls the ratio of inbound to dialed connections.
  71. // Example: a DialRatio of 2 allows 1/2 of connections to be dialed.
  72. // Setting DialRatio to zero defaults it to 3.
  73. DialRatio int `toml:",omitempty"`
  74. // NoDiscovery can be used to disable the peer discovery mechanism.
  75. // Disabling is useful for protocol debugging (manual topology).
  76. NoDiscovery bool
  77. // DiscoveryV5 specifies whether the new topic-discovery based V5 discovery
  78. // protocol should be started or not.
  79. DiscoveryV5 bool `toml:",omitempty"`
  80. // Name sets the node name of this server.
  81. // Use common.MakeName to create a name that follows existing conventions.
  82. Name string `toml:"-"`
  83. // BootstrapNodes are used to establish connectivity
  84. // with the rest of the network.
  85. BootstrapNodes []*enode.Node
  86. // BootstrapNodesV5 are used to establish connectivity
  87. // with the rest of the network using the V5 discovery
  88. // protocol.
  89. BootstrapNodesV5 []*enode.Node `toml:",omitempty"`
  90. // Static nodes are used as pre-configured connections which are always
  91. // maintained and re-connected on disconnects.
  92. StaticNodes []*enode.Node
  93. // Trusted nodes are used as pre-configured connections which are always
  94. // allowed to connect, even above the peer limit.
  95. TrustedNodes []*enode.Node
  96. // Connectivity can be restricted to certain IP networks.
  97. // If this option is set to a non-nil value, only hosts which match one of the
  98. // IP networks contained in the list are considered.
  99. NetRestrict *netutil.Netlist `toml:",omitempty"`
  100. // NodeDatabase is the path to the database containing the previously seen
  101. // live nodes in the network.
  102. NodeDatabase string `toml:",omitempty"`
  103. // Protocols should contain the protocols supported
  104. // by the server. Matching protocols are launched for
  105. // each peer.
  106. Protocols []Protocol `toml:"-"`
  107. // If ListenAddr is set to a non-nil address, the server
  108. // will listen for incoming connections.
  109. //
  110. // If the port is zero, the operating system will pick a port. The
  111. // ListenAddr field will be updated with the actual address when
  112. // the server is started.
  113. ListenAddr string
  114. // If set to a non-nil value, the given NAT port mapper
  115. // is used to make the listening port available to the
  116. // Internet.
  117. NAT nat.Interface `toml:",omitempty"`
  118. // If Dialer is set to a non-nil value, the given Dialer
  119. // is used to dial outbound peer connections.
  120. Dialer NodeDialer `toml:"-"`
  121. // If NoDial is true, the server will not dial any peers.
  122. NoDial bool `toml:",omitempty"`
  123. // If EnableMsgEvents is set then the server will emit PeerEvents
  124. // whenever a message is sent to or received from a peer
  125. EnableMsgEvents bool
  126. EnableNodePermission bool `toml:",omitempty"`
  127. DataDir string `toml:",omitempty"`
  128. // Logger is a custom logger to use with the p2p.Server.
  129. Logger log.Logger `toml:",omitempty"`
  130. clock mclock.Clock
  131. }
  132. // Server manages all peer connections.
  133. type Server struct {
  134. // Config fields may not be modified while the server is running.
  135. Config
  136. // Hooks for testing. These are useful because we can inhibit
  137. // the whole protocol stack.
  138. newTransport func(net.Conn, *ecdsa.PublicKey) transport
  139. newPeerHook func(*Peer)
  140. listenFunc func(network, addr string) (net.Listener, error)
  141. lock sync.Mutex // protects running
  142. running bool
  143. listener net.Listener
  144. ourHandshake *protoHandshake
  145. loopWG sync.WaitGroup // loop, listenLoop
  146. peerFeed event.Feed
  147. log log.Logger
  148. nodedb *enode.DB
  149. localnode *enode.LocalNode
  150. ntab *discover.UDPv4
  151. DiscV5 *discover.UDPv5
  152. discmix *enode.FairMix
  153. dialsched *dialScheduler
  154. // Channels into the run loop.
  155. quit chan struct{}
  156. addtrusted chan *enode.Node
  157. removetrusted chan *enode.Node
  158. peerOp chan peerOpFunc
  159. peerOpDone chan struct{}
  160. delpeer chan peerDrop
  161. checkpointPostHandshake chan *conn
  162. checkpointAddPeer chan *conn
  163. // State of run loop and listenLoop.
  164. inboundHistory expHeap
  165. // raft peers info
  166. checkPeerInRaft func(*enode.Node) bool
  167. // permissions - check if node is permissioned
  168. isNodePermissionedFunc func(node *enode.Node, nodename string, currentNode string, datadir string, direction string) bool
  169. }
  170. type peerOpFunc func(map[enode.ID]*Peer)
  171. type peerDrop struct {
  172. *Peer
  173. err error
  174. requested bool // true if signaled by the peer
  175. }
  176. type connFlag int32
  177. const (
  178. dynDialedConn connFlag = 1 << iota
  179. staticDialedConn
  180. inboundConn
  181. trustedConn
  182. )
  183. // conn wraps a network connection with information gathered
  184. // during the two handshakes.
  185. type conn struct {
  186. fd net.Conn
  187. transport
  188. node *enode.Node
  189. flags connFlag
  190. cont chan error // The run loop uses cont to signal errors to SetupConn.
  191. caps []Cap // valid after the protocol handshake
  192. name string // valid after the protocol handshake
  193. }
  194. type transport interface {
  195. // The two handshakes.
  196. doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error)
  197. doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
  198. // The MsgReadWriter can only be used after the encryption
  199. // handshake has completed. The code uses conn.id to track this
  200. // by setting it to a non-nil value after the encryption handshake.
  201. MsgReadWriter
  202. // transports must provide Close because we use MsgPipe in some of
  203. // the tests. Closing the actual network connection doesn't do
  204. // anything in those tests because MsgPipe doesn't use it.
  205. close(err error)
  206. }
  207. func (c *conn) String() string {
  208. s := c.flags.String()
  209. if (c.node.ID() != enode.ID{}) {
  210. s += " " + c.node.ID().String()
  211. }
  212. s += " " + c.fd.RemoteAddr().String()
  213. return s
  214. }
  215. func (f connFlag) String() string {
  216. s := ""
  217. if f&trustedConn != 0 {
  218. s += "-trusted"
  219. }
  220. if f&dynDialedConn != 0 {
  221. s += "-dyndial"
  222. }
  223. if f&staticDialedConn != 0 {
  224. s += "-staticdial"
  225. }
  226. if f&inboundConn != 0 {
  227. s += "-inbound"
  228. }
  229. if s != "" {
  230. s = s[1:]
  231. }
  232. return s
  233. }
  234. func (c *conn) is(f connFlag) bool {
  235. flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
  236. return flags&f != 0
  237. }
  238. func (c *conn) set(f connFlag, val bool) {
  239. for {
  240. oldFlags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
  241. flags := oldFlags
  242. if val {
  243. flags |= f
  244. } else {
  245. flags &= ^f
  246. }
  247. if atomic.CompareAndSwapInt32((*int32)(&c.flags), int32(oldFlags), int32(flags)) {
  248. return
  249. }
  250. }
  251. }
  252. // LocalNode returns the local node record.
  253. func (srv *Server) LocalNode() *enode.LocalNode {
  254. return srv.localnode
  255. }
  256. // Peers returns all connected peers.
  257. func (srv *Server) Peers() []*Peer {
  258. var ps []*Peer
  259. srv.doPeerOp(func(peers map[enode.ID]*Peer) {
  260. for _, p := range peers {
  261. ps = append(ps, p)
  262. }
  263. })
  264. return ps
  265. }
  266. // PeerCount returns the number of connected peers.
  267. func (srv *Server) PeerCount() int {
  268. var count int
  269. srv.doPeerOp(func(ps map[enode.ID]*Peer) {
  270. count = len(ps)
  271. })
  272. return count
  273. }
  274. // AddPeer adds the given node to the static node set. When there is room in the peer set,
  275. // the server will connect to the node. If the connection fails for any reason, the server
  276. // will attempt to reconnect the peer.
  277. func (srv *Server) AddPeer(node *enode.Node) {
  278. srv.dialsched.addStatic(node)
  279. }
  280. // RemovePeer removes a node from the static node set. It also disconnects from the given
  281. // node if it is currently connected as a peer.
  282. //
  283. // This method blocks until all protocols have exited and the peer is removed. Do not use
  284. // RemovePeer in protocol implementations, call Disconnect on the Peer instead.
  285. func (srv *Server) RemovePeer(node *enode.Node) {
  286. var (
  287. ch chan *PeerEvent
  288. sub event.Subscription
  289. )
  290. // Disconnect the peer on the main loop.
  291. srv.doPeerOp(func(peers map[enode.ID]*Peer) {
  292. srv.dialsched.removeStatic(node)
  293. if peer := peers[node.ID()]; peer != nil {
  294. ch = make(chan *PeerEvent, 1)
  295. sub = srv.peerFeed.Subscribe(ch)
  296. peer.Disconnect(DiscRequested)
  297. }
  298. })
  299. // Wait for the peer connection to end.
  300. if ch != nil {
  301. defer sub.Unsubscribe()
  302. for ev := range ch {
  303. if ev.Peer == node.ID() && ev.Type == PeerEventTypeDrop {
  304. return
  305. }
  306. }
  307. }
  308. }
  309. // AddTrustedPeer adds the given node to a reserved whitelist which allows the
  310. // node to always connect, even if the slot are full.
  311. func (srv *Server) AddTrustedPeer(node *enode.Node) {
  312. select {
  313. case srv.addtrusted <- node:
  314. case <-srv.quit:
  315. }
  316. }
  317. // RemoveTrustedPeer removes the given node from the trusted peer set.
  318. func (srv *Server) RemoveTrustedPeer(node *enode.Node) {
  319. select {
  320. case srv.removetrusted <- node:
  321. case <-srv.quit:
  322. }
  323. }
  324. // SubscribePeers subscribes the given channel to peer events
  325. func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
  326. return srv.peerFeed.Subscribe(ch)
  327. }
  328. // Self returns the local node's endpoint information.
  329. func (srv *Server) Self() *enode.Node {
  330. srv.lock.Lock()
  331. ln := srv.localnode
  332. srv.lock.Unlock()
  333. if ln == nil {
  334. return enode.NewV4(&srv.PrivateKey.PublicKey, net.ParseIP("0.0.0.0"), 0, 0)
  335. }
  336. return ln.Node()
  337. }
  338. // Stop terminates the server and all active peer connections.
  339. // It blocks until all active connections have been closed.
  340. func (srv *Server) Stop() {
  341. srv.lock.Lock()
  342. if !srv.running {
  343. srv.lock.Unlock()
  344. return
  345. }
  346. srv.running = false
  347. if srv.listener != nil {
  348. // this unblocks listener Accept
  349. srv.listener.Close()
  350. }
  351. close(srv.quit)
  352. srv.lock.Unlock()
  353. srv.loopWG.Wait()
  354. }
  355. // sharedUDPConn implements a shared connection. Write sends messages to the underlying connection while read returns
  356. // messages that were found unprocessable and sent to the unhandled channel by the primary listener.
  357. type sharedUDPConn struct {
  358. *net.UDPConn
  359. unhandled chan discover.ReadPacket
  360. }
  361. // ReadFromUDP implements discover.UDPConn
  362. func (s *sharedUDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
  363. packet, ok := <-s.unhandled
  364. if !ok {
  365. return 0, nil, errors.New("connection was closed")
  366. }
  367. l := len(packet.Data)
  368. if l > len(b) {
  369. l = len(b)
  370. }
  371. copy(b[:l], packet.Data[:l])
  372. return l, packet.Addr, nil
  373. }
  374. // Close implements discover.UDPConn
  375. func (s *sharedUDPConn) Close() error {
  376. return nil
  377. }
  378. // Start starts running the server.
  379. // Servers can not be re-used after stopping.
  380. func (srv *Server) Start() (err error) {
  381. srv.lock.Lock()
  382. defer srv.lock.Unlock()
  383. if srv.running {
  384. return errors.New("server already running")
  385. }
  386. srv.running = true
  387. srv.log = srv.Config.Logger
  388. if srv.log == nil {
  389. srv.log = log.Root()
  390. }
  391. if srv.clock == nil {
  392. srv.clock = mclock.System{}
  393. }
  394. if srv.NoDial && srv.ListenAddr == "" {
  395. srv.log.Warn("P2P server will be useless, neither dialing nor listening")
  396. }
  397. // static fields
  398. if srv.PrivateKey == nil {
  399. return errors.New("Server.PrivateKey must be set to a non-nil key")
  400. }
  401. if srv.newTransport == nil {
  402. srv.newTransport = newRLPX
  403. }
  404. if srv.listenFunc == nil {
  405. srv.listenFunc = net.Listen
  406. }
  407. srv.quit = make(chan struct{})
  408. srv.delpeer = make(chan peerDrop)
  409. srv.checkpointPostHandshake = make(chan *conn)
  410. srv.checkpointAddPeer = make(chan *conn)
  411. srv.addtrusted = make(chan *enode.Node)
  412. srv.removetrusted = make(chan *enode.Node)
  413. srv.peerOp = make(chan peerOpFunc)
  414. srv.peerOpDone = make(chan struct{})
  415. if err := srv.setupLocalNode(); err != nil {
  416. return err
  417. }
  418. if srv.ListenAddr != "" {
  419. if err := srv.setupListening(); err != nil {
  420. return err
  421. }
  422. }
  423. if err := srv.setupDiscovery(); err != nil {
  424. return err
  425. }
  426. srv.setupDialScheduler()
  427. srv.loopWG.Add(1)
  428. go srv.run()
  429. return nil
  430. }
  431. func (srv *Server) setupLocalNode() error {
  432. // Create the devp2p handshake.
  433. pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
  434. srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]}
  435. for _, p := range srv.Protocols {
  436. srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
  437. }
  438. sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
  439. // Create the local node.
  440. db, err := enode.OpenDB(srv.Config.NodeDatabase)
  441. if err != nil {
  442. return err
  443. }
  444. srv.nodedb = db
  445. srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
  446. srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
  447. // TODO: check conflicts
  448. for _, p := range srv.Protocols {
  449. for _, e := range p.Attributes {
  450. srv.localnode.Set(e)
  451. }
  452. }
  453. switch srv.NAT.(type) {
  454. case nil:
  455. // No NAT interface, do nothing.
  456. case nat.ExtIP:
  457. // ExtIP doesn't block, set the IP right away.
  458. ip, _ := srv.NAT.ExternalIP()
  459. srv.localnode.SetStaticIP(ip)
  460. default:
  461. // Ask the router about the IP. This takes a while and blocks startup,
  462. // do it in the background.
  463. srv.loopWG.Add(1)
  464. go func() {
  465. defer srv.loopWG.Done()
  466. if ip, err := srv.NAT.ExternalIP(); err == nil {
  467. srv.localnode.SetStaticIP(ip)
  468. }
  469. }()
  470. }
  471. return nil
  472. }
  473. func (srv *Server) setupDiscovery() error {
  474. srv.discmix = enode.NewFairMix(discmixTimeout)
  475. // Add protocol-specific discovery sources.
  476. added := make(map[string]bool)
  477. for _, proto := range srv.Protocols {
  478. if proto.DialCandidates != nil && !added[proto.Name] {
  479. srv.discmix.AddSource(proto.DialCandidates)
  480. added[proto.Name] = true
  481. }
  482. }
  483. // Don't listen on UDP endpoint if DHT is disabled.
  484. if srv.NoDiscovery && !srv.DiscoveryV5 {
  485. return nil
  486. }
  487. addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
  488. if err != nil {
  489. return err
  490. }
  491. conn, err := net.ListenUDP("udp", addr)
  492. if err != nil {
  493. return err
  494. }
  495. realaddr := conn.LocalAddr().(*net.UDPAddr)
  496. srv.log.Debug("UDP listener up", "addr", realaddr)
  497. if srv.NAT != nil {
  498. if !realaddr.IP.IsLoopback() {
  499. srv.loopWG.Add(1)
  500. go func() {
  501. nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
  502. srv.loopWG.Done()
  503. }()
  504. }
  505. }
  506. srv.localnode.SetFallbackUDP(realaddr.Port)
  507. // Discovery V4
  508. var unhandled chan discover.ReadPacket
  509. var sconn *sharedUDPConn
  510. if !srv.NoDiscovery {
  511. if srv.DiscoveryV5 {
  512. unhandled = make(chan discover.ReadPacket, 100)
  513. sconn = &sharedUDPConn{conn, unhandled}
  514. }
  515. cfg := discover.Config{
  516. PrivateKey: srv.PrivateKey,
  517. NetRestrict: srv.NetRestrict,
  518. Bootnodes: srv.BootstrapNodes,
  519. Unhandled: unhandled,
  520. Log: srv.log,
  521. }
  522. ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
  523. if err != nil {
  524. return err
  525. }
  526. srv.ntab = ntab
  527. srv.discmix.AddSource(ntab.RandomNodes())
  528. }
  529. // Discovery V5
  530. if srv.DiscoveryV5 {
  531. cfg := discover.Config{
  532. PrivateKey: srv.PrivateKey,
  533. NetRestrict: srv.NetRestrict,
  534. Bootnodes: srv.BootstrapNodesV5,
  535. Log: srv.log,
  536. }
  537. var err error
  538. if sconn != nil {
  539. srv.DiscV5, err = discover.ListenV5(sconn, srv.localnode, cfg)
  540. } else {
  541. srv.DiscV5, err = discover.ListenV5(conn, srv.localnode, cfg)
  542. }
  543. if err != nil {
  544. return err
  545. }
  546. }
  547. return nil
  548. }
  549. func (srv *Server) setupDialScheduler() {
  550. config := dialConfig{
  551. self: srv.localnode.ID(),
  552. maxDialPeers: srv.maxDialedConns(),
  553. maxActiveDials: srv.MaxPendingPeers,
  554. log: srv.Logger,
  555. netRestrict: srv.NetRestrict,
  556. dialer: srv.Dialer,
  557. clock: srv.clock,
  558. }
  559. if srv.ntab != nil {
  560. config.resolver = srv.ntab
  561. }
  562. if config.dialer == nil {
  563. config.dialer = tcpDialer{&net.Dialer{Timeout: defaultDialTimeout}}
  564. }
  565. srv.dialsched = newDialScheduler(config, srv.discmix, srv.SetupConn)
  566. for _, n := range srv.StaticNodes {
  567. srv.dialsched.addStatic(n)
  568. }
  569. }
  570. func (srv *Server) maxInboundConns() int {
  571. return srv.MaxPeers - srv.maxDialedConns()
  572. }
  573. func (srv *Server) maxDialedConns() (limit int) {
  574. if srv.NoDial || srv.MaxPeers == 0 {
  575. return 0
  576. }
  577. if srv.DialRatio == 0 {
  578. limit = srv.MaxPeers / defaultDialRatio
  579. } else {
  580. limit = srv.MaxPeers / srv.DialRatio
  581. }
  582. if limit == 0 {
  583. limit = 1
  584. }
  585. return limit
  586. }
  587. func (srv *Server) setupListening() error {
  588. // Launch the listener.
  589. listener, err := srv.listenFunc("tcp", srv.ListenAddr)
  590. if err != nil {
  591. return err
  592. }
  593. srv.listener = listener
  594. srv.ListenAddr = listener.Addr().String()
  595. // Update the local node record and map the TCP listening port if NAT is configured.
  596. if tcp, ok := listener.Addr().(*net.TCPAddr); ok {
  597. srv.localnode.Set(enr.TCP(tcp.Port))
  598. if !tcp.IP.IsLoopback() && srv.NAT != nil {
  599. srv.loopWG.Add(1)
  600. go func() {
  601. nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
  602. srv.loopWG.Done()
  603. }()
  604. }
  605. }
  606. srv.loopWG.Add(1)
  607. go srv.listenLoop()
  608. return nil
  609. }
  610. // doPeerOp runs fn on the main loop.
  611. func (srv *Server) doPeerOp(fn peerOpFunc) {
  612. select {
  613. case srv.peerOp <- fn:
  614. <-srv.peerOpDone
  615. case <-srv.quit:
  616. }
  617. }
  618. // run is the main loop of the server.
  619. func (srv *Server) run() {
  620. srv.log.Info("Started P2P networking", "self", srv.localnode.Node().URLv4())
  621. defer srv.loopWG.Done()
  622. defer srv.nodedb.Close()
  623. defer srv.discmix.Close()
  624. defer srv.dialsched.stop()
  625. var (
  626. peers = make(map[enode.ID]*Peer)
  627. inboundCount = 0
  628. trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
  629. )
  630. // Put trusted nodes into a map to speed up checks.
  631. // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
  632. for _, n := range srv.TrustedNodes {
  633. trusted[n.ID()] = true
  634. }
  635. running:
  636. for {
  637. select {
  638. case <-srv.quit:
  639. // The server was stopped. Run the cleanup logic.
  640. break running
  641. case n := <-srv.addtrusted:
  642. // This channel is used by AddTrustedPeer to add a node
  643. // to the trusted node set.
  644. srv.log.Trace("Adding trusted node", "node", n)
  645. trusted[n.ID()] = true
  646. if p, ok := peers[n.ID()]; ok {
  647. p.rw.set(trustedConn, true)
  648. }
  649. case n := <-srv.removetrusted:
  650. // This channel is used by RemoveTrustedPeer to remove a node
  651. // from the trusted node set.
  652. srv.log.Trace("Removing trusted node", "node", n)
  653. delete(trusted, n.ID())
  654. if p, ok := peers[n.ID()]; ok {
  655. p.rw.set(trustedConn, false)
  656. }
  657. case op := <-srv.peerOp:
  658. // This channel is used by Peers and PeerCount.
  659. op(peers)
  660. srv.peerOpDone <- struct{}{}
  661. case c := <-srv.checkpointPostHandshake:
  662. // A connection has passed the encryption handshake so
  663. // the remote identity is known (but hasn't been verified yet).
  664. if trusted[c.node.ID()] {
  665. // Ensure that the trusted flag is set before checking against MaxPeers.
  666. c.flags |= trustedConn
  667. }
  668. // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
  669. c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
  670. case c := <-srv.checkpointAddPeer:
  671. // At this point the connection is past the protocol handshake.
  672. // Its capabilities are known and the remote identity is verified.
  673. err := srv.addPeerChecks(peers, inboundCount, c)
  674. if err == nil {
  675. // The handshakes are done and it passed all checks.
  676. p := srv.launchPeer(c)
  677. peers[c.node.ID()] = p
  678. srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
  679. srv.dialsched.peerAdded(c)
  680. if p.Inbound() {
  681. inboundCount++
  682. }
  683. }
  684. c.cont <- err
  685. case pd := <-srv.delpeer:
  686. // A peer disconnected.
  687. d := common.PrettyDuration(mclock.Now() - pd.created)
  688. delete(peers, pd.ID())
  689. srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
  690. srv.dialsched.peerRemoved(pd.rw)
  691. if pd.Inbound() {
  692. inboundCount--
  693. }
  694. }
  695. }
  696. srv.log.Trace("P2P networking is spinning down")
  697. // Terminate discovery. If there is a running lookup it will terminate soon.
  698. if srv.ntab != nil {
  699. srv.ntab.Close()
  700. }
  701. if srv.DiscV5 != nil {
  702. srv.DiscV5.Close()
  703. }
  704. // Disconnect all peers.
  705. for _, p := range peers {
  706. p.Disconnect(DiscQuitting)
  707. }
  708. // Wait for peers to shut down. Pending connections and tasks are
  709. // not handled here and will terminate soon-ish because srv.quit
  710. // is closed.
  711. for len(peers) > 0 {
  712. p := <-srv.delpeer
  713. p.log.Trace("<-delpeer (spindown)")
  714. delete(peers, p.ID())
  715. }
  716. }
  717. func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
  718. switch {
  719. case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
  720. return DiscTooManyPeers
  721. case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
  722. return DiscTooManyPeers
  723. case peers[c.node.ID()] != nil:
  724. return DiscAlreadyConnected
  725. case c.node.ID() == srv.localnode.ID():
  726. return DiscSelf
  727. default:
  728. return nil
  729. }
  730. }
  731. func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
  732. // Drop connections with no matching protocols.
  733. if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
  734. return DiscUselessPeer
  735. }
  736. // Repeat the post-handshake checks because the
  737. // peer set might have changed since those checks were performed.
  738. return srv.postHandshakeChecks(peers, inboundCount, c)
  739. }
  740. // listenLoop runs in its own goroutine and accepts
  741. // inbound connections.
  742. func (srv *Server) listenLoop() {
  743. srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
  744. // The slots channel limits accepts of new connections.
  745. tokens := defaultMaxPendingPeers
  746. if srv.MaxPendingPeers > 0 {
  747. tokens = srv.MaxPendingPeers
  748. }
  749. slots := make(chan struct{}, tokens)
  750. for i := 0; i < tokens; i++ {
  751. slots <- struct{}{}
  752. }
  753. // Wait for slots to be returned on exit. This ensures all connection goroutines
  754. // are down before listenLoop returns.
  755. defer srv.loopWG.Done()
  756. defer func() {
  757. for i := 0; i < cap(slots); i++ {
  758. <-slots
  759. }
  760. }()
  761. for {
  762. // Wait for a free slot before accepting.
  763. <-slots
  764. var (
  765. fd net.Conn
  766. err error
  767. lastLog time.Time
  768. )
  769. for {
  770. fd, err = srv.listener.Accept()
  771. if netutil.IsTemporaryError(err) {
  772. if time.Since(lastLog) > 1*time.Second {
  773. srv.log.Debug("Temporary read error", "err", err)
  774. lastLog = time.Now()
  775. }
  776. time.Sleep(time.Millisecond * 200)
  777. continue
  778. } else if err != nil {
  779. srv.log.Debug("Read error", "err", err)
  780. slots <- struct{}{}
  781. return
  782. }
  783. break
  784. }
  785. remoteIP := netutil.AddrIP(fd.RemoteAddr())
  786. if err := srv.checkInboundConn(remoteIP); err != nil {
  787. srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
  788. fd.Close()
  789. slots <- struct{}{}
  790. continue
  791. }
  792. if remoteIP != nil {
  793. var addr *net.TCPAddr
  794. if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
  795. addr = tcp
  796. }
  797. fd = newMeteredConn(fd, true, addr)
  798. srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
  799. }
  800. go func() {
  801. srv.SetupConn(fd, inboundConn, nil)
  802. slots <- struct{}{}
  803. }()
  804. }
  805. }
  806. func (srv *Server) checkInboundConn(remoteIP net.IP) error {
  807. if remoteIP == nil {
  808. return nil
  809. }
  810. // Reject connections that do not match NetRestrict.
  811. if srv.NetRestrict != nil && !srv.NetRestrict.Contains(remoteIP) {
  812. return fmt.Errorf("not whitelisted in NetRestrict")
  813. }
  814. // Reject Internet peers that try too often.
  815. now := srv.clock.Now()
  816. srv.inboundHistory.expire(now, nil)
  817. if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) {
  818. return fmt.Errorf("too many attempts")
  819. }
  820. srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime))
  821. return nil
  822. }
  823. // SetupConn runs the handshakes and attempts to add the connection
  824. // as a peer. It returns when the connection has been added as a peer
  825. // or the handshakes have failed.
  826. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
  827. c := &conn{fd: fd, flags: flags, cont: make(chan error)}
  828. if dialDest == nil {
  829. c.transport = srv.newTransport(fd, nil)
  830. } else {
  831. c.transport = srv.newTransport(fd, dialDest.Pubkey())
  832. }
  833. err := srv.setupConn(c, flags, dialDest)
  834. if err != nil {
  835. c.close(err)
  836. }
  837. return err
  838. }
  839. func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
  840. // Prevent leftover pending conns from entering the handshake.
  841. srv.lock.Lock()
  842. running := srv.running
  843. srv.lock.Unlock()
  844. if !running {
  845. return errServerStopped
  846. }
  847. // If dialing, figure out the remote public key.
  848. var dialPubkey *ecdsa.PublicKey
  849. if dialDest != nil {
  850. dialPubkey = new(ecdsa.PublicKey)
  851. if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
  852. err = errors.New("dial destination doesn't have a secp256k1 public key")
  853. srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
  854. return err
  855. }
  856. }
  857. // Run the RLPx handshake.
  858. remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
  859. if err != nil {
  860. srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
  861. return err
  862. }
  863. if dialDest != nil {
  864. c.node = dialDest
  865. } else {
  866. c.node = nodeFromConn(remotePubkey, c.fd)
  867. }
  868. clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
  869. // If raft is running, check if the dialing node is in the raft cluster
  870. // Node doesn't belong to raft cluster is not allowed to join the p2p network
  871. if srv.checkPeerInRaft != nil && !srv.checkPeerInRaft(c.node) {
  872. node := c.node.ID().String()
  873. log.Trace("incoming connection peer is not in the raft cluster", "enode.id", node)
  874. return newPeerError(errNotInRaftCluster, "id=%s…%s", node[:4], node[len(node)-4:])
  875. }
  876. //START - QUORUM Permissioning
  877. currentNode := srv.NodeInfo().ID
  878. cnodeName := srv.NodeInfo().Name
  879. clog.Trace("Quorum permissioning",
  880. "EnableNodePermission", srv.EnableNodePermission,
  881. "DataDir", srv.DataDir,
  882. "Current Node ID", currentNode,
  883. "Node Name", cnodeName,
  884. "Dialed Dest", dialDest,
  885. "Connection ID", c.node.ID(),
  886. "Connection String", c.node.ID().String())
  887. if srv.EnableNodePermission {
  888. clog.Trace("Node Permissioning is Enabled.")
  889. nodeId := c.node.ID().String()
  890. node := c.node
  891. direction := "INCOMING"
  892. if dialDest != nil {
  893. node = dialDest
  894. nodeId = dialDest.ID().String()
  895. direction = "OUTGOING"
  896. log.Trace("Node Permissioning", "Connection Direction", direction)
  897. }
  898. if srv.isNodePermissionedFunc == nil {
  899. if !core.IsNodePermissioned(nodeId, currentNode, srv.DataDir, direction) {
  900. return newPeerError(errPermissionDenied, "id=%s…%s %s id=%s…%s", currentNode[:4], currentNode[len(currentNode)-4:], direction, nodeId[:4], nodeId[len(nodeId)-4:])
  901. }
  902. } else if !srv.isNodePermissionedFunc(node, nodeId, currentNode, srv.DataDir, direction) {
  903. return newPeerError(errPermissionDenied, "id=%s…%s %s id=%s…%s", currentNode[:4], currentNode[len(currentNode)-4:], direction, nodeId[:4], nodeId[len(nodeId)-4:])
  904. }
  905. } else {
  906. clog.Trace("Node Permissioning is Disabled.")
  907. }
  908. //END - QUORUM Permissioning
  909. err = srv.checkpoint(c, srv.checkpointPostHandshake)
  910. if err != nil {
  911. clog.Trace("Rejected peer", "err", err)
  912. return err
  913. }
  914. // Run the capability negotiation handshake.
  915. phs, err := c.doProtoHandshake(srv.ourHandshake)
  916. if err != nil {
  917. clog.Trace("Failed p2p handshake", "err", err)
  918. return err
  919. }
  920. if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
  921. clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
  922. return DiscUnexpectedIdentity
  923. }
  924. // To continue support of IBFT1.0 with besu
  925. if len(phs.Caps) == 1 && phs.Caps[0].Name == "istanbul" && phs.Caps[0].Version == 99 {
  926. phs.Caps = []Cap{{
  927. Name: "eth",
  928. Version: 64,
  929. }}
  930. }
  931. c.caps, c.name = phs.Caps, phs.Name
  932. err = srv.checkpoint(c, srv.checkpointAddPeer)
  933. if err != nil {
  934. clog.Trace("Rejected peer", "err", err)
  935. return err
  936. }
  937. return nil
  938. }
  939. func nodeFromConn(pubkey *ecdsa.PublicKey, conn net.Conn) *enode.Node {
  940. var ip net.IP
  941. var port int
  942. if tcp, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
  943. ip = tcp.IP
  944. port = tcp.Port
  945. }
  946. return enode.NewV4(pubkey, ip, port, port)
  947. }
  948. // checkpoint sends the conn to run, which performs the
  949. // post-handshake checks for the stage (posthandshake, addpeer).
  950. func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error {
  951. select {
  952. case stage <- c:
  953. case <-srv.quit:
  954. return errServerStopped
  955. }
  956. return <-c.cont
  957. }
  958. func (srv *Server) launchPeer(c *conn) *Peer {
  959. p := newPeer(srv.log, c, srv.Protocols)
  960. if srv.EnableMsgEvents {
  961. // If message events are enabled, pass the peerFeed
  962. // to the peer.
  963. p.events = &srv.peerFeed
  964. }
  965. go srv.runPeer(p)
  966. return p
  967. }
  968. // runPeer runs in its own goroutine for each peer.
  969. func (srv *Server) runPeer(p *Peer) {
  970. if srv.newPeerHook != nil {
  971. srv.newPeerHook(p)
  972. }
  973. srv.peerFeed.Send(&PeerEvent{
  974. Type: PeerEventTypeAdd,
  975. Peer: p.ID(),
  976. RemoteAddress: p.RemoteAddr().String(),
  977. LocalAddress: p.LocalAddr().String(),
  978. })
  979. // Run the per-peer main loop.
  980. remoteRequested, err := p.run()
  981. // Announce disconnect on the main loop to update the peer set.
  982. // The main loop waits for existing peers to be sent on srv.delpeer
  983. // before returning, so this send should not select on srv.quit.
  984. srv.delpeer <- peerDrop{p, err, remoteRequested}
  985. // Broadcast peer drop to external subscribers. This needs to be
  986. // after the send to delpeer so subscribers have a consistent view of
  987. // the peer set (i.e. Server.Peers() doesn't include the peer when the
  988. // event is received.
  989. srv.peerFeed.Send(&PeerEvent{
  990. Type: PeerEventTypeDrop,
  991. Peer: p.ID(),
  992. Error: err.Error(),
  993. RemoteAddress: p.RemoteAddr().String(),
  994. LocalAddress: p.LocalAddr().String(),
  995. })
  996. }
  997. // NodeInfo represents a short summary of the information known about the host.
  998. type NodeInfo struct {
  999. ID string `json:"id"` // Unique node identifier (also the encryption key)
  1000. Name string `json:"name"` // Name of the node, including client type, version, OS, custom data
  1001. Enode string `json:"enode"` // Enode URL for adding this peer from remote peers
  1002. ENR string `json:"enr"` // Ethereum Node Record
  1003. IP string `json:"ip"` // IP address of the node
  1004. Ports struct {
  1005. Discovery int `json:"discovery"` // UDP listening port for discovery protocol
  1006. Listener int `json:"listener"` // TCP listening port for RLPx
  1007. } `json:"ports"`
  1008. ListenAddr string `json:"listenAddr"`
  1009. Protocols map[string]interface{} `json:"protocols"`
  1010. }
  1011. // NodeInfo gathers and returns a collection of metadata known about the host.
  1012. func (srv *Server) NodeInfo() *NodeInfo {
  1013. // Gather and assemble the generic node infos
  1014. node := srv.Self()
  1015. info := &NodeInfo{
  1016. Name: srv.Name,
  1017. Enode: node.URLv4(),
  1018. ID: node.ID().String(),
  1019. IP: node.IP().String(),
  1020. ListenAddr: srv.ListenAddr,
  1021. Protocols: make(map[string]interface{}),
  1022. }
  1023. info.Ports.Discovery = node.UDP()
  1024. info.Ports.Listener = node.TCP()
  1025. info.ENR = node.String()
  1026. // Gather all the running protocol infos (only once per protocol type)
  1027. for _, proto := range srv.Protocols {
  1028. if _, ok := info.Protocols[proto.Name]; !ok {
  1029. nodeInfo := interface{}("unknown")
  1030. if query := proto.NodeInfo; query != nil {
  1031. nodeInfo = proto.NodeInfo()
  1032. }
  1033. info.Protocols[proto.Name] = nodeInfo
  1034. }
  1035. }
  1036. return info
  1037. }
  1038. // PeersInfo returns an array of metadata objects describing connected peers.
  1039. func (srv *Server) PeersInfo() []*PeerInfo {
  1040. // Gather all the generic and sub-protocol specific infos
  1041. infos := make([]*PeerInfo, 0, srv.PeerCount())
  1042. for _, peer := range srv.Peers() {
  1043. if peer != nil {
  1044. infos = append(infos, peer.Info())
  1045. }
  1046. }
  1047. // Sort the result array alphabetically by node identifier
  1048. for i := 0; i < len(infos); i++ {
  1049. for j := i + 1; j < len(infos); j++ {
  1050. if infos[i].ID > infos[j].ID {
  1051. infos[i], infos[j] = infos[j], infos[i]
  1052. }
  1053. }
  1054. }
  1055. return infos
  1056. }
  1057. func (srv *Server) SetCheckPeerInRaft(f func(*enode.Node) bool) {
  1058. srv.checkPeerInRaft = f
  1059. }
  1060. func (srv *Server) SetIsNodePermissioned(f func(*enode.Node, string, string, string, string) bool) {
  1061. if srv.isNodePermissionedFunc == nil {
  1062. srv.isNodePermissionedFunc = f
  1063. }
  1064. }
  1065. func (srv *Server) SetNewTransportFunc(f func(net.Conn, *ecdsa.PublicKey) transport) {
  1066. srv.newTransport = f
  1067. }