tx_journal.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "io"
  20. "os"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core/types"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/rlp"
  25. )
  26. // errNoActiveJournal is returned if a transaction is attempted to be inserted
  27. // into the journal, but no such file is currently open.
  28. var errNoActiveJournal = errors.New("no active journal")
  29. // devNull is a WriteCloser that just discards anything written into it. Its
  30. // goal is to allow the transaction journal to write into a fake journal when
  31. // loading transactions on startup without printing warnings due to no file
  32. // being read for write.
  33. type devNull struct{}
  34. func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
  35. func (*devNull) Close() error { return nil }
  36. // txJournal is a rotating log of transactions with the aim of storing locally
  37. // created transactions to allow non-executed ones to survive node restarts.
  38. type txJournal struct {
  39. path string // Filesystem path to store the transactions at
  40. writer io.WriteCloser // Output stream to write new transactions into
  41. }
  42. // newTxJournal creates a new transaction journal to
  43. func newTxJournal(path string) *txJournal {
  44. return &txJournal{
  45. path: path,
  46. }
  47. }
  48. // load parses a transaction journal dump from disk, loading its contents into
  49. // the specified pool.
  50. func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
  51. // Skip the parsing if the journal file doesn't exist at all
  52. if _, err := os.Stat(journal.path); os.IsNotExist(err) {
  53. return nil
  54. }
  55. // Open the journal for loading any past transactions
  56. input, err := os.Open(journal.path)
  57. if err != nil {
  58. return err
  59. }
  60. defer input.Close()
  61. // Temporarily discard any journal additions (don't double add on load)
  62. journal.writer = new(devNull)
  63. defer func() { journal.writer = nil }()
  64. // Inject all transactions from the journal into the pool
  65. stream := rlp.NewStream(input, 0)
  66. total, dropped := 0, 0
  67. // Create a method to load a limited batch of transactions and bump the
  68. // appropriate progress counters. Then use this method to load all the
  69. // journaled transactions in small-ish batches.
  70. loadBatch := func(txs types.Transactions) {
  71. for _, err := range add(txs) {
  72. if err != nil {
  73. log.Debug("Failed to add journaled transaction", "err", err)
  74. dropped++
  75. }
  76. }
  77. }
  78. var (
  79. failure error
  80. batch types.Transactions
  81. )
  82. for {
  83. // Parse the next transaction and terminate on error
  84. tx := new(types.Transaction)
  85. if err = stream.Decode(tx); err != nil {
  86. if err != io.EOF {
  87. failure = err
  88. }
  89. if batch.Len() > 0 {
  90. loadBatch(batch)
  91. }
  92. break
  93. }
  94. // New transaction parsed, queue up for later, import if threshold is reached
  95. total++
  96. if batch = append(batch, tx); batch.Len() > 1024 {
  97. loadBatch(batch)
  98. batch = batch[:0]
  99. }
  100. }
  101. log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
  102. return failure
  103. }
  104. // insert adds the specified transaction to the local disk journal.
  105. func (journal *txJournal) insert(tx *types.Transaction) error {
  106. if journal.writer == nil {
  107. return errNoActiveJournal
  108. }
  109. if err := rlp.Encode(journal.writer, tx); err != nil {
  110. return err
  111. }
  112. return nil
  113. }
  114. // rotate regenerates the transaction journal based on the current contents of
  115. // the transaction pool.
  116. func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
  117. // Close the current journal (if any is open)
  118. if journal.writer != nil {
  119. if err := journal.writer.Close(); err != nil {
  120. return err
  121. }
  122. journal.writer = nil
  123. }
  124. // Generate a new journal with the contents of the current pool
  125. replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
  126. if err != nil {
  127. return err
  128. }
  129. journaled := 0
  130. for _, txs := range all {
  131. for _, tx := range txs {
  132. if err = rlp.Encode(replacement, tx); err != nil {
  133. replacement.Close()
  134. return err
  135. }
  136. }
  137. journaled += len(txs)
  138. }
  139. replacement.Close()
  140. // Replace the live journal with the newly generated one
  141. if err = os.Rename(journal.path+".new", journal.path); err != nil {
  142. return err
  143. }
  144. sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
  145. if err != nil {
  146. return err
  147. }
  148. journal.writer = sink
  149. log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
  150. return nil
  151. }
  152. // close flushes the transaction journal contents to disk and closes the file.
  153. func (journal *txJournal) close() error {
  154. var err error
  155. if journal.writer != nil {
  156. err = journal.writer.Close()
  157. journal.writer = nil
  158. }
  159. return err
  160. }