Realtime
Realtime is currently in developer preview. Some details including APIs are still subject to change during this period. Read more about the developer preview here.
Realtime allows you to stream data from workflows to your users, without configuring infrastructure or maintaining state. This allows you to build interactive applications, show status updates, or stream AI responses to your users directly in your existing code.
Usage
There are two core parts of realtime: publishing and subscribing. You must publish data from your functions in order for it to be visible. Publishing data accepts three parameters:
data
, the data to be published to the realtime streamtopic
, the (optionally typed and validated) topic name, allowing you to differentiate between types of datachannel
, the name for a group of topics, eg.user:123
Subscriptions receive data by subscribing to topics within a channel. We manage the WebSocket connections, publishing, subscribing, and state for you.
Getting started
To use realtime, start by installing the @inngest/realtime
package:
npm install @inngest/realtime
Publishing
Using our APIs, you publish specific data that users subscribe to. Here's a basic example:
// NOTE: This is an untyped, minimal example. To view typed channels, use the code
// tabs above.
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime";
const inngest = new Inngest({
id: "my-app",
// Whenever you create your app, include the `realtimeMiddleware()`
middleware: [realtimeMiddleware()],
});
inngest.createFunction(
{ id: "some-task" },
{ event: "ai/ai.requested" },
async ({ event, step, publish }) => {
// Publish data to a user's channel, on the given topic. Channel names are custom
// and act as a container for a group of topics. Each topic is a stream of data.
await publish({
channel: `user:${event.data.userId}`,
topic: "ai",
data: {
response: "an llm response here",
success: true,
},
});
}
);
Subscribing
Subscribing can be done using an Inngest client that either has a valid signing key or a subscription token. You can can subscribe from the client or the backend.
Subscribe from the client
Create a subscription token
Subscription tokens should be created on the server and passed to the client. You can create a new endpoint to generate a token, checking things like user permissions or channel subscriptions.
Here's an example of a server endpoint that creates a token, scoped to a user's channel and specific topics.
import { getSubscriptionToken } from "@inngest/realtime";
import { auth } from "src/auth"; // this could be any auth provider
import { inngest } from "src/inngest";
// ex. /api/get-subscribe-token
export async function POST() {
const { userId } = await auth()
const token = await getSubscriptionToken(inngest, {
channel: `user:${userId}`,
topics: ["ai"],
})
return NextResponse.json({ token }, { status: 200 })
}
Fetch the token
From the client, you can send a request to your server endpoint to fetch the token:
Client
import { subscribe } from "@inngest/realtime";
const token = await fetch("/api/get-subscribe-token", {
method: "POST",
credentials: "include",
}).then(res => res.json());
Subscribe to a channel
Once you have a token, you can subscribe to a channel by calling the subscribe
function with the token. You can also subscribe using the useInngestSubscription
React hook. Read more about the React hook here.
"use server";
import { subscribe } from "@inngest/realtime";
import { inngest } from "src/inngest";
const stream = await subscribe(inngest, token)
for await (const message of stream) {
console.log(message)
}
That's all you need to do to subscribe to a channel from the client!
Subscribe from the backend
Subscribing on the backend is simple:
import { subscribe } from "@inngest/realtime";
import { inngest } from "src/inngest";
const stream = await subscribe(inngest, {
channel: "user:123",
topics: ["ai"], // subscribe to one or more topics in the user channel
});
// The returned `stream` from `subscribe()` is a `ReadableStream` that can be
// used with `getReader()` or as an async iterator
//
// In both cases, message is typed based on the subscription
// Example 1: AsyncIterator
for await (const message of stream) {
console.log(message);
}
// Example 2: ReadableStream
const reader = stream.getReader();
const { done, value } = await reader.read();
if (!done) {
console.log(value);
}
Type-only channels
When passing channels to subscribe()
or getSubscriptionToken()
, you may not
be able to import a channel directly, for example if the code is contained
within a Node package and we're on the browser.
For these instances we can use typeOnlyChannel()
to use the types of the
channel without requiring the runtime object:
import { type userChannel } from "@/server/channels";
import {
subscribe,
getSubscriptionToken,
typeOnlyChannel,
} from "@inngest/realtime";
const token = await fetchTokenFromBackend();
const stream = await subscribe(inngest, {
channel: typeOnlyChannel<typeof userChannel>("user:123"),
topics: ["ai"],
});
// or generating a token...
const token = await getSubscriptionToken(inngest, {
channel: typeOnlyChannel<typeof userChannel>("user:123"),
topics: ["ai"],
});
For convenience, a Realtime.Token
type helper is provided to help type
backend outputs when generating tokens for your frontend:
import { type Realtime } from "@inngest/realtime";
import { type userChannel } from "./channels";
type UserAiToken = Realtime.Token<typeof userChannel, ["ai"]>;
Concepts
Channels
Channels are environment-level containers which group one or more topics of data. You can create as many channels as you need. Some tips:
- You can subscribe to a channel before any data is published
- You can create a channel for a specific run ID, eg. for a run's status:
run:${ctx.runId}
- You can create channels for each user, or for a given conversation
Topics
Topics allow you to specify individual streams within a channel. For example, within a given run you may publish status updates, AI responses, and tool outputs to a user.
Benefits of separating data by topics include:
- Typing and data handling: you can switch on the topic name to properly type and handle different streams of data within a channel.
- Security: you must specify topics when creating subscription tokens, allowing you to protect or hide specific published data.
Subscription Tokens
Subscription tokens allow you to subscribe to the specified channel's topics. Tokens expire 1 minute after creation for security purposes. Once connected, you do not need to manage authentication or re-issue tokens to keep the connection active.
SDK Support
Realtime is supported in the following SDKs:
SDK | Publish | Subscribe | Version |
---|---|---|---|
TypeScript | ✅ | ✅ | >=v3.32.0 |
Golang | ✅ | ✅ | >=v0.9.0 |
Python | In progress | In progress | - |
Limitations
- The number of currently active topics depends on your Inngest plan
- Data sent is currently at-most-once and ephemeral
- The max message size is currently 512KB
Developer preview
Realtime is available as a developer preview. During this period:
- This feature is widely available for all Inngest accounts.
- Some details including APIs and SDKs are subject to change based on user feedback.
- There is no additional cost to using realtime. Realtime will be available to all Inngest billing plans at general availability, but final pricing is not yet determined.
Security
Realtime is secure by default. You can only subscribe to a channel's topics using time-sensitive tokens. The subscription token mechanism must be placed within your own protected API endpoints.
You must always specify the channel and topics when publishing data. This lets you ensure that users can only access specific subsets of data within runs.
Delivery guarantees
Message delivery is currently at-most-once. We recommend that your users subscribe to a channel's topics as you invoke runs or send events to ensure delivery of data within a topic.