benchmark.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/core/rawdb"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/les/flowcontrol"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/enode"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. // requestBenchmark is an interface for different randomized request generators
  37. type requestBenchmark interface {
  38. // init initializes the generator for generating the given number of randomized requests
  39. init(h *serverHandler, count int) error
  40. // request initiates sending a single request to the given peer
  41. request(peer *serverPeer, index int) error
  42. }
  43. // benchmarkBlockHeaders implements requestBenchmark
  44. type benchmarkBlockHeaders struct {
  45. amount, skip int
  46. reverse, byHash bool
  47. offset, randMax int64
  48. hashes []common.Hash
  49. }
  50. func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
  51. d := int64(b.amount-1) * int64(b.skip+1)
  52. b.offset = 0
  53. b.randMax = h.blockchain.CurrentHeader().Number.Int64() + 1 - d
  54. if b.randMax < 0 {
  55. return fmt.Errorf("chain is too short")
  56. }
  57. if b.reverse {
  58. b.offset = d
  59. }
  60. if b.byHash {
  61. b.hashes = make([]common.Hash, count)
  62. for i := range b.hashes {
  63. b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
  64. }
  65. }
  66. return nil
  67. }
  68. func (b *benchmarkBlockHeaders) request(peer *serverPeer, index int) error {
  69. if b.byHash {
  70. return peer.requestHeadersByHash(0, b.hashes[index], b.amount, b.skip, b.reverse)
  71. }
  72. return peer.requestHeadersByNumber(0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
  73. }
  74. // benchmarkBodiesOrReceipts implements requestBenchmark
  75. type benchmarkBodiesOrReceipts struct {
  76. receipts bool
  77. hashes []common.Hash
  78. }
  79. func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
  80. randMax := h.blockchain.CurrentHeader().Number.Int64() + 1
  81. b.hashes = make([]common.Hash, count)
  82. for i := range b.hashes {
  83. b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(rand.Int63n(randMax)))
  84. }
  85. return nil
  86. }
  87. func (b *benchmarkBodiesOrReceipts) request(peer *serverPeer, index int) error {
  88. if b.receipts {
  89. return peer.requestReceipts(0, []common.Hash{b.hashes[index]})
  90. }
  91. return peer.requestBodies(0, []common.Hash{b.hashes[index]})
  92. }
  93. // benchmarkProofsOrCode implements requestBenchmark
  94. type benchmarkProofsOrCode struct {
  95. code bool
  96. headHash common.Hash
  97. }
  98. func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
  99. b.headHash = h.blockchain.CurrentHeader().Hash()
  100. return nil
  101. }
  102. func (b *benchmarkProofsOrCode) request(peer *serverPeer, index int) error {
  103. key := make([]byte, 32)
  104. rand.Read(key)
  105. if b.code {
  106. return peer.requestCode(0, []CodeReq{{BHash: b.headHash, AccKey: key}})
  107. }
  108. return peer.requestProofs(0, []ProofReq{{BHash: b.headHash, Key: key}})
  109. }
  110. // benchmarkHelperTrie implements requestBenchmark
  111. type benchmarkHelperTrie struct {
  112. bloom bool
  113. reqCount int
  114. sectionCount, headNum uint64
  115. }
  116. func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
  117. if b.bloom {
  118. b.sectionCount, b.headNum, _ = h.server.bloomTrieIndexer.Sections()
  119. } else {
  120. b.sectionCount, _, _ = h.server.chtIndexer.Sections()
  121. b.headNum = b.sectionCount*params.CHTFrequency - 1
  122. }
  123. if b.sectionCount == 0 {
  124. return fmt.Errorf("no processed sections available")
  125. }
  126. return nil
  127. }
  128. func (b *benchmarkHelperTrie) request(peer *serverPeer, index int) error {
  129. reqs := make([]HelperTrieReq, b.reqCount)
  130. if b.bloom {
  131. bitIdx := uint16(rand.Intn(2048))
  132. for i := range reqs {
  133. key := make([]byte, 10)
  134. binary.BigEndian.PutUint16(key[:2], bitIdx)
  135. binary.BigEndian.PutUint64(key[2:], uint64(rand.Int63n(int64(b.sectionCount))))
  136. reqs[i] = HelperTrieReq{Type: htBloomBits, TrieIdx: b.sectionCount - 1, Key: key}
  137. }
  138. } else {
  139. for i := range reqs {
  140. key := make([]byte, 8)
  141. binary.BigEndian.PutUint64(key[:], uint64(rand.Int63n(int64(b.headNum))))
  142. reqs[i] = HelperTrieReq{Type: htCanonical, TrieIdx: b.sectionCount - 1, Key: key, AuxReq: htAuxHeader}
  143. }
  144. }
  145. return peer.requestHelperTrieProofs(0, reqs)
  146. }
  147. // benchmarkTxSend implements requestBenchmark
  148. type benchmarkTxSend struct {
  149. txs types.Transactions
  150. }
  151. func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
  152. key, _ := crypto.GenerateKey()
  153. addr := crypto.PubkeyToAddress(key.PublicKey)
  154. signer := types.LatestSigner(h.server.chainConfig)
  155. b.txs = make(types.Transactions, count)
  156. for i := range b.txs {
  157. data := make([]byte, txSizeCostLimit)
  158. rand.Read(data)
  159. tx, err := types.SignTx(types.NewTransaction(0, addr, new(big.Int), 0, new(big.Int), data), signer, key)
  160. if err != nil {
  161. panic(err)
  162. }
  163. b.txs[i] = tx
  164. }
  165. return nil
  166. }
  167. func (b *benchmarkTxSend) request(peer *serverPeer, index int) error {
  168. enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
  169. return peer.sendTxs(0, 1, enc)
  170. }
  171. // benchmarkTxStatus implements requestBenchmark
  172. type benchmarkTxStatus struct{}
  173. func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
  174. return nil
  175. }
  176. func (b *benchmarkTxStatus) request(peer *serverPeer, index int) error {
  177. var hash common.Hash
  178. rand.Read(hash[:])
  179. return peer.requestTxStatus(0, []common.Hash{hash})
  180. }
  181. // benchmarkSetup stores measurement data for a single benchmark type
  182. type benchmarkSetup struct {
  183. req requestBenchmark
  184. totalCount int
  185. totalTime, avgTime time.Duration
  186. maxInSize, maxOutSize uint32
  187. err error
  188. }
  189. // runBenchmark runs a benchmark cycle for all benchmark types in the specified
  190. // number of passes
  191. func (h *serverHandler) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
  192. setup := make([]*benchmarkSetup, len(benchmarks))
  193. for i, b := range benchmarks {
  194. setup[i] = &benchmarkSetup{req: b}
  195. }
  196. for i := 0; i < passCount; i++ {
  197. log.Info("Running benchmark", "pass", i+1, "total", passCount)
  198. todo := make([]*benchmarkSetup, len(benchmarks))
  199. copy(todo, setup)
  200. for len(todo) > 0 {
  201. // select a random element
  202. index := rand.Intn(len(todo))
  203. next := todo[index]
  204. todo[index] = todo[len(todo)-1]
  205. todo = todo[:len(todo)-1]
  206. if next.err == nil {
  207. // calculate request count
  208. count := 50
  209. if next.totalTime > 0 {
  210. count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
  211. }
  212. if err := h.measure(next, count); err != nil {
  213. next.err = err
  214. }
  215. }
  216. }
  217. }
  218. log.Info("Benchmark completed")
  219. for _, s := range setup {
  220. if s.err == nil {
  221. s.avgTime = s.totalTime / time.Duration(s.totalCount)
  222. }
  223. }
  224. return setup
  225. }
  226. // meteredPipe implements p2p.MsgReadWriter and remembers the largest single
  227. // message size sent through the pipe
  228. type meteredPipe struct {
  229. rw p2p.MsgReadWriter
  230. maxSize uint32
  231. }
  232. func (m *meteredPipe) ReadMsg() (p2p.Msg, error) {
  233. return m.rw.ReadMsg()
  234. }
  235. func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {
  236. if msg.Size > m.maxSize {
  237. m.maxSize = msg.Size
  238. }
  239. return m.rw.WriteMsg(msg)
  240. }
  241. // measure runs a benchmark for a single type in a single pass, with the given
  242. // number of requests
  243. func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
  244. clientPipe, serverPipe := p2p.MsgPipe()
  245. clientMeteredPipe := &meteredPipe{rw: clientPipe}
  246. serverMeteredPipe := &meteredPipe{rw: serverPipe}
  247. var id enode.ID
  248. rand.Read(id[:])
  249. peer1 := newServerPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
  250. peer2 := newClientPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
  251. peer2.announceType = announceTypeNone
  252. peer2.fcCosts = make(requestCostTable)
  253. c := &requestCosts{}
  254. for code := range requests {
  255. peer2.fcCosts[code] = c
  256. }
  257. peer2.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
  258. peer2.fcClient = flowcontrol.NewClientNode(h.server.fcManager, peer2.fcParams)
  259. defer peer2.fcClient.Disconnect()
  260. if err := setup.req.init(h, count); err != nil {
  261. return err
  262. }
  263. errCh := make(chan error, 10)
  264. start := mclock.Now()
  265. go func() {
  266. for i := 0; i < count; i++ {
  267. if err := setup.req.request(peer1, i); err != nil {
  268. errCh <- err
  269. return
  270. }
  271. }
  272. }()
  273. go func() {
  274. for i := 0; i < count; i++ {
  275. if err := h.handleMsg(peer2, &sync.WaitGroup{}); err != nil {
  276. errCh <- err
  277. return
  278. }
  279. }
  280. }()
  281. go func() {
  282. for i := 0; i < count; i++ {
  283. msg, err := clientPipe.ReadMsg()
  284. if err != nil {
  285. errCh <- err
  286. return
  287. }
  288. var i interface{}
  289. msg.Decode(&i)
  290. }
  291. // at this point we can be sure that the other two
  292. // goroutines finished successfully too
  293. close(errCh)
  294. }()
  295. select {
  296. case err := <-errCh:
  297. if err != nil {
  298. return err
  299. }
  300. case <-h.closeCh:
  301. clientPipe.Close()
  302. serverPipe.Close()
  303. return fmt.Errorf("Benchmark cancelled")
  304. }
  305. setup.totalTime += time.Duration(mclock.Now() - start)
  306. setup.totalCount += count
  307. setup.maxInSize = clientMeteredPipe.maxSize
  308. setup.maxOutSize = serverMeteredPipe.maxSize
  309. clientPipe.Close()
  310. serverPipe.Close()
  311. return nil
  312. }