cratestack_client_rust/streaming_callback.rs
1// -----------------------------------------------------------------------------
2// FFI / callback-shaped streaming consumer
3//
4// Synchronous from the caller's perspective: pass a callback, return
5// when the stream is done. The callback receives raw item bytes
6// (one CBOR-encoded value per call) so the FFI side can decode using
7// whatever native CBOR library it prefers.
8// -----------------------------------------------------------------------------
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::ClientError;
13use crate::runtime::wire::{RuntimeErrorCode, RuntimeErrorWire};
14use crate::streaming::CborSeqChunkDecoder;
15
16/// FFI-shaped chunk delivered to the `execute_streamed` callback.
17/// `Item` carries one CBOR-encoded item's raw bytes; `Error` is
18/// terminal; `End` is terminal and indicates a clean stream close.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub enum RuntimeChunkWire {
21 /// One complete cbor-seq item. The bytes are CBOR-encoded — decode
22 /// on the FFI side with whatever native library the host has.
23 Item(Vec<u8>),
24 /// Terminal: the stream ended cleanly. No further chunks follow.
25 End,
26 /// Terminal: the stream failed mid-flight. No further chunks follow.
27 Error(RuntimeErrorWire),
28}
29
30/// Drive a streaming response through a callback. Used by
31/// `RuntimeHandle::execute_streamed` — the callback returns `false` to
32/// cancel the stream early. The function returns once the stream is
33/// done (by completion, error, or cancellation).
34pub(crate) async fn pump_streamed_response_callback<F>(
35 response: reqwest::Response,
36 mut on_chunk: F,
37) -> Result<(), RuntimeErrorWire>
38where
39 F: FnMut(RuntimeChunkWire) -> bool,
40{
41 use futures_util::StreamExt;
42
43 let mut byte_stream = response.bytes_stream();
44 let mut decoder = CborSeqChunkDecoder::new();
45 loop {
46 let chunk_result = byte_stream.next().await;
47 let chunk = match chunk_result {
48 Some(Ok(c)) => c,
49 Some(Err(error)) => {
50 let err = RuntimeErrorWire::from(ClientError::Transport(error));
51 on_chunk(RuntimeChunkWire::Error(err.clone()));
52 return Err(err);
53 }
54 None => {
55 if decoder.pending_len() > 0 {
56 let err = RuntimeErrorWire {
57 code: RuntimeErrorCode::InvalidResponse,
58 http_status: None,
59 message: format!(
60 "stream ended with {} bytes buffered (incomplete final item)",
61 decoder.pending_len(),
62 ),
63 remote_code: None,
64 remote_body: None,
65 };
66 on_chunk(RuntimeChunkWire::Error(err.clone()));
67 return Err(err);
68 }
69 on_chunk(RuntimeChunkWire::End);
70 return Ok(());
71 }
72 };
73 let items = match decoder.feed_chunk(&chunk) {
74 Ok(items) => items,
75 Err(error) => {
76 let err = RuntimeErrorWire::from(ClientError::Codec(error));
77 on_chunk(RuntimeChunkWire::Error(err.clone()));
78 return Err(err);
79 }
80 };
81 for item_bytes in items {
82 if !on_chunk(RuntimeChunkWire::Item(item_bytes)) {
83 // Caller cancelled.
84 return Ok(());
85 }
86 }
87 }
88}