event emitter doesnt trigger subscription from webhook
I'm using subscription in nextjs using SSE, im trying to trigger subscription when webhook receives data, webhook is seprated page
api/webhook/route.ts
and subscription is in root.ts
, I even tested subscription using little mock of mutation
and subsciption
procedures and it was working fine,
here is the test which works ā
test: createTRPCRouter({
emit: protectedProcedure
.input(z.object({ message: z.string() }))
.mutation(async ({ input }) => {
mediaEvents.emit("test-event", {
...input,
});
}),
on: protectedProcedure
.subscription(async function* ({ signal }) {
const eventKey = `test-event`;
try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
yield data;
}
} catch (error) {
console.error(error);
}
}),
}),
test: createTRPCRouter({
emit: protectedProcedure
.input(z.object({ message: z.string() }))
.mutation(async ({ input }) => {
mediaEvents.emit("test-event", {
...input,
});
}),
on: protectedProcedure
.subscription(async function* ({ signal }) {
const eventKey = `test-event`;
try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
yield data;
}
} catch (error) {
console.error(error);
}
}),
}),
1 Reply
but this one which i want to make it work, it doesnt
thats my webhook route, forget about mediaEvents thats that class I made for
and thats my subscription procedure
i tried to remove unecessary lines to make it shorter and understandable, when webhook receives data and runs
i dont understand why, i was thinking maybe because trpc subscription and webhook emit is not related and thats why subscription never triggers?
in my client side i know subscription works because status is
EventEmitter
, which worked fine in my mock
export const POST = async (req: Request) => {
const body = (await req.json()) as FAL_AI_WEBHOOK_RESPONSE;
const eventKey = `media-test`;
console.log("š„š„š„ TEST EVENT:");
console.log(" - Event Key:", eventKey);
console.log(" - Listeners BEFORE:", mediaEvents.listenerCount(eventKey));
const wasEmitted = mediaEvents.emit(eventKey, body);
console.log("š„š„š„ TEST RESULT:");
console.log(" - Was Emitted:", wasEmitted);
console.log(" - Listeners AFTER:", mediaEvents.listenerCount(eventKey));
return NextResponse.json({
success: true,
});
};
export const POST = async (req: Request) => {
const body = (await req.json()) as FAL_AI_WEBHOOK_RESPONSE;
const eventKey = `media-test`;
console.log("š„š„š„ TEST EVENT:");
console.log(" - Event Key:", eventKey);
console.log(" - Listeners BEFORE:", mediaEvents.listenerCount(eventKey));
const wasEmitted = mediaEvents.emit(eventKey, body);
console.log("š„š„š„ TEST RESULT:");
console.log(" - Was Emitted:", wasEmitted);
console.log(" - Listeners AFTER:", mediaEvents.listenerCount(eventKey));
return NextResponse.json({
success: true,
});
};
export const mediaGeneted = protectedProcedure
.input(mediaGenetedSchema)
.subscription(async function* (opts) {
const { input, signal } = opts;
const { chatId } = input;
const eventKey = `media-test`;
console.log("š„š„š„ SUBSCRIPTION STARTED:");
console.log(" - Event Key:", eventKey);
console.log(" - Chat ID:", chatId);
console.log(
" - Initial Listener Count:",
mediaEvents.listenerCount(eventKey),
);
try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
console.log("š„š„š„ EVENT RECEIVED");
console.log(" - Raw data:", data);
console.log(" - Data type:", typeof data);
console.log(" - Data keys:", data ? Object.keys(data) : "null");
const event = data as MediaEvent;
if (event && event.chatId === chatId) {
console.log("š„š„š„ YIELDING EVENT:");
console.log(" - Event Chat ID:", event.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Match:", event.chatId === chatId);
yield tracked(event.chatId, event);
console.log("š„š„š„ EVENT YIELDED SUCCESSFULLY");
} else {
console.log("š„š„š„ EVENT FILTERED OUT:");
console.log(" - Event Chat ID:", event?.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Event exists:", !!event);
}
}
} catch (error) {
if (
error instanceof Error &&
(error.name === "AbortError" || error.message?.includes("aborted"))
) {
console.log("š„ Subscription aborted (normal)");
return;
}
console.error("š„ Subscription error:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Subscription failed",
cause: error,
});
}
});
export const mediaGeneted = protectedProcedure
.input(mediaGenetedSchema)
.subscription(async function* (opts) {
const { input, signal } = opts;
const { chatId } = input;
const eventKey = `media-test`;
console.log("š„š„š„ SUBSCRIPTION STARTED:");
console.log(" - Event Key:", eventKey);
console.log(" - Chat ID:", chatId);
console.log(
" - Initial Listener Count:",
mediaEvents.listenerCount(eventKey),
);
try {
for await (const [data] of on(mediaEvents, eventKey, { signal })) {
console.log("š„š„š„ EVENT RECEIVED");
console.log(" - Raw data:", data);
console.log(" - Data type:", typeof data);
console.log(" - Data keys:", data ? Object.keys(data) : "null");
const event = data as MediaEvent;
if (event && event.chatId === chatId) {
console.log("š„š„š„ YIELDING EVENT:");
console.log(" - Event Chat ID:", event.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Match:", event.chatId === chatId);
yield tracked(event.chatId, event);
console.log("š„š„š„ EVENT YIELDED SUCCESSFULLY");
} else {
console.log("š„š„š„ EVENT FILTERED OUT:");
console.log(" - Event Chat ID:", event?.chatId);
console.log(" - Expected Chat ID:", chatId);
console.log(" - Event exists:", !!event);
}
}
} catch (error) {
if (
error instanceof Error &&
(error.name === "AbortError" || error.message?.includes("aborted"))
) {
console.log("š„ Subscription aborted (normal)");
return;
}
console.error("š„ Subscription error:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Subscription failed",
cause: error,
});
}
});
emit
it logs this in terminal
š„š„š„ TEST EVENT:
- Event Key: media-test
- Listeners BEFORE: 0
š„š„š„ TEST RESULT:
- Was Emitted: false
- Listeners AFTER: 0
š„š„š„ TEST EVENT:
- Event Key: media-test
- Listeners BEFORE: 0
š„š„š„ TEST RESULT:
- Was Emitted: false
- Listeners AFTER: 0
"pending"