Skip to main content

cratestack_client_rust/
streaming.rs

1// -----------------------------------------------------------------------------
2// Chunked cbor-seq decoder + typed streaming response consumer
3//
4// The buffered path (`decode_cbor_sequence` in `codec`) needs the full
5// response body before yielding the first item. On a flaky / metered
6// network — typical for mobile clients — that costs time-to-first-byte
7// AND memory: a 5 MB streamed list buffers all 5 MB before any item
8// reaches the UI.
9//
10// `CborSeqChunkDecoder` does the boundary detection; the typed pump
11// `pump_streamed_response_typed` feeds decoded `T` items into an
12// `mpsc::Sender`. The FFI/callback shape lives in
13// `streaming_callback.rs`.
14// -----------------------------------------------------------------------------
15
16use cratestack_core::CoolError;
17use serde::de::DeserializeOwned;
18
19use crate::error::ClientError;
20
21/// Stateful boundary scanner for `application/cbor-seq` streams. Bytes
22/// arrive in arbitrary chunks; this type buffers them and emits the
23/// byte ranges of any complete top-level CBOR items observed so far.
24/// The CBOR-level parse uses `minicbor::Decoder::skip` for boundary
25/// detection (cheap, doesn't allocate); the per-item serde decode
26/// happens at the caller's leisure on each returned slice.
27///
28/// Exposed publicly so non-`RuntimeHandle` callers — e.g. apps that
29/// run the HTTP request themselves (dio in Flutter, `fetch` in Wasm,
30/// platform networking on iOS/Android) — can reuse the
31/// boundary-detection logic without re-implementing it.
32pub struct CborSeqChunkDecoder {
33    buffer: Vec<u8>,
34}
35
36impl CborSeqChunkDecoder {
37    pub fn new() -> Self {
38        Self { buffer: Vec::new() }
39    }
40
41    /// Append `chunk` to the internal buffer and return the bytes of
42    /// every complete top-level CBOR item now in it. Drains those bytes
43    /// from the buffer; any trailing bytes that don't yet form a
44    /// complete item stay buffered for the next call.
45    pub fn feed_chunk(&mut self, chunk: &[u8]) -> Result<Vec<Vec<u8>>, CoolError> {
46        self.buffer.extend_from_slice(chunk);
47        let mut items: Vec<Vec<u8>> = Vec::new();
48        let mut consumed = 0;
49        loop {
50            let remaining = &self.buffer[consumed..];
51            if remaining.is_empty() {
52                break;
53            }
54            let mut decoder = minicbor::decode::Decoder::new(remaining);
55            match decoder.skip() {
56                Ok(()) => {
57                    let item_len = decoder.position();
58                    if item_len == 0 {
59                        return Err(CoolError::Codec(
60                            "cbor-seq decoder made no progress".to_owned(),
61                        ));
62                    }
63                    items.push(remaining[..item_len].to_vec());
64                    consumed += item_len;
65                }
66                Err(error) if error.is_end_of_input() => {
67                    // Truncated final item — wait for the next chunk.
68                    break;
69                }
70                Err(error) => {
71                    return Err(CoolError::Codec(
72                        format!("cbor-seq decode failed: {error}",),
73                    ));
74                }
75            }
76        }
77        if consumed > 0 {
78            self.buffer.drain(..consumed);
79        }
80        Ok(items)
81    }
82
83    /// Bytes currently buffered (waiting for frame completion). After
84    /// the upstream stream closes, a non-zero value here indicates a
85    /// truncated final frame — the server hung up mid-item.
86    pub fn pending_len(&self) -> usize {
87        self.buffer.len()
88    }
89}
90
91impl Default for CborSeqChunkDecoder {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97/// Pump a reqwest streaming response into an `mpsc::Sender`. Each
98/// complete cbor-seq item gets deserialized to `T` and sent through;
99/// transport / decode errors become terminal `Err` items.
100///
101/// Generic over the consumer-facing error type `E`. REST callers pass
102/// `std::convert::identity` (so `E = ClientError`); RPC callers pass
103/// `client_error_to_rpc` (so `E = RpcClientError`). Keeping a single
104/// pump avoids a second forwarding task per stream.
105pub(crate) async fn pump_streamed_response_typed<T, E, F>(
106    response: reqwest::Response,
107    tx: tokio::sync::mpsc::Sender<Result<T, E>>,
108    convert_error: F,
109) where
110    T: DeserializeOwned + Send + 'static,
111    E: Send + 'static,
112    F: Fn(ClientError) -> E + Send + 'static,
113{
114    use futures_util::StreamExt;
115
116    let mut byte_stream = response.bytes_stream();
117    let mut decoder = CborSeqChunkDecoder::new();
118    while let Some(chunk_result) = byte_stream.next().await {
119        let chunk = match chunk_result {
120            Ok(c) => c,
121            Err(error) => {
122                let _ = tx
123                    .send(Err(convert_error(ClientError::Transport(error))))
124                    .await;
125                return;
126            }
127        };
128        let items = match decoder.feed_chunk(&chunk) {
129            Ok(items) => items,
130            Err(error) => {
131                let _ = tx.send(Err(convert_error(ClientError::Codec(error)))).await;
132                return;
133            }
134        };
135        for item_bytes in items {
136            let decoded: Result<T, E> = minicbor_serde::from_slice(&item_bytes).map_err(|error| {
137                convert_error(ClientError::Codec(CoolError::Codec(format!(
138                    "decode cbor-seq item: {error}",
139                ))))
140            });
141            if tx.send(decoded).await.is_err() {
142                // Receiver dropped — caller cancelled, stop work.
143                return;
144            }
145        }
146    }
147
148    if decoder.pending_len() > 0 {
149        let _ = tx
150            .send(Err(convert_error(ClientError::InvalidResponse(format!(
151                "stream ended with {} bytes buffered (incomplete final item)",
152                decoder.pending_len(),
153            )))))
154            .await;
155    }
156}