apibara.indexer
This module contains classes to index events contained in blocks streamed from an Apibara stream.
Indexer#
The Indexer
interface is used to implement the behavior of a general-purpose indexer.
Refer to the getting started page to learn
how to use Indexer
and IndexerRunner
.
Since encoding filters and decoding data is common between all indexers on a network, we provide the following specialized interfaces:
- StarkNet:
apibara.starknet.StarkNetIndexer
class Indexer(Generic[Filter, Data], metaclass=ABCMeta):@abstractmethoddef indexer_id(self) -> str:...@abstractmethoddef initial_configuration(self) -> IndexerConfiguration[Filter]:...@abstractmethoddef encode_filter(self, filter: Filter) -> bytes:...@abstractmethoddef decode_data(self, raw: bytes) -> Data:...@abstractmethodasync def handle_data(self, info: Info[UserContext, Filter], data: Data):...async def handle_pending_data(self, info: Info[UserContext, Filter], data: Data):...async def handle_invalidate(self, info: Info[UserContext, Filter], cursor: Cursor):...async def handle_reconnect(self, exc: Exception, retry_count: int) -> Reconnect:...
IndexerRunner#
The IndexerRunner
class is used to run an indexer over a stream of data.
The ctx
object passed ot the run
method is added to the Info
parameter.
class IndexerRunner(Generic[UserContext, Filter]):"""Run an indexer, listening for new data and calling the provided callbacks.Parameters----------reset_state:flag to restart the indexer from the beginning.config:options to set the input stream and connection string.client_options:list of options passed to the gRPC channel."""def __init__(self,*,reset_state: bool = False,config: Optional[IndexerRunnerConfiguration] = None,client_options: Optional[List[Tuple[str, Any]]] = None,) -> None:...async def run(self, indexer: Indexer, *, ctx: Optional[UserContext] = None):"""Run the indexer until stopped."""
IndexerRunnerConfiguration#
@dataclassclass IndexerRunnerConfiguration:"""IndexerRunner configuration.Parameters----------stream_url:url of the Apibara stream.stream_ssl:flag to connect using SSL.storage_url:MongoDB connection string, used to store the indexer data and state."""stream_url: Optional[str] = Nonestream_ssl: bool = Truestorage_url: Optional[str] = None
Info#
The Info
object is used to share data between multiple invocations of the
event handler.
Storage#
The Storage
class implements chain-aware database storage.
class Storage:async def insert_one(self, collection: str, doc: Document):"""Insert `doc` into `collection`."""async def insert_many(self, collection: str, docs: Iterable[Document]):"""Insert multiple `docs` into `collection`."""async def delete_one(self, collection: str, filter: Filter):""""Delete the first document in `collection` matching `filter`."""async def delete_many(self, collection: str, filter: Filter):"""Delete all documents in `collection` matching `filter`."""async def find_one(self, collection: str, filter: Filter) -> Optional[Document]:"""Find the first document in `collection` matching `filter`."""async def find(self,collection: str,filter: Filter,sort: Optional[dict[str, int]] = None,projection: Optional[Projection] = None,skip: int = 0,limit: int = 0,) -> Iterable[dict]:"""Find all documents in `collection` matching `filter`.Arguments---------- `collection`: the collection,- `filter`: the filter,- `sort`: keys used for sorting, e.g. `{"a": -1}` sorts documents by key `a` in descending order,- `project`: filter document keys to reduce the document size,- `skip`: number of documents to skip,- `limit`: maximum number of documents returned."""async def find_one_and_replace(self,collection: str,filter: Filter,replacement: Document,upsert: bool = False,):"""Replace the first document in `collection` matching `filter` with `replacement`.If `upsert = True`, insert `replacement` even if no document matched the `filter`."""async def find_one_and_update(self, collection: str, filter: Filter, update: Update):"""Update the first document in `collection` matching `filter` with `update`."""