hasanaktasTR
hasanaktasTR4d ago

Help for subscription with redis

My subscription is currently as in the function below. It works, but it doesn't make sense to check the response from redis once every second. How can I make the subscription more performant? I couldn't find an example using redis or rabbitmq.
const listSubscription= profileProcedure
.input(
z.object({
matchId: z.string(),
}),
)
.subscription(async function* ({ signal, input, ctx }) {
console.log("connection opened");

await ctx.eventSub.subscribe(input.matchId); //eventsub is a redis client

const messageQueue: any[] = [];

const messageHandler = (channel: string, message: string) => {
const post = JSON.parse(message);
messageQueue.push(post);
};

ctx.eventSub.on("message", messageHandler);


try {
while (!signal?.aborted) {
if (messageQueue.length > 0) {
yield messageQueue.shift();
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
} finally {
console.log("connection closed");
ctx.eventSub.off("message", messageHandler);
ctx.eventSub.unsubscribe(input.matchId);
}
})
const listSubscription= profileProcedure
.input(
z.object({
matchId: z.string(),
}),
)
.subscription(async function* ({ signal, input, ctx }) {
console.log("connection opened");

await ctx.eventSub.subscribe(input.matchId); //eventsub is a redis client

const messageQueue: any[] = [];

const messageHandler = (channel: string, message: string) => {
const post = JSON.parse(message);
messageQueue.push(post);
};

ctx.eventSub.on("message", messageHandler);


try {
while (!signal?.aborted) {
if (messageQueue.length > 0) {
yield messageQueue.shift();
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
} finally {
console.log("connection closed");
ctx.eventSub.off("message", messageHandler);
ctx.eventSub.unsubscribe(input.matchId);
}
})
7 Replies
BeBoRE
BeBoRE4d ago
Why are you doing the timeout and the weird while loop?
BeBoRE
BeBoRE4d ago
If you take a look at https://trpc.io/docs/server/subscriptions You should probably be able to turn you subscriber into an iterable using import { on } from "node:events" then you can for await that result instead of the message queue stuff you are doing.
hasanaktasTR
hasanaktasTR4d ago
@BeBoRE I had looked at the example in the document but it had not occurred to me that the on function in node:events could work with redis. I have never used eventemitter before. Do you think the following setup is correct?
const listSubscription= profileProcedure
.input(
z.object({
matchId: z.string(),
}),
)
.subscription(async function* ({ signal, input, ctx }) {
console.log("connection opened");
await ctx.eventSub.subscribe(input.matchId);

try {
for await (const [_, message] of on(ctx.eventSub, "message", {
signal: signal,
})) {
const post = JSON.parse(message);
yield post;
}
} finally {
console.log("connection closed");
ctx.eventSub.unsubscribe(input.matchId);
}
})
const listSubscription= profileProcedure
.input(
z.object({
matchId: z.string(),
}),
)
.subscription(async function* ({ signal, input, ctx }) {
console.log("connection opened");
await ctx.eventSub.subscribe(input.matchId);

try {
for await (const [_, message] of on(ctx.eventSub, "message", {
signal: signal,
})) {
const post = JSON.parse(message);
yield post;
}
} finally {
console.log("connection closed");
ctx.eventSub.unsubscribe(input.matchId);
}
})
BeBoRE
BeBoRE4d ago
Try it out, the Redis client implements or extends the EventEmitter, so it should work in theory.
wleistra
wleistra3d ago
This was the exact issue I ran into today. We have a redis client too and I noticed the depreciation notice on our subscriptions that now just return a resolver directly. In the websocket examples I got lost between the deprecated warning and the examples because they looked identical. Then I figured that we only use 1 way messaging so websockets are overkill. I would love to see an example for redis with event emitter (or maybe the one above is that example i missed) For some reasons our redis client is not in the ctx. Is there a pattern or reason for being there?
BeBoRE
BeBoRE3d ago
This is just a pattern they are using, where they pass the client to the subscription handler by context. https://trpc.io/docs/server/context. There is no one way of getting the client to the handler, just make sure you aren’t initializing a new client with every request.
wleistra
wleistra15h ago
Good call... Have to check that indeed! Gave it a shot, did not get it to work with ioredis. 😦 have to do some more digging. The examples on trpc website are great but only consider there is one instance of node server running, so if anyone has an example of using redis pubsub with web sockets/SSE that would be fanatastic to add.
export const onReview = publicProcedure.input(zCredentials.nullable()).subscription(async function* ({
input,
signal,
}) {
const onReview = async (channel: string, message: string) => {
try {
ee.emit(channel, message);
} catch (err) {
// eslint-disable-next-line no-console
console.error('err', err);
}
};

if (!pubsub.subscriber) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to subscribe to review changes',
});
}

await pubsub.subscriber?.subscribe(SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE);
pubsub.subscriber?.on('message', onReview);

try {
for await (const [message] of on(ee, SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE, {
signal,
})) {
if (!input) return;

await tryAuthenticate(input);

const successfulReviews = JSON.parse(message ?? {});
yield successfulReviews;
}
} finally {
pubsub.subscriber?.off('message', onReview);
pubsub.subscriber?.unsubscribe(SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE);
}
});
export const onReview = publicProcedure.input(zCredentials.nullable()).subscription(async function* ({
input,
signal,
}) {
const onReview = async (channel: string, message: string) => {
try {
ee.emit(channel, message);
} catch (err) {
// eslint-disable-next-line no-console
console.error('err', err);
}
};

if (!pubsub.subscriber) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to subscribe to review changes',
});
}

await pubsub.subscriber?.subscribe(SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE);
pubsub.subscriber?.on('message', onReview);

try {
for await (const [message] of on(ee, SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE, {
signal,
})) {
if (!input) return;

await tryAuthenticate(input);

const successfulReviews = JSON.parse(message ?? {});
yield successfulReviews;
}
} finally {
pubsub.subscriber?.off('message', onReview);
pubsub.subscriber?.unsubscribe(SubscriptionEventChannel.WORK_REPORT_REVIEW_CHANGE);
}
});
This seems to work for me, no idea if it is best practice unfortunately you cannot provide trpc.context.someMeta used in subscription link so i could opt for Websocket vs SSE workaround: use op.input and have some check for a key in an object