TypeScript SDK v4 is now available! See what's new

Realtime TypeScript SDK v4

Realtime is built into the v4 SDK. You define channels with realtime.channel(), publish with step.realtime.publish() or inngest.realtime.publish(), and subscribe with useRealtime or subscribe().

Looking for the older @inngest/realtime package docs? See the archived v3 docs here.

Realtime lets you stream function progress, push live updates into the browser, and build interactive workflows like human-in-the-loop approvals without managing your own websocket infrastructure.

Concepts

The v4 model revolves around five primitives:

  • channel defines the scope for updates, such as a specific runId, user, or thread.
  • topic defines the category of data inside a channel, such as status, tokens, or artifact.
  • publish() emits non-durable messages from inside your function handler (also available as inngest.realtime.publish() from server-side code outside a function).
  • step.realtime.publish() emits durable, memoized messages that should not re-fire on retry.
  • token authorizes a client subscription to a specific channel and a specific set of topics.

In practice, prefer step.realtime.publish() whenever possible. Use non-durable publish() or inngest.realtime.publish() only when you intentionally want low-overhead, retryable updates like token streams or you are publishing from outside a function.

Quick start

1. Define a shared channel

import { realtime } from "inngest";
import { z } from "zod";

export const contentPipeline = realtime.channel({
  name: ({ runId }: { runId: string }) => `pipeline:${runId}`,
  topics: {
    status: {
      schema: z.object({
        message: z.string(),
        step: z.string().optional(),
      }),
    },
    tokens: {
      schema: z.object({ token: z.string(), step: z.string() }),
    },
  },
});

2. Publish from a function

export const generatePost = inngest.createFunction(
  {
    id: "generate-post",
    triggers: [{ event: "app/generate-post" }],
  },
  async ({ event, publish, step }) => {
    const ch = contentPipeline({ runId: event.data.runId });

    //
    // Non-durable on purpose. Fine for transient progress updates.
    await publish(ch.status, {
      message: "Researching topic...",
      step: "research",
    });

    //
    // Non-durable on purpose. Token streams may replay on retry.
    await publish(ch.tokens, {
      token: "Hello",
      step: "research",
    });

    //
    // Prefer the durable publish for important state that should not replay.
    await step.realtime.publish("status-complete", ch.status, {
      message: "Done",
      step: "complete",
    });
  }
);

3. Mint a token and subscribe from the client

app/actions.ts
"use server";

import { getClientSubscriptionToken } from "inngest/react";
import { inngest } from "@/inngest/client";
import { contentPipeline } from "@/inngest/channels";

export async function getRealtimeToken(runId: string) {
  return getClientSubscriptionToken(inngest, {
    channel: contentPipeline({ runId }),
    topics: ["status", "tokens"],
  });
}
app/page.tsx
"use client";

import { useRealtime } from "inngest/react";
import { contentPipeline } from "@/inngest/channels";
import { getRealtimeToken } from "./actions";

export default function Home({ runId }: { runId: string }) {
  const topics = ["status", "tokens"] as const;
  const channel = contentPipeline({ runId });

  const { connectionStatus, runStatus, messages } = useRealtime({
    channel,
    topics,
    token: () => getRealtimeToken(runId),
  });

  return (
    <div>
      <p>Connection: {connectionStatus}</p>
      <p>Run: {runStatus}</p>
      <p>Status: {messages.byTopic.status?.data.message}</p>
    </div>
  );
}

When to use each publish mode

  • Prefer step.realtime.publish() for important state transitions and final results.
  • Use publish() for high-frequency updates like tokens, logs, or progress ticks where replay on retry is acceptable.
  • Use inngest.realtime.publish() from routes, webhooks, or other server-side code outside a function run.

Learn more