Donald Biden
Donald Biden5mo ago

A simple example of using SSE and subscriptions

Hello! Congrats on launching v11 🚀 I'm having a hard time to understand subscriptions](https://trpc.io/docs/server/subscriptions), specially the full stack example which is already a bit complex. Just wondering if there is a simple example somewhere without using React for example and just a basic subscription when a todo item is added for example. Also, is it just me or the onComplete, onData, etc options in a plain subscription got removed and is only available in useSubscription()? The types from v10 have it https://github.com/trpc/trpc/blob/3bc06d055d3b860d978e9c465bce56fdcf95b71f/packages/client/src/internals/TRPCUntypedClient.ts#L32 Thanks!
4 Replies
Sploopy
Sploopy5mo ago
Which part are you confused on? The
.subscription
.subscription
trpc component or using it on the frontend?
export const SubscriptionEndpoint = publicProcedure
.subscription(async function* ({ _input, _ctx }) {
const redisSubscriber = new Redis(...)
const emitter = new EventEmitter()

try {
//Optional if you want to use redis
await redisSubscriber.subscribe("some_channel_name")
//Redis listening for messages
redisSubscriber.on("message", async () => {
//If you don't want to use Redis you can just use an event emitter alone
emitter.emit("update", subData)
})

//Async generator to keep the thread alive
for await (const output of on(emitter, "update")) {
//Yield the output to your frontend subscribers
yield output[0]
}
} catch (error) {
throw error
} finally {
redisSubscriber.unsubscribe("some_channel_name")
redisSubscriber.quit()
}
})
export const SubscriptionEndpoint = publicProcedure
.subscription(async function* ({ _input, _ctx }) {
const redisSubscriber = new Redis(...)
const emitter = new EventEmitter()

try {
//Optional if you want to use redis
await redisSubscriber.subscribe("some_channel_name")
//Redis listening for messages
redisSubscriber.on("message", async () => {
//If you don't want to use Redis you can just use an event emitter alone
emitter.emit("update", subData)
})

//Async generator to keep the thread alive
for await (const output of on(emitter, "update")) {
//Yield the output to your frontend subscribers
yield output[0]
}
} catch (error) {
throw error
} finally {
redisSubscriber.unsubscribe("some_channel_name")
redisSubscriber.quit()
}
})
export const SubscriptionComponent = () => {
trpc.SubscriptionRouter.SubscriptionEndpoint.useSubscription(
//Props here
{},
{
onData: (data) => {
//Do the thing with the data
},
})
}
export const SubscriptionComponent = () => {
trpc.SubscriptionRouter.SubscriptionEndpoint.useSubscription(
//Props here
{},
{
onData: (data) => {
//Do the thing with the data
},
})
}
Mugetsu
Mugetsu3mo ago
@Sploopy Man you have rescued me ! i've been fighting exactly this usecase and could not figure out why my event emmiter is not sending the data.... Your example worked Like a charm THANKS ! ❤️
onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
const redisSubscriber = redis.duplicate();
const emitter = new EventEmitter();

try {
await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);

redisSubscriber.on('message', (channel, message) => {
if (channel === LOCATIONS_UPDATE_CHANNEL) {
emitter.emit('update', JSON.parse(message));
}
});

for await (const [data] of on(emitter, 'update', opts)) {
yield data as Awaited<
ReturnType<typeof cacheManager.getLocationsByIds>
>;
}
} catch (e) {
throw e;
} finally {
await redisSubscriber.unsubscribe();
await redisSubscriber.quit();
}
}),
onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
const redisSubscriber = redis.duplicate();
const emitter = new EventEmitter();

try {
await redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL);

redisSubscriber.on('message', (channel, message) => {
if (channel === LOCATIONS_UPDATE_CHANNEL) {
emitter.emit('update', JSON.parse(message));
}
});

for await (const [data] of on(emitter, 'update', opts)) {
yield data as Awaited<
ReturnType<typeof cacheManager.getLocationsByIds>
>;
}
} catch (e) {
throw e;
} finally {
await redisSubscriber.unsubscribe();
await redisSubscriber.quit();
}
}),
I do agree with @Donald Biden that there should be a less complicated example in the docs regarding this. As I could not get it to work with the current example of chat pub/sub in the docs
Alex / KATT 🐱
feel free to make a PR to the docs 🙂 if you figure out how to... - use explicit resource management, and - make your redis client compatible with async iterables rather than .on('x') ... you should be able to rewrite your thing into something like
onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
using redisSubscriber = redis.duplicate();
await using messages = redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL)
for await (const message of messages) {
yield message as SomeType
}
}),
onCacheUpdate: locationsProcedure.subscription(async function* (opts) {
using redisSubscriber = redis.duplicate();
await using messages = redisSubscriber.subscribe(LOCATIONS_UPDATE_CHANNEL)
for await (const message of messages) {
yield message as SomeType
}
}),
Revan
Revan3mo ago
tRPC also exports a function called observableToAsyncIterator that works with rxjs observables, which might also be worth mentioning also in the next version of rxjs observables will implement Symbol.asyncIterator so they should be able to be returned directly

Did you find this page helpful?