kalempster
kalempster6mo ago

Subscription example not behaving like it should

Hey, I've recently wanted to make a subscription route and I've stumbled upon a problem. In the documentation it's said that you should return an observable with a callback that is triggered immediately. That however is not happening. I don't know if I don't understand this correctly of if the example is broken. I'm using bun@1.1.10 with @trpc/server@11.0.0-rc.396.
import { EventEmitter } from "events";
import { initTRPC } from "@trpc/server";
import { observable } from "@trpc/server/observable";
import { z } from "zod";

// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();

const t = initTRPC.create();

export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable((emit) => {
console.log("running");

const onAdd = (data: any) => {
// emit data to client
emit.next(data);
};

// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on("add", onAdd);

// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off("add", onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
})
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */

ee.emit("add", post);
return post;
}),
});

console.log(ee.listeners("add"));
import { EventEmitter } from "events";
import { initTRPC } from "@trpc/server";
import { observable } from "@trpc/server/observable";
import { z } from "zod";

// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();

const t = initTRPC.create();

export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable((emit) => {
console.log("running");

const onAdd = (data: any) => {
// emit data to client
emit.next(data);
};

// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on("add", onAdd);

// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off("add", onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
})
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */

ee.emit("add", post);
return post;
}),
});

console.log(ee.listeners("add"));
The console.log at the end prints [] and I thought the callback is triggered immediately. Any help would be appreciated.
Solution:
Okay I get it now, in my actual project I was using httpBatchLink and wsLink together insead of splitting them using splitLink that's why the subscription wasn't going off. ```ts export const api = createTRPCClient<AppRouter & WebSocketRouter>({ links: [...
Jump to solution
2 Replies
Alex / KATT 🐱
it doesn't start listening until you actually have a client listening to onAdd
Solution
kalempster
kalempster6mo ago
Okay I get it now, in my actual project I was using httpBatchLink and wsLink together insead of splitting them using splitLink that's why the subscription wasn't going off.
export const api = createTRPCClient<AppRouter & WebSocketRouter>({
links: [
splitLink({
condition(op) {
return op.type === "subscription";
},
true: wsLink({
client: wsClient,
transformer: superjson,
}),
false: httpBatchLink({
url: import.meta.env.PROD ? `/trpc` : `http://localhost:3000/trpc`,
transformer: superjson,
}),
}),
],
});
export const api = createTRPCClient<AppRouter & WebSocketRouter>({
links: [
splitLink({
condition(op) {
return op.type === "subscription";
},
true: wsLink({
client: wsClient,
transformer: superjson,
}),
false: httpBatchLink({
url: import.meta.env.PROD ? `/trpc` : `http://localhost:3000/trpc`,
transformer: superjson,
}),
}),
],
});
Thanks for help :)