DocumentationBlog

Streaming protocol

Apibara defines a streaming protocol that nodes implement to communicate with each other.

A node is a process that streams data from one or more input streams to produce one output stream. It communicates with the application to transform and combine input messages into new outputs messages. The node handles storing messages and streaming them to other nodes that request them.

An application is a process that, for each input message received, produces zero or more output messages. Applications can be implemented in any programming language, in that case they communicate with the node through the gRPC application protocol. Applications can also be implemented in Rust and run as a task inside the node.

The diagram belows shows that an Apibara node is comprised of a core component that handles streaming connections and storage, and the application (implemented with one of the provided SDKs) that transforms messages.

ā€Ž
^ +-------------------+ ^
| | | |
| | Application | | Apibara SDK
| | | |
| +-------------------+ v
| +-------------------+ ^
| | | |
Apibara Node | | Storage | |
| | | |
| +-------------------+ | Apibara Core
| | | |
| | Streaming | |
| | | |
v +-------------------+ v

Streaming data#

In this section we provide an high-level overview of the data streaming through the protocol. The next section contains the lower-level details that are useful for developers that want to contribute a new SDK.

There are two types of messages in the protocol. Data: messages carry new information, for example a new chain block. The streaming protocol is agnostic over the data carried by Data messages, clients should use the reflection service to get the Protobuf definition of the data from each input node.

message Data {
uint64 sequence = 1;
google.protobuf.Any data = 2;
}

Invalidate messages invalidate all messages after the specified message.

message Invalidate {
uint64 sequence = 1;
}

At the core of the protocol is the concept of sequence number:

  • The first message in the stream must be a Data message with sequence 0.
  • Subsequent Data messages must have sequence number immediately following the preceding Data message (message 2 must be followed by message 3, and so on).
  • Invalidate messages reset the stream state. The next Data message must have the same sequence number as the Invalidate message.

If any input stream breaks any of these rules, the node must stop indexing and report the error to the node operator.

The protocol in detail#

A node implementing the Apibara streaming protocol must implement the following services:

  • apibara.node.v1alpha1.Node: the node protocol, more information about it below,
  • grpc.reflection.v1alpha.ServerReflection: gRPC reflection is used to improve developers experience by providing a standard way to access the stream type. Nodes are required to implement it.

The apibara.node.v1alpha1.Node service is defined as:

service Node {
rpc Status(StatusRequest) returns (StatusResponse);
rpc Connect(ConnectRequest) returns (stream ConnectResponse);
}

Status method#

The status method is used to retrieve information about the node syncing status. The request and response type are:

message StatusRequest {
}
message StatusResponse {
oneof message {
SyncingStatus syncing = 1;
SyncedStatus synced = 2;
NotStartedStatus not_started = 3;
}
}

The StatusRequest message is intentionally empty, future versions of the protocol can add fields to it.

The StatusResponse message must contain one of three distinct messages, depending on the node's sync status.

SyncingStatus

The node is not fully-synced yet, data should not be considered "live". The message also contains the syncing status of each individual node's input stream.

// Node is syncing with their sources.
message SyncingStatus {
// The latest sequence number.
uint64 sequence = 1;
// The syncing status of each input.
repeated InputSyncingStatus inputs = 2;
}
// Syncing status of the node's input.
message InputSyncingStatus {
// Latest sequence number.
uint64 head = 1;
// Latest indexed sequenced number.
uint64 indexed = 2;
}

SyncedStatus

The node is fully synced, data streamed from the node is "live".

// Node is fully synced.
message SyncedStatus {
// The latest sequence number.
uint64 sequence = 1;
}

NotStartedStatus

The node has not started syncing yet. This message is usually returned in the few seconds after starting the node for the first time.

// Node didn't start syncing.
message NotStartedStatus {
}

Connect method#

The connect method is used to start streaming messages out of the node.

The ConnectRequest message specifies from which sequence number to start streaming data.

message ConnectRequest {
uint64 starting_sequence = 1;
}

Each of the ConnectResponse messages contain one of the two message types (Data or Invalidate) described at the beginning of this page.

message ConnectResponse {
oneof message {
Invalidate invalidate = 1;
Data data = 2;
}
}

The stream will run indefinitely, until the client disconnects from it.

Source nodes#

Source nodes are nodes that produce messages by ingesting external sources, for example blockchain data.

The job of a source node is to efficiently ingest blockchain data and pre-process it so that it can be streamed by other nodes.

Source nodes are not limited to web3 data, they can ingest web2 (or off-chain) data as well. For example, developers can implement a source nodes that transform HTTP POST requests into stream messages. This pattern is extremely useful when building applications that combine web3 and web2 data, nodes can safely replay HTTP requests on new deployments.