SSE Subscriptions freeze app when multiple tabs open
So I just found out that whenever I open multiple tabs of my web app it stops working all together across all the tabs due to the subscription. Not quite sure how to debug this and find a solution for this problem.
CacheUpdateSubscriberCacheUpdateSubscriber Mounts in my root layoutapiUtils.location.findLocationsInCacheapiUtils.location.findLocationsInCache is an staleTime: Infinite query'use client';
import { api } from '$/lib/trpc/react';
export const CacheUpdateSubscriber = () => {
const apiUtils = api.useUtils();
api.location.onCacheUpdate.useSubscription(undefined, {
onData: (newData) => {
apiUtils.location.findLocationsInCache.setData(undefined, (oldData) => {
if (oldData === undefined) return [];
if (!newData?.length) return oldData;
const dataMap = new Map(oldData.map((item) => [item.id, item]));
for (const item of newData) {
dataMap.set(item.id, item);
}
return Array.from(dataMap.values());
});
},
});
return null;
};'use client';
import { api } from '$/lib/trpc/react';
export const CacheUpdateSubscriber = () => {
const apiUtils = api.useUtils();
api.location.onCacheUpdate.useSubscription(undefined, {
onData: (newData) => {
apiUtils.location.findLocationsInCache.setData(undefined, (oldData) => {
if (oldData === undefined) return [];
if (!newData?.length) return oldData;
const dataMap = new Map(oldData.map((item) => [item.id, item]));
for (const item of newData) {
dataMap.set(item.id, item);
}
return Array.from(dataMap.values());
});
},
});
return null;
}; onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
const redisSubscriber = redis.duplicate();
const emitter = new EventEmitter();
try {
await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);
redisSubscriber.on('message', (channel, message) => {
if (channel === LOCATIONS_UPDATE_CHANNEL) {
emitter.emit('update', JSON.parse(message));
}
});
for await (const [data] of on(emitter, 'update', opts)) {
yield data as Awaited<
ReturnType<typeof cacheManager.getLocationsByIds>
>;
}
} catch (e) {
throw e;
} finally {
emitter.removeAllListeners();
await redisSubscriber.unsubscribe();
await redisSubscriber.quit();
}
}) onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
const redisSubscriber = redis.duplicate();
const emitter = new EventEmitter();
try {
await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);
redisSubscriber.on('message', (channel, message) => {
if (channel === LOCATIONS_UPDATE_CHANNEL) {
emitter.emit('update', JSON.parse(message));
}
});
for await (const [data] of on(emitter, 'update', opts)) {
yield data as Awaited<
ReturnType<typeof cacheManager.getLocationsByIds>
>;
}
} catch (e) {
throw e;
} finally {
emitter.removeAllListeners();
await redisSubscriber.unsubscribe();
await redisSubscriber.quit();
}
})