niko
niko•3mo ago

event emitter doesnt trigger subscription from webhook

I'm using subscription in nextjs using SSE, im trying to trigger subscription when webhook receives data, webhook is seprated page api/webhook/route.ts and subscription is in root.ts, I even tested subscription using little mock of mutation and subsciption procedures and it was working fine, here is the test which works āœ…
test: createTRPCRouter({
emit: protectedProcedure
.input(z.object({ message: z.string() }))
.mutation(async ({ input }) => {
mediaEvents.emit("test-event", {
...input,
});
}),
on: protectedProcedure
.subscription(async function* ({ signal }) {
const eventKey = `test-event`;

try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
yield data;
}
} catch (error) {
console.error(error);
}
}),
}),
test: createTRPCRouter({
emit: protectedProcedure
.input(z.object({ message: z.string() }))
.mutation(async ({ input }) => {
mediaEvents.emit("test-event", {
...input,
});
}),
on: protectedProcedure
.subscription(async function* ({ signal }) {
const eventKey = `test-event`;

try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
yield data;
}
} catch (error) {
console.error(error);
}
}),
}),
1 Reply
niko
nikoOP•3mo ago
but this one which i want to make it work, it doesnt thats my webhook route, forget about mediaEvents thats that class I made for EventEmitter, which worked fine in my mock
export const POST = async (req: Request) => {
const body = (await req.json()) as FAL_AI_WEBHOOK_RESPONSE;

const eventKey = `media-test`;

console.log("šŸ”„šŸ”„šŸ”„ TEST EVENT:");
console.log(" - Event Key:", eventKey);
console.log(" - Listeners BEFORE:", mediaEvents.listenerCount(eventKey));

const wasEmitted = mediaEvents.emit(eventKey, body);

console.log("šŸ”„šŸ”„šŸ”„ TEST RESULT:");
console.log(" - Was Emitted:", wasEmitted);
console.log(" - Listeners AFTER:", mediaEvents.listenerCount(eventKey));

return NextResponse.json({
success: true,
});
};
export const POST = async (req: Request) => {
const body = (await req.json()) as FAL_AI_WEBHOOK_RESPONSE;

const eventKey = `media-test`;

console.log("šŸ”„šŸ”„šŸ”„ TEST EVENT:");
console.log(" - Event Key:", eventKey);
console.log(" - Listeners BEFORE:", mediaEvents.listenerCount(eventKey));

const wasEmitted = mediaEvents.emit(eventKey, body);

console.log("šŸ”„šŸ”„šŸ”„ TEST RESULT:");
console.log(" - Was Emitted:", wasEmitted);
console.log(" - Listeners AFTER:", mediaEvents.listenerCount(eventKey));

return NextResponse.json({
success: true,
});
};
and thats my subscription procedure
export const mediaGeneted = protectedProcedure
.input(mediaGenetedSchema)
.subscription(async function* (opts) {
const { input, signal } = opts;
const { chatId } = input;
const eventKey = `media-test`;

console.log("šŸ”„šŸ”„šŸ”„ SUBSCRIPTION STARTED:");
console.log(" - Event Key:", eventKey);
console.log(" - Chat ID:", chatId);
console.log(
" - Initial Listener Count:",
mediaEvents.listenerCount(eventKey),
);

try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
console.log("šŸ”„šŸ”„šŸ”„ EVENT RECEIVED");
console.log(" - Raw data:", data);
console.log(" - Data type:", typeof data);
console.log(" - Data keys:", data ? Object.keys(data) : "null");

const event = data as MediaEvent;

if (event && event.chatId === chatId) {
console.log("šŸ”„šŸ”„šŸ”„ YIELDING EVENT:");
console.log(" - Event Chat ID:", event.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Match:", event.chatId === chatId);

yield tracked(event.chatId, event);

console.log("šŸ”„šŸ”„šŸ”„ EVENT YIELDED SUCCESSFULLY");
} else {
console.log("šŸ”„šŸ”„šŸ”„ EVENT FILTERED OUT:");
console.log(" - Event Chat ID:", event?.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Event exists:", !!event);
}
}
} catch (error) {
if (
error instanceof Error &&
(error.name === "AbortError" || error.message?.includes("aborted"))
) {
console.log("šŸ”„ Subscription aborted (normal)");
return;
}
console.error("šŸ”„ Subscription error:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Subscription failed",
cause: error,
});
}
});
export const mediaGeneted = protectedProcedure
.input(mediaGenetedSchema)
.subscription(async function* (opts) {
const { input, signal } = opts;
const { chatId } = input;
const eventKey = `media-test`;

console.log("šŸ”„šŸ”„šŸ”„ SUBSCRIPTION STARTED:");
console.log(" - Event Key:", eventKey);
console.log(" - Chat ID:", chatId);
console.log(
" - Initial Listener Count:",
mediaEvents.listenerCount(eventKey),
);

try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
console.log("šŸ”„šŸ”„šŸ”„ EVENT RECEIVED");
console.log(" - Raw data:", data);
console.log(" - Data type:", typeof data);
console.log(" - Data keys:", data ? Object.keys(data) : "null");

const event = data as MediaEvent;

if (event && event.chatId === chatId) {
console.log("šŸ”„šŸ”„šŸ”„ YIELDING EVENT:");
console.log(" - Event Chat ID:", event.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Match:", event.chatId === chatId);

yield tracked(event.chatId, event);

console.log("šŸ”„šŸ”„šŸ”„ EVENT YIELDED SUCCESSFULLY");
} else {
console.log("šŸ”„šŸ”„šŸ”„ EVENT FILTERED OUT:");
console.log(" - Event Chat ID:", event?.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Event exists:", !!event);
}
}
} catch (error) {
if (
error instanceof Error &&
(error.name === "AbortError" || error.message?.includes("aborted"))
) {
console.log("šŸ”„ Subscription aborted (normal)");
return;
}
console.error("šŸ”„ Subscription error:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Subscription failed",
cause: error,
});
}
});
i tried to remove unecessary lines to make it shorter and understandable, when webhook receives data and runs emit it logs this in terminal
šŸ”„šŸ”„šŸ”„ TEST EVENT:
- Event Key: media-test
- Listeners BEFORE: 0
šŸ”„šŸ”„šŸ”„ TEST RESULT:
- Was Emitted: false
- Listeners AFTER: 0
šŸ”„šŸ”„šŸ”„ TEST EVENT:
- Event Key: media-test
- Listeners BEFORE: 0
šŸ”„šŸ”„šŸ”„ TEST RESULT:
- Was Emitted: false
- Listeners AFTER: 0
i dont understand why, i was thinking maybe because trpc subscription and webhook emit is not related and thats why subscription never triggers? in my client side i know subscription works because status is "pending"

Did you find this page helpful?