DocumentationBlog

How to index smart contract events on StarkNet

This tutorial will teach you how to use Apibara to index smart contract events on StarkNet. We will first program an NFT indexer and then an NFT GraphQL API on top of it. The concepts explained in the tutorial can be used to build the backend for any web3 project, including games, DeFi, analytics and more.

As smart contract developers, we need to expose our data in a format easily accessible by frontend libraries so that our users are delighted by a responsive application that loads fast, even on slower networks.

The main challenge is that StarkNet, like other networks, doesn't provide an API to access a contract's storage like a database. Smart contracts include functions to read specific values from storage, like the owner of an NFT or the ERC-20 balance of an address, but that's not enough if we need more complex queries like a list of NFTs owned by an address or their historical ERC-20 balances. This issue becomes even more critical when we implement smart contracts for complex systems like on-chain games that need to minimize the amount of storage used to reduce transaction costs.

Indexers are tools that listen for the events emitted by one or more (or all!) smart contracts and use them to update their view over the application's state. Indexers start by replaying historical events and using them to compute the current state of the smart contract, and then they only need to update the state when the smart contract emits a new event.

Indexers are combined with an API server to present the data to consumers, like web and mobile applications.

In this tutorial, we're going to show how to write a simple indexer to keep track of the ownership of Briq NFT tokens. The data collected by the indexer is then used to build an API for Briq using Python and Strawberry GraphQL.

Getting started#

Apibara is our tool to index smart contract events on StarkNet and, in the future, other EVM-compatible chains like Ethereum. Apibara runs locally on your machine; its job is to:

  • fetch events from the blockchain and send them to the indexer to index,
  • track the current chain's head and detect chain reorganizations,
  • track and store the indexers' progress.

Let's start by creating a new project using the python indexer template. Follow the instructions in the Python SDK installation guide to create a new, empty indexer project.

Python's packages are default installed globally, creating conflicts between dependencies. The solution to this problem is to create a virtual environment, a version of Python's interpreter and libraries local to your project.

Let's start by creating a virtual environment for our project and installing the required tools to bootstrap the project. Run the following commands from inside the project's folder.

# Create virtual environment
python3 -m venv venv
# Enable virtual environment.
# You will have to repeat this step every time
# you open a new shell in this project.
source venv/bin/activate
# Install Poetry
python3 -m pip install poetry

Let's now install the project's dependencies.

poetry install

First, let's check the directory structure. We can see the standard Python project files and a docker-compose.yml file. The docker-compose file is used to run the Mongo database for development. The project also includes a Dockerfile used to build a production image of the indexer.

The src/indexer directory contains two files:

  • main.py: this is the entry point of the indexer. It creates a CLI command using Click.
  • indexer.py: this contains the indexer's definition. We will look at this file in more detail in this section.

The indexer.py file is the core of the indexer. It defines which block to start indexing (to speed up the overall process, ignoring blocks that we know contain no events), which events to index, and how to index them.

The first function we see is handle_events(info, block_events). The SDK calls this function every time it receives a new or historical block to index. Apibara indexes events batched by block for several reasons:

  • increase performance by working on multiple events at the same time, if possible,
  • creates clear boundaries, a block is either indexed or not,
  • makes it possible to compute block-wide statistics.

The handle_events function, similarly to all other event handlers in Apibara, accepts two parameters:

  • info: Info: this is a context object to share data between handlers. It contains an RPC client configured to the current chain, the chain-aware storage client and a user-defined context object.
  • block_events: NewEvents and block: NewBlock are the handlers' payloads. Their content changes at every invocation and represents the data that needs to be handled by the handler.

Finally, we can see the run_indexer function that registers this indexer with Apibara and starts the indexer itself. As we mentioned at the beginning of the tutorial, Apibara tracks the state of the indexer and restarts indexing where the indexer left off. In this case, the indexer accepts a restart flag used to force the indexer to restart indexing everything from the beginning. The restart flag is helpful during development to quickly test our indexer.

The code then creates a new IndexerRunner object responsible for communicating with Apibara and calling the event handlers for each block received by the server. The unique indexer_id is used to support multiple indexers, all connected to the same Apibara instance.

runner = IndexerRunner(
config=IndexerRunnerConfiguration(
apibara_url=server_url,
storage_url=mongo_url,
),
reset_state=restart,
indexer_id=indexer_id,
new_events_handler=handle_events,
)

The template project indexes all Transfer events emitted by the Briq smart contract. We use the runner.add_event_filters function to register the type of events to index. The function accepts a list of EventFilter. The filters specify which events to index and optionally filter them by the address that emitted them.

runner.add_event_filters(
filters=[
EventFilter.from_event_name(
name="Transfer",
address="0x0266b1276d23ffb53d99da3f01be7e29fa024dd33cd7f7b1eb7a46c67891c9d0",
)
],
index_from_block=201_000,
)

We can now take a closer look at the code that indexes the Transfer events. The code included in the template converts the events to dictionary objects that can be stored in the chain-aware document storage.

await info.storage.insert_many("events", events)

Now that we understand how the indexer works, we can run it and see it print Briq's events.

indexer start

We can also view a list of flags accepted by the command.

indexer start --help

Chain-aware document storage#

This section introduces the concept of chain-aware storage and why it's needed. While understanding how storage works is valuable, you can skip this section for now and jump back later.

Now that the indexer is running, you can connect to the Mongo database using the Mongo shell or a GUI like Mongo Compass. The database is accessible using the mongo://apibara:apibara@localhost:27017 connection URL.

Looking at the databases, you can find a database with the same name as your indexer (in this case, my_indexer). In this database, we can see a list of collections and a special collection used to keep track of the indexer state. Each collection includes one or more documents.

If we inspect the events collection, we see that each document contains a _chain attribute. That attribute associates the block number at which the document was inserted at. The attribute has two fields:

  • valid_from: contains the block in which the document was inserted,
  • valid_to: this field shows until which block the value is valid to. A document that has not been updated has this field set to null.

When the indexer updates a value using storage.find_one_and_update or storage.find_one_and_replace, Apibara does the following behind the scene:

  • Sets the previous value _chain.valid_to to the current block number,
  • Inserts a new document with the updated data. This is now the current version of the document.

When writing Mongo queries to extract data, we should always add the { "_chain.valid_to": null } filter to get the current values only.

When a chain reorganization happens, Apibara can invalidate data by looking at when it was produced. Documents produced at block higher than the new head are deleted, and the older data is restored to the current version.

Indexing StarkNet events#

Now we are ready to start indexing StarkNet events. While all indexers are application specific, the code structure is usually the same:

  • Decode events from the raw representation to your application business model,
  • Decide whether to insert new documents into storage, or update existing entries.

We are going to use starknet.py to decode StarkNet events. Start by initializing the DataTransformer used to decode events, this can be done once at the top-level of your index.py file. Briq is a ERC-721 complaint smart contract, the Transfer events has the following fields:

  • from: felt: the address sending the NFT,
  • to: felt: the address receiving the NFT,
  • token_id: Uint256: the id of the NFT being transferred.
# (Imports Section)
from starknet_py.contract import DataTransformer, identifier_manager_from_abi
# ...
uint256_abi = {
"name": "Uint256",
"type": "struct",
"size": 2,
"members": [
{"name": "low", "offset": 0, "type": "felt"},
{"name": "high", "offset": 1, "type": "felt"},
],
}
transfer_abi = {
"name": "Transfer",
"type": "event",
"keys": [],
"outputs": [
{"name": "from_address", "type": "felt"},
{"name": "to_address", "type": "felt"},
{"name": "token_id", "type": "Uint256"},
],
}
transfer_decoder = DataTransformer(
abi=transfer_abi,
identifier_manager=identifier_manager_from_abi([
transfer_abi, uint256_abi
]),
)

We create a function decode_transfer_event used to decode raw events data to Python objects using the transfer_decoder defined above.

def decode_transfer_event(data):
data = [int.from_bytes(b, "big") for b in data]
return transfer_decoder.to_python(data)

We need one last helper function encode_int_as_bytes, this function is needed because Cairo felt and Uint256 types can represent numbers that are larger than a standard integer. We use this function to convert integers to their raw bytes before storing them in the database.

def encode_int_as_bytes(n):
return n.to_bytes(32, "big")

Now we can start work on the indexing function. Start by fetching information about the block so that we can include a timestamp together with transfer events. This timestamp will be used later when building the GraphQL API.

# (Import Section)
from datetime import datetime
# (Handle Events Section)
async def handle_events(info, block_events):
# (Get Block Section)
block = await info.rpc_client.get_block_by_hash(
block_events.block_hash
)
block_time = datetime.fromtimestamp(block["accepted_time"])

After that, we decode transfer events and store them in the database.

async def handle_events(info, block_events):
# After Get Block Section
# (Store Transfers Section)
transfers = [
decode_transfer_event(event.data)
for event in block_events.events
]
transfers_docs = [
{
"from_address": encode_int_as_bytes(tr.from_address),
"to_address": encode_int_as_bytes(tr.to_address),
"token_id": encode_int_as_bytes(tr.token_id),
"timestamp": block_time,
}
for tr in transfers
]
await info.storage.insert_many("transfers", transfers_docs)

We want to store every token with its current owner so that our API can request all tokens owned by a specific address. Notice that if there are multiple transfers of the same token in a block, we only want to store the final token owner. We use the storage.find_one_and_replace function to replace the data associated with a token if any. The arguments are as follows:

  • "tokens": the collection name. A collection contains one or more documents,
  • {"token_id": token_id}: the filter, used to select which document to replace.
  • {"token_id": ... }: the new document value.
  • upsert=True: insert a new document if no document matches the filter.
async def handle_events(info, block_events):
# After Store Transfers Section
# (Update Tokens Section)
new_token_owner = dict()
for transfer in transfers:
new_token_owner[transfer.token_id] = transfer.to_address
for token_id, new_owner in new_token_owner.items():
token_id = encode_int_as_bytes(token_id)
await info.storage.find_one_and_replace(
"tokens",
{"token_id": token_id},
{
"token_id": token_id,
"owner": encode_int_as_bytes(new_owner),
"updated_at": block_time,
},
upsert=True,
)

Now we restart the indexer and we can see the tokens and transfers collections being populated with new data.

indexer start --restart

Now that the indexer is indexing, we can implement a simple GraphQL API to expose this data to our frontend application.

Exposing data with GraphQL#

At the moment, Apibara does not include a GraphQL server but the feature is planned for a future release. For this tutorial, we're going to write some code that exposes the data stored in the database through a simple GraphQL API. We use Strawberry GraphQL and aiohttp for this.

First add the new dependencies to the project. We use poetry add for this.

poetry add "strawberry-graphql[debug-server]" aiohttp

Then create a new graphql.py file in src/indexer. All code in this section will be added to this file, unless specified differently.

Strawberry GraphQL uses Python's type hints to build entities and queries. We start by importing all the required types and functions for the server.

import asyncio
from datetime import datetime
from typing import List, NewType, Optional
import strawberry
from aiohttp import web
from pymongo import MongoClient
from strawberry.aiohttp.views import GraphQLView
from indexer.indexer import indexer_id

Then we define a new GraphQL type to represent hex-encoded values like token ids and addresses. In a real-world application you may want to have separate types for TokenId and Address, but for this tutorial one type is enough.

def parse_hex(value):
if not value.startswith("0x"):
raise valueError("invalid Hex value")
return bytes.fromhex(value.replace("0x", ""))
def serialize_hex(token_id):
return "0x" + token_id.hex()
HexValue = strawberry.scalar(
NewType("HexValue", bytes),
parse_value=parse_hex,
serialize=serialize_hex
)

We then add the Token and Transfer entities that represents Briq tokens and their transfers, respectively. The classes include a from_mongo class method to convert the values returned by MongoDB into the GraphQL type.

@strawberry.type
class Transfer:
from_address: HexValue
to_address: HexValue
timestamp: datetime
@classmethod
def from_mongo(cls, data):
return cls(
from_address=data["from_address"],
to_address=data["to_address"],
timestamp=data["timestamp"],
)
@strawberry.type
class Token:
token_id: HexValue
owner: HexValue
updated_at: datetime
@classmethod
def from_mongo(cls, data):
return cls(
token_id=data["token_id"],
owner=data["owner"],
updated_at=data["updated_at"],
)

We then define two resolvers. Resolvers are functions that are responsible for fetching data (in this case, from Mongo) and returning it to GraphQL. Strawberry uses the signature of the resolvers to generate the corresponding GraphQL queries. The resolver get_token_by_id returns the Token with the given id, if it exists. The resolver get_tokens returns a list of tokens, optionally filtered by their owner. The parameters skip and limit are used for pagination.

def get_token_by_id(info, id: HexValue) -> Optional[Token]:
db = info.context["db"]
token = db["tokens"].find_one({"token_id": id, "_chain.valid_to": None})
if token is not None:
return Token.from_mongo(token)
return None
def get_tokens(
info, owner: Optional[HexValue] = None, limit: int = 10, skip: int = 0
) -> List[Token]:
db = info.context["db"]
filter = {"_chain.valid_to": None}
if owner is not None:
filter["owner"] = owner
query = db["tokens"].find(filter).skip(skip).limit(limit).sort("updated_at", -1)
return [Token.from_mongo(t) for t in query]

We add the resolvers to the top-level Query type.

@strawberry.type
class Query:
tokens: List[Token] = strawberry.field(resolver=get_tokens)
token: Optional[Token] = strawberry.field(resolver=get_token_by_id)

Now we can setup the Strawberry aiohttp server. We create a new view IndexerGraphQLView so that we can include the Mongo database object in the context passed to the resolvers. We start the GraphQL server on port 8080.

class IndexerGraphQLView(GraphQLView):
def __init__(self, db, **kwargs):
super().__init__(**kwargs)
self._db = db
async def get_context(self, _request, _response):
return {"db": self._db}
async def run_graphql_api(mongo_url=None):
if mongo_url is None:
mongo_url = "mongodb://apibara:apibara@localhost:27017"
mongo = MongoClient(mongo_url)
db_name = indexer_id.replace("-", "_")
db = mongo[db_name]
schema = strawberry.Schema(query=Query)
view = IndexerGraphQLView(db, schema=schema)
app = web.Application()
app.router.add_route("*", "/graphql", view)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", "8080")
await site.start()
print(f"GraphQL server started on port 8080")
while True:
await asyncio.sleep(5_000)

The final step is to edit src/indexer/main.py to include a new command that starts the GraphQL server.

from indexer.graphql import run_graphql_api
@cli.command()
@click.option("--mongo-url", default=None, help="MongoDB url.")
@async_command
async def graphql(mongo_url):
"""Start the GraphQL server."""
await run_graphql_api(
mongo_url=mongo_url,
)

Now we can start the server from a new command line (remember to source venv/bin/activate!).

indexer graphql

Then head over to http://localhost:8080/graphql and you will have a GraphiQL editor that you can use to play around your StarkNet NFT API.

You can run the following API to fetch all NFTs owned by a particular user. You can use this query in your application to show a wallet-like view where the user can see all their NFTs and transfer them to their friends.

query {
tokens(owner: "0x00") {
tokenId
owner
}
}

We can add a new transfers field to the Token entity to fetch all transfers associated with a given token. Add the following method to the Token class.

@strawberry.type
class Token:
# ... Declarations
# .... from_mongo method
@strawberry.field
def transfers(self, info, limit: int = 10, skip: int = 0) -> List[Transfer]:
db = info.context["db"]
query = (
db["transfers"]
.find({"token_id": self.token_id})
.limit(limit)
.skip(skip)
.sort("timestamp", -1)
)
return [Transfer.from_mongo(t) for t in query]

Restart the GraphQL server, and you will see that tokens have now a transfers field. Now we can restart the API server and verify that we fetch the requested data.

Now the indexer and API are fully functional, but there is one last thing we can improve. Notice that transfers are fetched individually for each token. This is known as the N + 1 problem and it creates performance issues with larger requests. Fortunately, Strawberry comes with an included DataLoader that solves exactly this issue. This is an advanced topic and only relevant when serving large volume of data.

You can now deploy the indexer and the GraphQL server to production. This usually involves building a Docker image for your application and configuring Docker Compose or Kubernetes on your server. If you don't want to manage servers yourself, get in touch with us on Twitter or Discord, and we can host the service for you.