T
tRPC

Cannot get subscription event to fire

Cannot get subscription event to fire

Mmoloch3/9/2023
Ripping my hair out here trying to get Websockets working with tRPC and Next. Everything seems to be working and the frontend client is connecting to my websocket server, however triggering the addFact mutation does not not successfully emit the new catFact despite there not being any errors. I've looked at various repos that achieve similar functionality and haven't been able to spot the problem, does anyone have any guidance?
export const catFactsRouter = createTRPCRouter({
fact: publicProcedure.query(({ ctx }) => {

const fact = ctx.prisma.catFact.findFirst({
orderBy: {
createdAt: "desc",
},
});

return fact;
}),
addFact: protectedProcedure
.input(
z.object({
token: z.string().min(1),
fact: z.string().min(1)
})
)
.mutation(async ({ input, ctx }) => {
console.log(`protectedProcedure.mutation()`);
const fact = await ctx.prisma.catFact.create({
data: {
fact: input.fact,
}
});
console.log(`ctx.ee.emit(Events.NEW_CAT_FACT, fact)`);
ctx.ee.emit(Events.NEW_CAT_FACT, fact);
console.log(`ee.emit(Events.NEW_CAT_FACT, fact);`);
return fact;
}),
onFact: publicProcedure.subscription(({ ctx }) => {
console.log("publicProcedure.subscription")

// `resolve()` is triggered for each client when they start subscribing `onFact`
// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
const onCatFact = (data: CatFact) => {
console.log(`Emitting fact to client:`, data);
// emit data to client
emit.next(data);
};
console.log("ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);")

ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);

return () => {
ctx.ee.off(Events.NEW_CAT_FACT, onCatFact);
};
});
}),
});
export const catFactsRouter = createTRPCRouter({
fact: publicProcedure.query(({ ctx }) => {

const fact = ctx.prisma.catFact.findFirst({
orderBy: {
createdAt: "desc",
},
});

return fact;
}),
addFact: protectedProcedure
.input(
z.object({
token: z.string().min(1),
fact: z.string().min(1)
})
)
.mutation(async ({ input, ctx }) => {
console.log(`protectedProcedure.mutation()`);
const fact = await ctx.prisma.catFact.create({
data: {
fact: input.fact,
}
});
console.log(`ctx.ee.emit(Events.NEW_CAT_FACT, fact)`);
ctx.ee.emit(Events.NEW_CAT_FACT, fact);
console.log(`ee.emit(Events.NEW_CAT_FACT, fact);`);
return fact;
}),
onFact: publicProcedure.subscription(({ ctx }) => {
console.log("publicProcedure.subscription")

// `resolve()` is triggered for each client when they start subscribing `onFact`
// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
const onCatFact = (data: CatFact) => {
console.log(`Emitting fact to client:`, data);
// emit data to client
emit.next(data);
};
console.log("ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);")

ctx.ee.on(Events.NEW_CAT_FACT, onCatFact);

return () => {
ctx.ee.off(Events.NEW_CAT_FACT, onCatFact);
};
});
}),
});
UUUnknown User3/10/2023
Message Not Public
Sign In & Join Server To View
Mmoloch3/30/2023
Thank you very much for the detailed response! The issue turned out to be caused by Next.js using different processes and therefor contexts between the two endpoints, I was using an EventEmitter to trigger the webhooks but each context had it's own isolated EventEmitter. Ended up switching to Redis to solve the issue 🤦‍♀️
UUUnknown User4/12/2023
4 Messages Not Public
Sign In & Join Server To View
Mmoloch4/12/2023
I also struggled with setting up redis. My current working example is much the result of brute force trial / error:
onFact: publicProcedure.subscription(async ({ ctx }) => {
if (!redisClient.isReady) {
console.log("Redis client is not connected!");
await redisClient.connect();
}


// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
// define a listener function to handle incoming messages
const listener = (message: string, channel: string) => {
console.log(`Received message on channel ${channel}: ${message}`);

if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};

// subscribe to the `cat-facts` channel in Redis
const subscriber = redisClient.duplicate();
subscriber.subscribe("cat-facts", listener);

subscriber.connect();

const onCatFact = (channel: string, message: string) => {
if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};
// trigger `onCatFact()` when a message is published to the `cat-facts` channel
subscriber.on("message", onCatFact);

// unsubscribe function when client disconnects or stops subscribing
return () => {
// unsubscribe from the `cat-facts` channel in Redis
subscriber.unsubscribe("cat-facts");
subscriber.off("message", onCatFact);
};
});
}),
onFact: publicProcedure.subscription(async ({ ctx }) => {
if (!redisClient.isReady) {
console.log("Redis client is not connected!");
await redisClient.connect();
}


// return an `observable` with a callback which is triggered immediately
return observable<CatFact>((emit) => {
// define a listener function to handle incoming messages
const listener = (message: string, channel: string) => {
console.log(`Received message on channel ${channel}: ${message}`);

if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};

// subscribe to the `cat-facts` channel in Redis
const subscriber = redisClient.duplicate();
subscriber.subscribe("cat-facts", listener);

subscriber.connect();

const onCatFact = (channel: string, message: string) => {
if (channel === "cat-facts") {
const fact = JSON.parse(message);
// emit data to client
emit.next(fact);
}
};
// trigger `onCatFact()` when a message is published to the `cat-facts` channel
subscriber.on("message", onCatFact);

// unsubscribe function when client disconnects or stops subscribing
return () => {
// unsubscribe from the `cat-facts` channel in Redis
subscriber.unsubscribe("cat-facts");
subscriber.off("message", onCatFact);
};
});
}),
hope this helps!
UUUnknown User4/12/2023
2 Messages Not Public
Sign In & Join Server To View
Mmoloch4/12/2023
I just used the redis npm package
UUUnknown User4/12/2023
Message Not Public
Sign In & Join Server To View
Mmoloch4/12/2023
that's great though
UUUnknown User4/12/2023
3 Messages Not Public
Sign In & Join Server To View
Mmoloch4/12/2023
Ya I'm not sure how performant that is, but it seemed to be the only way I could get it to work when I was fiddling around np, and good luck!
UUUnknown User4/12/2023
3 Messages Not Public
Sign In & Join Server To View

Looking for more? Join the community!

T
tRPC

Cannot get subscription event to fire

Join Server
Recommended Posts
How to get response type depending on the input params?Simple example of a normal JS function with generics: ``` const example = <T extends unknown>(paramshow to build tRPC and Prisma with express?I am using TSC as the official docs example does. But when I use paths in tsconfig.json, It does notHow to infer types from input?When I call my procedure from the client I send an array of strings as an input. How can I infer theHow to infer types of a queryCoinsider the example `const hello = trpc.hello.useQuery();` I would like to export the type we get Data Visualisation/Chartsis anyone using tRPC on data visualisation (say bar chart/line chart) in a scenario close to a bankitrpc + AWS Lambda (through cdk)Hi all, has anyone successfully integrated tRPC with AWS Lambda? My current stack is API Gateway + LIs it possible to narrow an output schema if the query optionally doesn't return all fields?I have a router procedure that has an input schema that has an optional `filter` that changes the shFetching different server url than defined in configIs it possible to access the reactQuery instance and fetch different server url? I would like to useinput using z.or not working properlyi have an input like this let input = z.object({ name: z.string().optional() }).or(z.object({ How can I disable batching with fastify adapter?I cant seem to find a way to disable batching for my server, and this link doesnt help me much httpsIssue with monorepo architecture ant tRPCHi, we had an issue with batched requests that if we batch some requests they produce a TRPCClientErUsing tRPC in CRON jobsHey everyone, this might be a very stupid question, but is it possible to use tRPC inside a CRON joasync createContext for Express AdapterBeen debugging an odd behavior for the past hour, it seems like that an async function does not workIs it possible to split the router definition with the imlementation?I want to define the server router(input\output\meta) in a separate package from the server package Cache not working for `useQuery`I have a query like this: ```js const { data: article, isFetching } = api.public.getArticle.useQueryZod File Upload Validation with Open-Api Support?Hi guys, anyone know how to validate file upload with zod and get also open-api support?Zod validation and open-api support for File on the server?Hi guys, anyone know how to validate a File upload using zod? and also have open-api support?is context cached?If I put an object on the context that represents the User record from my database... ``` export asyJSON inferred router output not matchingHello. I have a procedure query which is returning Json from a postgresql db using Prisma. The type Best way to implement input based validation on a router procedureHi guys, bit of a noob. I have already created a 'protectedProcedure', ensuring the user is logged