cratestack_client_rust/rpc/batch_call.rs
1// -----------------------------------------------------------------------------
2// `BatchableCall` + `BatchHandle` — the prepared-call / typed-key duo
3// that the typed batch surface is built around. Sits alongside
4// `rpc::batch::{BatchBuilder, BatchResults}`, which consume them.
5// -----------------------------------------------------------------------------
6
7use cratestack_core::CoolError;
8
9use crate::codec::HttpClientCodec;
10use crate::rpc::batch::BatchBuilder;
11use crate::rpc::client::RpcClient;
12use crate::rpc::error::RpcClientError;
13
14/// Recursively remove `null`-valued entries from JSON objects, descending into
15/// nested objects and array elements. Array `null` *elements* are left intact
16/// (their position is significant); only object *entries* are dropped — the
17/// shape that `None` optional fields serialize to. Keeps `serde_json::Value::Null`
18/// off the CBOR wire, where it would otherwise mis-encode as an empty array.
19fn strip_json_null_entries(value: &mut serde_json::Value) {
20 match value {
21 serde_json::Value::Object(map) => {
22 map.retain(|_, child| !child.is_null());
23 for child in map.values_mut() {
24 strip_json_null_entries(child);
25 }
26 }
27 serde_json::Value::Array(items) => {
28 for item in items.iter_mut() {
29 strip_json_null_entries(item);
30 }
31 }
32 _ => {}
33 }
34}
35
36/// A typed unary RPC call that has been *prepared* but not yet sent.
37///
38/// Produced by every macro-generated unary RPC method on the typed
39/// client (model CRUD + unary procedures). Two consumption modes:
40///
41/// - **Eager.** `.await` directly — `IntoFuture` desugars to the same
42/// HTTP request `RpcClient::call` would have made.
43/// - **Batched.** `.queue(&mut batch)` registers the call with a
44/// [`BatchBuilder`] for a single multiplexed `POST /rpc/batch`.
45/// Returns a typed [`BatchHandle`] for `.take(...)` on the results
46/// after `batch.send().await` resolves.
47///
48/// The input is eagerly converted to `serde_json::Value` at
49/// construction time so the same prepared call can flow into either
50/// consumption mode without re-borrowing the input. Conversion errors
51/// surface lazily — eagerly on `.await`, per-handle on the batch path.
52#[must_use = "BatchableCall does nothing until `.await`ed or `.queue(&mut batch)`d"]
53pub struct BatchableCall<C, O> {
54 rpc: RpcClient<C>,
55 op_id: String,
56 input_value: Result<serde_json::Value, CoolError>,
57 /// `fn() -> O` instead of `O` so `BatchableCall` is `Send` + `Sync`
58 /// regardless of whether `O` is — the marker is variance-only.
59 _output: std::marker::PhantomData<fn() -> O>,
60}
61
62impl<C, O> BatchableCall<C, O>
63where
64 C: HttpClientCodec + Clone + Send + 'static,
65 O: serde::de::DeserializeOwned + Send + 'static,
66{
67 /// Construct a prepared call. Callers should generally use the
68 /// macro-generated typed methods rather than building this by hand.
69 pub fn new<I>(rpc: RpcClient<C>, op_id: impl Into<String>, input: &I) -> Self
70 where
71 I: serde::Serialize,
72 {
73 let input_value = serde_json::to_value(input)
74 .map(|mut value| {
75 // Strip `null` object entries before the value is handed to the
76 // codec. `serde::Serialize` emits `None` optional fields as
77 // `serde_json::Value::Null`, and the CBOR codec encodes
78 // `serde_json::Value::Null` as the empty-array marker (`0x80`),
79 // NOT CBOR null (`0xf6`) — see `cratestack-codec-cbor`. A server
80 // decoding the corresponding `Option<T>` field then fails with
81 // "expected text, got array". The generated request structs
82 // carry `#[serde(default)]` on optional fields, so an absent key
83 // decodes as `None` exactly as an explicit null would have. This
84 // mirrors the server-side projection that strips null map
85 // entries before its own encode, keeping both directions
86 // null-free on the wire.
87 strip_json_null_entries(&mut value);
88 value
89 })
90 .map_err(|error| CoolError::Codec(format!("encode batch input: {error}")));
91 Self {
92 rpc,
93 op_id: op_id.into(),
94 input_value,
95 _output: std::marker::PhantomData,
96 }
97 }
98
99 /// Queue this call into a [`BatchBuilder`] for deferred
100 /// execution. The returned [`BatchHandle`] is the key to
101 /// retrieve the typed result via [`BatchResults::take`] after
102 /// [`BatchBuilder::send`] resolves.
103 ///
104 /// Input-encoding errors observed at construction time are
105 /// preserved per-handle, so a single bad input in a batch
106 /// produces a per-handle `take(...)?` error rather than
107 /// poisoning the whole batch.
108 pub fn queue(self, batch: &mut BatchBuilder<C>) -> BatchHandle<O> {
109 let id = match self.input_value {
110 Ok(value) => batch.push_frame(self.op_id, value),
111 Err(error) => batch.push_failed_frame(error),
112 };
113 BatchHandle {
114 id,
115 _output: std::marker::PhantomData,
116 }
117 }
118}
119
120impl<C, O> std::future::IntoFuture for BatchableCall<C, O>
121where
122 C: HttpClientCodec + Clone + Send + 'static,
123 O: serde::de::DeserializeOwned + Send + 'static,
124{
125 type Output = Result<O, RpcClientError>;
126 type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Self::Output> + Send>>;
127
128 fn into_future(self) -> Self::IntoFuture {
129 Box::pin(async move {
130 let value = self.input_value.map_err(RpcClientError::Codec)?;
131 self.rpc
132 .call::<serde_json::Value, O>(&self.op_id, &value)
133 .await
134 })
135 }
136}
137
138/// A typed key returned by [`BatchableCall::queue`]. Pair it with
139/// [`BatchResults::take`] to extract the typed output for that op
140/// from the batch response.
141///
142/// Carries `O` only as a phantom type — there's no runtime overhead.
143/// Cheap to clone; clones share identity (you can `take(handle)` only
144/// once, but the type tracks across passes).
145pub struct BatchHandle<O> {
146 pub(crate) id: u64,
147 pub(crate) _output: std::marker::PhantomData<fn() -> O>,
148}
149
150impl<O> Clone for BatchHandle<O> {
151 // Hand-written (not derived) so the impl doesn't pick up a spurious
152 // `O: Clone` bound — `O` is purely a phantom. `BatchHandle` is `Copy`,
153 // so the canonical clone is just a copy of `*self`.
154 fn clone(&self) -> Self {
155 *self
156 }
157}
158
159impl<O> Copy for BatchHandle<O> {}
160
161impl<O> std::fmt::Debug for BatchHandle<O> {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("BatchHandle").field("id", &self.id).finish()
164 }
165}
166
167#[cfg(test)]
168mod null_strip_tests {
169 use super::strip_json_null_entries;
170 use cratestack_codec_cbor::CborCodec;
171 use cratestack_core::CoolCodec;
172 use serde::{Deserialize, Serialize};
173
174 #[derive(Serialize, Deserialize, PartialEq, Debug)]
175 struct Req {
176 required: String,
177 // Generated request structs carry `#[serde(default)]` on optionals so an
178 // absent key decodes as `None` — the property the strip relies on.
179 #[serde(default)]
180 optional: Option<String>,
181 #[serde(default)]
182 nested: Option<Inner>,
183 }
184
185 #[derive(Serialize, Deserialize, PartialEq, Debug)]
186 struct Inner {
187 #[serde(default)]
188 maybe: Option<String>,
189 kept: String,
190 }
191
192 /// The `BatchableCall::new` path: serde -> `serde_json::Value` -> strip nulls.
193 /// After stripping, the value encodes to CBOR cleanly and the typed struct
194 /// decodes its `None` optionals back from the absent keys.
195 #[test]
196 fn stripped_none_optionals_round_trip_through_cbor() {
197 let req = Req {
198 required: "x".to_owned(),
199 optional: None,
200 nested: Some(Inner {
201 maybe: None,
202 kept: "k".to_owned(),
203 }),
204 };
205 let mut value = serde_json::to_value(&req).expect("to_value");
206 assert!(value.get("optional").expect("present").is_null());
207 strip_json_null_entries(&mut value);
208 assert!(
209 value.get("optional").is_none(),
210 "top-level null entry dropped"
211 );
212 assert!(
213 value["nested"].get("maybe").is_none(),
214 "nested null entry dropped"
215 );
216 assert_eq!(value["nested"]["kept"], serde_json::json!("k"));
217
218 let bytes = CborCodec.encode(&value).expect("encode");
219 let decoded: Req = CborCodec.decode(&bytes).expect("decode");
220 assert_eq!(decoded, req);
221 }
222
223 /// Regression guard: WITHOUT the strip, a `serde_json::Value::Null` optional
224 /// mis-encodes as the CBOR empty-array marker and the typed `Option<String>`
225 /// decode fails ("expected text, got array") — the exact cross-service bug.
226 #[test]
227 fn unstripped_null_breaks_typed_decode() {
228 let value = serde_json::json!({ "required": "x", "optional": null });
229 let bytes = CborCodec.encode(&value).expect("encode");
230 assert!(
231 CborCodec.decode::<Req>(&bytes).is_err(),
232 "unstripped Value::Null must break the typed decode"
233 );
234 }
235}