matcher.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "math"
  22. "sort"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/bitutil"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. )
  29. // bloomIndexes represents the bit indexes inside the bloom filter that belong
  30. // to some key.
  31. type bloomIndexes [3]uint
  32. // calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
  33. func calcBloomIndexes(b []byte) bloomIndexes {
  34. b = crypto.Keccak256(b)
  35. var idxs bloomIndexes
  36. for i := 0; i < len(idxs); i++ {
  37. idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
  38. }
  39. return idxs
  40. }
  41. // partialMatches with a non-nil vector represents a section in which some sub-
  42. // matchers have already found potential matches. Subsequent sub-matchers will
  43. // binary AND their matches with this vector. If vector is nil, it represents a
  44. // section to be processed by the first sub-matcher.
  45. type partialMatches struct {
  46. section uint64
  47. bitset []byte
  48. }
  49. // Retrieval represents a request for retrieval task assignments for a given
  50. // bit with the given number of fetch elements, or a response for such a request.
  51. // It can also have the actual results set to be used as a delivery data struct.
  52. //
  53. // The contest and error fields are used by the light client to terminate matching
  54. // early if an error is encountered on some path of the pipeline.
  55. type Retrieval struct {
  56. Bit uint
  57. Sections []uint64
  58. Bitsets [][]byte
  59. Context context.Context
  60. Error error
  61. }
  62. // Matcher is a pipelined system of schedulers and logic matchers which perform
  63. // binary AND/OR operations on the bit-streams, creating a stream of potential
  64. // blocks to inspect for data content.
  65. type Matcher struct {
  66. sectionSize uint64 // Size of the data batches to filter on
  67. filters [][]bloomIndexes // Filter the system is matching for
  68. schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
  69. retrievers chan chan uint // Retriever processes waiting for bit allocations
  70. counters chan chan uint // Retriever processes waiting for task count reports
  71. retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
  72. deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
  73. running uint32 // Atomic flag whether a session is live or not
  74. }
  75. // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
  76. // address and topic filtering on them. Setting a filter component to `nil` is
  77. // allowed and will result in that filter rule being skipped (OR 0x11...1).
  78. func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
  79. // Create the matcher instance
  80. m := &Matcher{
  81. sectionSize: sectionSize,
  82. schedulers: make(map[uint]*scheduler),
  83. retrievers: make(chan chan uint),
  84. counters: make(chan chan uint),
  85. retrievals: make(chan chan *Retrieval),
  86. deliveries: make(chan *Retrieval),
  87. }
  88. // Calculate the bloom bit indexes for the groups we're interested in
  89. m.filters = nil
  90. for _, filter := range filters {
  91. // Gather the bit indexes of the filter rule, special casing the nil filter
  92. if len(filter) == 0 {
  93. continue
  94. }
  95. bloomBits := make([]bloomIndexes, len(filter))
  96. for i, clause := range filter {
  97. if clause == nil {
  98. bloomBits = nil
  99. break
  100. }
  101. bloomBits[i] = calcBloomIndexes(clause)
  102. }
  103. // Accumulate the filter rules if no nil rule was within
  104. if bloomBits != nil {
  105. m.filters = append(m.filters, bloomBits)
  106. }
  107. }
  108. // For every bit, create a scheduler to load/download the bit vectors
  109. for _, bloomIndexLists := range m.filters {
  110. for _, bloomIndexList := range bloomIndexLists {
  111. for _, bloomIndex := range bloomIndexList {
  112. m.addScheduler(bloomIndex)
  113. }
  114. }
  115. }
  116. return m
  117. }
  118. // addScheduler adds a bit stream retrieval scheduler for the given bit index if
  119. // it has not existed before. If the bit is already selected for filtering, the
  120. // existing scheduler can be used.
  121. func (m *Matcher) addScheduler(idx uint) {
  122. if _, ok := m.schedulers[idx]; ok {
  123. return
  124. }
  125. m.schedulers[idx] = newScheduler(idx)
  126. }
  127. // Start starts the matching process and returns a stream of bloom matches in
  128. // a given range of blocks. If there are no more matches in the range, the result
  129. // channel is closed.
  130. func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
  131. // Make sure we're not creating concurrent sessions
  132. if atomic.SwapUint32(&m.running, 1) == 1 {
  133. return nil, errors.New("matcher already running")
  134. }
  135. defer atomic.StoreUint32(&m.running, 0)
  136. // Initiate a new matching round
  137. session := &MatcherSession{
  138. matcher: m,
  139. quit: make(chan struct{}),
  140. ctx: ctx,
  141. }
  142. for _, scheduler := range m.schedulers {
  143. scheduler.reset()
  144. }
  145. sink := m.run(begin, end, cap(results), session)
  146. // Read the output from the result sink and deliver to the user
  147. session.pend.Add(1)
  148. go func() {
  149. defer session.pend.Done()
  150. defer close(results)
  151. for {
  152. select {
  153. case <-session.quit:
  154. return
  155. case res, ok := <-sink:
  156. // New match result found
  157. if !ok {
  158. return
  159. }
  160. // Calculate the first and last blocks of the section
  161. sectionStart := res.section * m.sectionSize
  162. first := sectionStart
  163. if begin > first {
  164. first = begin
  165. }
  166. last := sectionStart + m.sectionSize - 1
  167. if end < last {
  168. last = end
  169. }
  170. // Iterate over all the blocks in the section and return the matching ones
  171. for i := first; i <= last; i++ {
  172. // Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
  173. next := res.bitset[(i-sectionStart)/8]
  174. if next == 0 {
  175. if i%8 == 0 {
  176. i += 7
  177. }
  178. continue
  179. }
  180. // Some bit it set, do the actual submatching
  181. if bit := 7 - i%8; next&(1<<bit) != 0 {
  182. select {
  183. case <-session.quit:
  184. return
  185. case results <- i:
  186. }
  187. }
  188. }
  189. }
  190. }
  191. }()
  192. return session, nil
  193. }
  194. // run creates a daisy-chain of sub-matchers, one for the address set and one
  195. // for each topic set, each sub-matcher receiving a section only if the previous
  196. // ones have all found a potential match in one of the blocks of the section,
  197. // then binary AND-ing its own matches and forwarding the result to the next one.
  198. //
  199. // The method starts feeding the section indexes into the first sub-matcher on a
  200. // new goroutine and returns a sink channel receiving the results.
  201. func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
  202. // Create the source channel and feed section indexes into
  203. source := make(chan *partialMatches, buffer)
  204. session.pend.Add(1)
  205. go func() {
  206. defer session.pend.Done()
  207. defer close(source)
  208. for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
  209. select {
  210. case <-session.quit:
  211. return
  212. case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
  213. }
  214. }
  215. }()
  216. // Assemble the daisy-chained filtering pipeline
  217. next := source
  218. dist := make(chan *request, buffer)
  219. for _, bloom := range m.filters {
  220. next = m.subMatch(next, dist, bloom, session)
  221. }
  222. // Start the request distribution
  223. session.pend.Add(1)
  224. go m.distributor(dist, session)
  225. return next
  226. }
  227. // subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
  228. // binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
  229. // The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
  230. // that address/topic, and binary AND-ing those vectors together.
  231. func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
  232. // Start the concurrent schedulers for each bit required by the bloom filter
  233. sectionSources := make([][3]chan uint64, len(bloom))
  234. sectionSinks := make([][3]chan []byte, len(bloom))
  235. for i, bits := range bloom {
  236. for j, bit := range bits {
  237. sectionSources[i][j] = make(chan uint64, cap(source))
  238. sectionSinks[i][j] = make(chan []byte, cap(source))
  239. m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
  240. }
  241. }
  242. process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
  243. results := make(chan *partialMatches, cap(source))
  244. session.pend.Add(2)
  245. go func() {
  246. // Tear down the goroutine and terminate all source channels
  247. defer session.pend.Done()
  248. defer close(process)
  249. defer func() {
  250. for _, bloomSources := range sectionSources {
  251. for _, bitSource := range bloomSources {
  252. close(bitSource)
  253. }
  254. }
  255. }()
  256. // Read sections from the source channel and multiplex into all bit-schedulers
  257. for {
  258. select {
  259. case <-session.quit:
  260. return
  261. case subres, ok := <-source:
  262. // New subresult from previous link
  263. if !ok {
  264. return
  265. }
  266. // Multiplex the section index to all bit-schedulers
  267. for _, bloomSources := range sectionSources {
  268. for _, bitSource := range bloomSources {
  269. select {
  270. case <-session.quit:
  271. return
  272. case bitSource <- subres.section:
  273. }
  274. }
  275. }
  276. // Notify the processor that this section will become available
  277. select {
  278. case <-session.quit:
  279. return
  280. case process <- subres:
  281. }
  282. }
  283. }
  284. }()
  285. go func() {
  286. // Tear down the goroutine and terminate the final sink channel
  287. defer session.pend.Done()
  288. defer close(results)
  289. // Read the source notifications and collect the delivered results
  290. for {
  291. select {
  292. case <-session.quit:
  293. return
  294. case subres, ok := <-process:
  295. // Notified of a section being retrieved
  296. if !ok {
  297. return
  298. }
  299. // Gather all the sub-results and merge them together
  300. var orVector []byte
  301. for _, bloomSinks := range sectionSinks {
  302. var andVector []byte
  303. for _, bitSink := range bloomSinks {
  304. var data []byte
  305. select {
  306. case <-session.quit:
  307. return
  308. case data = <-bitSink:
  309. }
  310. if andVector == nil {
  311. andVector = make([]byte, int(m.sectionSize/8))
  312. copy(andVector, data)
  313. } else {
  314. bitutil.ANDBytes(andVector, andVector, data)
  315. }
  316. }
  317. if orVector == nil {
  318. orVector = andVector
  319. } else {
  320. bitutil.ORBytes(orVector, orVector, andVector)
  321. }
  322. }
  323. if orVector == nil {
  324. orVector = make([]byte, int(m.sectionSize/8))
  325. }
  326. if subres.bitset != nil {
  327. bitutil.ANDBytes(orVector, orVector, subres.bitset)
  328. }
  329. if bitutil.TestBytes(orVector) {
  330. select {
  331. case <-session.quit:
  332. return
  333. case results <- &partialMatches{subres.section, orVector}:
  334. }
  335. }
  336. }
  337. }
  338. }()
  339. return results
  340. }
  341. // distributor receives requests from the schedulers and queues them into a set
  342. // of pending requests, which are assigned to retrievers wanting to fulfil them.
  343. func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
  344. defer session.pend.Done()
  345. var (
  346. requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
  347. unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
  348. retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
  349. allocs int // Number of active allocations to handle graceful shutdown requests
  350. shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
  351. )
  352. // assign is a helper method fo try to assign a pending bit an actively
  353. // listening servicer, or schedule it up for later when one arrives.
  354. assign := func(bit uint) {
  355. select {
  356. case fetcher := <-m.retrievers:
  357. allocs++
  358. fetcher <- bit
  359. default:
  360. // No retrievers active, start listening for new ones
  361. retrievers = m.retrievers
  362. unallocs[bit] = struct{}{}
  363. }
  364. }
  365. for {
  366. select {
  367. case <-shutdown:
  368. // Shutdown requested. No more retrievers can be allocated,
  369. // but we still need to wait until all pending requests have returned.
  370. shutdown = nil
  371. if allocs == 0 {
  372. return
  373. }
  374. case req := <-dist:
  375. // New retrieval request arrived to be distributed to some fetcher process
  376. queue := requests[req.bit]
  377. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
  378. requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
  379. // If it's a new bit and we have waiting fetchers, allocate to them
  380. if len(queue) == 0 {
  381. assign(req.bit)
  382. }
  383. case fetcher := <-retrievers:
  384. // New retriever arrived, find the lowest section-ed bit to assign
  385. bit, best := uint(0), uint64(math.MaxUint64)
  386. for idx := range unallocs {
  387. if requests[idx][0] < best {
  388. bit, best = idx, requests[idx][0]
  389. }
  390. }
  391. // Stop tracking this bit (and alloc notifications if no more work is available)
  392. delete(unallocs, bit)
  393. if len(unallocs) == 0 {
  394. retrievers = nil
  395. }
  396. allocs++
  397. fetcher <- bit
  398. case fetcher := <-m.counters:
  399. // New task count request arrives, return number of items
  400. fetcher <- uint(len(requests[<-fetcher]))
  401. case fetcher := <-m.retrievals:
  402. // New fetcher waiting for tasks to retrieve, assign
  403. task := <-fetcher
  404. if want := len(task.Sections); want >= len(requests[task.Bit]) {
  405. task.Sections = requests[task.Bit]
  406. delete(requests, task.Bit)
  407. } else {
  408. task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
  409. requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
  410. }
  411. fetcher <- task
  412. // If anything was left unallocated, try to assign to someone else
  413. if len(requests[task.Bit]) > 0 {
  414. assign(task.Bit)
  415. }
  416. case result := <-m.deliveries:
  417. // New retrieval task response from fetcher, split out missing sections and
  418. // deliver complete ones
  419. var (
  420. sections = make([]uint64, 0, len(result.Sections))
  421. bitsets = make([][]byte, 0, len(result.Bitsets))
  422. missing = make([]uint64, 0, len(result.Sections))
  423. )
  424. for i, bitset := range result.Bitsets {
  425. if len(bitset) == 0 {
  426. missing = append(missing, result.Sections[i])
  427. continue
  428. }
  429. sections = append(sections, result.Sections[i])
  430. bitsets = append(bitsets, bitset)
  431. }
  432. m.schedulers[result.Bit].deliver(sections, bitsets)
  433. allocs--
  434. // Reschedule missing sections and allocate bit if newly available
  435. if len(missing) > 0 {
  436. queue := requests[result.Bit]
  437. for _, section := range missing {
  438. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
  439. queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
  440. }
  441. requests[result.Bit] = queue
  442. if len(queue) == len(missing) {
  443. assign(result.Bit)
  444. }
  445. }
  446. // End the session when all pending deliveries have arrived.
  447. if shutdown == nil && allocs == 0 {
  448. return
  449. }
  450. }
  451. }
  452. }
  453. // MatcherSession is returned by a started matcher to be used as a terminator
  454. // for the actively running matching operation.
  455. type MatcherSession struct {
  456. matcher *Matcher
  457. closer sync.Once // Sync object to ensure we only ever close once
  458. quit chan struct{} // Quit channel to request pipeline termination
  459. ctx context.Context // Context used by the light client to abort filtering
  460. err atomic.Value // Global error to track retrieval failures deep in the chain
  461. pend sync.WaitGroup
  462. }
  463. // Close stops the matching process and waits for all subprocesses to terminate
  464. // before returning. The timeout may be used for graceful shutdown, allowing the
  465. // currently running retrievals to complete before this time.
  466. func (s *MatcherSession) Close() {
  467. s.closer.Do(func() {
  468. // Signal termination and wait for all goroutines to tear down
  469. close(s.quit)
  470. s.pend.Wait()
  471. })
  472. }
  473. // Error returns any failure encountered during the matching session.
  474. func (s *MatcherSession) Error() error {
  475. if err := s.err.Load(); err != nil {
  476. return err.(error)
  477. }
  478. return nil
  479. }
  480. // allocateRetrieval assigns a bloom bit index to a client process that can either
  481. // immediately request and fetch the section contents assigned to this bit or wait
  482. // a little while for more sections to be requested.
  483. func (s *MatcherSession) allocateRetrieval() (uint, bool) {
  484. fetcher := make(chan uint)
  485. select {
  486. case <-s.quit:
  487. return 0, false
  488. case s.matcher.retrievers <- fetcher:
  489. bit, ok := <-fetcher
  490. return bit, ok
  491. }
  492. }
  493. // pendingSections returns the number of pending section retrievals belonging to
  494. // the given bloom bit index.
  495. func (s *MatcherSession) pendingSections(bit uint) int {
  496. fetcher := make(chan uint)
  497. select {
  498. case <-s.quit:
  499. return 0
  500. case s.matcher.counters <- fetcher:
  501. fetcher <- bit
  502. return int(<-fetcher)
  503. }
  504. }
  505. // allocateSections assigns all or part of an already allocated bit-task queue
  506. // to the requesting process.
  507. func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
  508. fetcher := make(chan *Retrieval)
  509. select {
  510. case <-s.quit:
  511. return nil
  512. case s.matcher.retrievals <- fetcher:
  513. task := &Retrieval{
  514. Bit: bit,
  515. Sections: make([]uint64, count),
  516. }
  517. fetcher <- task
  518. return (<-fetcher).Sections
  519. }
  520. }
  521. // deliverSections delivers a batch of section bit-vectors for a specific bloom
  522. // bit index to be injected into the processing pipeline.
  523. func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
  524. s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
  525. }
  526. // Multiplex polls the matcher session for retrieval tasks and multiplexes it into
  527. // the requested retrieval queue to be serviced together with other sessions.
  528. //
  529. // This method will block for the lifetime of the session. Even after termination
  530. // of the session, any request in-flight need to be responded to! Empty responses
  531. // are fine though in that case.
  532. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
  533. for {
  534. // Allocate a new bloom bit index to retrieve data for, stopping when done
  535. bit, ok := s.allocateRetrieval()
  536. if !ok {
  537. return
  538. }
  539. // Bit allocated, throttle a bit if we're below our batch limit
  540. if s.pendingSections(bit) < batch {
  541. select {
  542. case <-s.quit:
  543. // Session terminating, we can't meaningfully service, abort
  544. s.allocateSections(bit, 0)
  545. s.deliverSections(bit, []uint64{}, [][]byte{})
  546. return
  547. case <-time.After(wait):
  548. // Throttling up, fetch whatever's available
  549. }
  550. }
  551. // Allocate as much as we can handle and request servicing
  552. sections := s.allocateSections(bit, batch)
  553. request := make(chan *Retrieval)
  554. select {
  555. case <-s.quit:
  556. // Session terminating, we can't meaningfully service, abort
  557. s.deliverSections(bit, sections, make([][]byte, len(sections)))
  558. return
  559. case mux <- request:
  560. // Retrieval accepted, something must arrive before we're aborting
  561. request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
  562. result := <-request
  563. if result.Error != nil {
  564. s.err.Store(result.Error)
  565. s.Close()
  566. }
  567. s.deliverSections(result.Bit, result.Sections, result.Bitsets)
  568. }
  569. }
  570. }