cratestack_client_rust/rpc/client.rs
1use cratestack_codec_cbor::CborCodec;
2use cratestack_core::rpc::{RPC_BATCH_PATH, RpcRequest, RpcResponseFrame};
3use reqwest::Method;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use crate::client::CratestackClient;
8use crate::codec::HttpClientCodec;
9use crate::config::ClientConfig;
10use crate::rpc::batch::BatchBuilder;
11use crate::rpc::error::{RpcClientError, client_error_to_rpc, decode_rpc_unary_response};
12use crate::streaming::pump_streamed_response_typed;
13
14// `RPC_BATCH_PATH` from core is the axum-template form `"/rpc/batch"`,
15// so we just reuse it. The unary path is templated (`/rpc/{op_id}`) so
16// we format it per call instead of using the constant directly.
17const RPC_BATCH_PATH_PLAIN: &str = RPC_BATCH_PATH;
18
19/// Thin RPC client built on top of the REST client's transport + codec
20/// plumbing.
21///
22/// Shares a `reqwest::Client` and a codec impl with `CratestackClient`,
23/// but speaks the `/rpc/...` URL space instead of REST routes. Both
24/// clients can be used side-by-side against the same server.
25#[derive(Clone)]
26pub struct RpcClient<C = CborCodec> {
27 pub(crate) inner: CratestackClient<C>,
28}
29
30impl RpcClient<CborCodec> {
31 pub fn cbor(config: ClientConfig) -> Self {
32 Self::new(CratestackClient::cbor(config))
33 }
34}
35
36impl<C> RpcClient<C>
37where
38 C: HttpClientCodec + Clone,
39{
40 /// Build an RPC client on top of an existing REST client. The two
41 /// share their `reqwest::Client`, codec, and state store.
42 pub fn new(inner: CratestackClient<C>) -> Self {
43 Self { inner }
44 }
45
46 /// Underlying REST client. Exposed for callers that want REST + RPC
47 /// side-by-side (e.g. a long migration window between the two).
48 pub fn inner(&self) -> &CratestackClient<C> {
49 &self.inner
50 }
51
52 /// Start a new typed batch. Use with [`BatchableCall::queue`] from
53 /// the macro-generated typed methods (or any hand-built
54 /// [`BatchableCall`]) to compose a heterogeneous batch, then
55 /// `batch.send().await` for a single `POST /rpc/batch` round-trip.
56 pub fn batch_builder(&self) -> BatchBuilder<C> {
57 BatchBuilder::new(self.clone())
58 }
59
60 /// POST /rpc/{op_id} — unary call.
61 ///
62 /// `op_id` is the dotted dispatch key the server emits — `model.X.list`
63 /// / `model.X.get` / `model.X.create` / `model.X.update` /
64 /// `model.X.delete` for CRUD verbs and `procedure.<name>` for procedures.
65 pub async fn call<I, O>(&self, op_id: &str, input: &I) -> Result<O, RpcClientError>
66 where
67 I: Serialize,
68 O: DeserializeOwned,
69 {
70 let body = self
71 .inner
72 .codec
73 .encode(input)
74 .map_err(RpcClientError::Codec)?;
75 let path = format!("/rpc/{}", op_id);
76 let response = self
77 .inner
78 .request_raw_with_query_and_accept(Method::POST, &path, Some(body), None, &[], None)
79 .await
80 .map_err(client_error_to_rpc)?;
81 decode_rpc_unary_response(&self.inner.codec, &response)
82 }
83
84 /// POST /rpc/batch — sequence of `RpcRequest` frames in, sequence of
85 /// `RpcResponseFrame` frames out. Per-frame errors do not poison the
86 /// batch (each frame's `output` / `error` is reported independently).
87 pub async fn batch(
88 &self,
89 requests: &[RpcRequest],
90 ) -> Result<Vec<RpcResponseFrame>, RpcClientError> {
91 let body = self
92 .inner
93 .codec
94 .encode(&requests)
95 .map_err(RpcClientError::Codec)?;
96 let response = self
97 .inner
98 .request_raw_with_query_and_accept(
99 Method::POST,
100 RPC_BATCH_PATH_PLAIN,
101 Some(body),
102 None,
103 &[],
104 None,
105 )
106 .await
107 .map_err(client_error_to_rpc)?;
108 decode_rpc_unary_response::<C, Vec<RpcResponseFrame>>(&self.inner.codec, &response)
109 }
110
111 /// POST /rpc/{op_id} — sequence response, item-at-a-time.
112 ///
113 /// Returns a bounded `mpsc::Receiver` that yields each cbor-seq
114 /// item as bytes arrive over the network — no full-body buffering
115 /// before the first item reaches the caller. Transport / decode
116 /// failures appear as terminal `Err` items on the channel; the
117 /// receiver returning `None` indicates a clean stream close.
118 ///
119 /// Non-2xx responses are buffered and surfaced as a single
120 /// `RpcClientError::Remote(RpcRemoteError { ... })` from this
121 /// function (the channel is never opened) — same shape as the
122 /// unary `call` path. The server must return `application/cbor-seq`
123 /// for streaming; on a buffered `application/cbor` response this
124 /// method will misframe the body.
125 pub async fn call_streaming<I, O>(
126 &self,
127 op_id: &str,
128 input: &I,
129 ) -> Result<tokio::sync::mpsc::Receiver<Result<O, RpcClientError>>, RpcClientError>
130 where
131 I: Serialize,
132 O: DeserializeOwned + Send + 'static,
133 {
134 let body = self
135 .inner
136 .codec
137 .encode(input)
138 .map_err(RpcClientError::Codec)?;
139 let path = format!("/rpc/{}", op_id);
140 let response = self
141 .inner
142 .request_streamed_with_query_and_accept(
143 Method::POST,
144 &path,
145 Some(body),
146 None,
147 &[],
148 self.inner.codec.sequence_accept_header_value(),
149 )
150 .await
151 .map_err(client_error_to_rpc)?;
152
153 // Bounded channel — 16 in-flight items matches the REST
154 // `post_list_streamed` shape and keeps consumer memory tight.
155 let (tx, rx) = tokio::sync::mpsc::channel(16);
156 tokio::spawn(pump_streamed_response_typed::<O, RpcClientError, _>(
157 response,
158 tx,
159 client_error_to_rpc,
160 ));
161 Ok(rx)
162 }
163}