boneyB
tRPC3y ago
5 replies
boney

Unable to get mutation to trigger subscription because EventEmitter not being shared

Hey folks,

Been struggling with this for a few hours now hopelessly and trying random things - read all related posts in this forum, on github issues, and stackoverflow - and still don't understand what is going on.

I have a next app with a custom HTTP server and using tRPC. WSLink etc is all fine - i'm doing everything the proper way.

I have a router with these two functions:
 sendMessage: protectedProcedure
    .input(z.string())
    .mutation(async ({ ctx, input }) => {
      try {
        const chatRepo = new CommunityChatRepository(ctx.prisma);
        const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

        if (!result.ok) throw result.error;
        console.log({ namesSendMsg: wsee.eventNames() });

        wsee.emit('onNewMessage', result.value);

        return result.value;
      } catch (error) {
        if (error instanceof TRPCError) {
          throw error;
        } else {
          console.error(error);
          throw new TRPCError({
            code: 'INTERNAL_SERVER_ERROR',
            message: 'Internal server error',
          });
        }
      }
    }),

  onNewMessage: publicProcedure.subscription(() => {
    return observable<ChatMessageWithSenderInformation>((emit) => {
      const onNewMessage = (data: ChatMessageWithSenderInformation) => {
        emit.next(data);
      };

      console.log({ names: wsee.eventNames() });
      wsee.on('onNewMessage', onNewMessage);
      console.log({ names: wsee.eventNames() });

      console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

      return () => {
        console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
        wsee.off('onNewMessage', onNewMessage);
      };
    });
  }),


I've added a bunch of console logs to help explain.

So, in my logs I see that when the app loads up I get a WS Connection to the server.

On the client side, I have trpc.chat.onNewMessage.useSubscription which console logs {names: []} first and then {names: ['onNewMessage']} as it should based on the code for the router.

But, when I send a message, the mutation fails to trigger the subscription because it logs {namesSendMsg: []} i.e. for some reason it does;n't recognize the attached listener

I thought it was because maybe somehow multiple EventEmitter instances are being created due to HMR - so I did a workaround similar to what we do with Prisma for next dev mode:

src/eventEmitter.ts:
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
  // Community Chat
  onNewMessage: (message: ChatMessageWithSenderInformation) => void;

  // Trivia
  join: () => void;
  getQuestions: () => void;
  onRankingUpdate: () => void;
  onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
  on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  emit<WEv extends keyof WSEvents>(
    event: WEv,
    ...args: Parameters<WSEvents[WEv]>
  ): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;


this did not help either. using this wsee everywhere doesn't change anything. im lost and have no idea WHY there are supposedly two different event emitter instances being used here?
Was this page helpful?