Flaze
Flaze
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
thanks!
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
@julius Could you give me a hand ?
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
No description
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
No description
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
But I can't seem to import that decorator
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
I've tried to put @Subscription({ input?: ZodSchema, output?: ZodSchema })
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
How do I make sure the backend nestjsTRPC put that header accordingly ?
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
here is the error: EventSource's response has a MIME type ("application/json") that is not "text/event-stream". Aborting the connection.
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
I'm getting : access-control-allow-origin: * connection: keep-alive content-type: application/json date: Tue, 14 Jan 2025 16:21:09 GMT keep-alive: timeout=5 transfer-encoding: chunked vary: trpc-accept x-powered-by: Express response headers from my backend and
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
Help is greatly appreciated!
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
const utils = trpc.useUtils();
const [isStreaming, setIsStreaming] = useState(false);
const [streamedContent, setStreamedContent] = useState('');

const handleEnhancement = useCallback(
async (_e: MouseEvent<HTMLButtonElement>): Promise<void> => {
try {
setIsStreaming(true);

setStreamedContent('');

return new Promise<void>((resolve, reject) => {
const subscription = utils.client.notebook.enhanceEntry.subscribe(
{ id: bookmark.id },
{
onComplete() {
setIsStreaming(false);

resolve();
},
onError(error: Error) {
setIsStreaming(false);

reject(error);
},
onData(chunk: string) {
try {
const parsed = parse(chunk, ALL) as {
content?: string;
isComplete?: boolean;
};

if (parsed.content) {
setStreamedContent(prev => prev + parsed.content);
}

if (parsed.isComplete) {
utils.notebook.getEntries.setData({ sortOrder: 'desc', sortBy: 'created_at' }, prev => {
if (!prev) return prev;

return prev.map(entry =>
entry.id === bookmark.id ? { ...entry, content: streamedContent } : entry,
);
});

resolve();
}
} catch {
// Continue buffering if parse fails
}
},
},
);

return () => {
subscription?.unsubscribe?.();
};
});
} catch (error) {
setIsStreaming(false);
throw error;
}
},
[bookmark.id, streamedContent, utils],
);
};
const utils = trpc.useUtils();
const [isStreaming, setIsStreaming] = useState(false);
const [streamedContent, setStreamedContent] = useState('');

const handleEnhancement = useCallback(
async (_e: MouseEvent<HTMLButtonElement>): Promise<void> => {
try {
setIsStreaming(true);

setStreamedContent('');

return new Promise<void>((resolve, reject) => {
const subscription = utils.client.notebook.enhanceEntry.subscribe(
{ id: bookmark.id },
{
onComplete() {
setIsStreaming(false);

resolve();
},
onError(error: Error) {
setIsStreaming(false);

reject(error);
},
onData(chunk: string) {
try {
const parsed = parse(chunk, ALL) as {
content?: string;
isComplete?: boolean;
};

if (parsed.content) {
setStreamedContent(prev => prev + parsed.content);
}

if (parsed.isComplete) {
utils.notebook.getEntries.setData({ sortOrder: 'desc', sortBy: 'created_at' }, prev => {
if (!prev) return prev;

return prev.map(entry =>
entry.id === bookmark.id ? { ...entry, content: streamedContent } : entry,
);
});

resolve();
}
} catch {
// Continue buffering if parse fails
}
},
},
);

return () => {
subscription?.unsubscribe?.();
};
});
} catch (error) {
setIsStreaming(false);
throw error;
}
},
[bookmark.id, streamedContent, utils],
);
};
in my my react component
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: op => op.type === 'subscription',
false: httpBatchLink({
url: getAPIEndpoint('/api/trpc'),
headers: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
};
},
}),
true: unstable_httpSubscriptionLink({
url: getAPIEndpoint('/api/trpc'),
eventSourceOptions: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
headers: {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
},
};
},
}),
}),
],
}),
);
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: op => op.type === 'subscription',
false: httpBatchLink({
url: getAPIEndpoint('/api/trpc'),
headers: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
};
},
}),
true: unstable_httpSubscriptionLink({
url: getAPIEndpoint('/api/trpc'),
eventSourceOptions: async () => {
const accessToken = await SessionPersistanceManager.getInstance().getJWT();
const googleAuth = await SessionPersistanceManager.getInstance().getProviderData();

return {
headers: {
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
...(googleAuth && { 'x-google-tokens': JSON.stringify(googleAuth) }),
},
};
},
}),
}),
],
}),
);
in my react app
17 replies
TtRPC
Created by Flaze on 1/14/2025 in #❓-help
How to handle a SSE exactly ?
with the help of doc I came up with:
@Query({
input: NotebookEnhanceEntrySchema,
output: NotebookEnhanceEntryResponseSchema,
})
async enhanceEntry(@Ctx() ctx: AuthContext, @Input() input: { id: string }): Promise<Observable<string>> {
const { user } = ctx.auth;
const requestId = generate();
await this.notebookService.enhanceEntry(input.id, user, requestId);
return new Observable<string>(subscriber => {
const onChunk = (chunk: { id: string; delta: string; isLast: boolean }) => {
if (chunk.id !== requestId) return;
subscriber.next(chunk.delta);
if (chunk.isLast) {
subscriber.complete();
}
};

this.notebookService.eventEmitter.on('notebook.enhance.chunk', onChunk);

return () => {
this.notebookService.eventEmitter.off('notebook.enhance.chunk', onChunk);
};
});
}
@Query({
input: NotebookEnhanceEntrySchema,
output: NotebookEnhanceEntryResponseSchema,
})
async enhanceEntry(@Ctx() ctx: AuthContext, @Input() input: { id: string }): Promise<Observable<string>> {
const { user } = ctx.auth;
const requestId = generate();
await this.notebookService.enhanceEntry(input.id, user, requestId);
return new Observable<string>(subscriber => {
const onChunk = (chunk: { id: string; delta: string; isLast: boolean }) => {
if (chunk.id !== requestId) return;
subscriber.next(chunk.delta);
if (chunk.isLast) {
subscriber.complete();
}
};

this.notebookService.eventEmitter.on('notebook.enhance.chunk', onChunk);

return () => {
this.notebookService.eventEmitter.off('notebook.enhance.chunk', onChunk);
};
});
}
in my router
17 replies