tRPCttRPC
Powered by
MugetsuM
tRPC•6mo ago•
1 reply
Mugetsu

SSE Subscriptions freeze app when multiple tabs open

So I just found out that whenever I open multiple tabs of my web app it stops working all together across all the tabs due to the subscription. Not quite sure how to debug this and find a solution for this problem.

CacheUpdateSubscriber
CacheUpdateSubscriber
Mounts in my root layout
apiUtils.location.findLocationsInCache
apiUtils.location.findLocationsInCache
is an staleTime: Infinite query

'use client';

import { api } from '$/lib/trpc/react';

export const CacheUpdateSubscriber = () => {
  const apiUtils = api.useUtils();

  api.location.onCacheUpdate.useSubscription(undefined, {
    onData: (newData) => {
      apiUtils.location.findLocationsInCache.setData(undefined, (oldData) => {
        if (oldData === undefined) return [];
        if (!newData?.length) return oldData;

        const dataMap = new Map(oldData.map((item) => [item.id, item]));

        for (const item of newData) {
          dataMap.set(item.id, item);
        }

        return Array.from(dataMap.values());
      });
    },
  });

  return null;
};
'use client';

import { api } from '$/lib/trpc/react';

export const CacheUpdateSubscriber = () => {
  const apiUtils = api.useUtils();

  api.location.onCacheUpdate.useSubscription(undefined, {
    onData: (newData) => {
      apiUtils.location.findLocationsInCache.setData(undefined, (oldData) => {
        if (oldData === undefined) return [];
        if (!newData?.length) return oldData;

        const dataMap = new Map(oldData.map((item) => [item.id, item]));

        for (const item of newData) {
          dataMap.set(item.id, item);
        }

        return Array.from(dataMap.values());
      });
    },
  });

  return null;
};



  onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
    const redisSubscriber = redis.duplicate();
    const emitter = new EventEmitter();

    try {
      await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);

      redisSubscriber.on('message', (channel, message) => {
        if (channel === LOCATIONS_UPDATE_CHANNEL) {
          emitter.emit('update', JSON.parse(message));
        }
      });

      for await (const [data] of on(emitter, 'update', opts)) {
        yield data as Awaited<
          ReturnType<typeof cacheManager.getLocationsByIds>
        >;
      }
    } catch (e) {
      throw e;
    } finally {
      emitter.removeAllListeners();
      await redisSubscriber.unsubscribe();
      await redisSubscriber.quit();
    }
  })
  onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
    const redisSubscriber = redis.duplicate();
    const emitter = new EventEmitter();

    try {
      await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);

      redisSubscriber.on('message', (channel, message) => {
        if (channel === LOCATIONS_UPDATE_CHANNEL) {
          emitter.emit('update', JSON.parse(message));
        }
      });

      for await (const [data] of on(emitter, 'update', opts)) {
        yield data as Awaited<
          ReturnType<typeof cacheManager.getLocationsByIds>
        >;
      }
    } catch (e) {
      throw e;
    } finally {
      emitter.removeAllListeners();
      await redisSubscriber.unsubscribe();
      await redisSubscriber.quit();
    }
  })
tRPCJoin
Move Fast & Break Nothing. End-to-end typesafe APIs made easy.
5,015Members
Resources
Recent Announcements

Similar Threads

Was this page helpful?

Similar Threads

Suspend subscriptions when app is in background
BeBoREBBeBoRE / ❓-help
3y ago
A simple example of using SSE and subscriptions
Donald BidenDDonald Biden / ❓-help
11mo ago
Subscriptions
Ahmed EidAAhmed Eid / ❓-help
3y ago
websocket or sse
vbhvVvbhv / ❓-help
2y ago