Source code for helium_py.proto.local

# flake8: noqa
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: service/local.proto
# plugin: python-betterproto
from dataclasses import dataclass
from typing import Dict, List

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase


[docs]@dataclass(eq=False, repr=False) class PubkeyRes(betterproto.Message): address: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class PubkeyReq(betterproto.Message): pass
[docs]@dataclass(eq=False, repr=False) class SignReq(betterproto.Message): data: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class SignRes(betterproto.Message): signature: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class EcdhReq(betterproto.Message): address: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class EcdhRes(betterproto.Message): secret: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class ConfigReq(betterproto.Message): keys: List[str] = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False) class ConfigRes(betterproto.Message): values: List["ConfigValue"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class ConfigValue(betterproto.Message): name: str = betterproto.string_field(1) type: str = betterproto.string_field(2) value: bytes = betterproto.bytes_field(3)
[docs]@dataclass(eq=False, repr=False) class KeyedUri(betterproto.Message): address: bytes = betterproto.bytes_field(1) uri: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False) class HeightReq(betterproto.Message): pass
[docs]@dataclass(eq=False, repr=False) class HeightRes(betterproto.Message): height: int = betterproto.uint64_field(1) block_age: int = betterproto.uint64_field(2) gateway: "KeyedUri" = betterproto.message_field(3)
[docs]class ApiStub(betterproto.ServiceStub): async def pubkey(self) -> "PubkeyRes": request = PubkeyReq() return await self._unary_unary("/helium.local.api/pubkey", request, PubkeyRes) async def sign(self) -> "SignRes": request = SignReq() return await self._unary_unary("/helium.local.api/sign", request, SignRes) async def ecdh(self) -> "EcdhRes": request = EcdhReq() return await self._unary_unary("/helium.local.api/ecdh", request, EcdhRes) async def config(self) -> "ConfigRes": request = ConfigReq() return await self._unary_unary("/helium.local.api/config", request, ConfigRes) async def height(self) -> "HeightRes": request = HeightReq() return await self._unary_unary("/helium.local.api/height", request, HeightRes)
[docs]class ApiBase(betterproto.ServiceStub): async def pubkey(self) -> "PubkeyRes": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) async def sign(self) -> "SignRes": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) async def ecdh(self) -> "EcdhRes": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) async def config(self) -> "ConfigRes": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) async def height(self) -> "HeightRes": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) async def __rpc_pubkey(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.pubkey(**request_kwargs) await stream.send_message(response) async def __rpc_sign(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.sign(**request_kwargs) await stream.send_message(response) async def __rpc_ecdh(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.ecdh(**request_kwargs) await stream.send_message(response) async def __rpc_config(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.config(**request_kwargs) await stream.send_message(response) async def __rpc_height(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.height(**request_kwargs) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/helium.local.api/pubkey": grpclib.const.Handler( self.__rpc_pubkey, grpclib.const.Cardinality.UNARY_UNARY, PubkeyReq, PubkeyRes, ), "/helium.local.api/sign": grpclib.const.Handler( self.__rpc_sign, grpclib.const.Cardinality.UNARY_UNARY, SignReq, SignRes, ), "/helium.local.api/ecdh": grpclib.const.Handler( self.__rpc_ecdh, grpclib.const.Cardinality.UNARY_UNARY, EcdhReq, EcdhRes, ), "/helium.local.api/config": grpclib.const.Handler( self.__rpc_config, grpclib.const.Cardinality.UNARY_UNARY, ConfigReq, ConfigRes, ), "/helium.local.api/height": grpclib.const.Handler( self.__rpc_height, grpclib.const.Cardinality.UNARY_UNARY, HeightReq, HeightRes, ), }