sync.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "math/big"
  19. "math/rand"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/rawdb"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/eth/downloader"
  26. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/p2p/enode"
  29. "github.com/ethereum/go-ethereum/permission/core"
  30. )
  31. const (
  32. forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
  33. defaultMinSyncPeers = 5 // Amount of peers desired to start syncing
  34. // This is the target size for the packs of transactions sent by txsyncLoop64.
  35. // A pack can get larger than this if a single transactions exceeds this size.
  36. txsyncPackSize = 100 * 1024
  37. )
  38. type txsync struct {
  39. p *eth.Peer
  40. txs []*types.Transaction
  41. }
  42. // syncTransactions starts sending all currently pending transactions to the given peer.
  43. func (h *handler) syncTransactions(p *eth.Peer) {
  44. // Assemble the set of transaction to broadcast or announce to the remote
  45. // peer. Fun fact, this is quite an expensive operation as it needs to sort
  46. // the transactions if the sorting is not cached yet. However, with a random
  47. // order, insertions could overflow the non-executable queues and get dropped.
  48. //
  49. // TODO(karalabe): Figure out if we could get away with random order somehow
  50. var txs types.Transactions
  51. pending, _ := h.txpool.Pending()
  52. for _, batch := range pending {
  53. txs = append(txs, batch...)
  54. }
  55. if len(txs) == 0 {
  56. return
  57. }
  58. // The eth/65 protocol introduces proper transaction announcements, so instead
  59. // of dripping transactions across multiple peers, just send the entire list as
  60. // an announcement and let the remote side decide what they need (likely nothing).
  61. if p.Version() >= eth.ETH65 {
  62. hashes := make([]common.Hash, len(txs))
  63. for i, tx := range txs {
  64. hashes[i] = tx.Hash()
  65. }
  66. p.AsyncSendPooledTransactionHashes(hashes)
  67. return
  68. }
  69. // Out of luck, peer is running legacy protocols, drop the txs over
  70. select {
  71. case h.txsyncCh <- &txsync{p: p, txs: txs}:
  72. case <-h.quitSync:
  73. }
  74. }
  75. // txsyncLoop64 takes care of the initial transaction sync for each new
  76. // connection. When a new peer appears, we relay all currently pending
  77. // transactions. In order to minimise egress bandwidth usage, we send
  78. // the transactions in small packs to one peer at a time.
  79. func (h *handler) txsyncLoop64() {
  80. defer h.wg.Done()
  81. var (
  82. pending = make(map[enode.ID]*txsync)
  83. sending = false // whether a send is active
  84. pack = new(txsync) // the pack that is being sent
  85. done = make(chan error, 1) // result of the send
  86. )
  87. // send starts a sending a pack of transactions from the sync.
  88. send := func(s *txsync) {
  89. if s.p.Version() >= eth.ETH65 {
  90. panic("initial transaction syncer running on eth/65+")
  91. }
  92. // Fill pack with transactions up to the target size.
  93. size := common.StorageSize(0)
  94. pack.p = s.p
  95. pack.txs = pack.txs[:0]
  96. for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
  97. pack.txs = append(pack.txs, s.txs[i])
  98. size += s.txs[i].Size()
  99. }
  100. // Remove the transactions that will be sent.
  101. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
  102. if len(s.txs) == 0 {
  103. delete(pending, s.p.Peer.ID())
  104. }
  105. // Send the pack in the background.
  106. s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
  107. sending = true
  108. go func() { done <- pack.p.SendTransactions(pack.txs) }()
  109. }
  110. // pick chooses the next pending sync.
  111. pick := func() *txsync {
  112. if len(pending) == 0 {
  113. return nil
  114. }
  115. n := rand.Intn(len(pending)) + 1
  116. for _, s := range pending {
  117. if n--; n == 0 {
  118. return s
  119. }
  120. }
  121. return nil
  122. }
  123. for {
  124. select {
  125. case s := <-h.txsyncCh:
  126. pending[s.p.Peer.ID()] = s
  127. if !sending {
  128. send(s)
  129. }
  130. case err := <-done:
  131. sending = false
  132. // Stop tracking peers that cause send failures.
  133. if err != nil {
  134. pack.p.Log().Debug("Transaction send failed", "err", err)
  135. delete(pending, pack.p.Peer.ID())
  136. }
  137. // Schedule the next send.
  138. if s := pick(); s != nil {
  139. send(s)
  140. }
  141. case <-h.quitSync:
  142. return
  143. }
  144. }
  145. }
  146. // chainSyncer coordinates blockchain sync components.
  147. type chainSyncer struct {
  148. handler *handler
  149. force *time.Timer
  150. forced bool // true when force timer fired
  151. peerEventCh chan struct{}
  152. doneCh chan error // non-nil when sync is running
  153. }
  154. // chainSyncOp is a scheduled sync operation.
  155. type chainSyncOp struct {
  156. mode downloader.SyncMode
  157. peer *eth.Peer
  158. td *big.Int
  159. head common.Hash
  160. }
  161. // newChainSyncer creates a chainSyncer.
  162. func newChainSyncer(handler *handler) *chainSyncer {
  163. return &chainSyncer{
  164. handler: handler,
  165. peerEventCh: make(chan struct{}),
  166. }
  167. }
  168. // handlePeerEvent notifies the syncer about a change in the peer set.
  169. // This is called for new peers and every time a peer announces a new
  170. // chain head.
  171. func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool {
  172. select {
  173. case cs.peerEventCh <- struct{}{}:
  174. return true
  175. case <-cs.handler.quitSync:
  176. return false
  177. }
  178. }
  179. // loop runs in its own goroutine and launches the sync when necessary.
  180. func (cs *chainSyncer) loop() {
  181. defer cs.handler.wg.Done()
  182. cs.handler.blockFetcher.Start()
  183. cs.handler.txFetcher.Start()
  184. defer cs.handler.blockFetcher.Stop()
  185. defer cs.handler.txFetcher.Stop()
  186. defer cs.handler.downloader.Terminate()
  187. // The force timer lowers the peer count threshold down to one when it fires.
  188. // This ensures we'll always start sync even if there aren't enough peers.
  189. cs.force = time.NewTimer(forceSyncCycle)
  190. defer cs.force.Stop()
  191. for {
  192. if op := cs.nextSyncOp(); op != nil {
  193. if !cs.handler.raftMode {
  194. cs.startSync(op)
  195. }
  196. }
  197. select {
  198. case <-cs.peerEventCh:
  199. // Peer information changed, recheck.
  200. case <-cs.doneCh:
  201. cs.doneCh = nil
  202. cs.force.Reset(forceSyncCycle)
  203. cs.forced = false
  204. case <-cs.force.C:
  205. cs.forced = true
  206. case <-cs.handler.quitSync:
  207. // Disable all insertion on the blockchain. This needs to happen before
  208. // terminating the downloader because the downloader waits for blockchain
  209. // inserts, and these can take a long time to finish.
  210. cs.handler.chain.StopInsert()
  211. cs.handler.downloader.Terminate()
  212. if cs.doneCh != nil {
  213. <-cs.doneCh
  214. }
  215. return
  216. }
  217. }
  218. }
  219. // nextSyncOp determines whether sync is required at this time.
  220. func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
  221. if cs.doneCh != nil {
  222. return nil // Sync already running.
  223. }
  224. // Ensure we're at minimum peer count.
  225. minPeers := defaultMinSyncPeers
  226. if cs.forced {
  227. minPeers = 1
  228. } else if minPeers > cs.handler.maxPeers {
  229. minPeers = cs.handler.maxPeers
  230. }
  231. if cs.handler.peers.len() < minPeers {
  232. return nil
  233. }
  234. // We have enough peers, check TD
  235. peer := cs.handler.peers.peerWithHighestTD()
  236. if peer == nil {
  237. return nil
  238. }
  239. mode, ourTD := cs.modeAndLocalHead()
  240. if mode == downloader.FastSync && atomic.LoadUint32(&cs.handler.snapSync) == 1 {
  241. // Fast sync via the snap protocol
  242. mode = downloader.SnapSync
  243. }
  244. op := peerToSyncOp(mode, peer)
  245. if op.td.Cmp(ourTD) <= 0 {
  246. // Quorum
  247. // added for permissions changes to indicate node sync up has started
  248. // if peer's TD is smaller than ours, no sync will happen
  249. core.SetSyncStatus()
  250. return nil // We're in sync.
  251. }
  252. return op
  253. }
  254. func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
  255. peerHead, peerTD := p.Head()
  256. return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead}
  257. }
  258. func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
  259. // If we're in fast sync mode, return that directly
  260. if atomic.LoadUint32(&cs.handler.fastSync) == 1 {
  261. block := cs.handler.chain.CurrentFastBlock()
  262. td := cs.handler.chain.GetTdByHash(block.Hash())
  263. return downloader.FastSync, td
  264. }
  265. // We are probably in full sync, but we might have rewound to before the
  266. // fast sync pivot, check if we should reenable
  267. if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil {
  268. if head := cs.handler.chain.CurrentBlock(); head.NumberU64() < *pivot {
  269. block := cs.handler.chain.CurrentFastBlock()
  270. td := cs.handler.chain.GetTdByHash(block.Hash())
  271. return downloader.FastSync, td
  272. }
  273. }
  274. // Nope, we're really full syncing
  275. head := cs.handler.chain.CurrentBlock()
  276. td := cs.handler.chain.GetTd(head.Hash(), head.NumberU64())
  277. return downloader.FullSync, td
  278. }
  279. // startSync launches doSync in a new goroutine.
  280. func (cs *chainSyncer) startSync(op *chainSyncOp) {
  281. cs.doneCh = make(chan error, 1)
  282. go func() { cs.doneCh <- cs.handler.doSync(op) }()
  283. }
  284. // doSync synchronizes the local blockchain with a remote peer.
  285. func (h *handler) doSync(op *chainSyncOp) error {
  286. if op.mode == downloader.FastSync || op.mode == downloader.SnapSync {
  287. // Before launch the fast sync, we have to ensure user uses the same
  288. // txlookup limit.
  289. // The main concern here is: during the fast sync Geth won't index the
  290. // block(generate tx indices) before the HEAD-limit. But if user changes
  291. // the limit in the next fast sync(e.g. user kill Geth manually and
  292. // restart) then it will be hard for Geth to figure out the oldest block
  293. // has been indexed. So here for the user-experience wise, it's non-optimal
  294. // that user can't change limit during the fast sync. If changed, Geth
  295. // will just blindly use the original one.
  296. limit := h.chain.TxLookupLimit()
  297. if stored := rawdb.ReadFastTxLookupLimit(h.database); stored == nil {
  298. rawdb.WriteFastTxLookupLimit(h.database, limit)
  299. } else if *stored != limit {
  300. h.chain.SetTxLookupLimit(*stored)
  301. log.Warn("Update txLookup limit", "provided", limit, "updated", *stored)
  302. }
  303. }
  304. // Run the sync cycle, and disable fast sync if we're past the pivot block
  305. err := h.downloader.Synchronise(op.peer.ID(), op.head, op.td, op.mode)
  306. if err != nil {
  307. return err
  308. }
  309. if atomic.LoadUint32(&h.fastSync) == 1 {
  310. log.Info("Fast sync complete, auto disabling")
  311. atomic.StoreUint32(&h.fastSync, 0)
  312. }
  313. /* Disabled by Quorum
  314. if atomic.LoadUint32(&h.snapSync) == 1 {
  315. log.Info("Snap sync complete, auto disabling")
  316. atomic.StoreUint32(&h.snapSync, 0)
  317. }
  318. */
  319. // If we've successfully finished a sync cycle and passed any required checkpoint,
  320. // enable accepting transactions from the network.
  321. head := h.chain.CurrentBlock()
  322. if head.NumberU64() >= h.checkpointNumber {
  323. // Checkpoint passed, sanity check the timestamp to have a fallback mechanism
  324. // for non-checkpointed (number = 0) private networks.
  325. if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
  326. atomic.StoreUint32(&h.acceptTxs, 1)
  327. }
  328. }
  329. if head.NumberU64() > 0 {
  330. // We've completed a sync cycle, notify all peers of new state. This path is
  331. // essential in star-topology networks where a gateway node needs to notify
  332. // all its out-of-date peers of the availability of a new block. This failure
  333. // scenario will most often crop up in private and hackathon networks with
  334. // degenerate connectivity, but it should be healthy for the mainnet too to
  335. // more reliably update peers or the local TD state.
  336. h.BroadcastBlock(head, false)
  337. }
  338. return nil
  339. }