Skip to main content

cratestack_client_rust/runtime/
handle.rs

1use std::sync::Arc;
2
3use cratestack_codec_cbor::CborCodec;
4#[cfg(feature = "codec-json")]
5use cratestack_codec_json::JsonCodec;
6use reqwest::Url;
7
8use crate::client::CratestackClient;
9use crate::config::ClientConfig;
10use crate::error::ClientError;
11use crate::runtime::transport::RuntimeTransportClient;
12use crate::runtime::wire::{
13    RuntimeCodecConfig, RuntimeConfigWire, RuntimeEnvelopeConfig, RuntimeErrorCode,
14    RuntimeErrorWire, RuntimeRequestWire, RuntimeResponseWire, RuntimeStateStoreConfig,
15};
16use crate::state::{
17    ClientStateStore, InMemoryStateStore, JsonFileStateStore, PersistedClientState,
18};
19use crate::streaming_callback::RuntimeChunkWire;
20
21pub struct RuntimeHandle {
22    runtime: tokio::runtime::Runtime,
23    pub(crate) client: RuntimeTransportClient,
24}
25
26impl RuntimeHandle {
27    pub fn new(config: RuntimeConfigWire) -> Result<Self, RuntimeErrorWire> {
28        let base_url = Url::parse(&config.base_url).map_err(|error| RuntimeErrorWire {
29            code: RuntimeErrorCode::BadInput,
30            http_status: None,
31            message: format!("invalid base URL '{}': {error}", config.base_url),
32            remote_code: None,
33            remote_body: None,
34        })?;
35        let state_store: Arc<dyn ClientStateStore> = match config.state_store {
36            RuntimeStateStoreConfig::InMemory => Arc::new(InMemoryStateStore::default()),
37            RuntimeStateStoreConfig::JsonFile { path } => Arc::new(JsonFileStateStore::new(path)),
38        };
39        if config.transport.envelope != RuntimeEnvelopeConfig::None {
40            return Err(RuntimeErrorWire {
41                code: RuntimeErrorCode::BadInput,
42                http_status: None,
43                message: "COSE envelope support is not implemented yet".to_owned(),
44                remote_code: None,
45                remote_body: None,
46            });
47        }
48        let client = match config.transport.codec {
49            RuntimeCodecConfig::Cbor => RuntimeTransportClient::Cbor(
50                CratestackClient::new(ClientConfig::new(base_url.clone()), CborCodec)
51                    .with_state_store(state_store.clone()),
52            ),
53            #[cfg(feature = "codec-json")]
54            RuntimeCodecConfig::Json => RuntimeTransportClient::Json(
55                CratestackClient::new(ClientConfig::new(base_url), JsonCodec)
56                    .with_state_store(state_store),
57            ),
58            #[cfg(not(feature = "codec-json"))]
59            RuntimeCodecConfig::Json => {
60                let _ = state_store;
61                return Err(RuntimeErrorWire {
62                    code: RuntimeErrorCode::BadInput,
63                    http_status: None,
64                    message: "JSON codec is not compiled in (cratestack-client-rust built \
65                              with `--no-default-features` or without the `codec-json` \
66                              feature)"
67                        .to_owned(),
68                    remote_code: None,
69                    remote_body: None,
70                });
71            }
72        };
73        let runtime = tokio::runtime::Builder::new_current_thread()
74            .enable_all()
75            .build()
76            .map_err(|error| RuntimeErrorWire {
77                code: RuntimeErrorCode::State,
78                http_status: None,
79                message: format!("failed to build runtime: {error}"),
80                remote_code: None,
81                remote_body: None,
82            })?;
83
84        Ok(Self { runtime, client })
85    }
86
87    pub fn execute(
88        &self,
89        request: RuntimeRequestWire,
90    ) -> Result<RuntimeResponseWire, RuntimeErrorWire> {
91        self.runtime
92            .block_on(self.client.execute_raw(request))
93            .map_err(RuntimeErrorWire::from)
94    }
95
96    /// Streaming companion to [`Self::execute`]. The callback receives
97    /// one [`RuntimeChunkWire`] per complete cbor-seq item as bytes
98    /// arrive on the wire; returning `false` cancels the stream.
99    /// Returns when the stream terminates (clean end, error, or
100    /// cancellation).
101    ///
102    /// Designed for the FFI surface — the callback gets raw CBOR bytes
103    /// per item, so the host language decodes with its native CBOR
104    /// library (Dart, Swift, Kotlin) rather than carrying a typed
105    /// generic across the bridge.
106    pub fn execute_streamed<F>(
107        &self,
108        request: RuntimeRequestWire,
109        on_chunk: F,
110    ) -> Result<(), RuntimeErrorWire>
111    where
112        F: FnMut(RuntimeChunkWire) -> bool + Send,
113    {
114        self.runtime
115            .block_on(self.client.execute_streamed(request, on_chunk))
116    }
117
118    pub fn state(&self) -> Result<PersistedClientState, ClientError> {
119        self.client.state()
120    }
121}