Skip to main content

cratestack_client_rust/rpc/
client.rs

1use cratestack_codec_cbor::CborCodec;
2use cratestack_core::rpc::{RPC_BATCH_PATH, RpcRequest, RpcResponseFrame};
3use reqwest::Method;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6
7use crate::client::CratestackClient;
8use crate::codec::HttpClientCodec;
9use crate::config::ClientConfig;
10use crate::rpc::batch::BatchBuilder;
11use crate::rpc::error::{RpcClientError, client_error_to_rpc, decode_rpc_unary_response};
12use crate::streaming::pump_streamed_response_typed;
13
14// `RPC_BATCH_PATH` from core is the axum-template form `"/rpc/batch"`,
15// so we just reuse it. The unary path is templated (`/rpc/{op_id}`) so
16// we format it per call instead of using the constant directly.
17const RPC_BATCH_PATH_PLAIN: &str = RPC_BATCH_PATH;
18
19/// Thin RPC client built on top of the REST client's transport + codec
20/// plumbing.
21///
22/// Shares a `reqwest::Client` and a codec impl with `CratestackClient`,
23/// but speaks the `/rpc/...` URL space instead of REST routes. Both
24/// clients can be used side-by-side against the same server.
25#[derive(Clone)]
26pub struct RpcClient<C = CborCodec> {
27    pub(crate) inner: CratestackClient<C>,
28}
29
30impl RpcClient<CborCodec> {
31    pub fn cbor(config: ClientConfig) -> Self {
32        Self::new(CratestackClient::cbor(config))
33    }
34}
35
36impl<C> RpcClient<C>
37where
38    C: HttpClientCodec + Clone,
39{
40    /// Build an RPC client on top of an existing REST client. The two
41    /// share their `reqwest::Client`, codec, and state store.
42    pub fn new(inner: CratestackClient<C>) -> Self {
43        Self { inner }
44    }
45
46    /// Underlying REST client. Exposed for callers that want REST + RPC
47    /// side-by-side (e.g. a long migration window between the two).
48    pub fn inner(&self) -> &CratestackClient<C> {
49        &self.inner
50    }
51
52    /// Start a new typed batch. Use with [`BatchableCall::queue`] from
53    /// the macro-generated typed methods (or any hand-built
54    /// [`BatchableCall`]) to compose a heterogeneous batch, then
55    /// `batch.send().await` for a single `POST /rpc/batch` round-trip.
56    pub fn batch_builder(&self) -> BatchBuilder<C> {
57        BatchBuilder::new(self.clone())
58    }
59
60    /// POST /rpc/{op_id} — unary call.
61    ///
62    /// `op_id` is the dotted dispatch key the server emits — `model.X.list`
63    /// / `model.X.get` / `model.X.create` / `model.X.update` /
64    /// `model.X.delete` for CRUD verbs and `procedure.<name>` for procedures.
65    pub async fn call<I, O>(&self, op_id: &str, input: &I) -> Result<O, RpcClientError>
66    where
67        I: Serialize,
68        O: DeserializeOwned,
69    {
70        let body = self
71            .inner
72            .codec
73            .encode(input)
74            .map_err(RpcClientError::Codec)?;
75        let path = format!("/rpc/{}", op_id);
76        let response = self
77            .inner
78            .request_raw_with_query_and_accept(Method::POST, &path, Some(body), None, &[], None)
79            .await
80            .map_err(client_error_to_rpc)?;
81        decode_rpc_unary_response(&self.inner.codec, &response)
82    }
83
84    /// POST /rpc/batch — sequence of `RpcRequest` frames in, sequence of
85    /// `RpcResponseFrame` frames out. Per-frame errors do not poison the
86    /// batch (each frame's `output` / `error` is reported independently).
87    pub async fn batch(
88        &self,
89        requests: &[RpcRequest],
90    ) -> Result<Vec<RpcResponseFrame>, RpcClientError> {
91        let body = self
92            .inner
93            .codec
94            .encode(&requests)
95            .map_err(RpcClientError::Codec)?;
96        let response = self
97            .inner
98            .request_raw_with_query_and_accept(
99                Method::POST,
100                RPC_BATCH_PATH_PLAIN,
101                Some(body),
102                None,
103                &[],
104                None,
105            )
106            .await
107            .map_err(client_error_to_rpc)?;
108        decode_rpc_unary_response::<C, Vec<RpcResponseFrame>>(&self.inner.codec, &response)
109    }
110
111    /// POST /rpc/{op_id} — sequence response, item-at-a-time.
112    ///
113    /// Returns a bounded `mpsc::Receiver` that yields each cbor-seq
114    /// item as bytes arrive over the network — no full-body buffering
115    /// before the first item reaches the caller. Transport / decode
116    /// failures appear as terminal `Err` items on the channel; the
117    /// receiver returning `None` indicates a clean stream close.
118    ///
119    /// Non-2xx responses are buffered and surfaced as a single
120    /// `RpcClientError::Remote(RpcRemoteError { ... })` from this
121    /// function (the channel is never opened) — same shape as the
122    /// unary `call` path. The server must return `application/cbor-seq`
123    /// for streaming; on a buffered `application/cbor` response this
124    /// method will misframe the body.
125    pub async fn call_streaming<I, O>(
126        &self,
127        op_id: &str,
128        input: &I,
129    ) -> Result<tokio::sync::mpsc::Receiver<Result<O, RpcClientError>>, RpcClientError>
130    where
131        I: Serialize,
132        O: DeserializeOwned + Send + 'static,
133    {
134        let body = self
135            .inner
136            .codec
137            .encode(input)
138            .map_err(RpcClientError::Codec)?;
139        let path = format!("/rpc/{}", op_id);
140        let response = self
141            .inner
142            .request_streamed_with_query_and_accept(
143                Method::POST,
144                &path,
145                Some(body),
146                None,
147                &[],
148                self.inner.codec.sequence_accept_header_value(),
149            )
150            .await
151            .map_err(client_error_to_rpc)?;
152
153        // Bounded channel — 16 in-flight items matches the REST
154        // `post_list_streamed` shape and keeps consumer memory tight.
155        let (tx, rx) = tokio::sync::mpsc::channel(16);
156        tokio::spawn(pump_streamed_response_typed::<O, RpcClientError, _>(
157            response,
158            tx,
159            client_error_to_rpc,
160        ));
161        Ok(rx)
162    }
163}