api.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "context"
  19. "sync"
  20. "github.com/ethereum/go-ethereum"
  21. "github.com/ethereum/go-ethereum/event"
  22. "github.com/ethereum/go-ethereum/rpc"
  23. )
  24. // PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
  25. // It offers only methods that operates on data that can be available to anyone without security risks.
  26. type PublicDownloaderAPI struct {
  27. d *Downloader
  28. mux *event.TypeMux
  29. installSyncSubscription chan chan interface{}
  30. uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
  31. }
  32. // NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
  33. // listens for events from the downloader through the global event mux. In case it receives one of
  34. // these events it broadcasts it to all syncing subscriptions that are installed through the
  35. // installSyncSubscription channel.
  36. func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
  37. api := &PublicDownloaderAPI{
  38. d: d,
  39. mux: m,
  40. installSyncSubscription: make(chan chan interface{}),
  41. uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
  42. }
  43. go api.eventLoop()
  44. return api
  45. }
  46. // eventLoop runs a loop until the event mux closes. It will install and uninstall new
  47. // sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
  48. func (api *PublicDownloaderAPI) eventLoop() {
  49. var (
  50. sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
  51. syncSubscriptions = make(map[chan interface{}]struct{})
  52. )
  53. for {
  54. select {
  55. case i := <-api.installSyncSubscription:
  56. syncSubscriptions[i] = struct{}{}
  57. case u := <-api.uninstallSyncSubscription:
  58. delete(syncSubscriptions, u.c)
  59. close(u.uninstalled)
  60. case event := <-sub.Chan():
  61. if event == nil {
  62. return
  63. }
  64. var notification interface{}
  65. switch event.Data.(type) {
  66. case StartEvent:
  67. notification = &SyncingResult{
  68. Syncing: true,
  69. Status: api.d.Progress(),
  70. }
  71. case DoneEvent, FailedEvent:
  72. notification = false
  73. }
  74. // broadcast
  75. for c := range syncSubscriptions {
  76. c <- notification
  77. }
  78. }
  79. }
  80. }
  81. // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
  82. func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
  83. notifier, supported := rpc.NotifierFromContext(ctx)
  84. if !supported {
  85. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  86. }
  87. rpcSub := notifier.CreateSubscription()
  88. go func() {
  89. statuses := make(chan interface{})
  90. sub := api.SubscribeSyncStatus(statuses)
  91. for {
  92. select {
  93. case status := <-statuses:
  94. notifier.Notify(rpcSub.ID, status)
  95. case <-rpcSub.Err():
  96. sub.Unsubscribe()
  97. return
  98. case <-notifier.Closed():
  99. sub.Unsubscribe()
  100. return
  101. }
  102. }
  103. }()
  104. return rpcSub, nil
  105. }
  106. // SyncingResult provides information about the current synchronisation status for this node.
  107. type SyncingResult struct {
  108. Syncing bool `json:"syncing"`
  109. Status ethereum.SyncProgress `json:"status"`
  110. }
  111. // uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
  112. type uninstallSyncSubscriptionRequest struct {
  113. c chan interface{}
  114. uninstalled chan interface{}
  115. }
  116. // SyncStatusSubscription represents a syncing subscription.
  117. type SyncStatusSubscription struct {
  118. api *PublicDownloaderAPI // register subscription in event loop of this api instance
  119. c chan interface{} // channel where events are broadcasted to
  120. unsubOnce sync.Once // make sure unsubscribe logic is executed once
  121. }
  122. // Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
  123. // The status channel that was passed to subscribeSyncStatus isn't used anymore
  124. // after this method returns.
  125. func (s *SyncStatusSubscription) Unsubscribe() {
  126. s.unsubOnce.Do(func() {
  127. req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
  128. s.api.uninstallSyncSubscription <- &req
  129. for {
  130. select {
  131. case <-s.c:
  132. // drop new status events until uninstall confirmation
  133. continue
  134. case <-req.uninstalled:
  135. return
  136. }
  137. }
  138. })
  139. }
  140. // SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
  141. // The given channel must receive interface values, the result can either
  142. func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
  143. api.installSyncSubscription <- status
  144. return &SyncStatusSubscription{api: api, c: status}
  145. }