Getting Started



This module contains classes to index events contained in blocks streamed from an Apibara stream.


The Indexer interface is used to implement the behavior of a general-purpose indexer. Refer to the getting started page to learn how to use Indexer and IndexerRunner.

Since encoding filters and decoding data is common between all indexers on a network, we provide the following specialized interfaces:

class Indexer(Generic[Filter, Data], metaclass=ABCMeta):
def indexer_id(self) -> str:
def initial_configuration(self) -> IndexerConfiguration[Filter]:
def encode_filter(self, filter: Filter) -> bytes:
def decode_data(self, raw: bytes) -> Data:
async def handle_data(self, info: Info[UserContext, Filter], data: Data):
async def handle_pending_data(self, info: Info[UserContext, Filter], data: Data):
async def handle_invalidate(self, info: Info[UserContext, Filter], cursor: Cursor):
async def handle_reconnect(self, exc: Exception, retry_count: int) -> Reconnect:


The IndexerRunner class is used to run an indexer over a stream of data. The ctx object passed ot the run method is added to the Info parameter.

class IndexerRunner(Generic[UserContext, Filter]):
"""Run an indexer, listening for new data and calling the provided callbacks.
flag to restart the indexer from the beginning.
options to set the input stream and connection string.
list of options passed to the gRPC channel.
def __init__(
reset_state: bool = False,
config: Optional[IndexerRunnerConfiguration] = None,
client_options: Optional[List[Tuple[str, Any]]] = None,
) -> None:
async def run(
self, indexer: Indexer, *, ctx: Optional[UserContext] = None
"""Run the indexer until stopped."""


class IndexerRunnerConfiguration:
"""IndexerRunner configuration.
url of the Apibara stream.
flag to connect using SSL.
MongoDB connection string, used to store the indexer data and state.
stream_url: Optional[str] = None
stream_ssl: bool = True
storage_url: Optional[str] = None


The Info object is used to share data between multiple invocations of the event handler.


The Storage class implements chain-aware database storage.

class Storage:
async def insert_one(self, collection: str, doc: Document):
"""Insert `doc` into `collection`."""
async def insert_many(self, collection: str, docs: Iterable[Document]):
"""Insert multiple `docs` into `collection`."""
async def delete_one(self, collection: str, filter: Filter):
""""Delete the first document in `collection` matching `filter`."""
async def delete_many(self, collection: str, filter: Filter):
"""Delete all documents in `collection` matching `filter`."""
async def find_one(self, collection: str, filter: Filter) -> Optional[Document]:
"""Find the first document in `collection` matching `filter`."""
async def find(
collection: str,
filter: Filter,
sort: Optional[dict[str, int]] = None,
projection: Optional[Projection] = None,
skip: int = 0,
limit: int = 0,
) -> Iterable[dict]:
"""Find all documents in `collection` matching `filter`.
- `collection`: the collection,
- `filter`: the filter,
- `sort`: keys used for sorting, e.g. `{"a": -1}` sorts documents by key `a` in descending order,
- `project`: filter document keys to reduce the document size,
- `skip`: number of documents to skip,
- `limit`: maximum number of documents returned.
async def find_one_and_replace(
collection: str,
filter: Filter,
replacement: Document,
upsert: bool = False,
"""Replace the first document in `collection` matching `filter` with `replacement`.
If `upsert = True`, insert `replacement` even if no document matched the `filter`.
async def find_one_and_update(
self, collection: str, filter: Filter, update: Update
"""Update the first document in `collection` matching `filter` with `update`."""

Edit on GitHub