ChronicStone
ChronicStone16mo ago

tRPC subscription : Access to socket ID from subscription

I'm trying to implement user online status in a reliable way on my app, on the "live" part of my app. It's done & working well, but on some cases where the connexion is cut & not restored, I can't trigger the mutation marking the user offline. Right now I'm handling this through the client only right now but in some instances, it's not reliable. What I'd like to do would be something like this : (For subscription I need to pass my authToken as an input since wsLink does not support dynamic header resolver yet to my knowledge).
userOnlineStatusChanged: publicProcedure
.input(z.object({ authToken: z.string() }))
.subscription(async ({ input, ctx, socket }) => {
const user = await getUserFromJwt()
if(!user) throw new TRPCError({ code: 'UNAUTHORIZED', message: '...' })

// SOMEHOW GET SOCKET ID & ASSOCIATE IT TO USER :
const storage = useStorage('db')
storage.set(socket.id, user._id)


return observable<{ userId: string; online: boolean }>((emit) => {
mapEvents.on(
'userOnlineStatusChanged',
(params) => params.organizationId === user.organizationId && emit.next(params),
);

return () => {
mapEvents.off(
'userOnlineStatusChanged',
(params) => params.organizationId === user.organizationId && emit.next(params),
);
};
});
})
userOnlineStatusChanged: publicProcedure
.input(z.object({ authToken: z.string() }))
.subscription(async ({ input, ctx, socket }) => {
const user = await getUserFromJwt()
if(!user) throw new TRPCError({ code: 'UNAUTHORIZED', message: '...' })

// SOMEHOW GET SOCKET ID & ASSOCIATE IT TO USER :
const storage = useStorage('db')
storage.set(socket.id, user._id)


return observable<{ userId: string; online: boolean }>((emit) => {
mapEvents.on(
'userOnlineStatusChanged',
(params) => params.organizationId === user.organizationId && emit.next(params),
);

return () => {
mapEvents.off(
'userOnlineStatusChanged',
(params) => params.organizationId === user.organizationId && emit.next(params),
);
};
});
})
The goal being that on disconnexion of the socket, I mark the user as disconnected from the server to make the whole thing more reliable :
2 Replies
ChronicStone
ChronicStoneOP16mo ago
// wsServer.ts
const WebSocketServer = WebSocket.Server || WSWebSocketServer;
const wss = new WebSocketServer({
port: 3002,
});

const handler = applyWSSHandler({ wss, router: appRouter, createContext: () => ({}) as Context });

wss.on('connection', (ws) => {
consola.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
consola.log(`➖➖ Connection (${wss.clients.size})`);
const storage = useStorage('db')
// HERE, SOMEHOW GET UNIQUE SOCKET ID
const socketId = ...
const linkedUserId = storage.get(socketId)

// If present linked user ID, mark user offline
if(linkedUserId) db.update(userOrganizationsTable)....

});
});

consola.log('✅ WebSocket Server listening on ws://localhost:3002');
// wsServer.ts
const WebSocketServer = WebSocket.Server || WSWebSocketServer;
const wss = new WebSocketServer({
port: 3002,
});

const handler = applyWSSHandler({ wss, router: appRouter, createContext: () => ({}) as Context });

wss.on('connection', (ws) => {
consola.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
consola.log(`➖➖ Connection (${wss.clients.size})`);
const storage = useStorage('db')
// HERE, SOMEHOW GET UNIQUE SOCKET ID
const socketId = ...
const linkedUserId = storage.get(socketId)

// If present linked user ID, mark user offline
if(linkedUserId) db.update(userOrganizationsTable)....

});
});

consola.log('✅ WebSocket Server listening on ws://localhost:3002');
That's the idea of what I'm trying to do... I managed to get this working
ChronicStone
ChronicStoneOP16mo ago
I strongly inspired from the ideas presented in this article : https://preciselab.io/trpc/
Precise Lab
tRPC - super fast development cycle for fullstack typescript apps
We building tRPC client and server with query, mutation, authentication and subscriptions. Authentication for websocket can be tricky and it is in this case so there are presented three approaches to solve this problem.