DocumentationBlog

apibara.indexer

This module contains classes to index events contained in blocks streamed from an Apibara stream.

IndexerRunner#

The IndexerRunner class is the main entrypoint when implementing an indexer using the Python SDK.

The first step is to create and configure an indexer specifying the unique indexer_id. This value is used when persisting the indexer to storage.

The indexer also requires an event handler callback, invoked with the event data for each block that contains at least one indexed event. The event handler is an async function with the following signature, refer to the indexer.model documentation page for more information about the types.

async def handle_events(info: Info, block_events: NewEvents):

The Info object contains objects that are shared between multiple invocations of the handler. You can add an application-specific context object with the set_context method.

The indexer needs to know the address of the input stream and how to connect to the Mongo database used for storage. Both options can be set passing the IndexerRunnerConfiguration object.

Developers can specify callbacks called for each block indexed (even if it doesn't contain any indexed event) and in case of chain reorganizations. This is done by setting the callbacks with the add_block_handler and add_reorg_handler methods.

Finally, start indexing events by calling the run method. This method connects to the stream server and starts streaming events indefinitely.

NewEventsHandler = Callable[[Info, NewEvents], Awaitable[None]]
BlockHandler = Callable[[Info, NewBlock], Awaitable[None]]
ReorgHandler = Callable[[Info, Reorg], Awaitable[None]]
class IndexerRunner(Generic[UserContext]):
"""Run an indexer, listening for new events and calling the provided callbacks.
Parameters
----------
indexer_id:
unique id of this indexer. Used when persisting state.
new_events_handler:
async function called for each new block containing indexed events.
reset_state:
flag to restart the indexer from the beginning.
config:
options to set the input stream and connection string.
"""
def __init__(
self,
*,
indexer_id: str,
new_events_handler: NewEventsHandler,
reset_state: bool = False,
config: Optional[IndexerRunnerConfiguration] = None,
) -> None:
def add_event_filters(
self,
filters: Optional[List[EventFilter]] = None,
index_from_block: Optional[int] = None,
):
"""Add the initial event filters.
Parameters
----------
filters:
a list of EventFilter
index_from_block:
start indexing from the given block.
"""
def set_context(self, context: UserContext):
"""Set the context object used to share information between handlers."""
def add_block_handler(self, block_handler: BlockHandler) -> None:
"""Add a callback called every time there is a new block."""
self._block_handler = block_handler
def add_reorg_handler(self, reorg_handler: ReorgHandler) -> None:
"""Add a callback called every time there is a chain reorganization."""
async def run(self):
"""Run the indexer until stopped."""

IndexerRunnerConfiguration#

class IndexerRunnerConfiguration:
"""IndexerRunner configuration.
Parameters
----------
apibara_url:
url of the Apibara stream.
apibara_ssl:
flag to connect using SSL.
storage_url:
MongoDB connection string, used to store the indexer data and state.
"""

Info#

The Info object is used to share data between multiple invocations of the event handler.

This object is also used to add dynamic filters to the indexer.

class Info(Generic[UserContext]):
"""State shared between handlers.
Parameters
----------
context:
application-specific context.
storage:
access the chain-aware storage.
"""
context: UserContext
storage: Storage
def add_event_filters(self, filters: List[EventFilter]):
"""Add the provided event filters to the indexer.
The indexer will re-scan the current block for any event
matching the new filters.
"""

Storage#

The Storage class implements chain-aware database storage.

class Storage:
async def insert_one(self, collection: str, doc: Document):
"""Insert `doc` into `collection`."""
async def insert_many(self, collection: str, docs: Iterable[Document]):
"""Insert multiple `docs` into `collection`."""
async def delete_one(self, collection: str, filter: Filter):
""""Delete the first document in `collection` matching `filter`."""
async def delete_many(self, collection: str, filter: Filter):
"""Delete all documents in `collection` matching `filter`."""
async def find_one(self, collection: str, filter: Filter) -> Optional[Document]:
"""Find the first document in `collection` matching `filter`."""
async def find(
self,
collection: str,
filter: Filter,
sort: Optional[dict[str, int]] = None,
projection: Optional[Projection] = None,
skip: int = 0,
limit: int = 0,
) -> Iterable[dict]:
"""Find all documents in `collection` matching `filter`.
Arguments
---------
- `collection`: the collection,
- `filter`: the filter,
- `sort`: keys used for sorting, e.g. `{"a": -1}` sorts documents by key `a` in descending order,
- `project`: filter document keys to reduce the document size,
- `skip`: number of documents to skip,
- `limit`: maximum number of documents returned.
"""
async def find_one_and_replace(
self,
collection: str,
filter: Filter,
replacement: Document,
upsert: bool = False,
):
"""Replace the first document in `collection` matching `filter` with `replacement`.
If `upsert = True`, insert `replacement` even if no document matched the `filter`.
"""
async def find_one_and_update(
self, collection: str, filter: Filter, update: Update
):
"""Update the first document in `collection` matching `filter` with `update`."""