seb1
seb12mo ago

How to abort a streaming mutation

I have a streaming mutation working, but I need to be able to abort it. I've seen this thread but that syntax doesn't appear to be valid for the React version. If I do this:
foo.mutate(
{ exampleInputData: 'hello world' },
{ signal: abortController.signal },
)
foo.mutate(
{ exampleInputData: 'hello world' },
{ signal: abortController.signal },
)
then I get the type error that 'signal' does not exist on mutation options. If I try the new tanstack-query client syntax (from this migration guide), well that also doesn't work because the property mutationOptions doesn't exist in this call:
const mutation = useMutation(trpc.threads.example.mutationOptions());
const mutation = useMutation(trpc.threads.example.mutationOptions());
That migration guide doesn't specify how the useTRPC custom hook is meant to be implemented, so I suspect there's something I'm missing there? So my main questions are: 1. Is it possible to abort a streaming mutation client-side with a react trpc client? 2. if yes, can this be done using the old syntax? or if we need to use the new syntax, what code for the useTRPC hook makes the types check out? Thanks in advance!
7 Replies
seb1
seb1OP2mo ago
update: I just found these docs which indicate that "@tanstack/react-query only supports aborting queries." does @trpc/tanstack-react-query also have this limitation? or can mutations be aborted if we migrate to @trpc/tanstack-react-query?
Nick
Nick2mo ago
It’s the same TanStack under the hood for both so you’re going to have the same limitations. What are you actually trying to active?
seb1
seb1OP2mo ago
I would like to trigger a mutation that streams a response back and be able to abort that stream from the client. For example: send a chat message, where an LLM could stream a response back--but with a "stop" button to abort the streaming response. If it's only queries that can be aborted, then how should I be thinking about the implementation of that stop button in trpc?
Nick
Nick2mo ago
I would say the chat content is a subscription (maybe an initial query then subscription for future updates) and the mutation triggers the subscription to update from the backend That way you're not limited by timeouts for single requests etc In general, if you're receiving a response from a mutation and updating state manually with it, it's probably not an ideal pattern, except in the case of optimistic updates. Ideally every state in a browser should be reachable by refreshing the page, and depending on a mutation result isn't that
seb1
seb1OP2mo ago
I generally agree with those points and appreciate your response! Don't want to go into too much detail here, but the gist of our current setup is: - The initial chat data (and data on page refresh) is loaded in a static non-streaming query - After the user sends a message to a streaming mutation endpoint, in that streaming mutation's onSuccess only the newest message response streams in. The earlier array of messages is in a completely separate component, and they read from that static non-streaming query I mentioned earlier. - After the streaming mutation is complete, we can invalidate that initial non-streaming query to move the latest message into that array of static, non-streaming ones in the chat history. However, the stream does need to be abortable. The main reason not to use a subscription in this case is that we'll have a very large number of simultaneous open connections, and it would be wasteful of our server resources to keep these long-lived connections open. While we could probably hack something together enables the subscription endpoint when a message is sent and disables it to close the connection when the streaming ends, that feels a bit roundabout. Streaming mutations are the perfect api for this use case, but as far as I can tell they're not manually abortable. You can abort the stream by force unmounting the component and remounting it, but that also feels like an unnecessary workaround. Hope this context helps clarify the situation a bit! And if this still seems like the wrong way to be approaching the problem, I am totally open to switching to subscriptions or some other approach. With my current understanding, though, streaming mutations are almost a perfect fit.
Nick
Nick2mo ago
Fair enough, I would take this over to the tanstack guys as we'll always just support what they do, and I understand they don't support it yet
seb1
seb1OP3w ago
just to close the loop on this one, I found a way to achieve this! you can use an abort controller client-side and an early return. tanstack-query or trpc seems to correctly clean up the rest, and the server-side ctx.signal correctly fires.
// in your onSuccess(data)
for await (const chunk of data) {
if (signal.aborted) return;
// ... continue processing the stream
}
// in your onSuccess(data)
for await (const chunk of data) {
if (signal.aborted) return;
// ... continue processing the stream
}

Did you find this page helpful?