b0o
b0o
TtRPC
Created by b0o on 11/16/2023 in #❓-help
Use RxJS Observable for subscription procedure
On the server, I have an RxJS Observable that I'd like to use for one of my subscription procedures. Is this possible without writing a converter function that takes an RxJS Observable and converts it to a tRPC Observable? For example, what I'd like to do looks something like this:
import { Observable } from 'rxjs'
import { authedProcedure, createTRPCRouter } from '../../api/trpc'

export const demoRouter = createTRPCRouter({
fizzBuzz: authedProcedure.subscription(() =>
new Observable<string>((subscriber) => {
let i = 1
const interval = setInterval(() => {
if (i % 15 === 0) {
subscriber.next('FizzBuzz')
} else if (i % 3 === 0) {
subscriber.next('Fizz')
} else if (i % 5 === 0) {
subscriber.next('Buzz')
} else {
subscriber.next(String(i))
}
i++
}, 1000)
return () => clearInterval(interval)
})
),
})
import { Observable } from 'rxjs'
import { authedProcedure, createTRPCRouter } from '../../api/trpc'

export const demoRouter = createTRPCRouter({
fizzBuzz: authedProcedure.subscription(() =>
new Observable<string>((subscriber) => {
let i = 1
const interval = setInterval(() => {
if (i % 15 === 0) {
subscriber.next('FizzBuzz')
} else if (i % 3 === 0) {
subscriber.next('Fizz')
} else if (i % 5 === 0) {
subscriber.next('Buzz')
} else {
subscriber.next(String(i))
}
i++
}, 1000)
return () => clearInterval(interval)
})
),
})
1 replies
TtRPC
Created by b0o on 5/3/2023 in #❓-help
Can't get wsLink's retryDelayMs to work
I'm trying to add some backoff for when connecting to my websocket server fails. I've got the following code, but the retryDelayMs function never seems to be called (I don't see any console messages), and the wsLink keeps trying to reconnect immediately:
const TrpcProviderInner: React.FC<{ children: React.ReactNode }> = p => {
const [queryClient] = useState(() => new QueryClient())
const [trpcClient] = useState(() =>
api.createClient({
transformer: superjson,
links: [
loggerLink({
enabled: opts =>
process.env.NODE_ENV === 'development' ||
(opts.direction === 'down' && opts.result instanceof Error),
}),
wsLink<Router>({
client: createWSClient({
url: getWsUrl(),
retryDelayMs: (attemptIndex: number) => {
console.log('retrying', attemptIndex)
switch (attemptIndex) {
case 0:
return 50
case 1:
return 100
case 2:
return 200
case 3:
return 500
case 4:
return 1000
default:
return 5000
}
},
}),
}),
],
}),
)
return (
<api.Provider client={trpcClient} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>{p.children}</QueryClientProvider>
</api.Provider>
)
}
const TrpcProviderInner: React.FC<{ children: React.ReactNode }> = p => {
const [queryClient] = useState(() => new QueryClient())
const [trpcClient] = useState(() =>
api.createClient({
transformer: superjson,
links: [
loggerLink({
enabled: opts =>
process.env.NODE_ENV === 'development' ||
(opts.direction === 'down' && opts.result instanceof Error),
}),
wsLink<Router>({
client: createWSClient({
url: getWsUrl(),
retryDelayMs: (attemptIndex: number) => {
console.log('retrying', attemptIndex)
switch (attemptIndex) {
case 0:
return 50
case 1:
return 100
case 2:
return 200
case 3:
return 500
case 4:
return 1000
default:
return 5000
}
},
}),
}),
],
}),
)
return (
<api.Provider client={trpcClient} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>{p.children}</QueryClientProvider>
</api.Provider>
)
}
3 replies