moloch
moloch2y ago

Cannot get subscription event to fire

Ripping my hair out here trying to get Websockets working with tRPC and Next. Everything seems to be working and the frontend client is connecting to my websocket server, however triggering the addFact mutation does not not successfully emit the new catFact despite there not being any errors. I've looked at various repos that achieve similar functionality and haven't been able to spot the problem, does anyone have any guidance?
export const catFactsRouter = createTRPCRouter({
fact: publicProcedure.query(({ ctx }) => {

const fact = ctx.prisma.catFact.findFirst({
orderBy: {
createdAt: "desc",
},
});

return fact;
}),
addFact: protectedProcedure
.input(
z.object({
token: z.string().min(1),
fact: z.string().min(1)
})
)
.mutation(async ({ input, ctx }) => {
console.log(`protectedProcedure.mutation()`);
const fact = await ctx.prisma.catFact.create({
data: {
fact: input.fact,
}
});
console.log(`ctx.ee.emit(Events.NEW_CAT_FACT, fact)`);
ctx.ee.emit(Events.NEW_CAT_FACT, fact);
console.log(`ee.emit(Events.NEW_CAT_FACT, fact);`);
return fact;
}),
onFact: publicProcedure.subscription(({ ctx }) => {
console.log("publicProcedure.subscription")

// `resolve()` is triggered for each client when they start subscribing `onFact`
// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
const onCatFact = (data: CatFact) => {
console.log(`Emitting fact to client:`, data);
// emit data to client
emit.next(data);
};
console.log("ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);")

ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);

return () => {
ctx.ee.off(Events.NEW_CAT_FACT, onCatFact);
};
});
}),
});
export const catFactsRouter = createTRPCRouter({
fact: publicProcedure.query(({ ctx }) => {

const fact = ctx.prisma.catFact.findFirst({
orderBy: {
createdAt: "desc",
},
});

return fact;
}),
addFact: protectedProcedure
.input(
z.object({
token: z.string().min(1),
fact: z.string().min(1)
})
)
.mutation(async ({ input, ctx }) => {
console.log(`protectedProcedure.mutation()`);
const fact = await ctx.prisma.catFact.create({
data: {
fact: input.fact,
}
});
console.log(`ctx.ee.emit(Events.NEW_CAT_FACT, fact)`);
ctx.ee.emit(Events.NEW_CAT_FACT, fact);
console.log(`ee.emit(Events.NEW_CAT_FACT, fact);`);
return fact;
}),
onFact: publicProcedure.subscription(({ ctx }) => {
console.log("publicProcedure.subscription")

// `resolve()` is triggered for each client when they start subscribing `onFact`
// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
const onCatFact = (data: CatFact) => {
console.log(`Emitting fact to client:`, data);
// emit data to client
emit.next(data);
};
console.log("ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);")

ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);

return () => {
ctx.ee.off(Events.NEW_CAT_FACT, onCatFact);
};
});
}),
});
2 Replies
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
moloch
molochOP2y ago
Thank you very much for the detailed response! The issue turned out to be caused by Next.js using different processes and therefor contexts between the two endpoints, I was using an EventEmitter to trigger the webhooks but each context had it's own isolated EventEmitter. Ended up switching to Redis to solve the issue 🤦‍♀️ I also struggled with setting up redis. My current working example is much the result of brute force trial / error:
onFact: publicProcedure.subscription(async ({ ctx }) => {
if (!redisClient.isReady) {
console.log("Redis client is not connected!");
await redisClient.connect();
}


// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
// define a listener function to handle incoming messages
const listener = (message: string, channel: string) => {
console.log(`Received message on channel ${channel}: ${message}`);

if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};

// subscribe to the `cat-facts` channel in Redis
const subscriber = redisClient.duplicate();
subscriber.subscribe("cat-facts", listener);

subscriber.connect();

const onCatFact = (channel: string, message: string) => {
if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};
// trigger `onCatFact()` when a message is published to the `cat-facts` channel
subscriber.on("message", onCatFact);

// unsubscribe function when client disconnects or stops subscribing
return () => {
// unsubscribe from the `cat-facts` channel in Redis
subscriber.unsubscribe("cat-facts");
subscriber.off("message", onCatFact);
};
});
}),
onFact: publicProcedure.subscription(async ({ ctx }) => {
if (!redisClient.isReady) {
console.log("Redis client is not connected!");
await redisClient.connect();
}


// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
// define a listener function to handle incoming messages
const listener = (message: string, channel: string) => {
console.log(`Received message on channel ${channel}: ${message}`);

if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};

// subscribe to the `cat-facts` channel in Redis
const subscriber = redisClient.duplicate();
subscriber.subscribe("cat-facts", listener);

subscriber.connect();

const onCatFact = (channel: string, message: string) => {
if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};
// trigger `onCatFact()` when a message is published to the `cat-facts` channel
subscriber.on("message", onCatFact);

// unsubscribe function when client disconnects or stops subscribing
return () => {
// unsubscribe from the `cat-facts` channel in Redis
subscriber.unsubscribe("cat-facts");
subscriber.off("message", onCatFact);
};
});
}),
hope this helps! I just used the redis npm package that's great though Ya I'm not sure how performant that is, but it seemed to be the only way I could get it to work when I was fiddling around np, and good luck!