matcher_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "context"
  19. "math/rand"
  20. "sync/atomic"
  21. "testing"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. )
  25. const testSectionSize = 4096
  26. // Tests that wildcard filter rules (nil) can be specified and are handled well.
  27. func TestMatcherWildcards(t *testing.T) {
  28. t.Parallel()
  29. matcher := NewMatcher(testSectionSize, [][][]byte{
  30. {common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
  31. {common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard
  32. {common.Hash{0x01}.Bytes()}, // Plain rule, sanity check
  33. {common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule
  34. {nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule
  35. {nil, nil}, // Wildcard combo, drop rule
  36. {}, // Inited wildcard rule, drop rule
  37. nil, // Proper wildcard rule, drop rule
  38. })
  39. if len(matcher.filters) != 3 {
  40. t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
  41. }
  42. if len(matcher.filters[0]) != 2 {
  43. t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
  44. }
  45. if len(matcher.filters[1]) != 2 {
  46. t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
  47. }
  48. if len(matcher.filters[2]) != 1 {
  49. t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
  50. }
  51. }
  52. // Tests the matcher pipeline on a single continuous workflow without interrupts.
  53. func TestMatcherContinuous(t *testing.T) {
  54. t.Parallel()
  55. testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75)
  56. testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81)
  57. testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36)
  58. }
  59. // Tests the matcher pipeline on a constantly interrupted and resumed work pattern
  60. // with the aim of ensuring data items are requested only once.
  61. func TestMatcherIntermittent(t *testing.T) {
  62. t.Parallel()
  63. testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75)
  64. testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81)
  65. testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36)
  66. }
  67. // Tests the matcher pipeline on random input to hopefully catch anomalies.
  68. func TestMatcherRandom(t *testing.T) {
  69. t.Parallel()
  70. for i := 0; i < 10; i++ {
  71. testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0)
  72. testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0)
  73. testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0)
  74. testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0)
  75. testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0)
  76. }
  77. }
  78. // Tests that the matcher can properly find matches if the starting block is
  79. // shifter from a multiple of 8. This is needed to cover an optimisation with
  80. // bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
  81. func TestMatcherShifted(t *testing.T) {
  82. t.Parallel()
  83. // Block 0 always matches in the tests, skip ahead of first 8 blocks with the
  84. // start to get a potential zero byte in the matcher bitset.
  85. // To keep the second bitset byte zero, the filter must only match for the first
  86. // time in block 16, so doing an all-16 bit filter should suffice.
  87. // To keep the starting block non divisible by 8, block number 9 is the first
  88. // that would introduce a shift and not match block 0.
  89. testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0)
  90. }
  91. // Tests that matching on everything doesn't crash (special case internally).
  92. func TestWildcardMatcher(t *testing.T) {
  93. t.Parallel()
  94. testMatcherBothModes(t, nil, 0, 10000, 0)
  95. }
  96. // makeRandomIndexes generates a random filter system, composed on multiple filter
  97. // criteria, each having one bloom list component for the address and arbitrarily
  98. // many topic bloom list components.
  99. func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
  100. res := make([][]bloomIndexes, len(lengths))
  101. for i, topics := range lengths {
  102. res[i] = make([]bloomIndexes, topics)
  103. for j := 0; j < topics; j++ {
  104. for k := 0; k < len(res[i][j]); k++ {
  105. res[i][j][k] = uint(rand.Intn(max-1) + 2)
  106. }
  107. }
  108. }
  109. return res
  110. }
  111. // testMatcherDiffBatches runs the given matches test in single-delivery and also
  112. // in batches delivery mode, verifying that all kinds of deliveries are handled
  113. // correctly withn.
  114. func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) {
  115. singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1)
  116. batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16)
  117. if singleton != batched {
  118. t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
  119. }
  120. }
  121. // testMatcherBothModes runs the given matcher test in both continuous as well as
  122. // in intermittent mode, verifying that the request counts match each other.
  123. func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) {
  124. continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16)
  125. intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16)
  126. if continuous != intermittent {
  127. t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
  128. }
  129. }
  130. // testMatcher is a generic tester to run the given matcher test and return the
  131. // number of requests made for cross validation between different modes.
  132. func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
  133. // Create a new matcher an simulate our explicit random bitsets
  134. matcher := NewMatcher(testSectionSize, nil)
  135. matcher.filters = filter
  136. for _, rule := range filter {
  137. for _, topic := range rule {
  138. for _, bit := range topic {
  139. matcher.addScheduler(bit)
  140. }
  141. }
  142. }
  143. // Track the number of retrieval requests made
  144. var requested uint32
  145. // Start the matching session for the filter and the retriever goroutines
  146. quit := make(chan struct{})
  147. matches := make(chan uint64, 16)
  148. session, err := matcher.Start(context.Background(), start, blocks-1, matches)
  149. if err != nil {
  150. t.Fatalf("failed to stat matcher session: %v", err)
  151. }
  152. startRetrievers(session, quit, &requested, maxReqCount)
  153. // Iterate over all the blocks and verify that the pipeline produces the correct matches
  154. for i := start; i < blocks; i++ {
  155. if expMatch3(filter, i) {
  156. match, ok := <-matches
  157. if !ok {
  158. t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
  159. return 0
  160. }
  161. if match != i {
  162. t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
  163. }
  164. // If we're testing intermittent mode, abort and restart the pipeline
  165. if intermittent {
  166. session.Close()
  167. close(quit)
  168. quit = make(chan struct{})
  169. matches = make(chan uint64, 16)
  170. session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
  171. if err != nil {
  172. t.Fatalf("failed to stat matcher session: %v", err)
  173. }
  174. startRetrievers(session, quit, &requested, maxReqCount)
  175. }
  176. }
  177. }
  178. // Ensure the result channel is torn down after the last block
  179. match, ok := <-matches
  180. if ok {
  181. t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
  182. }
  183. // Clean up the session and ensure we match the expected retrieval count
  184. session.Close()
  185. close(quit)
  186. if retrievals != 0 && requested != retrievals {
  187. t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
  188. }
  189. return requested
  190. }
  191. // startRetrievers starts a batch of goroutines listening for section requests
  192. // and serving them.
  193. func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
  194. requests := make(chan chan *Retrieval)
  195. for i := 0; i < 10; i++ {
  196. // Start a multiplexer to test multiple threaded execution
  197. go session.Multiplex(batch, 100*time.Microsecond, requests)
  198. // Start a services to match the above multiplexer
  199. go func() {
  200. for {
  201. // Wait for a service request or a shutdown
  202. select {
  203. case <-quit:
  204. return
  205. case request := <-requests:
  206. task := <-request
  207. task.Bitsets = make([][]byte, len(task.Sections))
  208. for i, section := range task.Sections {
  209. if rand.Int()%4 != 0 { // Handle occasional missing deliveries
  210. task.Bitsets[i] = generateBitset(task.Bit, section)
  211. atomic.AddUint32(retrievals, 1)
  212. }
  213. }
  214. request <- task
  215. }
  216. }
  217. }()
  218. }
  219. }
  220. // generateBitset generates the rotated bitset for the given bloom bit and section
  221. // numbers.
  222. func generateBitset(bit uint, section uint64) []byte {
  223. bitset := make([]byte, testSectionSize/8)
  224. for i := 0; i < len(bitset); i++ {
  225. for b := 0; b < 8; b++ {
  226. blockIdx := section*testSectionSize + uint64(i*8+b)
  227. bitset[i] += bitset[i]
  228. if (blockIdx % uint64(bit)) == 0 {
  229. bitset[i]++
  230. }
  231. }
  232. }
  233. return bitset
  234. }
  235. func expMatch1(filter bloomIndexes, i uint64) bool {
  236. for _, ii := range filter {
  237. if (i % uint64(ii)) != 0 {
  238. return false
  239. }
  240. }
  241. return true
  242. }
  243. func expMatch2(filter []bloomIndexes, i uint64) bool {
  244. for _, ii := range filter {
  245. if expMatch1(ii, i) {
  246. return true
  247. }
  248. }
  249. return false
  250. }
  251. func expMatch3(filter [][]bloomIndexes, i uint64) bool {
  252. for _, ii := range filter {
  253. if !expMatch2(ii, i) {
  254. return false
  255. }
  256. }
  257. return true
  258. }