MugetsuM
tRPC5mo ago
1 reply
Mugetsu

SSE Subscriptions freeze app when multiple tabs open

So I just found out that whenever I open multiple tabs of my web app it stops working all together across all the tabs due to the subscription. Not quite sure how to debug this and find a solution for this problem.

CacheUpdateSubscriber Mounts in my root layout
apiUtils.location.findLocationsInCache is an staleTime: Infinite query

'use client';

import { api } from '$/lib/trpc/react';

export const CacheUpdateSubscriber = () => {
  const apiUtils = api.useUtils();

  api.location.onCacheUpdate.useSubscription(undefined, {
    onData: (newData) => {
      apiUtils.location.findLocationsInCache.setData(undefined, (oldData) => {
        if (oldData === undefined) return [];
        if (!newData?.length) return oldData;

        const dataMap = new Map(oldData.map((item) => [item.id, item]));

        for (const item of newData) {
          dataMap.set(item.id, item);
        }

        return Array.from(dataMap.values());
      });
    },
  });

  return null;
};



  onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
    const redisSubscriber = redis.duplicate();
    const emitter = new EventEmitter();

    try {
      await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);

      redisSubscriber.on('message', (channel, message) => {
        if (channel === LOCATIONS_UPDATE_CHANNEL) {
          emitter.emit('update', JSON.parse(message));
        }
      });

      for await (const [data] of on(emitter, 'update', opts)) {
        yield data as Awaited<
          ReturnType<typeof cacheManager.getLocationsByIds>
        >;
      }
    } catch (e) {
      throw e;
    } finally {
      emitter.removeAllListeners();
      await redisSubscriber.unsubscribe();
      await redisSubscriber.quit();
    }
  })
Was this page helpful?