api.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/hexutil"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. )
  33. // filter is a helper struct that holds meta information over the filter type
  34. // and associated subscription in the event system.
  35. type filter struct {
  36. typ Type
  37. deadline *time.Timer // filter is inactiv when deadline triggers
  38. hashes []common.Hash
  39. crit FilterCriteria
  40. logs []*types.Log
  41. s *Subscription // associated subscription in event system
  42. }
  43. // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
  44. // information related to the Ethereum protocol such als blocks, transactions and logs.
  45. type PublicFilterAPI struct {
  46. backend Backend
  47. mux *event.TypeMux
  48. quit chan struct{}
  49. chainDb ethdb.Database
  50. events *EventSystem
  51. filtersMu sync.Mutex
  52. filters map[rpc.ID]*filter
  53. timeout time.Duration
  54. }
  55. // NewPublicFilterAPI returns a new PublicFilterAPI instance.
  56. func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *PublicFilterAPI {
  57. api := &PublicFilterAPI{
  58. backend: backend,
  59. chainDb: backend.ChainDb(),
  60. events: NewEventSystem(backend, lightMode),
  61. filters: make(map[rpc.ID]*filter),
  62. timeout: timeout,
  63. }
  64. go api.timeoutLoop(timeout)
  65. return api
  66. }
  67. // timeoutLoop runs at the interval set by 'timeout' and deletes filters
  68. // that have not been recently used. It is started when the API is created.
  69. func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
  70. var toUninstall []*Subscription
  71. ticker := time.NewTicker(timeout)
  72. defer ticker.Stop()
  73. for {
  74. <-ticker.C
  75. api.filtersMu.Lock()
  76. for id, f := range api.filters {
  77. select {
  78. case <-f.deadline.C:
  79. toUninstall = append(toUninstall, f.s)
  80. delete(api.filters, id)
  81. default:
  82. continue
  83. }
  84. }
  85. api.filtersMu.Unlock()
  86. // Unsubscribes are processed outside the lock to avoid the following scenario:
  87. // event loop attempts broadcasting events to still active filters while
  88. // Unsubscribe is waiting for it to process the uninstall request.
  89. for _, s := range toUninstall {
  90. s.Unsubscribe()
  91. }
  92. toUninstall = nil
  93. }
  94. }
  95. // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
  96. // as transactions enter the pending state.
  97. //
  98. // It is part of the filter package because this filter can be used through the
  99. // `eth_getFilterChanges` polling method that is also used for log filters.
  100. //
  101. // https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter
  102. func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
  103. var (
  104. pendingTxs = make(chan []common.Hash)
  105. pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
  106. )
  107. api.filtersMu.Lock()
  108. api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
  109. api.filtersMu.Unlock()
  110. go func() {
  111. for {
  112. select {
  113. case ph := <-pendingTxs:
  114. api.filtersMu.Lock()
  115. if f, found := api.filters[pendingTxSub.ID]; found {
  116. f.hashes = append(f.hashes, ph...)
  117. }
  118. api.filtersMu.Unlock()
  119. case <-pendingTxSub.Err():
  120. api.filtersMu.Lock()
  121. delete(api.filters, pendingTxSub.ID)
  122. api.filtersMu.Unlock()
  123. return
  124. }
  125. }
  126. }()
  127. return pendingTxSub.ID
  128. }
  129. // NewPendingTransactions creates a subscription that is triggered each time a transaction
  130. // enters the transaction pool and was signed from one of the transactions this nodes manages.
  131. func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
  132. notifier, supported := rpc.NotifierFromContext(ctx)
  133. if !supported {
  134. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  135. }
  136. rpcSub := notifier.CreateSubscription()
  137. go func() {
  138. txHashes := make(chan []common.Hash, 128)
  139. pendingTxSub := api.events.SubscribePendingTxs(txHashes)
  140. for {
  141. select {
  142. case hashes := <-txHashes:
  143. // To keep the original behaviour, send a single tx hash in one notification.
  144. // TODO(rjl493456442) Send a batch of tx hashes in one notification
  145. for _, h := range hashes {
  146. notifier.Notify(rpcSub.ID, h)
  147. }
  148. case <-rpcSub.Err():
  149. pendingTxSub.Unsubscribe()
  150. return
  151. case <-notifier.Closed():
  152. pendingTxSub.Unsubscribe()
  153. return
  154. }
  155. }
  156. }()
  157. return rpcSub, nil
  158. }
  159. // NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
  160. // It is part of the filter package since polling goes with eth_getFilterChanges.
  161. //
  162. // https://eth.wiki/json-rpc/API#eth_newblockfilter
  163. func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
  164. var (
  165. headers = make(chan *types.Header)
  166. headerSub = api.events.SubscribeNewHeads(headers)
  167. )
  168. api.filtersMu.Lock()
  169. api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
  170. api.filtersMu.Unlock()
  171. go func() {
  172. for {
  173. select {
  174. case h := <-headers:
  175. api.filtersMu.Lock()
  176. if f, found := api.filters[headerSub.ID]; found {
  177. f.hashes = append(f.hashes, h.Hash())
  178. }
  179. api.filtersMu.Unlock()
  180. case <-headerSub.Err():
  181. api.filtersMu.Lock()
  182. delete(api.filters, headerSub.ID)
  183. api.filtersMu.Unlock()
  184. return
  185. }
  186. }
  187. }()
  188. return headerSub.ID
  189. }
  190. // NewHeads send a notification each time a new (header) block is appended to the chain.
  191. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
  192. notifier, supported := rpc.NotifierFromContext(ctx)
  193. if !supported {
  194. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  195. }
  196. rpcSub := notifier.CreateSubscription()
  197. go func() {
  198. headers := make(chan *types.Header)
  199. headersSub := api.events.SubscribeNewHeads(headers)
  200. for {
  201. select {
  202. case h := <-headers:
  203. notifier.Notify(rpcSub.ID, h)
  204. case <-rpcSub.Err():
  205. headersSub.Unsubscribe()
  206. return
  207. case <-notifier.Closed():
  208. headersSub.Unsubscribe()
  209. return
  210. }
  211. }
  212. }()
  213. return rpcSub, nil
  214. }
  215. // Logs creates a subscription that fires for all new log that match the given filter criteria.
  216. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
  217. notifier, supported := rpc.NotifierFromContext(ctx)
  218. if !supported {
  219. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  220. }
  221. var (
  222. rpcSub = notifier.CreateSubscription()
  223. matchedLogs = make(chan []*types.Log)
  224. )
  225. psm, err := api.backend.PSMR().ResolveForUserContext(ctx)
  226. if err != nil {
  227. return nil, err
  228. }
  229. crit.PSI = psm.ID
  230. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
  231. if err != nil {
  232. return nil, err
  233. }
  234. go func() {
  235. for {
  236. select {
  237. case logs := <-matchedLogs:
  238. for _, log := range logs {
  239. notifier.Notify(rpcSub.ID, &log)
  240. }
  241. case <-rpcSub.Err(): // client send an unsubscribe request
  242. logsSub.Unsubscribe()
  243. return
  244. case <-notifier.Closed(): // connection dropped
  245. logsSub.Unsubscribe()
  246. return
  247. }
  248. }
  249. }()
  250. return rpcSub, nil
  251. }
  252. // FilterCriteria represents a request to create a new filter.
  253. // Same as ethereum.FilterQuery but with UnmarshalJSON() method.
  254. type FilterCriteria ethereum.FilterQuery
  255. // NewFilter creates a new filter and returns the filter id. It can be
  256. // used to retrieve logs when the state changes. This method cannot be
  257. // used to fetch logs that are already stored in the state.
  258. //
  259. // Default criteria for the from and to block are "latest".
  260. // Using "latest" as block number will return logs for mined blocks.
  261. // Using "pending" as block number returns logs for not yet mined (pending) blocks.
  262. // In case logs are removed (chain reorg) previously returned logs are returned
  263. // again but with the removed property set to true.
  264. //
  265. // In case "fromBlock" > "toBlock" an error is returned.
  266. //
  267. // https://eth.wiki/json-rpc/API#eth_newfilter
  268. func (api *PublicFilterAPI) NewFilter(ctx context.Context, crit FilterCriteria) (rpc.ID, error) {
  269. logs := make(chan []*types.Log)
  270. psm, err := api.backend.PSMR().ResolveForUserContext(ctx)
  271. if err != nil {
  272. return "", err
  273. }
  274. crit.PSI = psm.ID
  275. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
  276. if err != nil {
  277. return rpc.ID(""), err // Quorum
  278. }
  279. api.filtersMu.Lock()
  280. api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
  281. api.filtersMu.Unlock()
  282. go func() {
  283. for {
  284. select {
  285. case l := <-logs:
  286. api.filtersMu.Lock()
  287. if f, found := api.filters[logsSub.ID]; found {
  288. f.logs = append(f.logs, l...)
  289. }
  290. api.filtersMu.Unlock()
  291. case <-logsSub.Err():
  292. api.filtersMu.Lock()
  293. delete(api.filters, logsSub.ID)
  294. api.filtersMu.Unlock()
  295. return
  296. }
  297. }
  298. }()
  299. return logsSub.ID, nil
  300. }
  301. // GetLogs returns logs matching the given argument that are stored within the state.
  302. //
  303. // https://eth.wiki/json-rpc/API#eth_getlogs
  304. func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
  305. psm, err := api.backend.PSMR().ResolveForUserContext(ctx)
  306. if err != nil {
  307. return nil, err
  308. }
  309. var filter *Filter
  310. if crit.BlockHash != nil {
  311. // Block filter requested, construct a single-shot filter
  312. filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics, psm.ID)
  313. } else {
  314. // Convert the RPC block numbers into internal representations
  315. begin := rpc.LatestBlockNumber.Int64()
  316. if crit.FromBlock != nil {
  317. begin = crit.FromBlock.Int64()
  318. }
  319. end := rpc.LatestBlockNumber.Int64()
  320. if crit.ToBlock != nil {
  321. end = crit.ToBlock.Int64()
  322. }
  323. // Construct the range filter
  324. filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics, psm.ID)
  325. }
  326. // Run the filter and return all the logs
  327. logs, err := filter.Logs(ctx)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return returnLogs(logs), err
  332. }
  333. // UninstallFilter removes the filter with the given filter id.
  334. //
  335. // https://eth.wiki/json-rpc/API#eth_uninstallfilter
  336. func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
  337. api.filtersMu.Lock()
  338. f, found := api.filters[id]
  339. if found {
  340. delete(api.filters, id)
  341. }
  342. api.filtersMu.Unlock()
  343. if found {
  344. f.s.Unsubscribe()
  345. }
  346. return found
  347. }
  348. // GetFilterLogs returns the logs for the filter with the given id.
  349. // If the filter could not be found an empty array of logs is returned.
  350. //
  351. // https://eth.wiki/json-rpc/API#eth_getfilterlogs
  352. func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
  353. psm, err := api.backend.PSMR().ResolveForUserContext(ctx)
  354. if err != nil {
  355. return nil, err
  356. }
  357. api.filtersMu.Lock()
  358. f, found := api.filters[id]
  359. api.filtersMu.Unlock()
  360. if !found || f.typ != LogsSubscription {
  361. return nil, fmt.Errorf("filter not found")
  362. }
  363. // Quorum:
  364. // - Make sure the tenant has access to the filter
  365. // - Even when MPS or Multitenancy is not enabled, the DefaultPrivateStateIdentifier values
  366. // will be populated in both context and filter criteria. So this check is safe without
  367. // the need of checking for multitenancy enablement
  368. if psm.ID != f.crit.PSI {
  369. return nil, fmt.Errorf("filter not found for %v", psm.ID)
  370. }
  371. var filter *Filter
  372. if f.crit.BlockHash != nil {
  373. // Block filter requested, construct a single-shot filter
  374. filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics, psm.ID)
  375. } else {
  376. // Convert the RPC block numbers into internal representations
  377. begin := rpc.LatestBlockNumber.Int64()
  378. if f.crit.FromBlock != nil {
  379. begin = f.crit.FromBlock.Int64()
  380. }
  381. end := rpc.LatestBlockNumber.Int64()
  382. if f.crit.ToBlock != nil {
  383. end = f.crit.ToBlock.Int64()
  384. }
  385. // Construct the range filter
  386. filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics, psm.ID)
  387. }
  388. // Run the filter and return all the logs
  389. logs, err := filter.Logs(ctx)
  390. if err != nil {
  391. return nil, err
  392. }
  393. return returnLogs(logs), nil
  394. }
  395. // GetFilterChanges returns the logs for the filter with the given id since
  396. // last time it was called. This can be used for polling.
  397. //
  398. // For pending transaction and block filters the result is []common.Hash.
  399. // (pending)Log filters return []Log.
  400. //
  401. // https://eth.wiki/json-rpc/API#eth_getfilterchanges
  402. func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
  403. api.filtersMu.Lock()
  404. defer api.filtersMu.Unlock()
  405. if f, found := api.filters[id]; found {
  406. if !f.deadline.Stop() {
  407. // timer expired but filter is not yet removed in timeout loop
  408. // receive timer value and reset timer
  409. <-f.deadline.C
  410. }
  411. f.deadline.Reset(api.timeout)
  412. switch f.typ {
  413. case PendingTransactionsSubscription, BlocksSubscription:
  414. hashes := f.hashes
  415. f.hashes = nil
  416. return returnHashes(hashes), nil
  417. case LogsSubscription, MinedAndPendingLogsSubscription:
  418. logs := f.logs
  419. f.logs = nil
  420. return returnLogs(logs), nil
  421. }
  422. }
  423. return []interface{}{}, fmt.Errorf("filter not found")
  424. }
  425. // returnHashes is a helper that will return an empty hash array case the given hash array is nil,
  426. // otherwise the given hashes array is returned.
  427. func returnHashes(hashes []common.Hash) []common.Hash {
  428. if hashes == nil {
  429. return []common.Hash{}
  430. }
  431. return hashes
  432. }
  433. // returnLogs is a helper that will return an empty log array in case the given logs array is nil,
  434. // otherwise the given logs array is returned.
  435. func returnLogs(logs []*types.Log) []*types.Log {
  436. if logs == nil {
  437. return []*types.Log{}
  438. }
  439. return logs
  440. }
  441. // UnmarshalJSON sets *args fields with given data.
  442. func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
  443. type input struct {
  444. BlockHash *common.Hash `json:"blockHash"`
  445. FromBlock *rpc.BlockNumber `json:"fromBlock"`
  446. ToBlock *rpc.BlockNumber `json:"toBlock"`
  447. Addresses interface{} `json:"address"`
  448. Topics []interface{} `json:"topics"`
  449. }
  450. var raw input
  451. if err := json.Unmarshal(data, &raw); err != nil {
  452. return err
  453. }
  454. if raw.BlockHash != nil {
  455. if raw.FromBlock != nil || raw.ToBlock != nil {
  456. // BlockHash is mutually exclusive with FromBlock/ToBlock criteria
  457. return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
  458. }
  459. args.BlockHash = raw.BlockHash
  460. } else {
  461. if raw.FromBlock != nil {
  462. args.FromBlock = big.NewInt(raw.FromBlock.Int64())
  463. }
  464. if raw.ToBlock != nil {
  465. args.ToBlock = big.NewInt(raw.ToBlock.Int64())
  466. }
  467. }
  468. args.Addresses = []common.Address{}
  469. if raw.Addresses != nil {
  470. // raw.Address can contain a single address or an array of addresses
  471. switch rawAddr := raw.Addresses.(type) {
  472. case []interface{}:
  473. for i, addr := range rawAddr {
  474. if strAddr, ok := addr.(string); ok {
  475. addr, err := decodeAddress(strAddr)
  476. if err != nil {
  477. return fmt.Errorf("invalid address at index %d: %v", i, err)
  478. }
  479. args.Addresses = append(args.Addresses, addr)
  480. } else {
  481. return fmt.Errorf("non-string address at index %d", i)
  482. }
  483. }
  484. case string:
  485. addr, err := decodeAddress(rawAddr)
  486. if err != nil {
  487. return fmt.Errorf("invalid address: %v", err)
  488. }
  489. args.Addresses = []common.Address{addr}
  490. default:
  491. return errors.New("invalid addresses in query")
  492. }
  493. }
  494. // topics is an array consisting of strings and/or arrays of strings.
  495. // JSON null values are converted to common.Hash{} and ignored by the filter manager.
  496. if len(raw.Topics) > 0 {
  497. args.Topics = make([][]common.Hash, len(raw.Topics))
  498. for i, t := range raw.Topics {
  499. switch topic := t.(type) {
  500. case nil:
  501. // ignore topic when matching logs
  502. case string:
  503. // match specific topic
  504. top, err := decodeTopic(topic)
  505. if err != nil {
  506. return err
  507. }
  508. args.Topics[i] = []common.Hash{top}
  509. case []interface{}:
  510. // or case e.g. [null, "topic0", "topic1"]
  511. for _, rawTopic := range topic {
  512. if rawTopic == nil {
  513. // null component, match all
  514. args.Topics[i] = nil
  515. break
  516. }
  517. if topic, ok := rawTopic.(string); ok {
  518. parsed, err := decodeTopic(topic)
  519. if err != nil {
  520. return err
  521. }
  522. args.Topics[i] = append(args.Topics[i], parsed)
  523. } else {
  524. return fmt.Errorf("invalid topic(s)")
  525. }
  526. }
  527. default:
  528. return fmt.Errorf("invalid topic(s)")
  529. }
  530. }
  531. }
  532. return nil
  533. }
  534. func decodeAddress(s string) (common.Address, error) {
  535. b, err := hexutil.Decode(s)
  536. if err == nil && len(b) != common.AddressLength {
  537. err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
  538. }
  539. return common.BytesToAddress(b), err
  540. }
  541. func decodeTopic(s string) (common.Hash, error) {
  542. b, err := hexutil.Decode(s)
  543. if err == nil && len(b) != common.HashLength {
  544. err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
  545. }
  546. return common.BytesToHash(b), err
  547. }