subscriptions.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package extension
  2. import (
  3. "github.com/ethereum/go-ethereum"
  4. "github.com/ethereum/go-ethereum/core/types"
  5. "github.com/ethereum/go-ethereum/ethclient"
  6. "github.com/ethereum/go-ethereum/log"
  7. "github.com/ethereum/go-ethereum/node"
  8. "github.com/ethereum/go-ethereum/private"
  9. )
  10. type subscriptionHandler struct {
  11. facade ManagementContractFacade
  12. client Client
  13. service *PrivacyService
  14. }
  15. func NewSubscriptionHandler(node *node.Node, psi types.PrivateStateIdentifier, ptm private.PrivateTransactionManager, service *PrivacyService) *subscriptionHandler {
  16. rpcClient, err := node.AttachWithPSI(psi)
  17. if err != nil {
  18. panic("extension: could not connect to ethereum client rpc")
  19. }
  20. client := ethclient.NewClientWithPTM(rpcClient, ptm)
  21. return &subscriptionHandler{
  22. facade: NewManagementContractFacade(client),
  23. client: NewInProcessClient(client),
  24. service: service,
  25. }
  26. }
  27. func (handler *subscriptionHandler) createSub(query ethereum.FilterQuery, logHandlerCb func(types.Log)) error {
  28. incomingLogs, subscription, err := handler.client.SubscribeToLogs(query)
  29. if err != nil {
  30. return err
  31. }
  32. go func() {
  33. stopChan, stopSubscription := handler.service.subscribeStopEvent()
  34. defer stopSubscription.Unsubscribe()
  35. for {
  36. select {
  37. case err := <-subscription.Err():
  38. log.Error("Contract extension watcher subscription error", "error", err)
  39. break
  40. case foundLog := <-incomingLogs:
  41. logHandlerCb(foundLog)
  42. case <-stopChan:
  43. return
  44. }
  45. }
  46. }()
  47. return nil
  48. }