Skip to main content

cratestack_client_rust/rpc/
batch_call.rs

1// -----------------------------------------------------------------------------
2// `BatchableCall` + `BatchHandle` — the prepared-call / typed-key duo
3// that the typed batch surface is built around. Sits alongside
4// `rpc::batch::{BatchBuilder, BatchResults}`, which consume them.
5// -----------------------------------------------------------------------------
6
7use cratestack_core::CoolError;
8
9use crate::codec::HttpClientCodec;
10use crate::rpc::batch::BatchBuilder;
11use crate::rpc::client::RpcClient;
12use crate::rpc::error::RpcClientError;
13
14/// Recursively remove `null`-valued entries from JSON objects, descending into
15/// nested objects and array elements. Array `null` *elements* are left intact
16/// (their position is significant); only object *entries* are dropped — the
17/// shape that `None` optional fields serialize to. Keeps `serde_json::Value::Null`
18/// off the CBOR wire, where it would otherwise mis-encode as an empty array.
19fn strip_json_null_entries(value: &mut serde_json::Value) {
20    match value {
21        serde_json::Value::Object(map) => {
22            map.retain(|_, child| !child.is_null());
23            for child in map.values_mut() {
24                strip_json_null_entries(child);
25            }
26        }
27        serde_json::Value::Array(items) => {
28            for item in items.iter_mut() {
29                strip_json_null_entries(item);
30            }
31        }
32        _ => {}
33    }
34}
35
36/// A typed unary RPC call that has been *prepared* but not yet sent.
37///
38/// Produced by every macro-generated unary RPC method on the typed
39/// client (model CRUD + unary procedures). Two consumption modes:
40///
41/// - **Eager.** `.await` directly — `IntoFuture` desugars to the same
42///   HTTP request `RpcClient::call` would have made.
43/// - **Batched.** `.queue(&mut batch)` registers the call with a
44///   [`BatchBuilder`] for a single multiplexed `POST /rpc/batch`.
45///   Returns a typed [`BatchHandle`] for `.take(...)` on the results
46///   after `batch.send().await` resolves.
47///
48/// The input is eagerly converted to `serde_json::Value` at
49/// construction time so the same prepared call can flow into either
50/// consumption mode without re-borrowing the input. Conversion errors
51/// surface lazily — eagerly on `.await`, per-handle on the batch path.
52#[must_use = "BatchableCall does nothing until `.await`ed or `.queue(&mut batch)`d"]
53pub struct BatchableCall<C, O> {
54    rpc: RpcClient<C>,
55    op_id: String,
56    input_value: Result<serde_json::Value, CoolError>,
57    /// `fn() -> O` instead of `O` so `BatchableCall` is `Send` + `Sync`
58    /// regardless of whether `O` is — the marker is variance-only.
59    _output: std::marker::PhantomData<fn() -> O>,
60}
61
62impl<C, O> BatchableCall<C, O>
63where
64    C: HttpClientCodec + Clone + Send + 'static,
65    O: serde::de::DeserializeOwned + Send + 'static,
66{
67    /// Construct a prepared call. Callers should generally use the
68    /// macro-generated typed methods rather than building this by hand.
69    pub fn new<I>(rpc: RpcClient<C>, op_id: impl Into<String>, input: &I) -> Self
70    where
71        I: serde::Serialize,
72    {
73        let input_value = serde_json::to_value(input)
74            .map(|mut value| {
75                // Strip `null` object entries before the value is handed to the
76                // codec. `serde::Serialize` emits `None` optional fields as
77                // `serde_json::Value::Null`, and the CBOR codec encodes
78                // `serde_json::Value::Null` as the empty-array marker (`0x80`),
79                // NOT CBOR null (`0xf6`) — see `cratestack-codec-cbor`. A server
80                // decoding the corresponding `Option<T>` field then fails with
81                // "expected text, got array". The generated request structs
82                // carry `#[serde(default)]` on optional fields, so an absent key
83                // decodes as `None` exactly as an explicit null would have. This
84                // mirrors the server-side projection that strips null map
85                // entries before its own encode, keeping both directions
86                // null-free on the wire.
87                strip_json_null_entries(&mut value);
88                value
89            })
90            .map_err(|error| CoolError::Codec(format!("encode batch input: {error}")));
91        Self {
92            rpc,
93            op_id: op_id.into(),
94            input_value,
95            _output: std::marker::PhantomData,
96        }
97    }
98
99    /// Queue this call into a [`BatchBuilder`] for deferred
100    /// execution. The returned [`BatchHandle`] is the key to
101    /// retrieve the typed result via [`BatchResults::take`] after
102    /// [`BatchBuilder::send`] resolves.
103    ///
104    /// Input-encoding errors observed at construction time are
105    /// preserved per-handle, so a single bad input in a batch
106    /// produces a per-handle `take(...)?` error rather than
107    /// poisoning the whole batch.
108    pub fn queue(self, batch: &mut BatchBuilder<C>) -> BatchHandle<O> {
109        let id = match self.input_value {
110            Ok(value) => batch.push_frame(self.op_id, value),
111            Err(error) => batch.push_failed_frame(error),
112        };
113        BatchHandle {
114            id,
115            _output: std::marker::PhantomData,
116        }
117    }
118}
119
120impl<C, O> std::future::IntoFuture for BatchableCall<C, O>
121where
122    C: HttpClientCodec + Clone + Send + 'static,
123    O: serde::de::DeserializeOwned + Send + 'static,
124{
125    type Output = Result<O, RpcClientError>;
126    type IntoFuture = std::pin::Pin<Box<dyn std::future::Future<Output = Self::Output> + Send>>;
127
128    fn into_future(self) -> Self::IntoFuture {
129        Box::pin(async move {
130            let value = self.input_value.map_err(RpcClientError::Codec)?;
131            self.rpc
132                .call::<serde_json::Value, O>(&self.op_id, &value)
133                .await
134        })
135    }
136}
137
138/// A typed key returned by [`BatchableCall::queue`]. Pair it with
139/// [`BatchResults::take`] to extract the typed output for that op
140/// from the batch response.
141///
142/// Carries `O` only as a phantom type — there's no runtime overhead.
143/// Cheap to clone; clones share identity (you can `take(handle)` only
144/// once, but the type tracks across passes).
145pub struct BatchHandle<O> {
146    pub(crate) id: u64,
147    pub(crate) _output: std::marker::PhantomData<fn() -> O>,
148}
149
150impl<O> Clone for BatchHandle<O> {
151    // Hand-written (not derived) so the impl doesn't pick up a spurious
152    // `O: Clone` bound — `O` is purely a phantom. `BatchHandle` is `Copy`,
153    // so the canonical clone is just a copy of `*self`.
154    fn clone(&self) -> Self {
155        *self
156    }
157}
158
159impl<O> Copy for BatchHandle<O> {}
160
161impl<O> std::fmt::Debug for BatchHandle<O> {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("BatchHandle").field("id", &self.id).finish()
164    }
165}
166
167#[cfg(test)]
168mod null_strip_tests {
169    use super::strip_json_null_entries;
170    use cratestack_codec_cbor::CborCodec;
171    use cratestack_core::CoolCodec;
172    use serde::{Deserialize, Serialize};
173
174    #[derive(Serialize, Deserialize, PartialEq, Debug)]
175    struct Req {
176        required: String,
177        // Generated request structs carry `#[serde(default)]` on optionals so an
178        // absent key decodes as `None` — the property the strip relies on.
179        #[serde(default)]
180        optional: Option<String>,
181        #[serde(default)]
182        nested: Option<Inner>,
183    }
184
185    #[derive(Serialize, Deserialize, PartialEq, Debug)]
186    struct Inner {
187        #[serde(default)]
188        maybe: Option<String>,
189        kept: String,
190    }
191
192    /// The `BatchableCall::new` path: serde -> `serde_json::Value` -> strip nulls.
193    /// After stripping, the value encodes to CBOR cleanly and the typed struct
194    /// decodes its `None` optionals back from the absent keys.
195    #[test]
196    fn stripped_none_optionals_round_trip_through_cbor() {
197        let req = Req {
198            required: "x".to_owned(),
199            optional: None,
200            nested: Some(Inner {
201                maybe: None,
202                kept: "k".to_owned(),
203            }),
204        };
205        let mut value = serde_json::to_value(&req).expect("to_value");
206        assert!(value.get("optional").expect("present").is_null());
207        strip_json_null_entries(&mut value);
208        assert!(
209            value.get("optional").is_none(),
210            "top-level null entry dropped"
211        );
212        assert!(
213            value["nested"].get("maybe").is_none(),
214            "nested null entry dropped"
215        );
216        assert_eq!(value["nested"]["kept"], serde_json::json!("k"));
217
218        let bytes = CborCodec.encode(&value).expect("encode");
219        let decoded: Req = CborCodec.decode(&bytes).expect("decode");
220        assert_eq!(decoded, req);
221    }
222
223    /// Regression guard: WITHOUT the strip, a `serde_json::Value::Null` optional
224    /// mis-encodes as the CBOR empty-array marker and the typed `Option<String>`
225    /// decode fails ("expected text, got array") — the exact cross-service bug.
226    #[test]
227    fn unstripped_null_breaks_typed_decode() {
228        let value = serde_json::json!({ "required": "x", "optional": null });
229        let bytes = CborCodec.encode(&value).expect("encode");
230        assert!(
231            CborCodec.decode::<Req>(&bytes).is_err(),
232            "unstripped Value::Null must break the typed decode"
233        );
234    }
235}