Flaze
Flaze4d ago

How to handle a SSE exactly ?

Needing help for that
7 Replies
Flaze
FlazeOP4d ago
with the help of doc I came up with:
@Query({
input: NotebookEnhanceEntrySchema,
output: NotebookEnhanceEntryResponseSchema,
})
async enhanceEntry(@Ctx() ctx: AuthContext, @Input() input: { id: string }): Promise<Observable<string>> {
const { user } = ctx.auth;
const requestId = generate();
await this.notebookService.enhanceEntry(input.id, user, requestId);
return new Observable<string>(subscriber => {
const onChunk = (chunk: { id: string; delta: string; isLast: boolean }) => {
if (chunk.id !== requestId) return;
subscriber.next(chunk.delta);
if (chunk.isLast) {
subscriber.complete();
}
};

this.notebookService.eventEmitter.on('notebook.enhance.chunk', onChunk);

return () => {
this.notebookService.eventEmitter.off('notebook.enhance.chunk', onChunk);
};
});
}
@Query({
input: NotebookEnhanceEntrySchema,
output: NotebookEnhanceEntryResponseSchema,
})
async enhanceEntry(@Ctx() ctx: AuthContext, @Input() input: { id: string }): Promise<Observable<string>> {
const { user } = ctx.auth;
const requestId = generate();
await this.notebookService.enhanceEntry(input.id, user, requestId);
return new Observable<string>(subscriber => {
const onChunk = (chunk: { id: string; delta: string; isLast: boolean }) => {
if (chunk.id !== requestId) return;
subscriber.next(chunk.delta);
if (chunk.isLast) {
subscriber.complete();
}
};

this.notebookService.eventEmitter.on('notebook.enhance.chunk', onChunk);

return () => {
this.notebookService.eventEmitter.off('notebook.enhance.chunk', onChunk);
};
});
}
in my router
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: op => op.type === 'subscription',
false: httpBatchLink({
url: getAPIEndpoint('/api/trpc'),
headers: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
};
},
}),
true: unstable_httpSubscriptionLink({
url: getAPIEndpoint('/api/trpc'),
eventSourceOptions: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
headers: {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
},
};
},
}),
}),
],
}),
);
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: op => op.type === 'subscription',
false: httpBatchLink({
url: getAPIEndpoint('/api/trpc'),
headers: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
};
},
}),
true: unstable_httpSubscriptionLink({
url: getAPIEndpoint('/api/trpc'),
eventSourceOptions: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
headers: {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
},
};
},
}),
}),
],
}),
);
in my react app
const utils = trpc.useUtils();
const [isStreaming, setIsStreaming] = useState(false);
const [streamedContent, setStreamedContent] = useState('');

const handleEnhancement = useCallback(
async (_e: MouseEvent<HTMLButtonElement>): Promise<void> => {
try {
setIsStreaming(true);

setStreamedContent('');

return new Promise<void>((resolve, reject) => {
const subscription = utils.client.notebook.enhanceEntry.subscribe(
{ id: bookmark.id },
{
onComplete() {
setIsStreaming(false);

resolve();
},
onError(error: Error) {
setIsStreaming(false);

reject(error);
},
onData(chunk: string) {
try {
const parsed = parse(chunk, ALL) as {
content?: string;
isComplete?: boolean;
};

if (parsed.content) {
setStreamedContent(prev => prev + parsed.content);
}

if (parsed.isComplete) {
utils.notebook.getEntries.setData({ sortOrder: 'desc', sortBy: 'created_at' }, prev => {
if (!prev) return prev;

return prev.map(entry =>
entry.id === bookmark.id ? { ...entry, content: streamedContent } : entry,
);
});

resolve();
}
} catch {
// Continue buffering if parse fails
}
},
},
);

return () => {
subscription?.unsubscribe?.();
};
});
} catch (error) {
setIsStreaming(false);
throw error;
}
},
[bookmark.id, streamedContent, utils],
);
};
const utils = trpc.useUtils();
const [isStreaming, setIsStreaming] = useState(false);
const [streamedContent, setStreamedContent] = useState('');

const handleEnhancement = useCallback(
async (_e: MouseEvent<HTMLButtonElement>): Promise<void> => {
try {
setIsStreaming(true);

setStreamedContent('');

return new Promise<void>((resolve, reject) => {
const subscription = utils.client.notebook.enhanceEntry.subscribe(
{ id: bookmark.id },
{
onComplete() {
setIsStreaming(false);

resolve();
},
onError(error: Error) {
setIsStreaming(false);

reject(error);
},
onData(chunk: string) {
try {
const parsed = parse(chunk, ALL) as {
content?: string;
isComplete?: boolean;
};

if (parsed.content) {
setStreamedContent(prev => prev + parsed.content);
}

if (parsed.isComplete) {
utils.notebook.getEntries.setData({ sortOrder: 'desc', sortBy: 'created_at' }, prev => {
if (!prev) return prev;

return prev.map(entry =>
entry.id === bookmark.id ? { ...entry, content: streamedContent } : entry,
);
});

resolve();
}
} catch {
// Continue buffering if parse fails
}
},
},
);

return () => {
subscription?.unsubscribe?.();
};
});
} catch (error) {
setIsStreaming(false);
throw error;
}
},
[bookmark.id, streamedContent, utils],
);
};
in my my react component Help is greatly appreciated! I'm getting : access-control-allow-origin: * connection: keep-alive content-type: application/json date: Tue, 14 Jan 2025 16:21:09 GMT keep-alive: timeout=5 transfer-encoding: chunked vary: trpc-accept x-powered-by: Express response headers from my backend and here is the error: EventSource's response has a MIME type ("application/json") that is not "text/event-stream". Aborting the connection. How do I make sure the backend nestjsTRPC put that header accordingly ? I've tried to put @Subscription({ input?: ZodSchema, output?: ZodSchema }) But I can't seem to import that decorator
Flaze
FlazeOP4d ago
and it seems it's not present
No description
Flaze
FlazeOP4d ago
why is it not in the package?
No description
Flaze
FlazeOP4d ago
NestJS-tRPC: Bringing type-safety to NestJS
NestJS-tRPC Documentation - Routers
NestJS tRPC is a library designed to integrate the capabilities of tRPC into the NestJS framework. It aims to provide native support for decorators and implement an opinionated approach that aligns with NestJS conventions.
Flaze
FlazeOP4d ago
@julius Could you give me a hand ?
julius
julius4d ago
#🐯-nestjs-trpc for nestjs questsions
Flaze
FlazeOP4d ago
thanks!

Did you find this page helpful?