Getting started with the Python SDK
After you installed the Python SDK, you can start indexing blockchain data.
The SDK provides two ways to interact with streams:
- using the high-level
Indexer
andIndexerRunner
components. The SDK provides an indexer implementation that uses MongoDB for storage. - using the low-level
StreamService
to interact directly with the stream. This choice is the best for teams that want maximum control over their indexer or that are building non-traditional applications.
Indexer
and IndexerRunner
#
Developers can implement the Indexer
interface and then run it with the IndexerRunner
object.
The Indexer
methods to implement are:
indexer_id(self) -> str
: unique indexer id, used when storing the indexer state to Mongo.initial_configuration(self) -> IndexerConfiguration
: returns the initial stream configuration. Here you can specify the data your application requires.handle_data(self, info: Info, data: Data)
: handle a new message from the stream.handle_pending_data(self, info: Info, data: Data)
: handle a new message from the stream, called with pending data.handle_invalidate(self, info: Info, cursor: Cursor)
: handle a chain reorganization.cursor
refers to the highest (most recent) block in common between the old chain and the new chain.handle_reconnect(self, exc: Exception, retry_count: int) -> Reconnect
: called when the indexer disconnects from the stream because of the given exception. You can use this callback to control reconnection behavior.encode_filter(self, filter: Filter) -> bytes
: encode filter.decode_data(self, raw: bytes) -> Data
: decode raw data.
Since all applications that use StarkNet data need to encode the filter and
decode data in the same way, we provide a apibara.starknet.StarkNetIndexer
class that is
already configured.
from apibara.indexer import Infofrom apibara.starknet import EventFilter, Filter, StarkNetIndexer, feltfrom apibara.protocol.proto.stream_pb2 import Cursor, DataFinalityfrom apibara.starknet.cursor import starknet_cursorfrom apibara.starknet.proto.starknet_pb2 import Blockcontract_address = felt.from_hex("0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7")# `Transfer` selector.# You can get this value either with starknet.py's `ContractFunction.get_selector`# or from starkscan.transfer_key = felt.from_hex("0x99cd8bde557814842a3121e8ddfd433a539b8c9f14bf31ebf108d12e6196e9")class ExampleIndexer(StarkNetIndexer):def indexer_id(self) -> str:return "starknet-example"def initial_configuration(self) -> Filter:# Return initial configuration of the indexer.return IndexerConfiguration(filter=Filter().add_event(EventFilter().with_from_address(contract_address).with_keys([transfer_key])),starting_cursor=starknet_cursor(10_000),finality=DataFinality.DATA_STATUS_FINALIZED,)async def handle_data(self, info: Info, data: Block):# Handle one block of datafor event_with_tx in data.events:print(event_with_tx.event)async def handle_invalidate(self, _info: Info, _cursor: Cursor):print("Chain reorganization")
Configuration#
The IndexerRunnerConfiguration
object is used to store the indexer configuration.
The options are:
stream_url
: the URL of the stream,stream_ssl
: connect to the stream using HTTP/2 SSL,storage_url
: a MongoDB connection string to store the indexer state and the application data.
The following configuration is used to connect to the StarkNet Goerli stream and store data to a MongoDB instance running locally.
from apibara.indexer import IndexerRunnerConfigurationconfig = IndexerRunnerConfiguration(stream_url="goerli.starknet.a5a.ch:443",apibara_ssl=True,storage_url="mongodb://apibara:apibara@localhost:27017",),
Starting the indexer#
Start the indexer by passing an instance of your indexer class to the runner.
The IndexerRunner
constructor accepts a reset_state
flag used to restart the
indexer from the beginning.
from apibara.indexer import IndexerRunnerrunner = IndexerRunner(config=config, reset_state=True)# ctx can be accessed by the callbacks in `info`.await runner.run(ExampleIndexer(),ctx={"network": "starknet-mainnet"})
Filtering data#
Streams include only the data needed by your application, this is achieved by specifying filters. Filters are stream-specific, refer to the reference to learn more about the data available for your chain.
Chain-aware database storage#
The Python SDK provides an abstraction over storage to store events together with information about the block they were generated at. The SDK automatically deletes data in response to chain reorganizations, so that you don't need to handle the low-level details yourself.
Refer to the API reference to learn more about the Storage
interface.