service.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "net"
  19. "strings"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/les/utils"
  23. "github.com/ethereum/go-ethereum/les/vflux"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/p2p/enode"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. )
  28. type (
  29. // Server serves vflux requests
  30. Server struct {
  31. limiter *utils.Limiter
  32. lock sync.Mutex
  33. services map[string]*serviceEntry
  34. delayPerRequest time.Duration
  35. }
  36. // Service is a service registered at the Server and identified by a string id
  37. Service interface {
  38. Handle(id enode.ID, address string, name string, data []byte) []byte // never called concurrently
  39. }
  40. serviceEntry struct {
  41. id, desc string
  42. backend Service
  43. }
  44. )
  45. // NewServer creates a new Server
  46. func NewServer(delayPerRequest time.Duration) *Server {
  47. return &Server{
  48. limiter: utils.NewLimiter(1000),
  49. delayPerRequest: delayPerRequest,
  50. services: make(map[string]*serviceEntry),
  51. }
  52. }
  53. // Register registers a Service
  54. func (s *Server) Register(b Service, id, desc string) {
  55. srv := &serviceEntry{backend: b, id: id, desc: desc}
  56. if strings.Contains(srv.id, ":") {
  57. // srv.id + ":" will be used as a service database prefix
  58. log.Error("Service ID contains ':'", "id", srv.id)
  59. return
  60. }
  61. s.lock.Lock()
  62. s.services[srv.id] = srv
  63. s.lock.Unlock()
  64. }
  65. // Serve serves a vflux request batch
  66. // Note: requests are served by the Handle functions of the registered services. Serve
  67. // may be called concurrently but the Handle functions are called sequentially and
  68. // therefore thread safety is guaranteed.
  69. func (s *Server) Serve(id enode.ID, address string, requests vflux.Requests) vflux.Replies {
  70. reqLen := uint(len(requests))
  71. if reqLen == 0 || reqLen > vflux.MaxRequestLength {
  72. return nil
  73. }
  74. // Note: the value parameter will be supplied by the token sale module (total amount paid)
  75. ch := <-s.limiter.Add(id, address, 0, reqLen)
  76. if ch == nil {
  77. return nil
  78. }
  79. // Note: the limiter ensures that the following section is not running concurrently,
  80. // the lock only protects against contention caused by new service registration
  81. s.lock.Lock()
  82. results := make(vflux.Replies, len(requests))
  83. for i, req := range requests {
  84. if service := s.services[req.Service]; service != nil {
  85. results[i] = service.backend.Handle(id, address, req.Name, req.Params)
  86. }
  87. }
  88. s.lock.Unlock()
  89. time.Sleep(s.delayPerRequest * time.Duration(reqLen))
  90. close(ch)
  91. return results
  92. }
  93. // ServeEncoded serves an encoded vflux request batch and returns the encoded replies
  94. func (s *Server) ServeEncoded(id enode.ID, addr *net.UDPAddr, req []byte) []byte {
  95. var requests vflux.Requests
  96. if err := rlp.DecodeBytes(req, &requests); err != nil {
  97. return nil
  98. }
  99. results := s.Serve(id, addr.String(), requests)
  100. if results == nil {
  101. return nil
  102. }
  103. res, _ := rlp.EncodeToBytes(&results)
  104. return res
  105. }
  106. // Stop shuts down the server
  107. func (s *Server) Stop() {
  108. s.limiter.Stop()
  109. }