cratestack_client_rust/client/
streaming.rs1use reqwest::Method;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::client::core::CratestackClient;
6use crate::client::decode::decode_sequence_response;
7use crate::codec::HttpClientCodec;
8use crate::error::{ClientError, HeaderPair};
9use crate::runtime::wire::{RuntimeRequestWire, RuntimeResponseWire};
10use crate::streaming::pump_streamed_response_typed;
11
12impl<C> CratestackClient<C>
13where
14 C: HttpClientCodec,
15{
16 pub async fn post_list<Input, Output>(
17 &self,
18 path: &str,
19 input: &Input,
20 headers: &[HeaderPair<'_>],
21 ) -> Result<Vec<Output>, ClientError>
22 where
23 Input: Serialize,
24 Output: DeserializeOwned,
25 {
26 let body = self.codec.encode(input)?;
27 let response = self
28 .request_raw_with_query_and_accept(
29 Method::POST,
30 path,
31 Some(body),
32 None,
33 headers,
34 Some(self.codec.sequence_accept_header_value()),
35 )
36 .await?;
37 decode_sequence_response(&self.codec, &response)
38 }
39
40 pub async fn post_list_streamed<Input, Output>(
57 &self,
58 path: &str,
59 input: &Input,
60 headers: &[HeaderPair<'_>],
61 ) -> Result<tokio::sync::mpsc::Receiver<Result<Output, ClientError>>, ClientError>
62 where
63 Input: Serialize,
64 Output: DeserializeOwned + Send + 'static,
65 {
66 let body = self.codec.encode(input)?;
67 let response = self
68 .request_streamed_with_query_and_accept(
69 Method::POST,
70 path,
71 Some(body),
72 None,
73 headers,
74 self.codec.sequence_accept_header_value(),
75 )
76 .await?;
77
78 let (tx, rx) = tokio::sync::mpsc::channel(16);
81 tokio::spawn(pump_streamed_response_typed::<Output, ClientError, _>(
82 response,
83 tx,
84 std::convert::identity,
85 ));
86 Ok(rx)
87 }
88
89 pub async fn execute_raw_transport(
90 &self,
91 request: RuntimeRequestWire,
92 ) -> Result<RuntimeResponseWire, ClientError> {
93 let method = Method::from_bytes(request.method.as_bytes()).map_err(|error| {
94 ClientError::BadInput(format!("invalid HTTP method '{}': {error}", request.method))
95 })?;
96 let header_pairs = request
97 .headers
98 .iter()
99 .map(|header| (header.name.as_str(), header.value.as_str()))
100 .collect::<Vec<_>>();
101 self.request_raw_with_query(
102 method,
103 &request.path,
104 if request.body.is_empty() {
105 None
106 } else {
107 Some(request.body)
108 },
109 request.canonical_query.as_deref(),
110 &header_pairs,
111 )
112 .await
113 }
114}