cratestack_client_rust/streaming.rs
1// -----------------------------------------------------------------------------
2// Chunked cbor-seq decoder + typed streaming response consumer
3//
4// The buffered path (`decode_cbor_sequence` in `codec`) needs the full
5// response body before yielding the first item. On a flaky / metered
6// network — typical for mobile clients — that costs time-to-first-byte
7// AND memory: a 5 MB streamed list buffers all 5 MB before any item
8// reaches the UI.
9//
10// `CborSeqChunkDecoder` does the boundary detection; the typed pump
11// `pump_streamed_response_typed` feeds decoded `T` items into an
12// `mpsc::Sender`. The FFI/callback shape lives in
13// `streaming_callback.rs`.
14// -----------------------------------------------------------------------------
15
16use cratestack_core::CoolError;
17use serde::de::DeserializeOwned;
18
19use crate::error::ClientError;
20
21/// Stateful boundary scanner for `application/cbor-seq` streams. Bytes
22/// arrive in arbitrary chunks; this type buffers them and emits the
23/// byte ranges of any complete top-level CBOR items observed so far.
24/// The CBOR-level parse uses `minicbor::Decoder::skip` for boundary
25/// detection (cheap, doesn't allocate); the per-item serde decode
26/// happens at the caller's leisure on each returned slice.
27///
28/// Exposed publicly so non-`RuntimeHandle` callers — e.g. apps that
29/// run the HTTP request themselves (dio in Flutter, `fetch` in Wasm,
30/// platform networking on iOS/Android) — can reuse the
31/// boundary-detection logic without re-implementing it.
32pub struct CborSeqChunkDecoder {
33 buffer: Vec<u8>,
34}
35
36impl CborSeqChunkDecoder {
37 pub fn new() -> Self {
38 Self { buffer: Vec::new() }
39 }
40
41 /// Append `chunk` to the internal buffer and return the bytes of
42 /// every complete top-level CBOR item now in it. Drains those bytes
43 /// from the buffer; any trailing bytes that don't yet form a
44 /// complete item stay buffered for the next call.
45 pub fn feed_chunk(&mut self, chunk: &[u8]) -> Result<Vec<Vec<u8>>, CoolError> {
46 self.buffer.extend_from_slice(chunk);
47 let mut items: Vec<Vec<u8>> = Vec::new();
48 let mut consumed = 0;
49 loop {
50 let remaining = &self.buffer[consumed..];
51 if remaining.is_empty() {
52 break;
53 }
54 let mut decoder = minicbor::decode::Decoder::new(remaining);
55 match decoder.skip() {
56 Ok(()) => {
57 let item_len = decoder.position();
58 if item_len == 0 {
59 return Err(CoolError::Codec(
60 "cbor-seq decoder made no progress".to_owned(),
61 ));
62 }
63 items.push(remaining[..item_len].to_vec());
64 consumed += item_len;
65 }
66 Err(error) if error.is_end_of_input() => {
67 // Truncated final item — wait for the next chunk.
68 break;
69 }
70 Err(error) => {
71 return Err(CoolError::Codec(
72 format!("cbor-seq decode failed: {error}",),
73 ));
74 }
75 }
76 }
77 if consumed > 0 {
78 self.buffer.drain(..consumed);
79 }
80 Ok(items)
81 }
82
83 /// Bytes currently buffered (waiting for frame completion). After
84 /// the upstream stream closes, a non-zero value here indicates a
85 /// truncated final frame — the server hung up mid-item.
86 pub fn pending_len(&self) -> usize {
87 self.buffer.len()
88 }
89}
90
91impl Default for CborSeqChunkDecoder {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97/// Pump a reqwest streaming response into an `mpsc::Sender`. Each
98/// complete cbor-seq item gets deserialized to `T` and sent through;
99/// transport / decode errors become terminal `Err` items.
100///
101/// Generic over the consumer-facing error type `E`. REST callers pass
102/// `std::convert::identity` (so `E = ClientError`); RPC callers pass
103/// `client_error_to_rpc` (so `E = RpcClientError`). Keeping a single
104/// pump avoids a second forwarding task per stream.
105pub(crate) async fn pump_streamed_response_typed<T, E, F>(
106 response: reqwest::Response,
107 tx: tokio::sync::mpsc::Sender<Result<T, E>>,
108 convert_error: F,
109) where
110 T: DeserializeOwned + Send + 'static,
111 E: Send + 'static,
112 F: Fn(ClientError) -> E + Send + 'static,
113{
114 use futures_util::StreamExt;
115
116 let mut byte_stream = response.bytes_stream();
117 let mut decoder = CborSeqChunkDecoder::new();
118 while let Some(chunk_result) = byte_stream.next().await {
119 let chunk = match chunk_result {
120 Ok(c) => c,
121 Err(error) => {
122 let _ = tx
123 .send(Err(convert_error(ClientError::Transport(error))))
124 .await;
125 return;
126 }
127 };
128 let items = match decoder.feed_chunk(&chunk) {
129 Ok(items) => items,
130 Err(error) => {
131 let _ = tx.send(Err(convert_error(ClientError::Codec(error)))).await;
132 return;
133 }
134 };
135 for item_bytes in items {
136 let decoded: Result<T, E> = minicbor_serde::from_slice(&item_bytes).map_err(|error| {
137 convert_error(ClientError::Codec(CoolError::Codec(format!(
138 "decode cbor-seq item: {error}",
139 ))))
140 });
141 if tx.send(decoded).await.is_err() {
142 // Receiver dropped — caller cancelled, stop work.
143 return;
144 }
145 }
146 }
147
148 if decoder.pending_len() > 0 {
149 let _ = tx
150 .send(Err(convert_error(ClientError::InvalidResponse(format!(
151 "stream ended with {} bytes buffered (incomplete final item)",
152 decoder.pending_len(),
153 )))))
154 .await;
155 }
156}