Skip to main content

cratestack_client_rust/rpc/
batch.rs

1// -----------------------------------------------------------------------------
2// Typed batch surface
3//
4// Lets callers compose heterogeneous batches of typed RPC ops into a
5// single `POST /rpc/batch` round-trip without dropping to the raw
6// `RpcRequest` / `RpcResponseFrame` wire types. Typical use through
7// the macro-generated client:
8//
9//   let mut batch = client.batch();
10//   let h_widgets = client.widgets().list(&list_input).queue(&mut batch);
11//   let h_ping    = client.procedures().ping(&args).queue(&mut batch);
12//   let h_created = client.widgets().create(&new).queue(&mut batch);
13//
14//   let mut results = batch.send().await?;            // one HTTP call
15//   let widgets:  Vec<Widget> = results.take(h_widgets)?;
16//   let echoed:   PingArgs    = results.take(h_ping)?;
17//   let created:  Widget      = results.take(h_created)?;
18//
19// `BatchableCall<C, O>` is what every macro-generated unary RPC method
20// now returns. It implements `IntoFuture`, so `.await` on it fires the
21// call immediately exactly like before — `.queue(&mut batch)` is the
22// opt-in deferral path. No `_batched` API duplication; same method,
23// two consumption modes.
24//
25// Sequence-streaming methods (`call_streaming` under the hood) stay as
26// `async fn -> Result<RpcStream<O>, _>` and do NOT participate in
27// batches — `/rpc/batch` is unary by construction.
28// -----------------------------------------------------------------------------
29
30use cratestack_core::CoolError;
31
32use crate::codec::HttpClientCodec;
33use crate::rpc::batch_call::BatchHandle;
34use crate::rpc::client::RpcClient;
35use crate::rpc::error::{RpcClientError, RpcRemoteError, http_status_for_rpc_code};
36
37/// Accumulates queued [`BatchableCall`]s into a single
38/// `POST /rpc/batch` round-trip. Build via [`RpcClient::batch_builder`]
39/// or the macro-generated `Client::batch()`.
40///
41/// Send-on-drop is intentionally *not* implemented — the batch only
42/// fires when you call `.send().await`. Drops without sending are
43/// silent (queued calls just discarded).
44#[must_use = "BatchBuilder does nothing until `.send().await`ed"]
45pub struct BatchBuilder<C> {
46    rpc: RpcClient<C>,
47    frames: Vec<cratestack_core::rpc::RpcRequest>,
48    /// Frames whose input failed to encode pre-send — recorded by id
49    /// so [`BatchResults::take`] can surface the error per-handle
50    /// instead of poisoning the whole batch.
51    encode_errors: std::collections::HashMap<u64, CoolError>,
52    next_id: u64,
53}
54
55impl<C> BatchBuilder<C>
56where
57    C: HttpClientCodec + Clone + Send + 'static,
58{
59    pub(crate) fn new(rpc: RpcClient<C>) -> Self {
60        Self {
61            rpc,
62            frames: Vec::new(),
63            encode_errors: std::collections::HashMap::new(),
64            next_id: 0,
65        }
66    }
67
68    /// Number of queued frames (including ones whose input failed
69    /// to encode — those still consume an id and will surface their
70    /// error on the matching `take`).
71    pub fn len(&self) -> usize {
72        self.frames.len() + self.encode_errors.len()
73    }
74
75    pub fn is_empty(&self) -> bool {
76        self.len() == 0
77    }
78
79    fn next_id(&mut self) -> u64 {
80        let id = self.next_id;
81        self.next_id += 1;
82        id
83    }
84
85    pub(crate) fn push_frame(&mut self, op_id: String, input: serde_json::Value) -> u64 {
86        let id = self.next_id();
87        self.frames.push(cratestack_core::rpc::RpcRequest {
88            id,
89            op: op_id,
90            input,
91            idem: None,
92        });
93        id
94    }
95
96    pub(crate) fn push_failed_frame(&mut self, error: CoolError) -> u64 {
97        let id = self.next_id();
98        self.encode_errors.insert(id, error);
99        id
100    }
101
102    /// Fire the batch as a single `POST /rpc/batch` and return a
103    /// [`BatchResults`] keyed by handle for per-op output extraction.
104    ///
105    /// The outer `Result` only fails on transport / batch-envelope
106    /// errors (the whole batch couldn't be sent or the response
107    /// couldn't be parsed). Per-frame failures — both pre-send input
108    /// encoding errors and server-side `RpcErrorBody` — are deferred
109    /// to the matching `BatchResults::take(handle)?` call.
110    pub async fn send(self) -> Result<BatchResults, RpcClientError> {
111        let encode_errors = self.encode_errors;
112        let frames = if self.frames.is_empty() {
113            std::collections::HashMap::new()
114        } else {
115            let response_frames = self.rpc.batch(&self.frames).await?;
116            response_frames.into_iter().map(|f| (f.id, f)).collect()
117        };
118        Ok(BatchResults {
119            frames,
120            encode_errors,
121        })
122    }
123}
124
125/// Per-handle results from a sent batch. Each handle can be `take`n
126/// exactly once.
127pub struct BatchResults {
128    frames: std::collections::HashMap<u64, cratestack_core::rpc::RpcResponseFrame>,
129    encode_errors: std::collections::HashMap<u64, CoolError>,
130}
131
132impl BatchResults {
133    /// Extract the typed output for one queued op. Returns:
134    ///
135    /// - `Ok(output)` — the server emitted an `output` for this frame
136    ///   and it decoded into `O`.
137    /// - `Err(RpcClientError::Codec(_))` — the input failed to encode
138    ///   before send, or the output failed to decode.
139    /// - `Err(RpcClientError::Remote(RpcRemoteError { body, .. }))` —
140    ///   the server emitted an `error` frame for this op. The
141    ///   `status` field is derived from the gRPC-style code in the
142    ///   body since `/rpc/batch` returns 200 at the HTTP level
143    ///   regardless of per-frame outcomes.
144    /// - `Err(RpcClientError::InvalidResponse(_))` — the server
145    ///   omitted this frame entirely or the frame had neither
146    ///   `output` nor `error` set.
147    pub fn take<O>(&mut self, handle: BatchHandle<O>) -> Result<O, RpcClientError>
148    where
149        O: serde::de::DeserializeOwned,
150    {
151        if let Some(error) = self.encode_errors.remove(&handle.id) {
152            return Err(RpcClientError::Codec(error));
153        }
154        let frame = self.frames.remove(&handle.id).ok_or_else(|| {
155            RpcClientError::InvalidResponse(format!(
156                "batch response missing frame for id {}",
157                handle.id,
158            ))
159        })?;
160        match (frame.output, frame.error) {
161            (Some(output), None) => serde_json::from_value::<O>(output).map_err(|error| {
162                RpcClientError::Codec(CoolError::Codec(format!(
163                    "decode batch output for id {}: {error}",
164                    handle.id,
165                )))
166            }),
167            (None, Some(body)) => Err(RpcClientError::Remote(RpcRemoteError {
168                status: http_status_for_rpc_code(&body.code),
169                body,
170            })),
171            (Some(_), Some(_)) | (None, None) => Err(RpcClientError::InvalidResponse(format!(
172                "batch frame {} has both `output` and `error` set, or neither",
173                handle.id,
174            ))),
175        }
176    }
177}