Apibara Python SDK

API Reference

Upgrading from v0.5 to v0.6

This guide assumes you are using the Python indexer template.

Add the new version as dependency#

Update the apibara version in pyproject.toml, then run poetry install to update.

diff --git a/pyproject.toml b/pyproject.toml
index f3a5f2d..19f536d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,7 +9,7 @@ indexer = "indexer.main:cli"
[tool.poetry.dependencies]
python = ">3.7.2,<3.10"
-apibara = "^0.5.0"
+apibara = { version = "^0.6.0", extras = ["indexer"] }
click = "^8.1.3"
"starknet.py" = "^0.13.0"
pymongo = {extras = ["srv"], version = "^4.1.1"}

Update the stream URLs#

The new version of the protocol is hosted at a different location. Update the URL in main.py:

diff --git a/src/indexer/main.py b/src/indexer/main.py
index 421498a..99c50b5 100644
--- a/src/indexer/main.py
+++ b/src/indexer/main.py
@@ -29,9 +29,10 @@ def cli():
async def start(server_url, mongo_url, restart):
"""Start the Apibara indexer."""
if server_url is None:
- server_url = "goerli.starknet.stream.apibara.com"
+ server_url = "mainnet.starknet.a5a.ch"
if mongo_url is None:
mongo_url = "mongodb://apibara:apibara@localhost:27017"
await run_indexer(
restart=restart,
server_url=server_url,

Update the event handlers#

Event handlers are gone. Now you implement the Indexer (or StarkNetIndexer if indexing StarkNet) interface. This interface is also responsible for providing the indexer id and configuration.

diff --git a/src/indexer/indexer.py b/src/indexer/indexer.py
index 3b7e37e..85f2b6d 100644
--- a/src/indexer/indexer.py
+++ b/src/indexer/indexer.py
@@ -1,53 +1,72 @@
-from apibara import EventFilter, IndexerRunner, Info, NewEvents
-from apibara.indexer import IndexerRunnerConfiguration
+import logging
-indexer_id = "my-indexer"
+from apibara.indexer import IndexerRunner, IndexerRunnerConfiguration, Info
+from apibara.indexer.indexer import IndexerConfiguration
+from apibara.protocol.proto.stream_pb2 import Cursor, DataFinality
+from apibara.starknet import EventFilter, Filter, StarkNetIndexer, felt
+from apibara.starknet.cursor import starknet_cursor
+from apibara.starknet.proto.starknet_pb2 import Block
+# Print apibara logs
+root_logger = logging.getLogger("apibara")
+# change to `logging.DEBUG` to print more information
+root_logger.setLevel(logging.INFO)
+root_logger.addHandler(logging.StreamHandler())
-async def handle_events(info: Info, block_events: NewEvents):
- """Handle a group of events grouped by block."""
- print(f"Received events for block {block_events.block.number}")
- for event in block_events.events:
- print(event)
+briqs_address = felt.from_hex(
+ "0x01435498bf393da86b4733b9264a86b58a42b31f8d8b8ba309593e5c17847672"
+)
- events = [
- {"address": event.address, "data": event.data, "name": event.name}
- for event in block_events.events
- ]
+# `Transfer` selector.
+# You can get this value either with starknet.py's `ContractFunction.get_selector`
+# or from starkscan.
+transfer_key = felt.from_hex(
+ "0x99cd8bde557814842a3121e8ddfd433a539b8c9f14bf31ebf108d12e6196e9"
+)
- # Insert multiple documents in one call.
- await info.storage.insert_many("events", events)
+class BriqIndexer(StarkNetIndexer):
+ def indexer_id(self) -> str:
+ return "starknet-example"
+
+ def initial_configuration(self) -> Filter:
+ # Return initial configuration of the indexer.
+ return IndexerConfiguration(
+ filter=Filter().add_event(
+ EventFilter().with_from_address(briqs_address).with_keys([transfer_key])
+ ),
+ starting_cursor=starknet_cursor(10_000),
+ finality=DataFinality.DATA_STATUS_FINALIZED,
+ )
+
+ async def handle_data(self, info: Info, data: Block):
+ # Handle one block of data
+ for event_with_tx in data.events:
+ tx_hash = felt.to_hex(event_with_tx.transaction.meta.hash)
+ event = event_with_tx.event
+
+ from_addr = felt.to_hex(event.data[0])
+ to_addr = felt.to_hex(event.data[1])
+ token_id = felt.to_int(event.data[2]) + (felt.to_int(event.data[3]) << 128)
+ print("Transfer")
+ print(f" Tx Hash: {tx_hash}")
+ print(f" From: {from_addr}")
+ print(f" To: {to_addr}")
+ print(f" Token ID: {token_id}")
+ print()
+
+ async def handle_invalidate(self, _info: Info, _cursor: Cursor):
+ raise ValueError("data must be finalized")

Update field element encoding and decoding#

The previous version of the indexer encoded field elements (felt) as bytes. The new version encodes them in a more efficient format. The SDK also provides helper functions to encode and decode field elements from hex strings and integers.

Update the indexer runner#

The new IndexerRunner is now simpler. The new version connects to a stream and runs the provided indexer over it.

diff --git a/src/indexer/indexer.py b/src/indexer/indexer.py
index 3b7e37e..85f2b6d 100644
--- a/src/indexer/indexer.py
+++ b/src/indexer/indexer.py
@@ -1,53 +1,72 @@
-async def run_indexer(server_url=None, mongo_url=None, restart=None):
- print("Starting Apibara indexer")
+async def run_indexer(server_url=None, mongo_url=None, restart=None):
runner = IndexerRunner(
config=IndexerRunnerConfiguration(
- apibara_url=server_url,
- apibara_ssl=True,
+ stream_url=server_url,
storage_url=mongo_url,
),
reset_state=restart,
- indexer_id=indexer_id,
- new_events_handler=handle_events,
- )
-
- # Create the indexer if it doesn't exist on the server,
- # otherwise it will resume indexing from where it left off.
- #
- # For now, this also helps the SDK map between human-readable
- # event names and StarkNet events.
- runner.add_event_filters(
- filters=[
- EventFilter.from_event_name(
- name="Transfer",
- address="0x0266b1276d23ffb53d99da3f01be7e29fa024dd33cd7f7b1eb7a46c67891c9d0",
- )
- ],
- index_from_block=201_000,
)
- print("Initialization completed. Entering main loop.")
-
- await runner.run()
+ # ctx can be accessed by the callbacks in `info`.
+ await runner.run(BriqIndexer(), ctx={"network": "starknet-mainnet"})

Edit on GitHub