Unable to get mutation to trigger subscription because EventEmitter not being shared

Hey folks, Been struggling with this for a few hours now hopelessly and trying random things - read all related posts in this forum, on github issues, and stackoverflow - and still don't understand what is going on. I have a next app with a custom HTTP server and using tRPC. WSLink etc is all fine - i'm doing everything the proper way. I have a router with these two functions:
sendMessage: protectedProcedure
.input(z.string())
.mutation(async ({ ctx, input }) => {
try {
const chatRepo = new CommunityChatRepository(ctx.prisma);
const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

if (!result.ok) throw result.error;
console.log({ namesSendMsg: wsee.eventNames() });

wsee.emit('onNewMessage', result.value);

return result.value;
} catch (error) {
if (error instanceof TRPCError) {
throw error;
} else {
console.error(error);
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Internal server error',
});
}
}
}),

onNewMessage: publicProcedure.subscription(() => {
return observable<ChatMessageWithSenderInformation>((emit) => {
const onNewMessage = (data: ChatMessageWithSenderInformation) => {
emit.next(data);
};

console.log({ names: wsee.eventNames() });
wsee.on('onNewMessage', onNewMessage);
console.log({ names: wsee.eventNames() });

console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

return () => {
console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
wsee.off('onNewMessage', onNewMessage);
};
});
}),
sendMessage: protectedProcedure
.input(z.string())
.mutation(async ({ ctx, input }) => {
try {
const chatRepo = new CommunityChatRepository(ctx.prisma);
const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

if (!result.ok) throw result.error;
console.log({ namesSendMsg: wsee.eventNames() });

wsee.emit('onNewMessage', result.value);

return result.value;
} catch (error) {
if (error instanceof TRPCError) {
throw error;
} else {
console.error(error);
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Internal server error',
});
}
}
}),

onNewMessage: publicProcedure.subscription(() => {
return observable<ChatMessageWithSenderInformation>((emit) => {
const onNewMessage = (data: ChatMessageWithSenderInformation) => {
emit.next(data);
};

console.log({ names: wsee.eventNames() });
wsee.on('onNewMessage', onNewMessage);
console.log({ names: wsee.eventNames() });

console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

return () => {
console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
wsee.off('onNewMessage', onNewMessage);
};
});
}),
I've added a bunch of console logs to help explain. So, in my logs I see that when the app loads up I get a WS Connection to the server. On the client side, I have trpc.chat.onNewMessage.useSubscription which console logs {names: []} first and then {names: ['onNewMessage']} as it should based on the code for the router. But, when I send a message, the mutation fails to trigger the subscription because it logs {namesSendMsg: []} i.e. for some reason it does;n't recognize the attached listener I thought it was because maybe somehow multiple EventEmitter instances are being created due to HMR - so I did a workaround similar to what we do with Prisma for next dev mode: src/eventEmitter.ts:
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
// Community Chat
onNewMessage: (message: ChatMessageWithSenderInformation) => void;

// Trivia
join: () => void;
getQuestions: () => void;
onRankingUpdate: () => void;
onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
emit<WEv extends keyof WSEvents>(
event: WEv,
...args: Parameters<WSEvents[WEv]>
): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
// Community Chat
onNewMessage: (message: ChatMessageWithSenderInformation) => void;

// Trivia
join: () => void;
getQuestions: () => void;
onRankingUpdate: () => void;
onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
emit<WEv extends keyof WSEvents>(
event: WEv,
...args: Parameters<WSEvents[WEv]>
): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;
this did not help either. using this wsee everywhere doesn't change anything. im lost and have no idea WHY there are supposedly two different event emitter instances being used here?
haardik | LearnWeb3
haardik | LearnWeb3234d ago
my terminal output basically looks like this:
No description
haardik | LearnWeb3
haardik | LearnWeb3234d ago
FWIW - i cloned the example websockets repo and that works just fine. so something somewhere is wrong, but i have absolutely no idea what. any hints would be great. my tRPC is initialized as such:
function getBaseUrl() {
if (typeof window !== 'undefined') return '';
return `${process.env.NEXT_PUBLIC_APP_URL}`;
}

function getEndingLink() {
const url =
process.env.NODE_ENV !== 'production'
? 'ws://localhost:3001'
: process.env.NEXT_PUBLIC_WS_URL || 'ws://localhost:3001';

const client = createWSClient({
url,
});

return wsLink<AppRouter>({
client,
});
}

export const trpc = createTRPCNext<AppRouter>({
config({ ctx }) {
if (typeof window !== 'undefined') {
return {
transformer: SuperJSON,
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: getEndingLink(),
false: splitLink({
condition: (op) => op.context.skipBatch === true,
true: httpLink({
url: '/api/trpc',
}),
false: httpBatchLink({
url: '/api/trpc',
}),
}),
}),
],
};
}
return {
transformer: SuperJSON,
links: [
splitLink({
condition(op) {
return op.context.skipBatch === true;
},
true: httpLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
}),
],
};
},
ssr: true,
abortOnUnmount: true,
});
function getBaseUrl() {
if (typeof window !== 'undefined') return '';
return `${process.env.NEXT_PUBLIC_APP_URL}`;
}

function getEndingLink() {
const url =
process.env.NODE_ENV !== 'production'
? 'ws://localhost:3001'
: process.env.NEXT_PUBLIC_WS_URL || 'ws://localhost:3001';

const client = createWSClient({
url,
});

return wsLink<AppRouter>({
client,
});
}

export const trpc = createTRPCNext<AppRouter>({
config({ ctx }) {
if (typeof window !== 'undefined') {
return {
transformer: SuperJSON,
links: [
splitLink({
condition: (op) => op.type === 'subscription',
true: getEndingLink(),
false: splitLink({
condition: (op) => op.context.skipBatch === true,
true: httpLink({
url: '/api/trpc',
}),
false: httpBatchLink({
url: '/api/trpc',
}),
}),
}),
],
};
}
return {
transformer: SuperJSON,
links: [
splitLink({
condition(op) {
return op.context.skipBatch === true;
},
true: httpLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
headers() {
if (!ctx?.req?.headers) {
return {};
}
const {
// If you're using Node 18 before 18.15.0, omit the "connection" header
// eslint-disable-next-line @typescript-eslint/naming-convention, unused-imports/no-unused-vars
connection: _connection,
...headers
} = ctx.req.headers;
return headers;
},
}),
}),
],
};
},
ssr: true,
abortOnUnmount: true,
});
Sorry to tag but cc @Alex / KATT 🐱 - another person mentioned this issue in another thread with no response and I feel like I’ve tried everything short of digging into Trpc codebase and trying to figure out what’s going on Edit - the other issue is https://discord.com/channels/867764511159091230/1125838439393271958 I also tested with creating a simple subscription route that just sets an interval and emits random data to the frontend - this works just fine The problem is simply that two different instances of the event emitter seem to be used in the mutation vs the subscription, causing the mutation route to believe there’s no attached listeners when there are And I have no idea why there are two different instances :/
More Posts
Is there a guide of how to test?I'm usually use vitest but in frontend of shared packages. I'm frontend trying to create my first aModify the payload before mutatingI have a specific mutation hook that is used all throughout my app, and inside of the hook I want tois there a simple boilerplate that has everything ready to deploy?For example nodemon, reading the port for .env, etc. (I don't know if that's how it is haha) I use Context is not fully globally accessed? [ probably newbie question ]I create a context in the based procedure, but It's undefined in the procedure based on it. ~~Also Using tRPC for server to server requestsHey everyone! The start up I’m at recently deployed a server separate from our main server. Our mainReferenceError: FormData is not definedI am using the experimental form data as shown in the examples and it works correctly on my machine,TRPCError that has TRPCError as causeHaving some troubles with error handling, specifically, my errors are double-wrapped in a TRPCErrorHow do you make use of custom input validation?I have an input validation middleware, to parse my input based on some values in ctx. ``` export fuWhy useQuery() tries to refetch when error occurs while useMutation() doesn't?I'm using the T3 stack. ```test: publicProcedure.input(z.string().min(5)).query(async () => { Lambda WITHOUT API Gateway (Lambda Function URL)Anyone know how to make this work?In a monorepo can I have 2 packages each one with different trpc server, and use both in 1 app?I want to create 2 different APIs, and in my monorepo I have several apps, in some I want to use botHave you used electron-trpc? how does it work for you? Do you recommend it?https://www.electron-trpc.dev/ Thanks