DocumentationBlog

Application Protocol

Apibara stream nodes are built on top of the streaming protocol. They abstract away the boilerplate required to build a node and provide a simpler interface for developers to build their application-specific indexers.

At its core, an indexer is an application that receives a continuous stream of messages and runs user-defined code on each message. Indexers can be used to accumulate state by storing and updating data in a database. In Apibara, indexers can also create a new stream of derived data that other indexers can use.

Apibara nodes take care of:

  • Streaming data from the indexer’s input streams. At the moment indexers can only stream data from one source at the time, but in the future they will support indexing multiple input streams.
  • Invoking the user’s code for each message received.
  • Persist messages generated by the user’s indexer.
  • Handle requests from clients to stream the messages generated by the indexer.
  • Handle chain reorganizations by invalidating data. The node tracks which output messages are generated in response to an input message so that it can automatically remove messages that don’t belong to the new canonical chain after reorganization.

The following diagram shows the components of an Apibara stream node and how data flows in the system.

Apibara application-specific node

The application-specific indexer can be an external process implemented in any programming language, or running in-process using the Rust SDK. This is similar to implementing a Tendermint application using the ABCI interface.

Applications that need to process high-volume of data should use the Rust SDK for best performance.

External application processes communicate with the node through the Application gRPC protocol defined below. Developers should use external applications if they want to:

  • leverage their team’s existing skills.
  • integrate with their existing infrastructure.
  • use specialized libraries, for example Machine Learning frameworks like Pytorch and Tensorflow.

The protocol in detail#

An application must implement the apibara.application.v1alpha1.Application service.

service Application {
rpc Init(InitRequest) returns (InitResponse);
rpc ReceiveData(ReceiveDataRequest) returns (ReceiveDataResponse);
}

Init method#

Called once on the first run, it’s used to configure the application. It must return one input stream the node connects to and the protobuf definition of the output messages.

The method must return one or more input streams and the output definition.

message InitRequest {}
message InitResponse {
repeated InputStream inputs = 1;
OutputStream output = 2;
}

An input stream contains an unique numerical id, the gRPC url of the stream and the starting sequence number.

message InputStream {
uint64 id = 1;
string url = 2;
uint64 starting_sequence = 3;
}

The output stream definition includes the FileDescriptorProto (already serialized to bytes), together with the output protobuf filename and message type.

message OutputStream {
string message_type = 1;
string filename = 2;
bytes file_descriptor_proto = 3;
}

Output stream with tonic#

The OutputStream message is used by the node to setup gRPC reflection correctly. Messages are sent over the wire as bytes and client need some extra information to know how to decode them.

Developers using the Rust SDK can generate the file_descriptor_proto object by instructing tonic to generate a file descriptor set. Your build.rs file should look like the following:

use std::{env, path::PathBuf};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.build_client(false)
.build_server(true)
.file_descriptor_set_path(out_dir.join("transfer_descriptor.bin"))
.compile(&["proto/transfer.proto"], &["proto"])?;
}

Then you can access the serialized content of the file descriptor set like this:

mod pb {
tonic::include_proto!("example.org.v1alpha1");
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("transfer_descriptor");
pub fn transfer_file_descriptor_set() -> &'static [u8] {
FILE_DESCRIPTOR_SET
}
}

Finally, the OutputStream will look like this:

let output = OutputStream {
message_type: "example.org.v1alpha1.Transfer".to_string(),
filename: "transfer.proto".to_string(),
file_descriptor_proto: pb::transfer_file_descriptor_set().to_vec(),
};

Where "example.org.v1alpha1.Transfer" is the qualified name of the message type and transfer.proto is the filename containing the protobuf definition.

ReceiveData method#

Called for each message, it transforms one input message into zero or more output messages.

message ReceiveDataRequest {
uint64 input_id = 1;
uint64 sequence = 2;
google.protobuf.Any data = 3;
}
message ReceiveDataResponse {
repeated google.protobuf.Any data = 1;
}

The request contains the id of the input from which the message comes from, the message sequence number and the raw message content. The method is expected to return zero or more messages.

Notice that the response contains only raw messages without a sequence number: sequence numbers are added to the stream by the node.