Andrés
Andrés2w ago

Problem using EventEmitters [Weird behavior]

I'm trying to implement a simple notification in a webapp, I can't make it work with EventEmmiters
streamNotifications: publicProcedure.subscription(async function* (opts) {
try {
console.log(
"Listeners before setup:",
notificationEventEmitter.listenerCount("add"),
);
const iterable = on(notificationEventEmitter, "add");
console.log(
"Listeners after setup:",
notificationEventEmitter.listenerCount("add"),
);

opts.signal?.addEventListener("abort", () => {
console.log("Subscription aborted");
});
for await (const [notification] of iterable) {
yield notification as TNotification;
}
} catch (error) {
console.error("Error in notification stream:", error);
throw error; // Rethrow or handle as appropriate
} finally {
console.log("Stream closed");
}
}),
streamNotifications: publicProcedure.subscription(async function* (opts) {
try {
console.log(
"Listeners before setup:",
notificationEventEmitter.listenerCount("add"),
);
const iterable = on(notificationEventEmitter, "add");
console.log(
"Listeners after setup:",
notificationEventEmitter.listenerCount("add"),
);

opts.signal?.addEventListener("abort", () => {
console.log("Subscription aborted");
});
for await (const [notification] of iterable) {
yield notification as TNotification;
}
} catch (error) {
console.error("Error in notification stream:", error);
throw error; // Rethrow or handle as appropriate
} finally {
console.log("Stream closed");
}
}),
Looks like its connected but when I do something like emit an event
emitFirstNotification: publicProcedure.mutation(async ({ ctx }) => {
const notifications = await ctx.db.notification.findFirstOrThrow({
select: selectnotificacion.select,
orderBy: { createdAt: "desc" },
});
console.log("Emmiting");
const emmiting = notificationEventEmitter.emit("add", notifications);
console.log(emmiting, "listeners?");
}),
emitFirstNotification: publicProcedure.mutation(async ({ ctx }) => {
const notifications = await ctx.db.notification.findFirstOrThrow({
select: selectnotificacion.select,
orderBy: { createdAt: "desc" },
});
console.log("Emmiting");
const emmiting = notificationEventEmitter.emit("add", notifications);
console.log(emmiting, "listeners?");
}),
It simply doesnt work. It does nothing, and it's like my emmiter
import type { Prisma } from "@prisma/client";
import type { selectnotificacion } from "../api/routers/webcheckinRouter";
import { EventEmitter } from "stream";

export type TNotification = Prisma.NotificationGetPayload<
typeof selectnotificacion
>;
type NotificationEvents = {
add: TNotification[];
};
const notificationEventEmitter = new EventEmitter<NotificationEvents>();
export default notificationEventEmitter;
import type { Prisma } from "@prisma/client";
import type { selectnotificacion } from "../api/routers/webcheckinRouter";
import { EventEmitter } from "stream";

export type TNotification = Prisma.NotificationGetPayload<
typeof selectnotificacion
>;
type NotificationEvents = {
add: TNotification[];
};
const notificationEventEmitter = new EventEmitter<NotificationEvents>();
export default notificationEventEmitter;
has 0 clients. (But the subscription is still connected according to the logger)
Solution:
I use redis for my project https://github.com/bebore/ei-noah-bot but that's quite complex, I recommend just following the redis package docs. It's not much different from the event emitter other than that you need to (de)serialize the input and output.
GitHub
GitHub - BeBoRE/ei-noah-bot: De officiële Discord Bot voor de Sweat...
De officiële Discord Bot voor de Sweaty GG Chat. Contribute to BeBoRE/ei-noah-bot development by creating an account on GitHub.
Jump to solution
5 Replies
Andrés
AndrésOP2w ago
Its kind of funny It works. but I must declare the terminating link for mutations as the wsLink
condition: (op) => op.type === "subscription" || op.type === "mutation"
condition: (op) => op.type === "subscription" || op.type === "mutation"
Looks like the eventEmmiter that's created in the subscrition lives in the ws server and the emit event
emitFirstNotification: publicProcedure.mutation(async ({ ctx }) => {
const notifications = await ctx.db.notification.findFirstOrThrow({
select: selectnotificacion.select,
orderBy: { createdAt: "desc" },
});
console.log("Emmiting");
const emmiting = notificationEventEmitter.emit("add", notifications);
console.log(emmiting, "listeners?");
}),
emitFirstNotification: publicProcedure.mutation(async ({ ctx }) => {
const notifications = await ctx.db.notification.findFirstOrThrow({
select: selectnotificacion.select,
orderBy: { createdAt: "desc" },
});
console.log("Emmiting");
const emmiting = notificationEventEmitter.emit("add", notifications);
console.log(emmiting, "listeners?");
}),
Lives in the nextjs proccess So the event emmiter has this problem I probably could fix this by implementing a redis/valkey server but I would really like to keep the app small Also I dont understand why subscriptions are being run on the ws server, maybe is something of my setup? I would really like to know
BeBoRE
BeBoRE2w ago
The ws and next server run on different node proces, event emitters only exist for one node proces, therefore an event emitted on one will not be received by the other You already found the solution, redis is the solution here Or use the new httpSubscriptionLink
Andrés
AndrésOP2w ago
Can you give me an example of implementation of this with Redis? @BeBoRE thanks in advance
Solution
BeBoRE
BeBoRE2w ago
I use redis for my project https://github.com/bebore/ei-noah-bot but that's quite complex, I recommend just following the redis package docs. It's not much different from the event emitter other than that you need to (de)serialize the input and output.
GitHub
GitHub - BeBoRE/ei-noah-bot: De officiële Discord Bot voor de Sweat...
De officiële Discord Bot voor de Sweaty GG Chat. Contribute to BeBoRE/ei-noah-bot development by creating an account on GitHub.
Andrés
AndrésOP2w ago
Thanks, I was wondering if you have an example that uses an async generator Ended up doing my notification stream like this
streamNotifications: publicProcedure.subscription(async function*() {
const subRedis = new Redis(RedisParams);
let resolveQueue:
| ((value: TNotification | PromiseLike<TNotification>) => void)
| null = null;
try {
await subRedis.subscribe("notifications");
subRedis.on("message", (_, message) => {
const notification: TNotification = SuperJSON.parse(message);
if (resolveQueue) {
resolveQueue(notification);
resolveQueue = null;
} else {
queue.push(notification);
}
});
const queue: TNotification[] = [];
while (true) {
if (queue.length > 0) {
yield queue.shift()!;
} else {
yield await new Promise<TNotification>((resolve) => {
resolveQueue = resolve;
});
}
}
} catch (error) {
console.error("Error in notification stream:", error);
throw error; // Rethrow or handle as appropriate
} finally {
console.log("Unsubscribed from notifications");
subRedis.disconnect();
}
}),
streamNotifications: publicProcedure.subscription(async function*() {
const subRedis = new Redis(RedisParams);
let resolveQueue:
| ((value: TNotification | PromiseLike<TNotification>) => void)
| null = null;
try {
await subRedis.subscribe("notifications");
subRedis.on("message", (_, message) => {
const notification: TNotification = SuperJSON.parse(message);
if (resolveQueue) {
resolveQueue(notification);
resolveQueue = null;
} else {
queue.push(notification);
}
});
const queue: TNotification[] = [];
while (true) {
if (queue.length > 0) {
yield queue.shift()!;
} else {
yield await new Promise<TNotification>((resolve) => {
resolveQueue = resolve;
});
}
}
} catch (error) {
console.error("Error in notification stream:", error);
throw error; // Rethrow or handle as appropriate
} finally {
console.log("Unsubscribed from notifications");
subRedis.disconnect();
}
}),
It works good, but it feels weird using a while(true) And I was wondering if there's a cleaner way also IDK if it's ok to do a new redis connection on every subscription

Did you find this page helpful?