Apibara Typescript SDK

API Reference

@apibara/protocol

This package contains the client used to stream data from Apibara.

Installation#

Install the package from npm:

npm add @apibara/protocol@0.3

Example#

This example shows how to connect to the StarkNet Goerli stream and print all messages received. To learn more how you can parse the StarkNet-specific data, look at the @apibara/starknet package documentation.

import { NodeClient, credentials } from '@apibara/protocol'
async function main() {
const node = new NodeClient('goerli.starknet.stream.apibara.com:443', credentials.createSsl())
const messages = node.streamMessages({})
// return a promise that resolves only when the
// stream finishes or it receives an error.
return new Promise((resolve, reject) => {
messages.on('end', resolve)
messages.on('error', reject)
messages.on('data', (data) => {
// handle message from server.
console.log(data)
})
})
}

NodeClient#

The NodeClient connects to the Apibara gRPC server and streams data from it.

constructor#

constructor(
address: string,
credentials: ChannelCredentials,
options?: ClientOptions
)

Creates a new NodeClient.

Parameters

  • address: the stream address, for example goerli.starknet.stream.apibara.com:443.
  • credentials: credentials used to connect to the server. Usually credentials.createSsl().
  • options: gRPC client options. See @grpc/grpc-js for a list of options.

streamMessages#

public streamMessages(
{ startingSequence }: StreamMessagesRequest,
options?: StreamMessagesOptions
): ClientReadableStream<StreamMessagesResponse__Output>

Start streaming messages from Apibara.

Parameters

  • startingSequence: the sequence number of the first block to stream. Use this to resume streaming after disconnect.
  • options: stream options:
    • reconnect: when set to true, the stream will automatically try to reconnect on error.
    • onRetry: callback to control the stream reconnection behavior.

Reconnecting on error#

The streamMessages method can, optionally, reconnect to the stream on error. This behavior is useful if you're working with a less reliable connection where disconnection can happen frequently.

To opt-in into reconnecting, you need to specify the reconnect option.

const messages = node.streamMessages(
{ startingSequence: 123_000 },
{ reconnect: true, onRetry: myOnRetry }
)

The onRetry argument is used to control how the stream reconnects on error. For example, the following function stops reconnecting after 3 retries, with a 5 seconds delay in between retries and starting from the specified sequence number.

function myOnRetry(retryCount: number) {
const retry = retryCount < 3
return { retry, delay: 5, startingSequence: 123_000 }
}

The default retry behavior is to retry up to 3 times, with a delay between retries of 5 seconds and starting from the sequence number after the last received message.

The following example shows how to handle reconnection in a real-world indexer.

import { NodeClient, credentials, proto } from '@apibara/protocol'
class AppIndexer {
// protobuf encodes possibly-large numbers as strings
private currentSequence?: string
handleData(data: proto.Data__Output) {
console.log(`[data] sequence=${data.sequence}`)
// track sequence number for reconnecting later
this.currentSequence = data.sequence
}
handleInvalidate(invalidate: proto.Invalidate__Output) {
console.log(`[invalidate] sequence=${invalidate.sequence}`)
this.currentSequence = invalidate.sequence
}
onRetry(retryCount: number) {
// retry connecting up to 3 times, with a delay of 5 seconds in between
// retries.
// Start from the sequence number _following_ the last received message.
const retry = retryCount < 3
const startingSequence = this.currentSequence ? +this.currentSequence + 1 : undefined
console.log(`[retry] retry=${retry ? 'yes' : 'no'}, startingSequence=${startingSequence}`)
return { retry, delay: 5, startingSequence }
}
}
async function main() {
const indexer = new AppIndexer()
const node = new NodeClient('goerli.starknet.stream.apibara.com:443', credentials.createSsl())
const messages = node.streamMessages(
{ startingSequence: 100_000 },
{
reconnect: true,
onRetry: indexer.onRetry,
}
)
return new Promise((resolve, reject) => {
messages.on('end', resolve)
messages.on('error', reject)
messages.on('data', (msg: StreamMessagesResponse__Output) => {
if (msg.data) {
indexer.handleData(msg.data)
} else if (msg.invalidate) {
indexer.handleInvalidate(msg.invalidate)
}
})
})
}

status#

async status(): Promise<StatusResponse__Output | undefined>

Returns the indexer status. This is used to check the latest block indexed by the stream.