Plugins & Hooks
Indexers are extensible through hooks and plugins. Hooks are functions that are called at specific points in the indexer's lifecycle. Plugins are components that contain reusable hooks callbacks.
Hooks
The following hooks are available in all indexers.
run:before
() => void
Called before the indexer starts running.run:after
() => void
Called after the indexer has finished running.connect:before
({ request: StreamDataRequest<TFilter>, options: StreamDataOptions }) => void
Called before the indexer connects to the DNA stream. Can be used to change the request or stream options.connect:after
({ request: StreamDataRequest<TFilter> }) => void
Called after the indexer has connected to the DNA stream.connect:factory
({ request: StreamDataRequest<TFilter>, endCursor: { orderKey: bigint, uniqueKey?: string } }) => void
Called before the indexer reconnects to the DNA stream with a new filter (in factory mode).message
({ message: StreamDataResponse<TBlock> }) => void
Called for each message received from the DNA stream. Additionally, message-specific hooks are available:message:invalidate
,message:finalize
,message:heartbeat
,message:systemMessage
.handler:middleware
({ use: MiddlewareFunction) => void }) => void
Called to register indexer's middlewares.
Using plugins
You can register plugins in the indexer's configuration, under the plugins
key.
import { BeaconChainStream } from "@apibara/beaconchain";
import { defineIndexer } from "@apibara/indexer";
import { myAwesomePlugin } from "@/lib/my-plugin.ts";
export default defineIndexer(BeaconChainStream)({
streamUrl: "https://beaconchain.preview.apibara.org",
filter: { /* ... */ },
plugins: [myAwesomePlugin()],
async transform({ block: { header, validators } }) {
/* ... */
},
});
Building plugins
Developers can create new plugins to be shared across multiple indexers or projects. Plugins use the available hooks to extend the functionality of indexers.
The main way to define a plugin is by using the defineIndexerPlugin
function. This function takes a callback with the indexer as parameter, the plugin should register itself with the indexer's hooks.
When the runner runs the indexer, all the relevant hooks are called.
import type { Cursor } from "@apibara/protocol";
import { defineIndexerPlugin } from "@apibara/indexer/plugins";
export function myAwesomePlugin<TFilter, TBlock, TTxnParams>() {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
indexer.hooks.hook("connect:before", ({ request, options }) => {
// Do something before the indexer connects to the DNA stream.
});
indexer.hooks.hook("run:after", () => {
// Do something after the indexer has finished running.
});
});
}
Middleware
Apibara indexers support wrapping the transform
function in middleware. This is used, for example, to wrap all database operations in a transaction.
The middleware is registered using the handler:middleware
hook. This hook takes a use
argument to register the middleware with the indexer.
The argument to use
is a function that takes the indexer's context and a next
function to call the next middleware or the transform function.
import type { Cursor } from "@apibara/protocol";
import { defineIndexerPlugin } from "@apibara/indexer/plugins";
export function myAwesomePlugin<TFilter, TBlock, TTxnParams>() {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
const db = openDatabase();
indexer.hooks.hook("handler:middleware", ({ use }) => {
use(async (context, next) => {
// Start a transaction.
await db.transaction(async (txn) => {
// Add the transaction to the context.
context.db = txn;
try {
// Call the next middleware or the transform function.
await next();
} finally {
// Remove the transaction from the context.
context.db = undefined;
}
});
});
});
});
}
Inline hooks
For all cases where you want to use a hook without creating a plugin, you can use the hooks
property of the indexer.
IMPORTANT: inline hooks are the recommended way to add hooks to an indexer. If the same hook is needed in multiple indexers, it is better to create a plugin. Usually, plugins lives in the lib
folder, for example lib/my-plugin.ts
.
import { BeaconChainStream } from "@apibara/beaconchain";
import { defineIndexer } from "@apibara/indexer";
export default defineIndexer(BeaconChainStream)({
streamUrl: "https://beaconchain.preview.apibara.org",
filter: { /* ... */ },
async transform({ block: { header, validators } }) {
/* ... */
},
hooks: {
async "connect:before"({ request, options }) {
// Do something before the indexer connects to the DNA stream.
},
},
});
Indexer lifecycle
The following Javascript pseudocode shows the indexer's lifecycle. This should give you a good understanding of when hooks are called.
function run(indexer) {
indexer.callHook("run:before");
const { use, middleware } = registerMiddleware(indexer);
indexer.callHook("handler:middleware", { use });
// Create the request based on the indexer's configuration.
const request = Request.create({
filter: indexer.filter,
startingCursor: indexer.startingCursor,
finality: indexer.finality,
});
// Stream options.
const options = {};
indexer.callHook("connect:before", { request, options });
let stream = indexer.streamData(request, options);
indexer.callHook("connect:after");
while (true) {
const { message, done } = stream.next();
if (done) {
break;
}
indexer.callHook("message", { message });
switch (message._tag) {
case "data": {
const { block, endCursor, finality } = message.data
middleware(() => {
if (indexer.isFactoryMode()) {
// Handle the factory portion of the indexer data.
// Implementation detail is not important here.
const newFilter = indexer.factory();
const request = Request.create(/* ... */);
indexer.callHook("connect:factory", { request, endCursor });
stream = indexer.streamData(request, options);
}
indexer.transform({ block, endCursor, finality });
});
break;
}
case "invalidate": {
indexer.callHook("message:invalidate", { message });
break;
}
case "finalize": {
indexer.callHook("message:finalize", { message });
break;
}
case "heartbeat": {
indexer.callHook("message:heartbeat", { message });
break;
}
case "systemMessage": {
indexer.callHook("message:systemMessage", { message });
break;
}
}
}
indexer.callHook("run:after");
}