Apibara Python SDK

API Reference

Getting started with the Python SDK

After you installed the Python SDK, you can start indexing blockchain data.

The SDK provides two ways to interact with streams:

  • using the high-level Indexer and IndexerRunner components. The SDK provides an indexer implementation that uses MongoDB for storage.
  • using the low-level StreamService to interact directly with the stream. This choice is the best for teams that want maximum control over their indexer or that are building non-traditional applications.

Indexer and IndexerRunner#

Developers can implement the Indexer interface and then run it with the IndexerRunner object.

The Indexer methods to implement are:

  • indexer_id(self) -> str: unique indexer id, used when storing the indexer state to Mongo.
  • initial_configuration(self) -> IndexerConfiguration: returns the initial stream configuration. Here you can specify the data your application requires.
  • handle_data(self, info: Info, data: Data): handle a new message from the stream.
  • handle_pending_data(self, info: Info, data: Data): handle a new message from the stream, called with pending data.
  • handle_invalidate(self, info: Info, cursor: Cursor): handle a chain reorganization. cursor refers to the highest (most recent) block in common between the old chain and the new chain.
  • handle_reconnect(self, exc: Exception, retry_count: int) -> Reconnect: called when the indexer disconnects from the stream because of the given exception. You can use this callback to control reconnection behavior.
  • encode_filter(self, filter: Filter) -> bytes: encode filter.
  • decode_data(self, raw: bytes) -> Data: decode raw data.

Since all applications that use StarkNet data need to encode the filter and decode data in the same way, we provide a apibara.starknet.StarkNetIndexer class that is already configured.

from apibara.indexer import Info
from apibara.starknet import EventFilter, Filter, StarkNetIndexer, felt
from apibara.protocol.proto.stream_pb2 import Cursor, DataFinality
from apibara.starknet.cursor import starknet_cursor
from apibara.starknet.proto.starknet_pb2 import Block
contract_address = felt.from_hex(
"0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7"
)
# `Transfer` selector.
# You can get this value either with starknet.py's `ContractFunction.get_selector`
# or from starkscan.
transfer_key = felt.from_hex(
"0x99cd8bde557814842a3121e8ddfd433a539b8c9f14bf31ebf108d12e6196e9"
)
class ExampleIndexer(StarkNetIndexer):
def indexer_id(self) -> str:
return "starknet-example"
def initial_configuration(self) -> Filter:
# Return initial configuration of the indexer.
return IndexerConfiguration(
filter=Filter().add_event(
EventFilter().with_from_address(contract_address).with_keys([transfer_key])
),
starting_cursor=starknet_cursor(10_000),
finality=DataFinality.DATA_STATUS_FINALIZED,
)
async def handle_data(self, info: Info, data: Block):
# Handle one block of data
for event_with_tx in data.events:
print(event_with_tx.event)
async def handle_invalidate(self, _info: Info, _cursor: Cursor):
print("Chain reorganization")

Configuration#

The IndexerRunnerConfiguration object is used to store the indexer configuration. The options are:

  • stream_url: the URL of the stream,
  • stream_ssl: connect to the stream using HTTP/2 SSL,
  • storage_url: a MongoDB connection string to store the indexer state and the application data.

The following configuration is used to connect to the StarkNet Goerli stream and store data to a MongoDB instance running locally.

from apibara.indexer import IndexerRunnerConfiguration
config = IndexerRunnerConfiguration(
stream_url="goerli.starknet.a5a.ch:443",
apibara_ssl=True,
storage_url="mongodb://apibara:apibara@localhost:27017",
),

Starting the indexer#

Start the indexer by passing an instance of your indexer class to the runner. The IndexerRunner constructor accepts a reset_state flag used to restart the indexer from the beginning.

from apibara.indexer import IndexerRunner
runner = IndexerRunner(config=config, reset_state=True)
# ctx can be accessed by the callbacks in `info`.
await runner.run(
ExampleIndexer(),
ctx={"network": "starknet-mainnet"}
)

Filtering data#

Streams include only the data needed by your application, this is achieved by specifying filters. Filters are stream-specific, refer to the reference to learn more about the data available for your chain.

Chain-aware database storage#

The Python SDK provides an abstraction over storage to store events together with information about the block they were generated at. The SDK automatically deletes data in response to chain reorganizations, so that you don't need to handle the low-level details yourself.

Refer to the API reference to learn more about the Storage interface.