123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // This file contains some shares testing functionality, common to multiple
- // different files and modules being tested. Client based network and Server
- // based network can be created easily with available APIs.
- package les
- import (
- "context"
- "crypto/rand"
- "fmt"
- "math/big"
- "sync/atomic"
- "testing"
- "time"
- "github.com/ethereum/go-ethereum/accounts/abi/bind"
- "github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/consensus/ethash"
- "github.com/ethereum/go-ethereum/contracts/checkpointoracle/contract"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/forkid"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/eth/ethconfig"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/les/checkpointoracle"
- "github.com/ethereum/go-ethereum/les/flowcontrol"
- vfs "github.com/ethereum/go-ethereum/les/vflux/server"
- "github.com/ethereum/go-ethereum/light"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/enode"
- "github.com/ethereum/go-ethereum/params"
- )
- var (
- bankKey, _ = crypto.GenerateKey()
- bankAddr = crypto.PubkeyToAddress(bankKey.PublicKey)
- bankFunds = big.NewInt(1000000000000000000)
- userKey1, _ = crypto.GenerateKey()
- userKey2, _ = crypto.GenerateKey()
- userAddr1 = crypto.PubkeyToAddress(userKey1.PublicKey)
- userAddr2 = crypto.PubkeyToAddress(userKey2.PublicKey)
- testContractAddr common.Address
- testContractCode = common.Hex2Bytes("606060405260cc8060106000396000f360606040526000357c01000000000000000000000000000000000000000000000000000000009004806360cd2685146041578063c16431b914606b57603f565b005b6055600480803590602001909190505060a9565b6040518082815260200191505060405180910390f35b60886004808035906020019091908035906020019091905050608a565b005b80600060005083606481101560025790900160005b50819055505b5050565b6000600060005082606481101560025790900160005b5054905060c7565b91905056")
- testContractCodeDeployed = testContractCode[16:]
- testContractDeployed = uint64(2)
- testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029")
- // Checkpoint oracle relative fields
- oracleAddr common.Address
- signerKey, _ = crypto.GenerateKey()
- signerAddr = crypto.PubkeyToAddress(signerKey.PublicKey)
- )
- var (
- // The block frequency for creating checkpoint(only used in test)
- sectionSize = big.NewInt(128)
- // The number of confirmations needed to generate a checkpoint(only used in test).
- processConfirms = big.NewInt(1)
- // The token bucket buffer limit for testing purpose.
- testBufLimit = uint64(1000000)
- // The buffer recharging speed for testing purpose.
- testBufRecharge = uint64(1000)
- )
- /*
- contract test {
- uint256[100] data;
- function Put(uint256 addr, uint256 value) {
- data[addr] = value;
- }
- function Get(uint256 addr) constant returns (uint256 value) {
- return data[addr];
- }
- }
- */
- // prepare pre-commits specified number customized blocks into chain.
- func prepare(n int, backend *backends.SimulatedBackend) {
- var (
- ctx = context.Background()
- signer = types.HomesteadSigner{}
- )
- for i := 0; i < n; i++ {
- switch i {
- case 0:
- // Builtin-block
- // number: 1
- // txs: 2
- // deploy checkpoint contract
- auth, _ := bind.NewKeyedTransactorWithChainID(bankKey, big.NewInt(1337))
- oracleAddr, _, _, _ = contract.DeployCheckpointOracle(auth, backend, []common.Address{signerAddr}, sectionSize, processConfirms, big.NewInt(1))
- // bankUser transfers some ether to user1
- nonce, _ := backend.PendingNonceAt(ctx, bankAddr)
- tx, _ := types.SignTx(types.NewTransaction(nonce, userAddr1, big.NewInt(10000), params.TxGas, nil, nil), signer, bankKey)
- backend.SendTransaction(ctx, tx, bind.PrivateTxArgs{})
- case 1:
- // Builtin-block
- // number: 2
- // txs: 4
- bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
- userNonce1, _ := backend.PendingNonceAt(ctx, userAddr1)
- // bankUser transfers more ether to user1
- tx1, _ := types.SignTx(types.NewTransaction(bankNonce, userAddr1, big.NewInt(1000), params.TxGas, nil, nil), signer, bankKey)
- backend.SendTransaction(ctx, tx1, bind.PrivateTxArgs{})
- // user1 relays ether to user2
- tx2, _ := types.SignTx(types.NewTransaction(userNonce1, userAddr2, big.NewInt(1000), params.TxGas, nil, nil), signer, userKey1)
- backend.SendTransaction(ctx, tx2, bind.PrivateTxArgs{})
- // user1 deploys a test contract
- tx3, _ := types.SignTx(types.NewContractCreation(userNonce1+1, big.NewInt(0), 200000, big.NewInt(0), testContractCode), signer, userKey1)
- backend.SendTransaction(ctx, tx3, bind.PrivateTxArgs{})
- testContractAddr = crypto.CreateAddress(userAddr1, userNonce1+1)
- // user1 deploys a event contract
- tx4, _ := types.SignTx(types.NewContractCreation(userNonce1+2, big.NewInt(0), 200000, big.NewInt(0), testEventEmitterCode), signer, userKey1)
- backend.SendTransaction(ctx, tx4, bind.PrivateTxArgs{})
- case 2:
- // Builtin-block
- // number: 3
- // txs: 2
- // bankUser transfer some ether to signer
- bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
- tx1, _ := types.SignTx(types.NewTransaction(bankNonce, signerAddr, big.NewInt(1000000000), params.TxGas, nil, nil), signer, bankKey)
- backend.SendTransaction(ctx, tx1, bind.PrivateTxArgs{})
- // invoke test contract
- data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001")
- tx2, _ := types.SignTx(types.NewTransaction(bankNonce+1, testContractAddr, big.NewInt(0), 100000, nil, data), signer, bankKey)
- backend.SendTransaction(ctx, tx2, bind.PrivateTxArgs{})
- case 3:
- // Builtin-block
- // number: 4
- // txs: 1
- // invoke test contract
- bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
- data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002")
- tx, _ := types.SignTx(types.NewTransaction(bankNonce, testContractAddr, big.NewInt(0), 100000, nil, data), signer, bankKey)
- backend.SendTransaction(ctx, tx, bind.PrivateTxArgs{})
- }
- backend.Commit()
- }
- }
- // testIndexers creates a set of indexers with specified params for testing purpose.
- func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig, disablePruning bool) []*core.ChainIndexer {
- var indexers [3]*core.ChainIndexer
- indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms, disablePruning)
- indexers[1] = core.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms)
- indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize, disablePruning)
- // make bloomTrieIndexer as a child indexer of bloom indexer.
- indexers[1].AddChildIndexer(indexers[2])
- return indexers[:]
- }
- func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, indexers []*core.ChainIndexer, db ethdb.Database, peers *serverPeerSet, ulcServers []string, ulcFraction int) (*clientHandler, func()) {
- var (
- evmux = new(event.TypeMux)
- engine = ethash.NewFaker()
- gspec = core.Genesis{
- Config: params.AllEthashProtocolChanges,
- Alloc: core.GenesisAlloc{bankAddr: {Balance: bankFunds}},
- GasLimit: 100000000,
- }
- oracle *checkpointoracle.CheckpointOracle
- )
- genesis := gspec.MustCommit(db)
- chain, _ := light.NewLightChain(odr, gspec.Config, engine, nil)
- if indexers != nil {
- checkpointConfig := ¶ms.CheckpointOracleConfig{
- Address: crypto.CreateAddress(bankAddr, 0),
- Signers: []common.Address{signerAddr},
- Threshold: 1,
- }
- getLocal := func(index uint64) params.TrustedCheckpoint {
- chtIndexer := indexers[0]
- sectionHead := chtIndexer.SectionHead(index)
- return params.TrustedCheckpoint{
- SectionIndex: index,
- SectionHead: sectionHead,
- CHTRoot: light.GetChtRoot(db, index, sectionHead),
- BloomRoot: light.GetBloomTrieRoot(db, index, sectionHead),
- }
- }
- oracle = checkpointoracle.New(checkpointConfig, getLocal)
- }
- client := &LightEthereum{
- lesCommons: lesCommons{
- genesis: genesis.Hash(),
- config: ðconfig.Config{LightPeers: 100, NetworkId: NetworkId},
- chainConfig: params.AllEthashProtocolChanges,
- iConfig: light.TestClientIndexerConfig,
- chainDb: db,
- oracle: oracle,
- chainReader: chain,
- closeCh: make(chan struct{}),
- },
- peers: peers,
- reqDist: odr.retriever.dist,
- retriever: odr.retriever,
- odr: odr,
- engine: engine,
- blockchain: chain,
- eventMux: evmux,
- }
- client.handler = newClientHandler(ulcServers, ulcFraction, nil, client)
- if client.oracle != nil {
- client.oracle.Start(backend)
- }
- client.handler.start()
- return client.handler, func() {
- client.handler.stop()
- }
- }
- func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Database, clock mclock.Clock) (*serverHandler, *backends.SimulatedBackend, func()) {
- var (
- gspec = core.Genesis{
- Config: params.AllEthashProtocolChanges,
- Alloc: core.GenesisAlloc{bankAddr: {Balance: bankFunds}},
- GasLimit: 100000000,
- }
- oracle *checkpointoracle.CheckpointOracle
- )
- genesis := gspec.MustCommit(db)
- // create a simulation backend and pre-commit several customized block to the database.
- simulation := backends.NewSimulatedBackendWithDatabase(db, gspec.Alloc, 100000000)
- prepare(blocks, simulation)
- txpoolConfig := core.DefaultTxPoolConfig
- txpoolConfig.Journal = ""
- txpool := core.NewTxPool(txpoolConfig, gspec.Config, simulation.Blockchain())
- if indexers != nil {
- checkpointConfig := ¶ms.CheckpointOracleConfig{
- Address: crypto.CreateAddress(bankAddr, 0),
- Signers: []common.Address{signerAddr},
- Threshold: 1,
- }
- getLocal := func(index uint64) params.TrustedCheckpoint {
- chtIndexer := indexers[0]
- sectionHead := chtIndexer.SectionHead(index)
- return params.TrustedCheckpoint{
- SectionIndex: index,
- SectionHead: sectionHead,
- CHTRoot: light.GetChtRoot(db, index, sectionHead),
- BloomRoot: light.GetBloomTrieRoot(db, index, sectionHead),
- }
- }
- oracle = checkpointoracle.New(checkpointConfig, getLocal)
- }
- server := &LesServer{
- lesCommons: lesCommons{
- genesis: genesis.Hash(),
- config: ðconfig.Config{LightPeers: 100, NetworkId: NetworkId},
- chainConfig: params.AllEthashProtocolChanges,
- iConfig: light.TestServerIndexerConfig,
- chainDb: db,
- chainReader: simulation.Blockchain(),
- oracle: oracle,
- closeCh: make(chan struct{}),
- },
- peers: newClientPeerSet(),
- servingQueue: newServingQueue(int64(time.Millisecond*10), 1),
- defParams: flowcontrol.ServerParams{
- BufLimit: testBufLimit,
- MinRecharge: testBufRecharge,
- },
- fcManager: flowcontrol.NewClientManager(nil, clock),
- }
- server.costTracker, server.minCapacity = newCostTracker(db, server.config)
- server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
- server.clientPool = vfs.NewClientPool(db, testBufRecharge, defaultConnectedBias, clock, alwaysTrueFn)
- server.clientPool.Start()
- server.clientPool.SetLimits(10000, 10000) // Assign enough capacity for clientpool
- server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
- if server.oracle != nil {
- server.oracle.Start(simulation)
- }
- server.servingQueue.setThreads(4)
- server.handler.start()
- closer := func() { server.Stop() }
- return server.handler, simulation, closer
- }
- func alwaysTrueFn() bool {
- return true
- }
- // testPeer is a simulated peer to allow testing direct network calls.
- type testPeer struct {
- cpeer *clientPeer
- speer *serverPeer
- net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging
- app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side
- }
- // handshakeWithServer executes the handshake with the remote server peer.
- func (p *testPeer) handshakeWithServer(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID) {
- // It only works for the simulated client peer
- if p.cpeer == nil {
- t.Fatal("handshake for client peer only")
- }
- var sendList keyValueList
- sendList = sendList.add("protocolVersion", uint64(p.cpeer.version))
- sendList = sendList.add("networkId", uint64(NetworkId))
- sendList = sendList.add("headTd", td)
- sendList = sendList.add("headHash", head)
- sendList = sendList.add("headNum", headNum)
- sendList = sendList.add("genesisHash", genesis)
- if p.cpeer.version >= lpv4 {
- sendList = sendList.add("forkID", &forkID)
- }
- if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil {
- t.Fatalf("status recv: %v", err)
- }
- if err := p2p.Send(p.app, StatusMsg, sendList); err != nil {
- t.Fatalf("status send: %v", err)
- }
- }
- // handshakeWithClient executes the handshake with the remote client peer.
- func (p *testPeer) handshakeWithClient(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID, costList RequestCostList, recentTxLookup uint64) {
- // It only works for the simulated client peer
- if p.speer == nil {
- t.Fatal("handshake for server peer only")
- }
- var sendList keyValueList
- sendList = sendList.add("protocolVersion", uint64(p.speer.version))
- sendList = sendList.add("networkId", uint64(NetworkId))
- sendList = sendList.add("headTd", td)
- sendList = sendList.add("headHash", head)
- sendList = sendList.add("headNum", headNum)
- sendList = sendList.add("genesisHash", genesis)
- sendList = sendList.add("serveHeaders", nil)
- sendList = sendList.add("serveChainSince", uint64(0))
- sendList = sendList.add("serveStateSince", uint64(0))
- sendList = sendList.add("serveRecentState", uint64(core.TriesInMemory-4))
- sendList = sendList.add("txRelay", nil)
- sendList = sendList.add("flowControl/BL", testBufLimit)
- sendList = sendList.add("flowControl/MRR", testBufRecharge)
- sendList = sendList.add("flowControl/MRC", costList)
- if p.speer.version >= lpv4 {
- sendList = sendList.add("forkID", &forkID)
- sendList = sendList.add("recentTxLookup", recentTxLookup)
- }
- if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil {
- t.Fatalf("status recv: %v", err)
- }
- if err := p2p.Send(p.app, StatusMsg, sendList); err != nil {
- t.Fatalf("status send: %v", err)
- }
- }
- // close terminates the local side of the peer, notifying the remote protocol
- // manager of termination.
- func (p *testPeer) close() {
- p.app.Close()
- }
- func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) {
- // Create a message pipe to communicate through
- app, net := p2p.MsgPipe()
- // Generate a random id and create the peer
- var id enode.ID
- rand.Read(id[:])
- peer1 := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
- peer2 := newServerPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), app)
- // Start the peer on a new thread
- errc1 := make(chan error, 1)
- errc2 := make(chan error, 1)
- go func() {
- select {
- case <-server.closeCh:
- errc1 <- p2p.DiscQuitting
- case errc1 <- server.handle(peer1):
- }
- }()
- go func() {
- select {
- case <-client.closeCh:
- errc2 <- p2p.DiscQuitting
- case errc2 <- client.handle(peer2):
- }
- }()
- // Ensure the connection is established or exits when any error occurs
- for {
- select {
- case err := <-errc1:
- return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
- case err := <-errc2:
- return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
- default:
- }
- if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
- break
- }
- time.Sleep(50 * time.Millisecond)
- }
- return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil
- }
- type indexerCallback func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)
- // testClient represents a client object for testing with necessary auxiliary fields.
- type testClient struct {
- clock mclock.Clock
- db ethdb.Database
- peer *testPeer
- handler *clientHandler
- chtIndexer *core.ChainIndexer
- bloomIndexer *core.ChainIndexer
- bloomTrieIndexer *core.ChainIndexer
- }
- // newRawPeer creates a new server peer connects to the server and do the handshake.
- func (client *testClient) newRawPeer(t *testing.T, name string, version int, recentTxLookup uint64) (*testPeer, func(), <-chan error) {
- // Create a message pipe to communicate through
- app, net := p2p.MsgPipe()
- // Generate a random id and create the peer
- var id enode.ID
- rand.Read(id[:])
- peer := newServerPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), net)
- // Start the peer on a new thread
- errCh := make(chan error, 1)
- go func() {
- select {
- case <-client.handler.closeCh:
- errCh <- p2p.DiscQuitting
- case errCh <- client.handler.handle(peer):
- }
- }()
- tp := &testPeer{
- app: app,
- net: net,
- speer: peer,
- }
- var (
- genesis = client.handler.backend.blockchain.Genesis()
- head = client.handler.backend.blockchain.CurrentHeader()
- td = client.handler.backend.blockchain.GetTd(head.Hash(), head.Number.Uint64())
- )
- forkID := forkid.NewID(client.handler.backend.blockchain.Config(), genesis.Hash(), head.Number.Uint64())
- tp.handshakeWithClient(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID, testCostList(0), recentTxLookup) // disable flow control by default
- // Ensure the connection is established or exits when any error occurs
- for {
- select {
- case <-errCh:
- return nil, nil, nil
- default:
- }
- if atomic.LoadUint32(&peer.serving) == 1 {
- break
- }
- time.Sleep(50 * time.Millisecond)
- }
- closePeer := func() {
- tp.speer.close()
- tp.close()
- }
- return tp, closePeer, errCh
- }
- // testServer represents a server object for testing with necessary auxiliary fields.
- type testServer struct {
- clock mclock.Clock
- backend *backends.SimulatedBackend
- db ethdb.Database
- peer *testPeer
- handler *serverHandler
- chtIndexer *core.ChainIndexer
- bloomIndexer *core.ChainIndexer
- bloomTrieIndexer *core.ChainIndexer
- }
- // newRawPeer creates a new client peer connects to the server and do the handshake.
- func (server *testServer) newRawPeer(t *testing.T, name string, version int) (*testPeer, func(), <-chan error) {
- // Create a message pipe to communicate through
- app, net := p2p.MsgPipe()
- // Generate a random id and create the peer
- var id enode.ID
- rand.Read(id[:])
- peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
- // Start the peer on a new thread
- errCh := make(chan error, 1)
- go func() {
- select {
- case <-server.handler.closeCh:
- errCh <- p2p.DiscQuitting
- case errCh <- server.handler.handle(peer):
- }
- }()
- tp := &testPeer{
- app: app,
- net: net,
- cpeer: peer,
- }
- var (
- genesis = server.handler.blockchain.Genesis()
- head = server.handler.blockchain.CurrentHeader()
- td = server.handler.blockchain.GetTd(head.Hash(), head.Number.Uint64())
- )
- forkID := forkid.NewID(server.handler.blockchain.Config(), genesis.Hash(), head.Number.Uint64())
- tp.handshakeWithServer(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID)
- // Ensure the connection is established or exits when any error occurs
- for {
- select {
- case <-errCh:
- return nil, nil, nil
- default:
- }
- if atomic.LoadUint32(&peer.serving) == 1 {
- break
- }
- time.Sleep(50 * time.Millisecond)
- }
- closePeer := func() {
- tp.cpeer.close()
- tp.close()
- }
- return tp, closePeer, errCh
- }
- // testnetConfig wraps all the configurations for testing network.
- type testnetConfig struct {
- blocks int
- protocol int
- indexFn indexerCallback
- ulcServers []string
- ulcFraction int
- simClock bool
- connect bool
- nopruning bool
- }
- func newClientServerEnv(t *testing.T, config testnetConfig) (*testServer, *testClient, func()) {
- var (
- sdb = rawdb.NewMemoryDatabase()
- cdb = rawdb.NewMemoryDatabase()
- speers = newServerPeerSet()
- )
- var clock mclock.Clock = &mclock.System{}
- if config.simClock {
- clock = &mclock.Simulated{}
- }
- dist := newRequestDistributor(speers, clock)
- rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 })
- odr := NewLesOdr(cdb, light.TestClientIndexerConfig, speers, rm)
- sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig, true)
- cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, config.nopruning)
- scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2]
- ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2]
- odr.SetIndexers(ccIndexer, cbIndexer, cbtIndexer)
- server, b, serverClose := newTestServerHandler(config.blocks, sindexers, sdb, clock)
- client, clientClose := newTestClientHandler(b, odr, cIndexers, cdb, speers, config.ulcServers, config.ulcFraction)
- scIndexer.Start(server.blockchain)
- sbIndexer.Start(server.blockchain)
- ccIndexer.Start(client.backend.blockchain)
- cbIndexer.Start(client.backend.blockchain)
- if config.indexFn != nil {
- config.indexFn(scIndexer, sbIndexer, sbtIndexer)
- }
- var (
- err error
- speer, cpeer *testPeer
- )
- if config.connect {
- done := make(chan struct{})
- client.syncEnd = func(_ *types.Header) { close(done) }
- cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client)
- if err != nil {
- t.Fatalf("Failed to connect testing peers %v", err)
- }
- select {
- case <-done:
- case <-time.After(10 * time.Second):
- t.Fatal("test peer did not connect and sync within 3s")
- }
- }
- s := &testServer{
- clock: clock,
- backend: b,
- db: sdb,
- peer: cpeer,
- handler: server,
- chtIndexer: scIndexer,
- bloomIndexer: sbIndexer,
- bloomTrieIndexer: sbtIndexer,
- }
- c := &testClient{
- clock: clock,
- db: cdb,
- peer: speer,
- handler: client,
- chtIndexer: ccIndexer,
- bloomIndexer: cbIndexer,
- bloomTrieIndexer: cbtIndexer,
- }
- teardown := func() {
- if config.connect {
- speer.close()
- cpeer.close()
- cpeer.cpeer.close()
- speer.speer.close()
- }
- ccIndexer.Close()
- cbIndexer.Close()
- scIndexer.Close()
- sbIndexer.Close()
- dist.close()
- serverClose()
- b.Close()
- clientClose()
- }
- return s, c, teardown
- }
- // NewFuzzerPeer creates a client peer for test purposes, and also returns
- // a function to close the peer: this is needed to avoid goroutine leaks in the
- // exec queue.
- func NewFuzzerPeer(version int) (p *clientPeer, closer func()) {
- p = newClientPeer(version, 0, p2p.NewPeer(enode.ID{}, "", nil), nil)
- return p, func() { p.peerCommons.close() }
- }
|