tRPCttRPC
Powered by
boneyB
tRPCβ€’3y agoβ€’
5 replies
boney

Unable to get mutation to trigger subscription because EventEmitter not being shared

Hey folks,

Been struggling with this for a few hours now hopelessly and trying random things - read all related posts in this forum, on github issues, and stackoverflow - and still don't understand what is going on.

I have a next app with a custom HTTP server and using tRPC. WSLink etc is all fine - i'm doing everything the proper way.

I have a router with these two functions:
 sendMessage: protectedProcedure
    .input(z.string())
    .mutation(async ({ ctx, input }) => {
      try {
        const chatRepo = new CommunityChatRepository(ctx.prisma);
        const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

        if (!result.ok) throw result.error;
        console.log({ namesSendMsg: wsee.eventNames() });

        wsee.emit('onNewMessage', result.value);

        return result.value;
      } catch (error) {
        if (error instanceof TRPCError) {
          throw error;
        } else {
          console.error(error);
          throw new TRPCError({
            code: 'INTERNAL_SERVER_ERROR',
            message: 'Internal server error',
          });
        }
      }
    }),

  onNewMessage: publicProcedure.subscription(() => {
    return observable<ChatMessageWithSenderInformation>((emit) => {
      const onNewMessage = (data: ChatMessageWithSenderInformation) => {
        emit.next(data);
      };

      console.log({ names: wsee.eventNames() });
      wsee.on('onNewMessage', onNewMessage);
      console.log({ names: wsee.eventNames() });

      console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

      return () => {
        console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
        wsee.off('onNewMessage', onNewMessage);
      };
    });
  }),
 sendMessage: protectedProcedure
    .input(z.string())
    .mutation(async ({ ctx, input }) => {
      try {
        const chatRepo = new CommunityChatRepository(ctx.prisma);
        const result = await chatRepo.sendMessage(ctx.session!.user.id, input);

        if (!result.ok) throw result.error;
        console.log({ namesSendMsg: wsee.eventNames() });

        wsee.emit('onNewMessage', result.value);

        return result.value;
      } catch (error) {
        if (error instanceof TRPCError) {
          throw error;
        } else {
          console.error(error);
          throw new TRPCError({
            code: 'INTERNAL_SERVER_ERROR',
            message: 'Internal server error',
          });
        }
      }
    }),

  onNewMessage: publicProcedure.subscription(() => {
    return observable<ChatMessageWithSenderInformation>((emit) => {
      const onNewMessage = (data: ChatMessageWithSenderInformation) => {
        emit.next(data);
      };

      console.log({ names: wsee.eventNames() });
      wsee.on('onNewMessage', onNewMessage);
      console.log({ names: wsee.eventNames() });

      console.log(`added listener ${wsee.listenerCount('onNewMessage')}`);

      return () => {
        console.log(`removed listener ${wsee.listenerCount('onNewMessage')}`);
        wsee.off('onNewMessage', onNewMessage);
      };
    });
  }),


I've added a bunch of console logs to help explain.

So, in my logs I see that when the app loads up I get a WS Connection to the server.

On the client side, I have
trpc.chat.onNewMessage.useSubscription
trpc.chat.onNewMessage.useSubscription
which console logs
{names: []}
{names: []}
first and then
{names: ['onNewMessage']}
{names: ['onNewMessage']}
as it should based on the code for the router.

But, when I send a message, the mutation fails to trigger the subscription because it logs
{namesSendMsg: []}
{namesSendMsg: []}
i.e. for some reason it does;n't recognize the attached listener

I thought it was because maybe somehow multiple
EventEmitter
EventEmitter
instances are being created due to HMR - so I did a workaround similar to what we do with Prisma for next dev mode:

src/eventEmitter.ts
src/eventEmitter.ts
:
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
  // Community Chat
  onNewMessage: (message: ChatMessageWithSenderInformation) => void;

  // Trivia
  join: () => void;
  getQuestions: () => void;
  onRankingUpdate: () => void;
  onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
  on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  emit<WEv extends keyof WSEvents>(
    event: WEv,
    ...args: Parameters<WSEvents[WEv]>
  ): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;
import { EventEmitter } from 'events';

import type { ChatMessageWithSenderInformation } from './server/repositories/CommunityChatRepository';

interface WSEvents {
  // Community Chat
  onNewMessage: (message: ChatMessageWithSenderInformation) => void;

  // Trivia
  join: () => void;
  getQuestions: () => void;
  onRankingUpdate: () => void;
  onChoiceUpdate: () => void;
}

export declare interface WSEventEmitter {
  on<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  off<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  once<WEv extends keyof WSEvents>(event: WEv, listener: WSEvents[WEv]): this;
  emit<WEv extends keyof WSEvents>(
    event: WEv,
    ...args: Parameters<WSEvents[WEv]>
  ): boolean;
}

export class WSEventEmitter extends EventEmitter {}

const globalForWSEE = globalThis as unknown as { wsee: WSEventEmitter };
const isDevMode = process.env.NODE_ENV !== 'production';

export const wsee = globalForWSEE.wsee || new WSEventEmitter();

if (isDevMode) globalForWSEE.wsee = wsee;


this did not help either. using this
wsee
wsee
everywhere doesn't change anything. im lost and have no idea WHY there are supposedly two different event emitter instances being used here?
tRPCJoin
Move Fast & Break Nothing. End-to-end typesafe APIs made easy.
5,015Members
Resources
Recent Announcements

Similar Threads

Was this page helpful?

Similar Threads

onSuccess mutation not being called
PeformPPeform / ❓-help
17mo ago
Types not being shared with the frontend
scrubbuddySscrubbuddy / ❓-help
2y ago
Cannot get subscription event to fire
molochMmoloch / ❓-help
3y ago
event emitter doesnt trigger subscription from webhook
nikoNniko / ❓-help
9mo ago