Getting Started

Overview

apibara.indexer

This module contains classes to index events contained in blocks streamed from an Apibara stream.

Indexer#

The Indexer interface is used to implement the behavior of a general-purpose indexer. Refer to the getting started page to learn how to use Indexer and IndexerRunner.

Since encoding filters and decoding data is common between all indexers on a network, we provide the following specialized interfaces:

class Indexer(Generic[Filter, Data], metaclass=ABCMeta):
@abstractmethod
def indexer_id(self) -> str:
...
@abstractmethod
def initial_configuration(self) -> IndexerConfiguration[Filter]:
...
@abstractmethod
def encode_filter(self, filter: Filter) -> bytes:
...
@abstractmethod
def decode_data(self, raw: bytes) -> Data:
...
@abstractmethod
async def handle_data(self, info: Info[UserContext, Filter], data: Data):
...
async def handle_pending_data(self, info: Info[UserContext, Filter], data: Data):
...
async def handle_invalidate(self, info: Info[UserContext, Filter], cursor: Cursor):
...
async def handle_reconnect(self, exc: Exception, retry_count: int) -> Reconnect:
...

IndexerRunner#

The IndexerRunner class is used to run an indexer over a stream of data. The ctx object passed ot the run method is added to the Info parameter.

class IndexerRunner(Generic[UserContext, Filter]):
"""Run an indexer, listening for new data and calling the provided callbacks.
Parameters
----------
reset_state:
flag to restart the indexer from the beginning.
config:
options to set the input stream and connection string.
client_options:
list of options passed to the gRPC channel.
"""
def __init__(
self,
*,
reset_state: bool = False,
config: Optional[IndexerRunnerConfiguration] = None,
client_options: Optional[List[Tuple[str, Any]]] = None,
) -> None:
...
async def run(
self, indexer: Indexer, *, ctx: Optional[UserContext] = None
):
"""Run the indexer until stopped."""

IndexerRunnerConfiguration#

@dataclass
class IndexerRunnerConfiguration:
"""IndexerRunner configuration.
Parameters
----------
stream_url:
url of the Apibara stream.
stream_ssl:
flag to connect using SSL.
storage_url:
MongoDB connection string, used to store the indexer data and state.
"""
stream_url: Optional[str] = None
stream_ssl: bool = True
storage_url: Optional[str] = None

Info#

The Info object is used to share data between multiple invocations of the event handler.

Storage#

The Storage class implements chain-aware database storage.

class Storage:
async def insert_one(self, collection: str, doc: Document):
"""Insert `doc` into `collection`."""
async def insert_many(self, collection: str, docs: Iterable[Document]):
"""Insert multiple `docs` into `collection`."""
async def delete_one(self, collection: str, filter: Filter):
""""Delete the first document in `collection` matching `filter`."""
async def delete_many(self, collection: str, filter: Filter):
"""Delete all documents in `collection` matching `filter`."""
async def find_one(self, collection: str, filter: Filter) -> Optional[Document]:
"""Find the first document in `collection` matching `filter`."""
async def find(
self,
collection: str,
filter: Filter,
sort: Optional[dict[str, int]] = None,
projection: Optional[Projection] = None,
skip: int = 0,
limit: int = 0,
) -> Iterable[dict]:
"""Find all documents in `collection` matching `filter`.
Arguments
---------
- `collection`: the collection,
- `filter`: the filter,
- `sort`: keys used for sorting, e.g. `{"a": -1}` sorts documents by key `a` in descending order,
- `project`: filter document keys to reduce the document size,
- `skip`: number of documents to skip,
- `limit`: maximum number of documents returned.
"""
async def find_one_and_replace(
self,
collection: str,
filter: Filter,
replacement: Document,
upsert: bool = False,
):
"""Replace the first document in `collection` matching `filter` with `replacement`.
If `upsert = True`, insert `replacement` even if no document matched the `filter`.
"""
async def find_one_and_update(
self, collection: str, filter: Filter, update: Update
):
"""Update the first document in `collection` matching `filter` with `update`."""

Edit on GitHub