nodedb.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package enode
  17. import (
  18. "bytes"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "net"
  23. "os"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. "github.com/syndtr/goleveldb/leveldb"
  28. "github.com/syndtr/goleveldb/leveldb/errors"
  29. "github.com/syndtr/goleveldb/leveldb/iterator"
  30. "github.com/syndtr/goleveldb/leveldb/opt"
  31. "github.com/syndtr/goleveldb/leveldb/storage"
  32. "github.com/syndtr/goleveldb/leveldb/util"
  33. )
  34. // Keys in the node database.
  35. const (
  36. dbVersionKey = "version" // Version of the database to flush if changes
  37. dbNodePrefix = "n:" // Identifier to prefix node entries with
  38. dbLocalPrefix = "local:"
  39. dbDiscoverRoot = "v4"
  40. dbDiscv5Root = "v5"
  41. // These fields are stored per ID and IP, the full key is "n:<ID>:v4:<IP>:findfail".
  42. // Use nodeItemKey to create those keys.
  43. dbNodeFindFails = "findfail"
  44. dbNodePing = "lastping"
  45. dbNodePong = "lastpong"
  46. dbNodeSeq = "seq"
  47. // Local information is keyed by ID only, the full key is "local:<ID>:seq".
  48. // Use localItemKey to create those keys.
  49. dbLocalSeq = "seq"
  50. )
  51. const (
  52. dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
  53. dbCleanupCycle = time.Hour // Time period for running the expiration task.
  54. dbVersion = 9
  55. )
  56. var (
  57. errInvalidIP = errors.New("invalid IP")
  58. )
  59. var zeroIP = make(net.IP, 16)
  60. // DB is the node database, storing previously seen nodes and any collected metadata about
  61. // them for QoS purposes.
  62. type DB struct {
  63. lvl *leveldb.DB // Interface to the database itself
  64. runner sync.Once // Ensures we can start at most one expirer
  65. quit chan struct{} // Channel to signal the expiring thread to stop
  66. }
  67. // OpenDB opens a node database for storing and retrieving infos about known peers in the
  68. // network. If no path is given an in-memory, temporary database is constructed.
  69. func OpenDB(path string) (*DB, error) {
  70. if path == "" {
  71. return newMemoryDB()
  72. }
  73. return newPersistentDB(path)
  74. }
  75. // newMemoryNodeDB creates a new in-memory node database without a persistent backend.
  76. func newMemoryDB() (*DB, error) {
  77. db, err := leveldb.Open(storage.NewMemStorage(), nil)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return &DB{lvl: db, quit: make(chan struct{})}, nil
  82. }
  83. // newPersistentNodeDB creates/opens a leveldb backed persistent node database,
  84. // also flushing its contents in case of a version mismatch.
  85. func newPersistentDB(path string) (*DB, error) {
  86. opts := &opt.Options{OpenFilesCacheCapacity: 5}
  87. db, err := leveldb.OpenFile(path, opts)
  88. if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
  89. db, err = leveldb.RecoverFile(path, nil)
  90. }
  91. if err != nil {
  92. return nil, err
  93. }
  94. // The nodes contained in the cache correspond to a certain protocol version.
  95. // Flush all nodes if the version doesn't match.
  96. currentVer := make([]byte, binary.MaxVarintLen64)
  97. currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
  98. blob, err := db.Get([]byte(dbVersionKey), nil)
  99. switch err {
  100. case leveldb.ErrNotFound:
  101. // Version not found (i.e. empty cache), insert it
  102. if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
  103. db.Close()
  104. return nil, err
  105. }
  106. case nil:
  107. // Version present, flush if different
  108. if !bytes.Equal(blob, currentVer) {
  109. db.Close()
  110. if err = os.RemoveAll(path); err != nil {
  111. return nil, err
  112. }
  113. return newPersistentDB(path)
  114. }
  115. }
  116. return &DB{lvl: db, quit: make(chan struct{})}, nil
  117. }
  118. // nodeKey returns the database key for a node record.
  119. func nodeKey(id ID) []byte {
  120. key := append([]byte(dbNodePrefix), id[:]...)
  121. key = append(key, ':')
  122. key = append(key, dbDiscoverRoot...)
  123. return key
  124. }
  125. // splitNodeKey returns the node ID of a key created by nodeKey.
  126. func splitNodeKey(key []byte) (id ID, rest []byte) {
  127. if !bytes.HasPrefix(key, []byte(dbNodePrefix)) {
  128. return ID{}, nil
  129. }
  130. item := key[len(dbNodePrefix):]
  131. copy(id[:], item[:len(id)])
  132. return id, item[len(id)+1:]
  133. }
  134. // nodeItemKey returns the database key for a node metadata field.
  135. func nodeItemKey(id ID, ip net.IP, field string) []byte {
  136. ip16 := ip.To16()
  137. if ip16 == nil {
  138. panic(fmt.Errorf("invalid IP (length %d)", len(ip)))
  139. }
  140. return bytes.Join([][]byte{nodeKey(id), ip16, []byte(field)}, []byte{':'})
  141. }
  142. // splitNodeItemKey returns the components of a key created by nodeItemKey.
  143. func splitNodeItemKey(key []byte) (id ID, ip net.IP, field string) {
  144. id, key = splitNodeKey(key)
  145. // Skip discover root.
  146. if string(key) == dbDiscoverRoot {
  147. return id, nil, ""
  148. }
  149. key = key[len(dbDiscoverRoot)+1:]
  150. // Split out the IP.
  151. ip = key[:16]
  152. if ip4 := ip.To4(); ip4 != nil {
  153. ip = ip4
  154. }
  155. key = key[16+1:]
  156. // Field is the remainder of key.
  157. field = string(key)
  158. return id, ip, field
  159. }
  160. func v5Key(id ID, ip net.IP, field string) []byte {
  161. return bytes.Join([][]byte{
  162. []byte(dbNodePrefix),
  163. id[:],
  164. []byte(dbDiscv5Root),
  165. ip.To16(),
  166. []byte(field),
  167. }, []byte{':'})
  168. }
  169. // localItemKey returns the key of a local node item.
  170. func localItemKey(id ID, field string) []byte {
  171. key := append([]byte(dbLocalPrefix), id[:]...)
  172. key = append(key, ':')
  173. key = append(key, field...)
  174. return key
  175. }
  176. // fetchInt64 retrieves an integer associated with a particular key.
  177. func (db *DB) fetchInt64(key []byte) int64 {
  178. blob, err := db.lvl.Get(key, nil)
  179. if err != nil {
  180. return 0
  181. }
  182. val, read := binary.Varint(blob)
  183. if read <= 0 {
  184. return 0
  185. }
  186. return val
  187. }
  188. // storeInt64 stores an integer in the given key.
  189. func (db *DB) storeInt64(key []byte, n int64) error {
  190. blob := make([]byte, binary.MaxVarintLen64)
  191. blob = blob[:binary.PutVarint(blob, n)]
  192. return db.lvl.Put(key, blob, nil)
  193. }
  194. // fetchUint64 retrieves an integer associated with a particular key.
  195. func (db *DB) fetchUint64(key []byte) uint64 {
  196. blob, err := db.lvl.Get(key, nil)
  197. if err != nil {
  198. return 0
  199. }
  200. val, _ := binary.Uvarint(blob)
  201. return val
  202. }
  203. // storeUint64 stores an integer in the given key.
  204. func (db *DB) storeUint64(key []byte, n uint64) error {
  205. blob := make([]byte, binary.MaxVarintLen64)
  206. blob = blob[:binary.PutUvarint(blob, n)]
  207. return db.lvl.Put(key, blob, nil)
  208. }
  209. // Node retrieves a node with a given id from the database.
  210. func (db *DB) Node(id ID) *Node {
  211. blob, err := db.lvl.Get(nodeKey(id), nil)
  212. if err != nil {
  213. return nil
  214. }
  215. return mustDecodeNode(id[:], blob)
  216. }
  217. func mustDecodeNode(id, data []byte) *Node {
  218. node := new(Node)
  219. if err := rlp.DecodeBytes(data, &node.r); err != nil {
  220. panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err))
  221. }
  222. // Restore node id cache.
  223. copy(node.id[:], id)
  224. return node
  225. }
  226. // UpdateNode inserts - potentially overwriting - a node into the peer database.
  227. func (db *DB) UpdateNode(node *Node) error {
  228. if node.Seq() < db.NodeSeq(node.ID()) {
  229. return nil
  230. }
  231. blob, err := rlp.EncodeToBytes(&node.r)
  232. if err != nil {
  233. return err
  234. }
  235. if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
  236. return err
  237. }
  238. return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
  239. }
  240. // NodeSeq returns the stored record sequence number of the given node.
  241. func (db *DB) NodeSeq(id ID) uint64 {
  242. return db.fetchUint64(nodeItemKey(id, zeroIP, dbNodeSeq))
  243. }
  244. // Resolve returns the stored record of the node if it has a larger sequence
  245. // number than n.
  246. func (db *DB) Resolve(n *Node) *Node {
  247. if n.Seq() > db.NodeSeq(n.ID()) {
  248. return n
  249. }
  250. return db.Node(n.ID())
  251. }
  252. // DeleteNode deletes all information associated with a node.
  253. func (db *DB) DeleteNode(id ID) {
  254. deleteRange(db.lvl, nodeKey(id))
  255. }
  256. func deleteRange(db *leveldb.DB, prefix []byte) {
  257. it := db.NewIterator(util.BytesPrefix(prefix), nil)
  258. defer it.Release()
  259. for it.Next() {
  260. db.Delete(it.Key(), nil)
  261. }
  262. }
  263. // ensureExpirer is a small helper method ensuring that the data expiration
  264. // mechanism is running. If the expiration goroutine is already running, this
  265. // method simply returns.
  266. //
  267. // The goal is to start the data evacuation only after the network successfully
  268. // bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
  269. // it would require significant overhead to exactly trace the first successful
  270. // convergence, it's simpler to "ensure" the correct state when an appropriate
  271. // condition occurs (i.e. a successful bonding), and discard further events.
  272. func (db *DB) ensureExpirer() {
  273. db.runner.Do(func() { go db.expirer() })
  274. }
  275. // expirer should be started in a go routine, and is responsible for looping ad
  276. // infinitum and dropping stale data from the database.
  277. func (db *DB) expirer() {
  278. tick := time.NewTicker(dbCleanupCycle)
  279. defer tick.Stop()
  280. for {
  281. select {
  282. case <-tick.C:
  283. db.expireNodes()
  284. case <-db.quit:
  285. return
  286. }
  287. }
  288. }
  289. // expireNodes iterates over the database and deletes all nodes that have not
  290. // been seen (i.e. received a pong from) for some time.
  291. func (db *DB) expireNodes() {
  292. it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
  293. defer it.Release()
  294. if !it.Next() {
  295. return
  296. }
  297. var (
  298. threshold = time.Now().Add(-dbNodeExpiration).Unix()
  299. youngestPong int64
  300. atEnd = false
  301. )
  302. for !atEnd {
  303. id, ip, field := splitNodeItemKey(it.Key())
  304. if field == dbNodePong {
  305. time, _ := binary.Varint(it.Value())
  306. if time > youngestPong {
  307. youngestPong = time
  308. }
  309. if time < threshold {
  310. // Last pong from this IP older than threshold, remove fields belonging to it.
  311. deleteRange(db.lvl, nodeItemKey(id, ip, ""))
  312. }
  313. }
  314. atEnd = !it.Next()
  315. nextID, _ := splitNodeKey(it.Key())
  316. if atEnd || nextID != id {
  317. // We've moved beyond the last entry of the current ID.
  318. // Remove everything if there was no recent enough pong.
  319. if youngestPong > 0 && youngestPong < threshold {
  320. deleteRange(db.lvl, nodeKey(id))
  321. }
  322. youngestPong = 0
  323. }
  324. }
  325. }
  326. // LastPingReceived retrieves the time of the last ping packet received from
  327. // a remote node.
  328. func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
  329. if ip = ip.To16(); ip == nil {
  330. return time.Time{}
  331. }
  332. return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
  333. }
  334. // UpdateLastPingReceived updates the last time we tried contacting a remote node.
  335. func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
  336. if ip = ip.To16(); ip == nil {
  337. return errInvalidIP
  338. }
  339. return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
  340. }
  341. // LastPongReceived retrieves the time of the last successful pong from remote node.
  342. func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
  343. if ip = ip.To16(); ip == nil {
  344. return time.Time{}
  345. }
  346. // Launch expirer
  347. db.ensureExpirer()
  348. return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
  349. }
  350. // UpdateLastPongReceived updates the last pong time of a node.
  351. func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
  352. if ip = ip.To16(); ip == nil {
  353. return errInvalidIP
  354. }
  355. return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
  356. }
  357. // FindFails retrieves the number of findnode failures since bonding.
  358. func (db *DB) FindFails(id ID, ip net.IP) int {
  359. if ip = ip.To16(); ip == nil {
  360. return 0
  361. }
  362. return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
  363. }
  364. // UpdateFindFails updates the number of findnode failures since bonding.
  365. func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
  366. if ip = ip.To16(); ip == nil {
  367. return errInvalidIP
  368. }
  369. return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
  370. }
  371. // FindFailsV5 retrieves the discv5 findnode failure counter.
  372. func (db *DB) FindFailsV5(id ID, ip net.IP) int {
  373. if ip = ip.To16(); ip == nil {
  374. return 0
  375. }
  376. return int(db.fetchInt64(v5Key(id, ip, dbNodeFindFails)))
  377. }
  378. // UpdateFindFailsV5 stores the discv5 findnode failure counter.
  379. func (db *DB) UpdateFindFailsV5(id ID, ip net.IP, fails int) error {
  380. if ip = ip.To16(); ip == nil {
  381. return errInvalidIP
  382. }
  383. return db.storeInt64(v5Key(id, ip, dbNodeFindFails), int64(fails))
  384. }
  385. // LocalSeq retrieves the local record sequence counter.
  386. func (db *DB) localSeq(id ID) uint64 {
  387. return db.fetchUint64(localItemKey(id, dbLocalSeq))
  388. }
  389. // storeLocalSeq stores the local record sequence counter.
  390. func (db *DB) storeLocalSeq(id ID, n uint64) {
  391. db.storeUint64(localItemKey(id, dbLocalSeq), n)
  392. }
  393. // QuerySeeds retrieves random nodes to be used as potential seed nodes
  394. // for bootstrapping.
  395. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
  396. var (
  397. now = time.Now()
  398. nodes = make([]*Node, 0, n)
  399. it = db.lvl.NewIterator(nil, nil)
  400. id ID
  401. )
  402. defer it.Release()
  403. seek:
  404. for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
  405. // Seek to a random entry. The first byte is incremented by a
  406. // random amount each time in order to increase the likelihood
  407. // of hitting all existing nodes in very small databases.
  408. ctr := id[0]
  409. rand.Read(id[:])
  410. id[0] = ctr + id[0]%16
  411. it.Seek(nodeKey(id))
  412. n := nextNode(it)
  413. if n == nil {
  414. id[0] = 0
  415. continue seek // iterator exhausted
  416. }
  417. if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
  418. continue seek
  419. }
  420. for i := range nodes {
  421. if nodes[i].ID() == n.ID() {
  422. continue seek // duplicate
  423. }
  424. }
  425. nodes = append(nodes, n)
  426. }
  427. return nodes
  428. }
  429. // reads the next node record from the iterator, skipping over other
  430. // database entries.
  431. func nextNode(it iterator.Iterator) *Node {
  432. for end := false; !end; end = !it.Next() {
  433. id, rest := splitNodeKey(it.Key())
  434. if string(rest) != dbDiscoverRoot {
  435. continue
  436. }
  437. return mustDecodeNode(id[:], it.Value())
  438. }
  439. return nil
  440. }
  441. // close flushes and closes the database files.
  442. func (db *DB) Close() {
  443. close(db.quit)
  444. db.lvl.Close()
  445. }