Skip to main content

cratestack_client_rust/client/
streaming.rs

1use reqwest::Method;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::client::core::CratestackClient;
6use crate::client::decode::decode_sequence_response;
7use crate::codec::HttpClientCodec;
8use crate::error::{ClientError, HeaderPair};
9use crate::runtime::wire::{RuntimeRequestWire, RuntimeResponseWire};
10use crate::streaming::pump_streamed_response_typed;
11
12impl<C> CratestackClient<C>
13where
14    C: HttpClientCodec,
15{
16    pub async fn post_list<Input, Output>(
17        &self,
18        path: &str,
19        input: &Input,
20        headers: &[HeaderPair<'_>],
21    ) -> Result<Vec<Output>, ClientError>
22    where
23        Input: Serialize,
24        Output: DeserializeOwned,
25    {
26        let body = self.codec.encode(input)?;
27        let response = self
28            .request_raw_with_query_and_accept(
29                Method::POST,
30                path,
31                Some(body),
32                None,
33                headers,
34                Some(self.codec.sequence_accept_header_value()),
35            )
36            .await?;
37        decode_sequence_response(&self.codec, &response)
38    }
39
40    /// Streaming variant of [`Self::post_list`]. Returns an
41    /// `mpsc::Receiver` that yields decoded items as they arrive over
42    /// the network — first-item latency drops from "buffer the whole
43    /// body" to "decode one chunk." Useful on mobile / flaky links
44    /// where time-to-first-byte matters more than total throughput.
45    ///
46    /// The receiver yields `Result<Output, ClientError>` per item.
47    /// Transport / decode errors are terminal — the next call to
48    /// `.recv()` returns `None` after one. A clean end-of-stream
49    /// (server closed cleanly after the last item) also surfaces as
50    /// `None` from the next `.recv()`.
51    ///
52    /// The server must return `application/cbor-seq`. If it returns a
53    /// buffered `application/cbor` or `application/json` instead, the
54    /// caller should use [`Self::post_list`] — this method does not
55    /// fall back.
56    pub async fn post_list_streamed<Input, Output>(
57        &self,
58        path: &str,
59        input: &Input,
60        headers: &[HeaderPair<'_>],
61    ) -> Result<tokio::sync::mpsc::Receiver<Result<Output, ClientError>>, ClientError>
62    where
63        Input: Serialize,
64        Output: DeserializeOwned + Send + 'static,
65    {
66        let body = self.codec.encode(input)?;
67        let response = self
68            .request_streamed_with_query_and_accept(
69                Method::POST,
70                path,
71                Some(body),
72                None,
73                headers,
74                self.codec.sequence_accept_header_value(),
75            )
76            .await?;
77
78        // Bounded channel keeps memory tight on the consumer side —
79        // 16 items in flight is plenty for a single subscriber.
80        let (tx, rx) = tokio::sync::mpsc::channel(16);
81        tokio::spawn(pump_streamed_response_typed::<Output, ClientError, _>(
82            response,
83            tx,
84            std::convert::identity,
85        ));
86        Ok(rx)
87    }
88
89    pub async fn execute_raw_transport(
90        &self,
91        request: RuntimeRequestWire,
92    ) -> Result<RuntimeResponseWire, ClientError> {
93        let method = Method::from_bytes(request.method.as_bytes()).map_err(|error| {
94            ClientError::BadInput(format!("invalid HTTP method '{}': {error}", request.method))
95        })?;
96        let header_pairs = request
97            .headers
98            .iter()
99            .map(|header| (header.name.as_str(), header.value.as_str()))
100            .collect::<Vec<_>>();
101        self.request_raw_with_query(
102            method,
103            &request.path,
104            if request.body.is_empty() {
105                None
106            } else {
107                Some(request.body)
108            },
109            request.canonical_query.as_deref(),
110            &header_pairs,
111        )
112        .await
113    }
114}