@apibara/protocol
This package contains the client used to stream data from Apibara.
Installation#
Install the package from npm:
npm add @apibara/protocol@0.3
Example#
This example shows how to connect to the StarkNet Goerli stream and print all messages received. To learn more how you can parse the StarkNet-specific data, look at the @apibara/starknet
package documentation.
import { NodeClient, credentials } from '@apibara/protocol'async function main() {const node = new NodeClient('goerli.starknet.stream.apibara.com:443', credentials.createSsl())const messages = node.streamMessages({})// return a promise that resolves only when the// stream finishes or it receives an error.return new Promise((resolve, reject) => {messages.on('end', resolve)messages.on('error', reject)messages.on('data', (data) => {// handle message from server.console.log(data)})})}
NodeClient
#
The NodeClient
connects to the Apibara gRPC server and streams data from it.
constructor#
constructor(address: string,credentials: ChannelCredentials,options?: ClientOptions)
Creates a new NodeClient
.
Parameters
- address: the stream address, for example
goerli.starknet.stream.apibara.com:443
. - credentials: credentials used to connect to the server. Usually
credentials.createSsl()
. - options: gRPC client options. See @grpc/grpc-js for a list of options.
streamMessages#
public streamMessages({ startingSequence }: StreamMessagesRequest,options?: StreamMessagesOptions): ClientReadableStream<StreamMessagesResponse__Output>
Start streaming messages from Apibara.
Parameters
- startingSequence: the sequence number of the first block to stream. Use this to resume streaming after disconnect.
- options: stream options:
- reconnect: when set to
true
, the stream will automatically try to reconnect on error. - onRetry: callback to control the stream reconnection behavior.
- reconnect: when set to
Reconnecting on error#
The streamMessages
method can, optionally, reconnect to the stream on error.
This behavior is useful if you're working with a less reliable connection where
disconnection can happen frequently.
To opt-in into reconnecting, you need to specify the reconnect
option.
const messages = node.streamMessages({ startingSequence: 123_000 },{ reconnect: true, onRetry: myOnRetry })
The onRetry
argument is used to control how the stream reconnects on error.
For example, the following function stops reconnecting after 3 retries, with a 5
seconds delay in between retries and starting from the specified sequence
number.
function myOnRetry(retryCount: number) {const retry = retryCount < 3return { retry, delay: 5, startingSequence: 123_000 }}
The default retry behavior is to retry up to 3 times, with a delay between retries of 5 seconds and starting from the sequence number after the last received message.
The following example shows how to handle reconnection in a real-world indexer.
import { NodeClient, credentials, proto } from '@apibara/protocol'class AppIndexer {// protobuf encodes possibly-large numbers as stringsprivate currentSequence?: stringhandleData(data: proto.Data__Output) {console.log(`[data] sequence=${data.sequence}`)// track sequence number for reconnecting laterthis.currentSequence = data.sequence}handleInvalidate(invalidate: proto.Invalidate__Output) {console.log(`[invalidate] sequence=${invalidate.sequence}`)this.currentSequence = invalidate.sequence}onRetry(retryCount: number) {// retry connecting up to 3 times, with a delay of 5 seconds in between// retries.// Start from the sequence number _following_ the last received message.const retry = retryCount < 3const startingSequence = this.currentSequence ? +this.currentSequence + 1 : undefinedconsole.log(`[retry] retry=${retry ? 'yes' : 'no'}, startingSequence=${startingSequence}`)return { retry, delay: 5, startingSequence }}}async function main() {const indexer = new AppIndexer()const node = new NodeClient('goerli.starknet.stream.apibara.com:443', credentials.createSsl())const messages = node.streamMessages({ startingSequence: 100_000 },{reconnect: true,onRetry: indexer.onRetry,})return new Promise((resolve, reject) => {messages.on('end', resolve)messages.on('error', reject)messages.on('data', (msg: StreamMessagesResponse__Output) => {if (msg.data) {indexer.handleData(msg.data)} else if (msg.invalidate) {indexer.handleInvalidate(msg.invalidate)}})})}
status#
async status(): Promise<StatusResponse__Output | undefined>
Returns the indexer status. This is used to check the latest block indexed by the stream.