haardik | LearnWeb3
haardik | LearnWeb3ā€¢16mo ago

Unable to get mutation to trigger subscription because EventEmitter not being shared

Hey folks, Been struggling with this for a few hours now hopelessly and trying random things - read all related posts in this forum, on github issues, and stackoverflow - and still don't understand what is going on. I have a next app with a custom HTTP server and using tRPC. WSLink etc is all fine - i'm doing everything the proper way. I have a router with these two functions:
sendMessage: protectedProcedure
.input(z.string())
.mutation(async ({ ctx, input }) => {
try {
const chatRepo = new CommunityChatRepository(ctx.prisma);
const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

if (!result.ok) throw result.error;
console.log({ namesSendMsg: wsee.eventNames() });

wsee.emit('onNewMessage', result.value);

return result.value;
} catch (error) {
if (error instanceof TRPCError) {
throw error;
} else {
console.error(error);
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Internal server error',
});
}
}
}),

onNewMessage: publicProcedure.subscription(() => {
return observable<ChatMessageWithSenderInformation>((emit) => {
const onNewMessage = (data: ChatMessageWithSenderInformation) => {
emit.next(data);
};

console.log({ names: wsee.eventNames() });
wsee.on('onNewMessage', onNewMessage);
console.log({ names: wsee.eventNames() });

console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

return () => {
console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
wsee.off('onNewMessage', onNewMessage);
};
});
}),
sendMessage: protectedProcedure
.input(z.string())
.mutation(async ({ ctx, input }) => {
try {
const chatRepo = new CommunityChatRepository(ctx.prisma);
const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

if (!result.ok) throw result.error;
console.log({ namesSendMsg: wsee.eventNames() });

wsee.emit('onNewMessage', result.value);

return result.value;
} catch (error) {
if (error instanceof TRPCError) {
throw error;
} else {
console.error(error);
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Internal server error',
});
}
}
}),

onNewMessage: publicProcedure.subscription(() => {
return observable<ChatMessageWithSenderInformation>((emit) => {
const onNewMessage = (data: ChatMessageWithSenderInformation) => {
emit.next(data);
};

console.log({ names: wsee.eventNames() });
wsee.on('onNewMessage', onNewMessage);
console.log({ names: wsee.eventNames() });

console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

return () => {
console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
wsee.off('onNewMessage', onNewMessage);
};
});
}),
I've added a bunch of console logs to help explain. So, in my logs I see that when the app loads up I get a WS Connection to the server. On the client side, I have trpc.chat.onNewMessage.useSubscription which console logs {names: []} first and then {names: ['onNewMessage']} as it should based on the code for the router. But, when I send a message, the mutation fails to trigger the subscription because it logs {namesSendMsg: []} i.e. for some reason it does;n't recognize the attached listener I thought it was because maybe somehow multiple EventEmitter instances are being created due to HMR - so I did a workaround similar to what we do with Prisma for next dev mode: src/eventEmitter.ts:
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
// Community Chat
onNewMessage: (message: ChatMessageWithSenderInformation) => void;

// Trivia
join: () => void;
getQuestions: () => void;
onRankingUpdate: () => void;
onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
emit<WEv extends keyof WSEvents>(
event: WEv,
...args: Parameters<WSEvents[WEv]>
): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
// Community Chat
onNewMessage: (message: ChatMessageWithSenderInformation) => void;

// Trivia
join: () => void;
getQuestions: () => void;
onRankingUpdate: () => void;
onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
emit<WEv extends keyof WSEvents>(
event: WEv,
...args: Parameters<WSEvents[WEv]>
): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;
this did not help either. using this wsee everywhere doesn't change anything. im lost and have no idea WHY there are supposedly two different event emitter instances being used here?
2 Replies
haardik | LearnWeb3
haardik | LearnWeb3OPā€¢16mo ago
my terminal output basically looks like this:
No description
haardik | LearnWeb3
haardik | LearnWeb3OPā€¢16mo ago
FWIW - i cloned the example websockets repo and that works just fine. so something somewhere is wrong, but i have absolutely no idea what. any hints would be great. my tRPC is initialized as such:
function getBaseUrl() {
if (typeof window !== 'undefined') return '';
return `${process.env.NEXT_PUBLIC_APP_URL}`;
}

function getEndingLink() {
const url =
process.env.NODE_ENV !== 'production'
? 'ws://localhost:3001'
: process.env.NEXT_PUBLIC_WS_URL || 'ws://localhost:3001';

const client = createWSClient({
url,
});

return wsLink<AppRouter>({
client,
});
}

export const trpc = createTRPCNext<AppRouter>({
config({ ctx }) {
if (typeof window !== 'undefined') {
return {
transformer: SuperJSON,
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: getEndingLink(),
false: splitLink({
condition: (op) => op.context.skipBatch === true,
true: httpLink({
url: '/api/trpc',
}),
false: httpBatchLink({
url: '/api/trpc',
}),
}),
}),
],
};
}
return {
transformer: SuperJSON,
links: [
splitLink({
condition(op) {
return op.context.skipBatch === true;
},
true: httpLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
}),
],
};
},
ssr: true,
abortOnUnmount: true,
});
function getBaseUrl() {
if (typeof window !== 'undefined') return '';
return `${process.env.NEXT_PUBLIC_APP_URL}`;
}

function getEndingLink() {
const url =
process.env.NODE_ENV !== 'production'
? 'ws://localhost:3001'
: process.env.NEXT_PUBLIC_WS_URL || 'ws://localhost:3001';

const client = createWSClient({
url,
});

return wsLink<AppRouter>({
client,
});
}

export const trpc = createTRPCNext<AppRouter>({
config({ ctx }) {
if (typeof window !== 'undefined') {
return {
transformer: SuperJSON,
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: getEndingLink(),
false: splitLink({
condition: (op) => op.context.skipBatch === true,
true: httpLink({
url: '/api/trpc',
}),
false: httpBatchLink({
url: '/api/trpc',
}),
}),
}),
],
};
}
return {
transformer: SuperJSON,
links: [
splitLink({
condition(op) {
return op.context.skipBatch === true;
},
true: httpLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
}),
],
};
},
ssr: true,
abortOnUnmount: true,
});
Sorry to tag but cc @Alex / KATT šŸ± - another person mentioned this issue in another thread with no response and I feel like Iā€™ve tried everything short of digging into Trpc codebase and trying to figure out whatā€™s going on Edit - the other issue is https://discord.com/channels/867764511159091230/1125838439393271958 I also tested with creating a simple subscription route that just sets an interval and emits random data to the frontend - this works just fine The problem is simply that two different instances of the event emitter seem to be used in the mutation vs the subscription, causing the mutation route to believe thereā€™s no attached listeners when there are And I have no idea why there are two different instances :/

Did you find this page helpful?