# flake8: noqa
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: service/local.proto
# plugin: python-betterproto
from dataclasses import dataclass
from typing import Dict, List
import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase
[docs]@dataclass(eq=False, repr=False)
class PubkeyRes(betterproto.Message):
address: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class PubkeyReq(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class SignReq(betterproto.Message):
data: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class SignRes(betterproto.Message):
signature: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class EcdhReq(betterproto.Message):
address: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class EcdhRes(betterproto.Message):
secret: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class ConfigReq(betterproto.Message):
keys: List[str] = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class ConfigRes(betterproto.Message):
values: List["ConfigValue"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class ConfigValue(betterproto.Message):
name: str = betterproto.string_field(1)
type: str = betterproto.string_field(2)
value: bytes = betterproto.bytes_field(3)
[docs]@dataclass(eq=False, repr=False)
class KeyedUri(betterproto.Message):
address: bytes = betterproto.bytes_field(1)
uri: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class HeightReq(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class HeightRes(betterproto.Message):
height: int = betterproto.uint64_field(1)
block_age: int = betterproto.uint64_field(2)
gateway: "KeyedUri" = betterproto.message_field(3)
[docs]class ApiStub(betterproto.ServiceStub):
async def pubkey(self) -> "PubkeyRes":
request = PubkeyReq()
return await self._unary_unary("/helium.local.api/pubkey", request, PubkeyRes)
async def sign(self) -> "SignRes":
request = SignReq()
return await self._unary_unary("/helium.local.api/sign", request, SignRes)
async def ecdh(self) -> "EcdhRes":
request = EcdhReq()
return await self._unary_unary("/helium.local.api/ecdh", request, EcdhRes)
async def config(self) -> "ConfigRes":
request = ConfigReq()
return await self._unary_unary("/helium.local.api/config", request, ConfigRes)
async def height(self) -> "HeightRes":
request = HeightReq()
return await self._unary_unary("/helium.local.api/height", request, HeightRes)
[docs]class ApiBase(betterproto.ServiceStub):
async def pubkey(self) -> "PubkeyRes":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def sign(self) -> "SignRes":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def ecdh(self) -> "EcdhRes":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def config(self) -> "ConfigRes":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def height(self) -> "HeightRes":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_pubkey(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.pubkey(**request_kwargs)
await stream.send_message(response)
async def __rpc_sign(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.sign(**request_kwargs)
await stream.send_message(response)
async def __rpc_ecdh(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.ecdh(**request_kwargs)
await stream.send_message(response)
async def __rpc_config(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.config(**request_kwargs)
await stream.send_message(response)
async def __rpc_height(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.height(**request_kwargs)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/helium.local.api/pubkey": grpclib.const.Handler(
self.__rpc_pubkey,
grpclib.const.Cardinality.UNARY_UNARY,
PubkeyReq,
PubkeyRes,
),
"/helium.local.api/sign": grpclib.const.Handler(
self.__rpc_sign,
grpclib.const.Cardinality.UNARY_UNARY,
SignReq,
SignRes,
),
"/helium.local.api/ecdh": grpclib.const.Handler(
self.__rpc_ecdh,
grpclib.const.Cardinality.UNARY_UNARY,
EcdhReq,
EcdhRes,
),
"/helium.local.api/config": grpclib.const.Handler(
self.__rpc_config,
grpclib.const.Cardinality.UNARY_UNARY,
ConfigReq,
ConfigRes,
),
"/helium.local.api/height": grpclib.const.Handler(
self.__rpc_height,
grpclib.const.Cardinality.UNARY_UNARY,
HeightReq,
HeightRes,
),
}