apibara.protocol
This module contains the StreamService
class used to connect to data streams directly.
StreamService#
Creates a StreamService
using the provided gRPC async channel. The SDK supports both secure
and insecure channels:
from grpc import ssl_channel_credentialsfrom grpc.aio import secure_channelchannel = secure_channel("mainnet.starknet.a5a.ch", ssl_channel_credentials())# orchannel = insecure_channel("localhost:7171")(client, stream) = StreamService(channel).stream_data()
class StreamService:"""An Apibara Stream service.Arguments---------channel: grpc.aio.Channelthe grpc channel"""def __init__(self, channel: Channel) -> None:...def stream_data(self) -> Tuple[StreamClient, StreamIter]:...
StreamClient#
The StreamClient
is used to configure the data stream. Call configure
to
update the current stream configuration.
(client, stream) = StreamService(channel).stream_data()await client.configure(filter=my_filter)
class StreamClient:async def configure(self,*,filter: Optional[bytes] = None,batch_size: Optional[int] = None,finality: Optional[DataFinality.ValueType] = None,cursor: Optional[Cursor] = None):...
StreamIter#
StreamIter
is used to iterate over the messages in the stream.
(client, stream) = StreamService(channel).stream_data()async for message in stream:if message.data is not None:# data messagepasselif message.invalidate is not None:# invalidate messagepass
class StreamIter:async def __anext__(self):...