leveldb.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // +build !js
  17. // Package leveldb implements the key-value database layer based on LevelDB.
  18. package leveldb
  19. import (
  20. "fmt"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/ethdb"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/syndtr/goleveldb/leveldb"
  30. "github.com/syndtr/goleveldb/leveldb/errors"
  31. "github.com/syndtr/goleveldb/leveldb/filter"
  32. "github.com/syndtr/goleveldb/leveldb/opt"
  33. "github.com/syndtr/goleveldb/leveldb/util"
  34. )
  35. const (
  36. // degradationWarnInterval specifies how often warning should be printed if the
  37. // leveldb database cannot keep up with requested writes.
  38. degradationWarnInterval = time.Minute
  39. // minCache is the minimum amount of memory in megabytes to allocate to leveldb
  40. // read and write caching, split half and half.
  41. minCache = 16
  42. // minHandles is the minimum number of files handles to allocate to the open
  43. // database files.
  44. minHandles = 16
  45. // metricsGatheringInterval specifies the interval to retrieve leveldb database
  46. // compaction, io and pause stats to report to the user.
  47. metricsGatheringInterval = 3 * time.Second
  48. )
  49. // Database is a persistent key-value store. Apart from basic data storage
  50. // functionality it also supports batch writes and iterating over the keyspace in
  51. // binary-alphabetical order.
  52. type Database struct {
  53. fn string // filename for reporting
  54. db *leveldb.DB // LevelDB instance
  55. compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
  56. compReadMeter metrics.Meter // Meter for measuring the data read during compaction
  57. compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
  58. writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
  59. writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
  60. diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
  61. diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
  62. diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
  63. memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
  64. level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
  65. nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
  66. seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
  67. quitLock sync.Mutex // Mutex protecting the quit channel access
  68. quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
  69. log log.Logger // Contextual logger tracking the database path
  70. }
  71. // New returns a wrapped LevelDB object. The namespace is the prefix that the
  72. // metrics reporting should use for surfacing internal stats.
  73. func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
  74. return NewCustom(file, namespace, func(options *opt.Options) {
  75. // Ensure we have some minimal caching and file guarantees
  76. if cache < minCache {
  77. cache = minCache
  78. }
  79. if handles < minHandles {
  80. handles = minHandles
  81. }
  82. // Set default options
  83. options.OpenFilesCacheCapacity = handles
  84. options.BlockCacheCapacity = cache / 2 * opt.MiB
  85. options.WriteBuffer = cache / 4 * opt.MiB // Two of these are used internally
  86. if readonly {
  87. options.ReadOnly = true
  88. }
  89. })
  90. }
  91. // NewCustom returns a wrapped LevelDB object. The namespace is the prefix that the
  92. // metrics reporting should use for surfacing internal stats.
  93. // The customize function allows the caller to modify the leveldb options.
  94. func NewCustom(file string, namespace string, customize func(options *opt.Options)) (*Database, error) {
  95. options := configureOptions(customize)
  96. logger := log.New("database", file)
  97. usedCache := options.GetBlockCacheCapacity() + options.GetWriteBuffer()*2
  98. logCtx := []interface{}{"cache", common.StorageSize(usedCache), "handles", options.GetOpenFilesCacheCapacity()}
  99. if options.ReadOnly {
  100. logCtx = append(logCtx, "readonly", "true")
  101. }
  102. logger.Info("Allocated cache and file handles", logCtx...)
  103. // Open the db and recover any potential corruptions
  104. db, err := leveldb.OpenFile(file, options)
  105. if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
  106. db, err = leveldb.RecoverFile(file, nil)
  107. }
  108. if err != nil {
  109. return nil, err
  110. }
  111. // Assemble the wrapper with all the registered metrics
  112. ldb := &Database{
  113. fn: file,
  114. db: db,
  115. log: logger,
  116. quitChan: make(chan chan error),
  117. }
  118. ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
  119. ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
  120. ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
  121. ldb.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
  122. ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
  123. ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
  124. ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
  125. ldb.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil)
  126. ldb.memCompGauge = metrics.NewRegisteredGauge(namespace+"compact/memory", nil)
  127. ldb.level0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/level0", nil)
  128. ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
  129. ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
  130. // Start up the metrics gathering and return
  131. go ldb.meter(metricsGatheringInterval)
  132. return ldb, nil
  133. }
  134. // configureOptions sets some default options, then runs the provided setter.
  135. func configureOptions(customizeFn func(*opt.Options)) *opt.Options {
  136. // Set default options
  137. options := &opt.Options{
  138. Filter: filter.NewBloomFilter(10),
  139. DisableSeeksCompaction: true,
  140. }
  141. // Allow caller to make custom modifications to the options
  142. if customizeFn != nil {
  143. customizeFn(options)
  144. }
  145. return options
  146. }
  147. // Close stops the metrics collection, flushes any pending data to disk and closes
  148. // all io accesses to the underlying key-value store.
  149. func (db *Database) Close() error {
  150. db.quitLock.Lock()
  151. defer db.quitLock.Unlock()
  152. if db.quitChan != nil {
  153. errc := make(chan error)
  154. db.quitChan <- errc
  155. if err := <-errc; err != nil {
  156. db.log.Error("Metrics collection failed", "err", err)
  157. }
  158. db.quitChan = nil
  159. }
  160. return db.db.Close()
  161. }
  162. // Has retrieves if a key is present in the key-value store.
  163. func (db *Database) Has(key []byte) (bool, error) {
  164. return db.db.Has(key, nil)
  165. }
  166. // Get retrieves the given key if it's present in the key-value store.
  167. func (db *Database) Get(key []byte) ([]byte, error) {
  168. dat, err := db.db.Get(key, nil)
  169. if err != nil {
  170. return nil, err
  171. }
  172. return dat, nil
  173. }
  174. // Put inserts the given value into the key-value store.
  175. func (db *Database) Put(key []byte, value []byte) error {
  176. return db.db.Put(key, value, nil)
  177. }
  178. // Delete removes the key from the key-value store.
  179. func (db *Database) Delete(key []byte) error {
  180. return db.db.Delete(key, nil)
  181. }
  182. // NewBatch creates a write-only key-value store that buffers changes to its host
  183. // database until a final write is called.
  184. func (db *Database) NewBatch() ethdb.Batch {
  185. return &batch{
  186. db: db.db,
  187. b: new(leveldb.Batch),
  188. }
  189. }
  190. // NewIterator creates a binary-alphabetical iterator over a subset
  191. // of database content with a particular key prefix, starting at a particular
  192. // initial key (or after, if it does not exist).
  193. func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
  194. return db.db.NewIterator(bytesPrefixRange(prefix, start), nil)
  195. }
  196. // Stat returns a particular internal stat of the database.
  197. func (db *Database) Stat(property string) (string, error) {
  198. return db.db.GetProperty(property)
  199. }
  200. // Compact flattens the underlying data store for the given key range. In essence,
  201. // deleted and overwritten versions are discarded, and the data is rearranged to
  202. // reduce the cost of operations needed to access them.
  203. //
  204. // A nil start is treated as a key before all keys in the data store; a nil limit
  205. // is treated as a key after all keys in the data store. If both is nil then it
  206. // will compact entire data store.
  207. func (db *Database) Compact(start []byte, limit []byte) error {
  208. return db.db.CompactRange(util.Range{Start: start, Limit: limit})
  209. }
  210. // Path returns the path to the database directory.
  211. func (db *Database) Path() string {
  212. return db.fn
  213. }
  214. // meter periodically retrieves internal leveldb counters and reports them to
  215. // the metrics subsystem.
  216. //
  217. // This is how a LevelDB stats table looks like (currently):
  218. // Compactions
  219. // Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
  220. // -------+------------+---------------+---------------+---------------+---------------
  221. // 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
  222. // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
  223. // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
  224. // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
  225. //
  226. // This is how the write delay look like (currently):
  227. // DelayN:5 Delay:406.604657ms Paused: false
  228. //
  229. // This is how the iostats look like (currently):
  230. // Read(MB):3895.04860 Write(MB):3654.64712
  231. func (db *Database) meter(refresh time.Duration) {
  232. // Create the counters to store current and previous compaction values
  233. compactions := make([][]float64, 2)
  234. for i := 0; i < 2; i++ {
  235. compactions[i] = make([]float64, 4)
  236. }
  237. // Create storage for iostats.
  238. var iostats [2]float64
  239. // Create storage and warning log tracer for write delay.
  240. var (
  241. delaystats [2]int64
  242. lastWritePaused time.Time
  243. )
  244. var (
  245. errc chan error
  246. merr error
  247. )
  248. timer := time.NewTimer(refresh)
  249. defer timer.Stop()
  250. // Iterate ad infinitum and collect the stats
  251. for i := 1; errc == nil && merr == nil; i++ {
  252. // Retrieve the database stats
  253. stats, err := db.db.GetProperty("leveldb.stats")
  254. if err != nil {
  255. db.log.Error("Failed to read database stats", "err", err)
  256. merr = err
  257. continue
  258. }
  259. // Find the compaction table, skip the header
  260. lines := strings.Split(stats, "\n")
  261. for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
  262. lines = lines[1:]
  263. }
  264. if len(lines) <= 3 {
  265. db.log.Error("Compaction leveldbTable not found")
  266. merr = errors.New("compaction leveldbTable not found")
  267. continue
  268. }
  269. lines = lines[3:]
  270. // Iterate over all the leveldbTable rows, and accumulate the entries
  271. for j := 0; j < len(compactions[i%2]); j++ {
  272. compactions[i%2][j] = 0
  273. }
  274. for _, line := range lines {
  275. parts := strings.Split(line, "|")
  276. if len(parts) != 6 {
  277. break
  278. }
  279. for idx, counter := range parts[2:] {
  280. value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
  281. if err != nil {
  282. db.log.Error("Compaction entry parsing failed", "err", err)
  283. merr = err
  284. continue
  285. }
  286. compactions[i%2][idx] += value
  287. }
  288. }
  289. // Update all the requested meters
  290. if db.diskSizeGauge != nil {
  291. db.diskSizeGauge.Update(int64(compactions[i%2][0] * 1024 * 1024))
  292. }
  293. if db.compTimeMeter != nil {
  294. db.compTimeMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1000 * 1000 * 1000))
  295. }
  296. if db.compReadMeter != nil {
  297. db.compReadMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
  298. }
  299. if db.compWriteMeter != nil {
  300. db.compWriteMeter.Mark(int64((compactions[i%2][3] - compactions[(i-1)%2][3]) * 1024 * 1024))
  301. }
  302. // Retrieve the write delay statistic
  303. writedelay, err := db.db.GetProperty("leveldb.writedelay")
  304. if err != nil {
  305. db.log.Error("Failed to read database write delay statistic", "err", err)
  306. merr = err
  307. continue
  308. }
  309. var (
  310. delayN int64
  311. delayDuration string
  312. duration time.Duration
  313. paused bool
  314. )
  315. if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
  316. db.log.Error("Write delay statistic not found")
  317. merr = err
  318. continue
  319. }
  320. duration, err = time.ParseDuration(delayDuration)
  321. if err != nil {
  322. db.log.Error("Failed to parse delay duration", "err", err)
  323. merr = err
  324. continue
  325. }
  326. if db.writeDelayNMeter != nil {
  327. db.writeDelayNMeter.Mark(delayN - delaystats[0])
  328. }
  329. if db.writeDelayMeter != nil {
  330. db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
  331. }
  332. // If a warning that db is performing compaction has been displayed, any subsequent
  333. // warnings will be withheld for one minute not to overwhelm the user.
  334. if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
  335. time.Now().After(lastWritePaused.Add(degradationWarnInterval)) {
  336. db.log.Warn("Database compacting, degraded performance")
  337. lastWritePaused = time.Now()
  338. }
  339. delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
  340. // Retrieve the database iostats.
  341. ioStats, err := db.db.GetProperty("leveldb.iostats")
  342. if err != nil {
  343. db.log.Error("Failed to read database iostats", "err", err)
  344. merr = err
  345. continue
  346. }
  347. var nRead, nWrite float64
  348. parts := strings.Split(ioStats, " ")
  349. if len(parts) < 2 {
  350. db.log.Error("Bad syntax of ioStats", "ioStats", ioStats)
  351. merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
  352. continue
  353. }
  354. if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
  355. db.log.Error("Bad syntax of read entry", "entry", parts[0])
  356. merr = err
  357. continue
  358. }
  359. if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
  360. db.log.Error("Bad syntax of write entry", "entry", parts[1])
  361. merr = err
  362. continue
  363. }
  364. if db.diskReadMeter != nil {
  365. db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
  366. }
  367. if db.diskWriteMeter != nil {
  368. db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
  369. }
  370. iostats[0], iostats[1] = nRead, nWrite
  371. compCount, err := db.db.GetProperty("leveldb.compcount")
  372. if err != nil {
  373. db.log.Error("Failed to read database iostats", "err", err)
  374. merr = err
  375. continue
  376. }
  377. var (
  378. memComp uint32
  379. level0Comp uint32
  380. nonLevel0Comp uint32
  381. seekComp uint32
  382. )
  383. if n, err := fmt.Sscanf(compCount, "MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", &memComp, &level0Comp, &nonLevel0Comp, &seekComp); n != 4 || err != nil {
  384. db.log.Error("Compaction count statistic not found")
  385. merr = err
  386. continue
  387. }
  388. db.memCompGauge.Update(int64(memComp))
  389. db.level0CompGauge.Update(int64(level0Comp))
  390. db.nonlevel0CompGauge.Update(int64(nonLevel0Comp))
  391. db.seekCompGauge.Update(int64(seekComp))
  392. // Sleep a bit, then repeat the stats collection
  393. select {
  394. case errc = <-db.quitChan:
  395. // Quit requesting, stop hammering the database
  396. case <-timer.C:
  397. timer.Reset(refresh)
  398. // Timeout, gather a new set of stats
  399. }
  400. }
  401. if errc == nil {
  402. errc = <-db.quitChan
  403. }
  404. errc <- merr
  405. }
  406. // batch is a write-only leveldb batch that commits changes to its host database
  407. // when Write is called. A batch cannot be used concurrently.
  408. type batch struct {
  409. db *leveldb.DB
  410. b *leveldb.Batch
  411. size int
  412. }
  413. // Put inserts the given value into the batch for later committing.
  414. func (b *batch) Put(key, value []byte) error {
  415. b.b.Put(key, value)
  416. b.size += len(value)
  417. return nil
  418. }
  419. // Delete inserts the a key removal into the batch for later committing.
  420. func (b *batch) Delete(key []byte) error {
  421. b.b.Delete(key)
  422. b.size += len(key)
  423. return nil
  424. }
  425. // ValueSize retrieves the amount of data queued up for writing.
  426. func (b *batch) ValueSize() int {
  427. return b.size
  428. }
  429. // Write flushes any accumulated data to disk.
  430. func (b *batch) Write() error {
  431. return b.db.Write(b.b, nil)
  432. }
  433. // Reset resets the batch for reuse.
  434. func (b *batch) Reset() {
  435. b.b.Reset()
  436. b.size = 0
  437. }
  438. // Replay replays the batch contents.
  439. func (b *batch) Replay(w ethdb.KeyValueWriter) error {
  440. return b.b.Replay(&replayer{writer: w})
  441. }
  442. // replayer is a small wrapper to implement the correct replay methods.
  443. type replayer struct {
  444. writer ethdb.KeyValueWriter
  445. failure error
  446. }
  447. // Put inserts the given value into the key-value data store.
  448. func (r *replayer) Put(key, value []byte) {
  449. // If the replay already failed, stop executing ops
  450. if r.failure != nil {
  451. return
  452. }
  453. r.failure = r.writer.Put(key, value)
  454. }
  455. // Delete removes the key from the key-value data store.
  456. func (r *replayer) Delete(key []byte) {
  457. // If the replay already failed, stop executing ops
  458. if r.failure != nil {
  459. return
  460. }
  461. r.failure = r.writer.Delete(key)
  462. }
  463. // bytesPrefixRange returns key range that satisfy
  464. // - the given prefix, and
  465. // - the given seek position
  466. func bytesPrefixRange(prefix, start []byte) *util.Range {
  467. r := util.BytesPrefix(prefix)
  468. r.Start = append(r.Start, start...)
  469. return r
  470. }