filter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "errors"
  20. "math/big"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core"
  23. "github.com/ethereum/go-ethereum/core/bloombits"
  24. "github.com/ethereum/go-ethereum/core/mps"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/core/vm"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/event"
  30. "github.com/ethereum/go-ethereum/rpc"
  31. )
  32. type Backend interface {
  33. ChainDb() ethdb.Database
  34. HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
  35. HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
  36. GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
  37. GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
  38. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
  39. SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
  40. SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
  41. SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
  42. SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
  43. BloomStatus() (uint64, uint64)
  44. ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
  45. // AccountExtraDataStateGetterByNumber returns state getter at a given block height
  46. AccountExtraDataStateGetterByNumber(ctx context.Context, number rpc.BlockNumber) (vm.AccountExtraDataStateGetter, error)
  47. PSMR() mps.PrivateStateMetadataResolver
  48. }
  49. // Filter can be used to retrieve and filter logs.
  50. type Filter struct {
  51. backend Backend
  52. db ethdb.Database
  53. addresses []common.Address
  54. topics [][]common.Hash
  55. psi types.PrivateStateIdentifier
  56. block common.Hash // Block hash if filtering a single block
  57. begin, end int64 // Range interval if filtering multiple blocks
  58. matcher *bloombits.Matcher
  59. }
  60. // NewRangeFilter creates a new filter which uses a bloom filter on blocks to
  61. // figure out whether a particular block is interesting or not.
  62. func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash, psi types.PrivateStateIdentifier) *Filter {
  63. // Flatten the address and topic filter clauses into a single bloombits filter
  64. // system. Since the bloombits are not positional, nil topics are permitted,
  65. // which get flattened into a nil byte slice.
  66. var filters [][][]byte
  67. if len(addresses) > 0 {
  68. filter := make([][]byte, len(addresses))
  69. for i, address := range addresses {
  70. filter[i] = address.Bytes()
  71. }
  72. filters = append(filters, filter)
  73. }
  74. for _, topicList := range topics {
  75. filter := make([][]byte, len(topicList))
  76. for i, topic := range topicList {
  77. filter[i] = topic.Bytes()
  78. }
  79. filters = append(filters, filter)
  80. }
  81. size, _ := backend.BloomStatus()
  82. // Create a generic filter and convert it into a range filter
  83. filter := newFilter(backend, addresses, topics, psi)
  84. filter.matcher = bloombits.NewMatcher(size, filters)
  85. filter.begin = begin
  86. filter.end = end
  87. return filter
  88. }
  89. // NewBlockFilter creates a new filter which directly inspects the contents of
  90. // a block to figure out whether it is interesting or not.
  91. func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash, psi types.PrivateStateIdentifier) *Filter {
  92. // Create a generic filter and convert it into a block filter
  93. filter := newFilter(backend, addresses, topics, psi)
  94. filter.block = block
  95. return filter
  96. }
  97. // newFilter creates a generic filter that can either filter based on a block hash,
  98. // or based on range queries. The search criteria needs to be explicitly set.
  99. func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash, psi types.PrivateStateIdentifier) *Filter {
  100. return &Filter{
  101. backend: backend,
  102. addresses: addresses,
  103. topics: topics,
  104. psi: psi,
  105. db: backend.ChainDb(),
  106. }
  107. }
  108. // Logs searches the blockchain for matching log entries, returning all from the
  109. // first block that contains matches, updating the start of the filter accordingly.
  110. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
  111. // If we're doing singleton block filtering, execute and return
  112. if f.block != (common.Hash{}) {
  113. header, err := f.backend.HeaderByHash(ctx, f.block)
  114. if err != nil {
  115. return nil, err
  116. }
  117. if header == nil {
  118. return nil, errors.New("unknown block")
  119. }
  120. return f.blockLogs(ctx, header)
  121. }
  122. // Figure out the limits of the filter range
  123. header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
  124. if header == nil {
  125. return nil, nil
  126. }
  127. head := header.Number.Uint64()
  128. if f.begin == -1 {
  129. f.begin = int64(head)
  130. }
  131. end := uint64(f.end)
  132. if f.end == -1 {
  133. end = head
  134. }
  135. // Gather all indexed logs, and finish with non indexed ones
  136. var (
  137. logs []*types.Log
  138. err error
  139. )
  140. size, sections := f.backend.BloomStatus()
  141. if indexed := sections * size; indexed > uint64(f.begin) {
  142. if indexed > end {
  143. logs, err = f.indexedLogs(ctx, end)
  144. } else {
  145. logs, err = f.indexedLogs(ctx, indexed-1)
  146. }
  147. if err != nil {
  148. return logs, err
  149. }
  150. }
  151. rest, err := f.unindexedLogs(ctx, end)
  152. logs = append(logs, rest...)
  153. return logs, err
  154. }
  155. // indexedLogs returns the logs matching the filter criteria based on the bloom
  156. // bits indexed available locally or via the network.
  157. func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  158. // Create a matcher session and request servicing from the backend
  159. matches := make(chan uint64, 64)
  160. session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
  161. if err != nil {
  162. return nil, err
  163. }
  164. defer session.Close()
  165. f.backend.ServiceFilter(ctx, session)
  166. // Iterate over the matches until exhausted or context closed
  167. var logs []*types.Log
  168. for {
  169. select {
  170. case number, ok := <-matches:
  171. // Abort if all matches have been fulfilled
  172. if !ok {
  173. err := session.Error()
  174. if err == nil {
  175. f.begin = int64(end) + 1
  176. }
  177. return logs, err
  178. }
  179. f.begin = int64(number) + 1
  180. // Retrieve the suggested block and pull any truly matching logs
  181. header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
  182. if header == nil || err != nil {
  183. return logs, err
  184. }
  185. found, err := f.checkMatches(ctx, header)
  186. if err != nil {
  187. return logs, err
  188. }
  189. logs = append(logs, found...)
  190. case <-ctx.Done():
  191. return logs, ctx.Err()
  192. }
  193. }
  194. }
  195. // unindexedLogs returns the logs matching the filter criteria based on raw block
  196. // iteration and bloom matching.
  197. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  198. var logs []*types.Log
  199. for ; f.begin <= int64(end); f.begin++ {
  200. header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
  201. if header == nil || err != nil {
  202. return logs, err
  203. }
  204. found, err := f.blockLogs(ctx, header)
  205. if err != nil {
  206. return logs, err
  207. }
  208. logs = append(logs, found...)
  209. }
  210. return logs, nil
  211. }
  212. // blockLogs returns the logs matching the filter criteria within a single block.
  213. func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
  214. // Quorum
  215. // Apply bloom filter for both public bloom and private bloom
  216. bloomMatches := bloomFilter(header.Bloom, f.addresses, f.topics) ||
  217. bloomFilter(rawdb.GetPrivateBlockBloom(f.db, header.Number.Uint64()), f.addresses, f.topics)
  218. if bloomMatches {
  219. found, err := f.checkMatches(ctx, header)
  220. if err != nil {
  221. return logs, err
  222. }
  223. logs = append(logs, found...)
  224. }
  225. return logs, nil
  226. }
  227. // checkMatches checks if the receipts belonging to the given header contain any log events that
  228. // match the filter criteria. This function is called when the bloom filter signals a potential match.
  229. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
  230. // Get the logs of the block
  231. logsList, err := f.backend.GetLogs(ctx, header.Hash())
  232. if err != nil {
  233. return nil, err
  234. }
  235. var unfiltered []*types.Log
  236. for _, logs := range logsList {
  237. unfiltered = append(unfiltered, logs...)
  238. }
  239. logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics, f.psi)
  240. if len(logs) > 0 {
  241. // We have matching logs, check if we need to resolve full logs via the light client
  242. if logs[0].TxHash == (common.Hash{}) {
  243. receipts, err := f.backend.GetReceipts(ctx, header.Hash())
  244. if err != nil {
  245. return nil, err
  246. }
  247. unfiltered = unfiltered[:0]
  248. for _, receipt := range receipts {
  249. unfiltered = append(unfiltered, receipt.Logs...)
  250. }
  251. logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics, f.psi)
  252. }
  253. return logs, nil
  254. }
  255. return nil, nil
  256. }
  257. func includes(addresses []common.Address, a common.Address) bool {
  258. for _, addr := range addresses {
  259. if addr == a {
  260. return true
  261. }
  262. }
  263. return false
  264. }
  265. // filterLogs creates a slice of logs matching the given criteria.
  266. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash, psi types.PrivateStateIdentifier) []*types.Log {
  267. var ret []*types.Log
  268. Logs:
  269. for _, log := range logs {
  270. if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
  271. continue
  272. }
  273. if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
  274. continue
  275. }
  276. if len(addresses) > 0 && !includes(addresses, log.Address) {
  277. continue
  278. }
  279. if len(log.PSI) > 0 && log.PSI != psi {
  280. continue
  281. }
  282. // If the to filtered topics is greater than the amount of topics in logs, skip.
  283. if len(topics) > len(log.Topics) {
  284. continue Logs
  285. }
  286. for i, sub := range topics {
  287. match := len(sub) == 0 // empty rule set == wildcard
  288. for _, topic := range sub {
  289. if log.Topics[i] == topic {
  290. match = true
  291. break
  292. }
  293. }
  294. if !match {
  295. continue Logs
  296. }
  297. }
  298. ret = append(ret, log)
  299. }
  300. return ret
  301. }
  302. func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
  303. if len(addresses) > 0 {
  304. var included bool
  305. for _, addr := range addresses {
  306. if types.BloomLookup(bloom, addr) {
  307. included = true
  308. break
  309. }
  310. }
  311. if !included {
  312. return false
  313. }
  314. }
  315. for _, sub := range topics {
  316. included := len(sub) == 0 // empty rule set == wildcard
  317. for _, topic := range sub {
  318. if types.BloomLookup(bloom, topic) {
  319. included = true
  320. break
  321. }
  322. }
  323. if !included {
  324. return false
  325. }
  326. }
  327. return true
  328. }