cratestack_client_rust/rpc/batch.rs
1// -----------------------------------------------------------------------------
2// Typed batch surface
3//
4// Lets callers compose heterogeneous batches of typed RPC ops into a
5// single `POST /rpc/batch` round-trip without dropping to the raw
6// `RpcRequest` / `RpcResponseFrame` wire types. Typical use through
7// the macro-generated client:
8//
9// let mut batch = client.batch();
10// let h_widgets = client.widgets().list(&list_input).queue(&mut batch);
11// let h_ping = client.procedures().ping(&args).queue(&mut batch);
12// let h_created = client.widgets().create(&new).queue(&mut batch);
13//
14// let mut results = batch.send().await?; // one HTTP call
15// let widgets: Vec<Widget> = results.take(h_widgets)?;
16// let echoed: PingArgs = results.take(h_ping)?;
17// let created: Widget = results.take(h_created)?;
18//
19// `BatchableCall<C, O>` is what every macro-generated unary RPC method
20// now returns. It implements `IntoFuture`, so `.await` on it fires the
21// call immediately exactly like before — `.queue(&mut batch)` is the
22// opt-in deferral path. No `_batched` API duplication; same method,
23// two consumption modes.
24//
25// Sequence-streaming methods (`call_streaming` under the hood) stay as
26// `async fn -> Result<RpcStream<O>, _>` and do NOT participate in
27// batches — `/rpc/batch` is unary by construction.
28// -----------------------------------------------------------------------------
29
30use cratestack_core::CoolError;
31
32use crate::codec::HttpClientCodec;
33use crate::rpc::batch_call::BatchHandle;
34use crate::rpc::client::RpcClient;
35use crate::rpc::error::{RpcClientError, RpcRemoteError, http_status_for_rpc_code};
36
37/// Accumulates queued [`BatchableCall`]s into a single
38/// `POST /rpc/batch` round-trip. Build via [`RpcClient::batch_builder`]
39/// or the macro-generated `Client::batch()`.
40///
41/// Send-on-drop is intentionally *not* implemented — the batch only
42/// fires when you call `.send().await`. Drops without sending are
43/// silent (queued calls just discarded).
44#[must_use = "BatchBuilder does nothing until `.send().await`ed"]
45pub struct BatchBuilder<C> {
46 rpc: RpcClient<C>,
47 frames: Vec<cratestack_core::rpc::RpcRequest>,
48 /// Frames whose input failed to encode pre-send — recorded by id
49 /// so [`BatchResults::take`] can surface the error per-handle
50 /// instead of poisoning the whole batch.
51 encode_errors: std::collections::HashMap<u64, CoolError>,
52 next_id: u64,
53}
54
55impl<C> BatchBuilder<C>
56where
57 C: HttpClientCodec + Clone + Send + 'static,
58{
59 pub(crate) fn new(rpc: RpcClient<C>) -> Self {
60 Self {
61 rpc,
62 frames: Vec::new(),
63 encode_errors: std::collections::HashMap::new(),
64 next_id: 0,
65 }
66 }
67
68 /// Number of queued frames (including ones whose input failed
69 /// to encode — those still consume an id and will surface their
70 /// error on the matching `take`).
71 pub fn len(&self) -> usize {
72 self.frames.len() + self.encode_errors.len()
73 }
74
75 pub fn is_empty(&self) -> bool {
76 self.len() == 0
77 }
78
79 fn next_id(&mut self) -> u64 {
80 let id = self.next_id;
81 self.next_id += 1;
82 id
83 }
84
85 pub(crate) fn push_frame(&mut self, op_id: String, input: serde_json::Value) -> u64 {
86 let id = self.next_id();
87 self.frames.push(cratestack_core::rpc::RpcRequest {
88 id,
89 op: op_id,
90 input,
91 idem: None,
92 });
93 id
94 }
95
96 pub(crate) fn push_failed_frame(&mut self, error: CoolError) -> u64 {
97 let id = self.next_id();
98 self.encode_errors.insert(id, error);
99 id
100 }
101
102 /// Fire the batch as a single `POST /rpc/batch` and return a
103 /// [`BatchResults`] keyed by handle for per-op output extraction.
104 ///
105 /// The outer `Result` only fails on transport / batch-envelope
106 /// errors (the whole batch couldn't be sent or the response
107 /// couldn't be parsed). Per-frame failures — both pre-send input
108 /// encoding errors and server-side `RpcErrorBody` — are deferred
109 /// to the matching `BatchResults::take(handle)?` call.
110 pub async fn send(self) -> Result<BatchResults, RpcClientError> {
111 let encode_errors = self.encode_errors;
112 let frames = if self.frames.is_empty() {
113 std::collections::HashMap::new()
114 } else {
115 let response_frames = self.rpc.batch(&self.frames).await?;
116 response_frames.into_iter().map(|f| (f.id, f)).collect()
117 };
118 Ok(BatchResults {
119 frames,
120 encode_errors,
121 })
122 }
123}
124
125/// Per-handle results from a sent batch. Each handle can be `take`n
126/// exactly once.
127pub struct BatchResults {
128 frames: std::collections::HashMap<u64, cratestack_core::rpc::RpcResponseFrame>,
129 encode_errors: std::collections::HashMap<u64, CoolError>,
130}
131
132impl BatchResults {
133 /// Extract the typed output for one queued op. Returns:
134 ///
135 /// - `Ok(output)` — the server emitted an `output` for this frame
136 /// and it decoded into `O`.
137 /// - `Err(RpcClientError::Codec(_))` — the input failed to encode
138 /// before send, or the output failed to decode.
139 /// - `Err(RpcClientError::Remote(RpcRemoteError { body, .. }))` —
140 /// the server emitted an `error` frame for this op. The
141 /// `status` field is derived from the gRPC-style code in the
142 /// body since `/rpc/batch` returns 200 at the HTTP level
143 /// regardless of per-frame outcomes.
144 /// - `Err(RpcClientError::InvalidResponse(_))` — the server
145 /// omitted this frame entirely or the frame had neither
146 /// `output` nor `error` set.
147 pub fn take<O>(&mut self, handle: BatchHandle<O>) -> Result<O, RpcClientError>
148 where
149 O: serde::de::DeserializeOwned,
150 {
151 if let Some(error) = self.encode_errors.remove(&handle.id) {
152 return Err(RpcClientError::Codec(error));
153 }
154 let frame = self.frames.remove(&handle.id).ok_or_else(|| {
155 RpcClientError::InvalidResponse(format!(
156 "batch response missing frame for id {}",
157 handle.id,
158 ))
159 })?;
160 match (frame.output, frame.error) {
161 (Some(output), None) => serde_json::from_value::<O>(output).map_err(|error| {
162 RpcClientError::Codec(CoolError::Codec(format!(
163 "decode batch output for id {}: {error}",
164 handle.id,
165 )))
166 }),
167 (None, Some(body)) => Err(RpcClientError::Remote(RpcRemoteError {
168 status: http_status_for_rpc_code(&body.code),
169 body,
170 })),
171 (Some(_), Some(_)) | (None, None) => Err(RpcClientError::InvalidResponse(format!(
172 "batch frame {} has both `output` and `error` set, or neither",
173 handle.id,
174 ))),
175 }
176 }
177}