Skip to main content

cratestack_axum/rpc/
batch.rs

1//! Per-frame conversion for the batch path.
2
3use axum::http::HeaderMap;
4use cratestack_core::CoolError;
5use cratestack_core::rpc::{RpcErrorBody, RpcResponseFrame};
6
7use crate::HttpTransport;
8
9use super::codec_helpers::decode_rpc_body;
10use super::util::synthesize_error_for_status;
11
12/// Convert an [`axum::Response`] returned by an inner dispatch arm into a
13/// single batch response frame.
14///
15/// Success bodies (2xx) are decoded as `serde_json::Value` via the same
16/// codec the request used and become `RpcResponseFrame::ok`. Error
17/// bodies (4xx/5xx) — which have already been post-processed by
18/// [`super::convert_handler_error_response`] inside `rpc_dispatch_inner` —
19/// are decoded as [`RpcErrorBody`] and inlined into
20/// `RpcResponseFrame::error` directly.
21///
22/// Wire limitation: success outputs must be representable as
23/// `serde_json::Value`. For CRUD/procedure outputs this is fine; if a
24/// future op returns CBOR-only types (e.g. raw byte strings without a
25/// JSON representation) the frame becomes an `internal` error.
26pub async fn response_to_frame<C>(
27    id: u64,
28    response: axum::response::Response,
29    codec: &C,
30    headers: &HeaderMap,
31) -> RpcResponseFrame
32where
33    C: HttpTransport,
34{
35    let status = response.status();
36    let body_bytes = match axum::body::to_bytes(response.into_body(), usize::MAX).await {
37        Ok(bytes) => bytes.to_vec(),
38        Err(error) => {
39            return RpcResponseFrame::err(
40                id,
41                &CoolError::Internal(format!("buffer batch frame body: {error}")),
42            );
43        }
44    };
45
46    if status.is_success() {
47        match decode_rpc_body::<_, serde_json::Value>(codec, headers, &body_bytes) {
48            Ok(value) => RpcResponseFrame::ok(id, value),
49            Err(error) => RpcResponseFrame::err(id, &error),
50        }
51    } else {
52        // Body is already RpcErrorBody-shaped — `rpc_dispatch_inner`
53        // post-processes handler errors before they reach us.
54        match decode_rpc_body::<_, RpcErrorBody>(codec, headers, &body_bytes) {
55            Ok(body) => RpcResponseFrame {
56                id,
57                output: None,
58                error: Some(body),
59            },
60            Err(_) => {
61                // Defensive: a handler/dispatcher returned a non-2xx
62                // body that isn't RpcErrorBody-shaped. Synthesize one
63                // from the status alone rather than corrupting the
64                // batch envelope.
65                let synthetic = synthesize_error_for_status(status);
66                RpcResponseFrame::err(id, &synthetic)
67            }
68        }
69    }
70}