Skip to main content

cratestack_client_rust/
streaming_callback.rs

1// -----------------------------------------------------------------------------
2// FFI / callback-shaped streaming consumer
3//
4// Synchronous from the caller's perspective: pass a callback, return
5// when the stream is done. The callback receives raw item bytes
6// (one CBOR-encoded value per call) so the FFI side can decode using
7// whatever native CBOR library it prefers.
8// -----------------------------------------------------------------------------
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::ClientError;
13use crate::runtime::wire::{RuntimeErrorCode, RuntimeErrorWire};
14use crate::streaming::CborSeqChunkDecoder;
15
16/// FFI-shaped chunk delivered to the `execute_streamed` callback.
17/// `Item` carries one CBOR-encoded item's raw bytes; `Error` is
18/// terminal; `End` is terminal and indicates a clean stream close.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum RuntimeChunkWire {
21    /// One complete cbor-seq item. The bytes are CBOR-encoded — decode
22    /// on the FFI side with whatever native library the host has.
23    Item(Vec<u8>),
24    /// Terminal: the stream ended cleanly. No further chunks follow.
25    End,
26    /// Terminal: the stream failed mid-flight. No further chunks follow.
27    Error(RuntimeErrorWire),
28}
29
30/// Drive a streaming response through a callback. Used by
31/// `RuntimeHandle::execute_streamed` — the callback returns `false` to
32/// cancel the stream early. The function returns once the stream is
33/// done (by completion, error, or cancellation).
34pub(crate) async fn pump_streamed_response_callback<F>(
35    response: reqwest::Response,
36    mut on_chunk: F,
37) -> Result<(), RuntimeErrorWire>
38where
39    F: FnMut(RuntimeChunkWire) -> bool,
40{
41    use futures_util::StreamExt;
42
43    let mut byte_stream = response.bytes_stream();
44    let mut decoder = CborSeqChunkDecoder::new();
45    loop {
46        let chunk_result = byte_stream.next().await;
47        let chunk = match chunk_result {
48            Some(Ok(c)) => c,
49            Some(Err(error)) => {
50                let err = RuntimeErrorWire::from(ClientError::Transport(error));
51                on_chunk(RuntimeChunkWire::Error(err.clone()));
52                return Err(err);
53            }
54            None => {
55                if decoder.pending_len() > 0 {
56                    let err = RuntimeErrorWire {
57                        code: RuntimeErrorCode::InvalidResponse,
58                        http_status: None,
59                        message: format!(
60                            "stream ended with {} bytes buffered (incomplete final item)",
61                            decoder.pending_len(),
62                        ),
63                        remote_code: None,
64                        remote_body: None,
65                    };
66                    on_chunk(RuntimeChunkWire::Error(err.clone()));
67                    return Err(err);
68                }
69                on_chunk(RuntimeChunkWire::End);
70                return Ok(());
71            }
72        };
73        let items = match decoder.feed_chunk(&chunk) {
74            Ok(items) => items,
75            Err(error) => {
76                let err = RuntimeErrorWire::from(ClientError::Codec(error));
77                on_chunk(RuntimeChunkWire::Error(err.clone()));
78                return Err(err);
79            }
80        };
81        for item_bytes in items {
82            if !on_chunk(RuntimeChunkWire::Item(item_bytes)) {
83                // Caller cancelled.
84                return Ok(());
85            }
86        }
87    }
88}