tRPCttRPC
Powered by
kalempsterK
tRPC•2y ago•
3 replies
kalempster

Subscription example not behaving like it should

Hey, I've recently wanted to make a subscription route and I've stumbled upon a problem. In the documentation it's said that you should return an
observable
observable
with a callback that is triggered immediately. That however is not happening. I don't know if I don't understand this correctly of if the example is broken.

I'm using bun@1.1.10 with @trpc/server@11.0.0-rc.396.

import { EventEmitter } from "events";
import { initTRPC } from "@trpc/server";
import { observable } from "@trpc/server/observable";
import { z } from "zod";

// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();

const t = initTRPC.create();

export const appRouter = t.router({
    onAdd: t.procedure.subscription(() => {
        // return an `observable` with a callback which is triggered immediately
        return observable((emit) => {
            console.log("running");

            const onAdd = (data: any) => {
                // emit data to client
                emit.next(data);
            };

            // trigger `onAdd()` when `add` is triggered in our event emitter
            ee.on("add", onAdd);

            // unsubscribe function when client disconnects or stops subscribing
            return () => {
                ee.off("add", onAdd);
            };
        });
    }),
    add: t.procedure
        .input(
            z.object({
                id: z.string().uuid().optional(),
                text: z.string().min(1),
            })
        )
        .mutation(async (opts) => {
            const post = { ...opts.input }; /* [..] add to db */

            ee.emit("add", post);
            return post;
        }),
});

console.log(ee.listeners("add"));
import { EventEmitter } from "events";
import { initTRPC } from "@trpc/server";
import { observable } from "@trpc/server/observable";
import { z } from "zod";

// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();

const t = initTRPC.create();

export const appRouter = t.router({
    onAdd: t.procedure.subscription(() => {
        // return an `observable` with a callback which is triggered immediately
        return observable((emit) => {
            console.log("running");

            const onAdd = (data: any) => {
                // emit data to client
                emit.next(data);
            };

            // trigger `onAdd()` when `add` is triggered in our event emitter
            ee.on("add", onAdd);

            // unsubscribe function when client disconnects or stops subscribing
            return () => {
                ee.off("add", onAdd);
            };
        });
    }),
    add: t.procedure
        .input(
            z.object({
                id: z.string().uuid().optional(),
                text: z.string().min(1),
            })
        )
        .mutation(async (opts) => {
            const post = { ...opts.input }; /* [..] add to db */

            ee.emit("add", post);
            return post;
        }),
});

console.log(ee.listeners("add"));

The
console.log
console.log
at the end prints
[]
[]
and I thought the callback is triggered immediately. Any help would be appreciated.
Solution
Okay I get it now, in my actual project I was using
httpBatchLink
httpBatchLink
and
wsLink
wsLink
together insead of splitting them using
splitLink
splitLink
that's why the subscription wasn't going off.

export const api = createTRPCClient<AppRouter & WebSocketRouter>({
    links: [
        splitLink({
            condition(op) {
                return op.type === "subscription";
            },
            true: wsLink({
                client: wsClient,
                transformer: superjson,
            }),
            false: httpBatchLink({
                url: import.meta.env.PROD ? `/trpc` : `http://localhost:3000/trpc`,
                transformer: superjson,
            }),
        }),
    ],
});
export const api = createTRPCClient<AppRouter & WebSocketRouter>({
    links: [
        splitLink({
            condition(op) {
                return op.type === "subscription";
            },
            true: wsLink({
                client: wsClient,
                transformer: superjson,
            }),
            false: httpBatchLink({
                url: import.meta.env.PROD ? `/trpc` : `http://localhost:3000/trpc`,
                transformer: superjson,
            }),
        }),
    ],
});

Thanks for help :)
Jump to solution
tRPCJoin
Move Fast & Break Nothing. End-to-end typesafe APIs made easy.
5,015Members
Resources
Was this page helpful?

Similar Threads

Recent Announcements

Similar Threads

Subscription error: TRPCClientError: Subscriptions should use wsLink
jaacsenJjaacsen / ❓-help
2y ago
subscription
Ahmed EidAAhmed Eid / ❓-help
4y ago
tRPC subscription : Access to socket ID from subscription
ChronicStoneCChronicStone / ❓-help
3y ago
prefetchInfiniteQuery example?
onethreadOonethread / ❓-help
3mo ago