1use std::fs;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6pub use cratestack_codec_cbor::CborCodec;
7pub use cratestack_codec_json::JsonCodec;
8use cratestack_core::{
9 CoolCodec, CoolError, CoolErrorResponse, Page, SelectionQuery, canonical_request_string,
10};
11use reqwest::header::{ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
12use reqwest::{Method, StatusCode, Url};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16
17const BRIDGE_CONTENT_TYPE: &str = "application/json";
18const CBOR_SEQUENCE_CONTENT_TYPE: &str = "application/cbor-seq";
19
20pub trait Projection {
21 type Output;
22
23 fn selection_query(&self) -> SelectionQuery;
24
25 fn decode_one(&self, value: JsonValue) -> Result<Self::Output, CoolError>;
26
27 fn decode_many(&self, value: JsonValue) -> Result<Vec<Self::Output>, CoolError> {
28 match value {
29 JsonValue::Array(values) => values
30 .into_iter()
31 .map(|value| self.decode_one(value))
32 .collect(),
33 other => Err(CoolError::Internal(format!(
34 "projected list payload must be an array, got {other:?}"
35 ))),
36 }
37 }
38
39 fn decode_page(&self, value: JsonValue) -> Result<Page<Self::Output>, CoolError> {
40 let page = serde_json::from_value::<Page<JsonValue>>(value).map_err(|error| {
41 CoolError::Codec(format!("failed to decode projected page payload: {error}"))
42 })?;
43 let items = page
44 .items
45 .into_iter()
46 .map(|value| self.decode_one(value))
47 .collect::<Result<Vec<_>, _>>()?;
48 Ok(Page::new(items, page.page_info).with_total_count(page.total_count))
49 }
50}
51
52impl Projection for SelectionQuery {
53 type Output = JsonValue;
54
55 fn selection_query(&self) -> SelectionQuery {
56 self.clone()
57 }
58
59 fn decode_one(&self, value: JsonValue) -> Result<Self::Output, CoolError> {
60 Ok(value)
61 }
62}
63
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
65pub enum RuntimeCodecConfig {
66 #[default]
67 Cbor,
68 Json,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
72pub enum RuntimeEnvelopeConfig {
73 #[default]
74 None,
75 CoseSign1,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
79pub struct RuntimeTransportConfig {
80 pub codec: RuntimeCodecConfig,
81 pub envelope: RuntimeEnvelopeConfig,
82}
83
84pub trait HttpClientCodec: CoolCodec {
85 fn accept_header_value(&self) -> &'static str;
86
87 fn sequence_accept_header_value(&self) -> &'static str;
88
89 fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
90 where
91 T: DeserializeOwned;
92
93 fn decode_sequence_response<T>(
94 &self,
95 content_type: &str,
96 body: &[u8],
97 ) -> Result<Vec<T>, CoolError>
98 where
99 T: DeserializeOwned;
100}
101
102impl HttpClientCodec for CborCodec {
103 fn accept_header_value(&self) -> &'static str {
104 "application/cbor, application/json"
105 }
106
107 fn sequence_accept_header_value(&self) -> &'static str {
108 "application/cbor-seq, application/cbor, application/json"
109 }
110
111 fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
112 where
113 T: DeserializeOwned,
114 {
115 if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
116 self.decode(body)
117 } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
118 JsonCodec.decode(body)
119 } else {
120 Err(CoolError::Codec(format!(
121 "unsupported response Content-Type {content_type}"
122 )))
123 }
124 }
125
126 fn decode_sequence_response<T>(
127 &self,
128 content_type: &str,
129 body: &[u8],
130 ) -> Result<Vec<T>, CoolError>
131 where
132 T: DeserializeOwned,
133 {
134 if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
135 decode_cbor_sequence(body)
136 } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
137 self.decode(body)
138 } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
139 JsonCodec.decode(body)
140 } else {
141 Err(CoolError::Codec(format!(
142 "unsupported response Content-Type {content_type}"
143 )))
144 }
145 }
146}
147
148impl HttpClientCodec for JsonCodec {
149 fn accept_header_value(&self) -> &'static str {
150 "application/json, application/cbor"
151 }
152
153 fn sequence_accept_header_value(&self) -> &'static str {
154 "application/cbor-seq, application/json, application/cbor"
155 }
156
157 fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
158 where
159 T: DeserializeOwned,
160 {
161 if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
162 self.decode(body)
163 } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
164 CborCodec.decode(body)
165 } else {
166 Err(CoolError::Codec(format!(
167 "unsupported response Content-Type {content_type}"
168 )))
169 }
170 }
171
172 fn decode_sequence_response<T>(
173 &self,
174 content_type: &str,
175 body: &[u8],
176 ) -> Result<Vec<T>, CoolError>
177 where
178 T: DeserializeOwned,
179 {
180 if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
181 decode_cbor_sequence(body)
182 } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
183 self.decode(body)
184 } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
185 CborCodec.decode(body)
186 } else {
187 Err(CoolError::Codec(format!(
188 "unsupported response Content-Type {content_type}"
189 )))
190 }
191 }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct RequestJournalEntry {
196 pub method: String,
197 pub path: String,
198 pub status_code: u16,
199 pub content_type: Option<String>,
200 pub recorded_at: DateTime<Utc>,
201}
202
203fn default_schema_version() -> u32 {
204 1
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208pub struct PersistedClientState {
209 #[serde(default = "default_schema_version")]
210 pub schema_version: u32,
211 #[serde(default)]
212 pub state_version: u64,
213 #[serde(default)]
214 pub request_journal: Vec<RequestJournalEntry>,
215}
216
217impl Default for PersistedClientState {
218 fn default() -> Self {
219 Self {
220 schema_version: default_schema_version(),
221 state_version: 0,
222 request_journal: Vec::new(),
223 }
224 }
225}
226
227pub trait ClientStateStore: Send + Sync {
228 fn load(&self) -> Result<PersistedClientState, ClientError>;
229 fn save(&self, state: &PersistedClientState) -> Result<(), ClientError>;
230
231 fn append_request_journal(&self, entry: &RequestJournalEntry) -> Result<(), ClientError> {
232 let mut state = self.load()?;
233 state.request_journal.push(entry.clone());
234 state.state_version = state.state_version.saturating_add(1);
235 self.save(&state)
236 }
237}
238
239#[derive(Debug, Default)]
240pub struct InMemoryStateStore {
241 state: Mutex<PersistedClientState>,
242}
243
244impl ClientStateStore for InMemoryStateStore {
245 fn load(&self) -> Result<PersistedClientState, ClientError> {
246 self.state
247 .lock()
248 .map_err(|error| ClientError::State(format!("failed to lock state store: {error}")))
249 .map(|state| state.clone())
250 }
251
252 fn save(&self, state: &PersistedClientState) -> Result<(), ClientError> {
253 let mut guard = self
254 .state
255 .lock()
256 .map_err(|error| ClientError::State(format!("failed to lock state store: {error}")))?;
257 *guard = state.clone();
258 Ok(())
259 }
260}
261
262#[derive(Debug, Clone)]
263pub struct JsonFileStateStore {
264 path: PathBuf,
265}
266
267impl JsonFileStateStore {
268 pub fn new(path: impl Into<PathBuf>) -> Self {
269 Self { path: path.into() }
270 }
271
272 pub fn path(&self) -> &Path {
273 &self.path
274 }
275}
276
277impl ClientStateStore for JsonFileStateStore {
278 fn load(&self) -> Result<PersistedClientState, ClientError> {
279 match fs::read(&self.path) {
280 Ok(bytes) => serde_json::from_slice(&bytes).map_err(|error| {
281 ClientError::State(format!(
282 "failed to decode state file {}: {error}",
283 self.path.display()
284 ))
285 }),
286 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
287 Ok(PersistedClientState::default())
288 }
289 Err(error) => Err(ClientError::State(format!(
290 "failed to read state file {}: {error}",
291 self.path.display()
292 ))),
293 }
294 }
295
296 fn save(&self, state: &PersistedClientState) -> Result<(), ClientError> {
297 if let Some(parent) = self.path.parent() {
298 fs::create_dir_all(parent).map_err(|error| {
299 ClientError::State(format!(
300 "failed to create state directory {}: {error}",
301 parent.display()
302 ))
303 })?;
304 }
305 let bytes = serde_json::to_vec_pretty(state).map_err(|error| {
306 ClientError::State(format!(
307 "failed to encode state file {}: {error}",
308 self.path.display()
309 ))
310 })?;
311 fs::write(&self.path, bytes).map_err(|error| {
312 ClientError::State(format!(
313 "failed to write state file {}: {error}",
314 self.path.display()
315 ))
316 })
317 }
318}
319
320#[derive(Debug, Clone)]
321pub struct ClientConfig {
322 pub base_url: Url,
323}
324
325impl ClientConfig {
326 pub fn new(base_url: Url) -> Self {
327 Self { base_url }
328 }
329}
330
331#[derive(Debug, thiserror::Error)]
332pub enum ClientError {
333 #[error("transport error: {0}")]
334 Transport(#[from] reqwest::Error),
335 #[error("codec error: {0}")]
336 Codec(#[from] CoolError),
337 #[error("state error: {0}")]
338 State(String),
339 #[error("invalid response: {0}")]
340 InvalidResponse(String),
341 #[error("bad input: {0}")]
342 BadInput(String),
343 #[error("remote call failed with status {status}: {message}")]
344 Remote {
345 status: StatusCode,
346 error: Option<CoolErrorResponse>,
347 message: String,
348 },
349}
350
351pub type HeaderPair<'a> = (&'a str, &'a str);
352pub type QueryPair<'a> = (&'a str, &'a str);
353
354#[derive(Debug, Clone, PartialEq, Eq)]
355pub struct AuthorizationRequest {
356 pub method: String,
357 pub path: String,
358 pub canonical_query: Option<String>,
359 pub content_type: Option<String>,
360 pub body: Vec<u8>,
361 pub canonical_request: String,
362}
363
364pub trait RequestAuthorizer: Send + Sync {
365 fn authorize(
366 &self,
367 request: &AuthorizationRequest,
368 ) -> Result<Vec<(String, String)>, ClientError>;
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
372pub struct RuntimeHeader {
373 pub name: String,
374 pub value: String,
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
378pub struct RuntimeRequestWire {
379 pub method: String,
380 pub path: String,
381 pub canonical_query: Option<String>,
382 pub headers: Vec<RuntimeHeader>,
383 pub body: Vec<u8>,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct RuntimeResponseWire {
388 pub status_code: u16,
389 pub headers: Vec<RuntimeHeader>,
390 pub body: Vec<u8>,
391}
392
393#[repr(u32)]
394#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
395pub enum RuntimeErrorCode {
396 Transport = 1,
397 Codec = 2,
398 State = 3,
399 InvalidResponse = 4,
400 Remote = 5,
401 BadInput = 6,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
405pub struct RuntimeErrorWire {
406 pub code: RuntimeErrorCode,
407 pub http_status: Option<u16>,
408 pub message: String,
409 pub remote_code: Option<String>,
410 pub remote_body: Option<Vec<u8>>,
411}
412
413impl From<ClientError> for RuntimeErrorWire {
414 fn from(value: ClientError) -> Self {
415 match value {
416 ClientError::Transport(error) => Self {
417 code: RuntimeErrorCode::Transport,
418 http_status: None,
419 message: error.to_string(),
420 remote_code: None,
421 remote_body: None,
422 },
423 ClientError::Codec(error) => Self {
424 code: RuntimeErrorCode::Codec,
425 http_status: Some(error.status_code().as_u16()),
426 message: error.to_string(),
427 remote_code: Some(error.code().to_owned()),
428 remote_body: None,
429 },
430 ClientError::State(message) => Self {
431 code: RuntimeErrorCode::State,
432 http_status: None,
433 message,
434 remote_code: None,
435 remote_body: None,
436 },
437 ClientError::InvalidResponse(message) => Self {
438 code: RuntimeErrorCode::InvalidResponse,
439 http_status: None,
440 message,
441 remote_code: None,
442 remote_body: None,
443 },
444 ClientError::BadInput(message) => Self {
445 code: RuntimeErrorCode::BadInput,
446 http_status: None,
447 message,
448 remote_code: None,
449 remote_body: None,
450 },
451 ClientError::Remote {
452 status,
453 error,
454 message,
455 } => Self {
456 code: RuntimeErrorCode::Remote,
457 http_status: Some(status.as_u16()),
458 remote_code: error.as_ref().map(|value| value.code.clone()),
459 remote_body: error
460 .as_ref()
461 .and_then(|value| serde_json::to_vec(value).ok()),
462 message,
463 },
464 }
465 }
466}
467
468#[derive(Debug, Clone, PartialEq, Eq)]
469pub enum RuntimeStateStoreConfig {
470 InMemory,
471 JsonFile { path: PathBuf },
472}
473
474#[derive(Debug, Clone, PartialEq, Eq)]
475pub struct RuntimeConfigWire {
476 pub base_url: String,
477 pub state_store: RuntimeStateStoreConfig,
478 pub transport: RuntimeTransportConfig,
479}
480
481pub struct RuntimeHandle {
482 runtime: tokio::runtime::Runtime,
483 client: RuntimeTransportClient,
484}
485
486enum RuntimeTransportClient {
487 Cbor(CratestackClient<CborCodec>),
488 Json(CratestackClient<JsonCodec>),
489}
490
491impl RuntimeHandle {
492 pub fn new(config: RuntimeConfigWire) -> Result<Self, RuntimeErrorWire> {
493 let base_url = Url::parse(&config.base_url).map_err(|error| RuntimeErrorWire {
494 code: RuntimeErrorCode::BadInput,
495 http_status: None,
496 message: format!("invalid base URL '{}': {error}", config.base_url),
497 remote_code: None,
498 remote_body: None,
499 })?;
500 let state_store: Arc<dyn ClientStateStore> = match config.state_store {
501 RuntimeStateStoreConfig::InMemory => Arc::new(InMemoryStateStore::default()),
502 RuntimeStateStoreConfig::JsonFile { path } => Arc::new(JsonFileStateStore::new(path)),
503 };
504 if config.transport.envelope != RuntimeEnvelopeConfig::None {
505 return Err(RuntimeErrorWire {
506 code: RuntimeErrorCode::BadInput,
507 http_status: None,
508 message: "COSE envelope support is not implemented yet".to_owned(),
509 remote_code: None,
510 remote_body: None,
511 });
512 }
513 let client = match config.transport.codec {
514 RuntimeCodecConfig::Cbor => RuntimeTransportClient::Cbor(
515 CratestackClient::new(ClientConfig::new(base_url.clone()), CborCodec)
516 .with_state_store(state_store.clone()),
517 ),
518 RuntimeCodecConfig::Json => RuntimeTransportClient::Json(
519 CratestackClient::new(ClientConfig::new(base_url), JsonCodec)
520 .with_state_store(state_store),
521 ),
522 };
523 let runtime = tokio::runtime::Builder::new_current_thread()
524 .enable_all()
525 .build()
526 .map_err(|error| RuntimeErrorWire {
527 code: RuntimeErrorCode::State,
528 http_status: None,
529 message: format!("failed to build runtime: {error}"),
530 remote_code: None,
531 remote_body: None,
532 })?;
533
534 Ok(Self { runtime, client })
535 }
536
537 pub fn execute(
538 &self,
539 request: RuntimeRequestWire,
540 ) -> Result<RuntimeResponseWire, RuntimeErrorWire> {
541 self.runtime
542 .block_on(self.client.execute_raw(request))
543 .map_err(RuntimeErrorWire::from)
544 }
545
546 pub fn state(&self) -> Result<PersistedClientState, ClientError> {
547 self.client.state()
548 }
549}
550
551fn replace_bridge_content_type(headers: &mut Vec<RuntimeHeader>) {
552 headers.retain(|header| !header.name.eq_ignore_ascii_case("content-type"));
553 headers.push(RuntimeHeader {
554 name: "content-type".to_owned(),
555 value: BRIDGE_CONTENT_TYPE.to_owned(),
556 });
557}
558
559impl RuntimeTransportClient {
560 async fn execute_raw(
561 &self,
562 request: RuntimeRequestWire,
563 ) -> Result<RuntimeResponseWire, ClientError> {
564 let request = self.bridge_request_to_transport(request)?;
565 match self {
566 Self::Cbor(client) => client.execute_raw_transport(request).await,
567 Self::Json(client) => client.execute_raw_transport(request).await,
568 }
569 .and_then(|response| self.transport_response_to_bridge(response))
570 }
571
572 fn bridge_request_to_transport(
573 &self,
574 request: RuntimeRequestWire,
575 ) -> Result<RuntimeRequestWire, ClientError> {
576 if request.body.is_empty() {
577 return Ok(request);
578 }
579
580 let value: JsonValue = serde_json::from_slice(&request.body).map_err(|error| {
581 ClientError::BadInput(format!("invalid bridge payload JSON: {error}"))
582 })?;
583 let body = match self {
584 Self::Cbor(_) => CborCodec.encode(&value)?,
585 Self::Json(_) => JsonCodec.encode(&value)?,
586 };
587
588 Ok(RuntimeRequestWire { body, ..request })
589 }
590
591 fn transport_response_to_bridge(
592 &self,
593 mut response: RuntimeResponseWire,
594 ) -> Result<RuntimeResponseWire, ClientError> {
595 if response.body.is_empty() {
596 replace_bridge_content_type(&mut response.headers);
597 return Ok(response);
598 }
599
600 let value = match self {
601 Self::Cbor(_) => CborCodec.decode::<JsonValue>(&response.body)?,
602 Self::Json(_) => JsonCodec.decode::<JsonValue>(&response.body)?,
603 };
604
605 response.body = serde_json::to_vec(&value).map_err(|error| {
606 ClientError::InvalidResponse(format!("failed to encode bridge payload JSON: {error}"))
607 })?;
608 replace_bridge_content_type(&mut response.headers);
609 Ok(response)
610 }
611
612 fn state(&self) -> Result<PersistedClientState, ClientError> {
613 match self {
614 Self::Cbor(client) => client.state(),
615 Self::Json(client) => client.state(),
616 }
617 }
618}
619
620#[derive(Clone)]
621pub struct CratestackClient<C = CborCodec> {
622 http: reqwest::Client,
623 config: ClientConfig,
624 codec: C,
625 state_store: Arc<dyn ClientStateStore>,
626 request_authorizer: Option<Arc<dyn RequestAuthorizer>>,
627}
628
629impl CratestackClient<CborCodec> {
630 pub fn cbor(config: ClientConfig) -> Self {
631 Self::new(config, CborCodec)
632 }
633}
634
635impl<C> CratestackClient<C>
636where
637 C: HttpClientCodec,
638{
639 pub fn new(config: ClientConfig, codec: C) -> Self {
640 Self {
641 http: reqwest::Client::new(),
642 config,
643 codec,
644 state_store: Arc::new(InMemoryStateStore::default()),
645 request_authorizer: None,
646 }
647 }
648
649 pub fn with_http_client(config: ClientConfig, codec: C, http: reqwest::Client) -> Self {
650 Self {
651 http,
652 config,
653 codec,
654 state_store: Arc::new(InMemoryStateStore::default()),
655 request_authorizer: None,
656 }
657 }
658
659 pub fn with_state_store(mut self, state_store: Arc<dyn ClientStateStore>) -> Self {
660 self.state_store = state_store;
661 self
662 }
663
664 pub fn with_optional_state_store(self, state_store: Option<Arc<dyn ClientStateStore>>) -> Self {
665 match state_store {
666 Some(state_store) => self.with_state_store(state_store),
667 None => self,
668 }
669 }
670
671 pub fn with_request_authorizer(
672 mut self,
673 request_authorizer: Arc<dyn RequestAuthorizer>,
674 ) -> Self {
675 self.request_authorizer = Some(request_authorizer);
676 self
677 }
678
679 pub fn state(&self) -> Result<PersistedClientState, ClientError> {
680 self.state_store.load()
681 }
682
683 pub async fn get<Output>(
684 &self,
685 path: &str,
686 query: &[QueryPair<'_>],
687 headers: &[HeaderPair<'_>],
688 ) -> Result<Output, ClientError>
689 where
690 Output: DeserializeOwned,
691 {
692 let response = self
693 .request_raw(Method::GET, path, None, query, headers)
694 .await?;
695 decode_typed_response(&self.codec, &response)
696 }
697
698 pub async fn get_view<P>(
699 &self,
700 path: &str,
701 projection: &P,
702 headers: &[HeaderPair<'_>],
703 ) -> Result<P::Output, ClientError>
704 where
705 P: Projection,
706 {
707 let selection = projection.selection_query();
708 let canonical_query = canonical_query_from_selection(&selection, &[])?;
709 let response = self
710 .request_raw_with_query_and_accept(
711 Method::GET,
712 path,
713 None,
714 canonical_query.as_deref(),
715 headers,
716 Some(JsonCodec::CONTENT_TYPE),
717 )
718 .await?;
719 let value = decode_json_value_response(&JsonCodec, &response)?;
720 projection.decode_one(value).map_err(ClientError::from)
721 }
722
723 pub async fn list_view<P>(
724 &self,
725 path: &str,
726 projection: &P,
727 extra_query: &[QueryPair<'_>],
728 headers: &[HeaderPair<'_>],
729 ) -> Result<Vec<P::Output>, ClientError>
730 where
731 P: Projection,
732 {
733 let selection = projection.selection_query();
734 let canonical_query = canonical_query_from_selection(&selection, extra_query)?;
735 let response = self
736 .request_raw_with_query_and_accept(
737 Method::GET,
738 path,
739 None,
740 canonical_query.as_deref(),
741 headers,
742 Some(JsonCodec::CONTENT_TYPE),
743 )
744 .await?;
745 let value = decode_json_value_response(&JsonCodec, &response)?;
746 projection.decode_many(value).map_err(ClientError::from)
747 }
748
749 pub async fn list_view_paged<P>(
750 &self,
751 path: &str,
752 projection: &P,
753 extra_query: &[QueryPair<'_>],
754 headers: &[HeaderPair<'_>],
755 ) -> Result<Page<P::Output>, ClientError>
756 where
757 P: Projection,
758 {
759 let selection = projection.selection_query();
760 let canonical_query = canonical_query_from_selection(&selection, extra_query)?;
761 let response = self
762 .request_raw_with_query_and_accept(
763 Method::GET,
764 path,
765 None,
766 canonical_query.as_deref(),
767 headers,
768 Some(JsonCodec::CONTENT_TYPE),
769 )
770 .await?;
771 let value = decode_json_value_response(&JsonCodec, &response)?;
772 projection.decode_page(value).map_err(ClientError::from)
773 }
774
775 pub async fn post<Input, Output>(
776 &self,
777 path: &str,
778 input: &Input,
779 headers: &[HeaderPair<'_>],
780 ) -> Result<Output, ClientError>
781 where
782 Input: Serialize,
783 Output: DeserializeOwned,
784 {
785 let body = self.codec.encode(input)?;
786 let response = self
787 .request_raw(Method::POST, path, Some(body), &[], headers)
788 .await?;
789 decode_typed_response(&self.codec, &response)
790 }
791
792 pub async fn post_list<Input, Output>(
793 &self,
794 path: &str,
795 input: &Input,
796 headers: &[HeaderPair<'_>],
797 ) -> Result<Vec<Output>, ClientError>
798 where
799 Input: Serialize,
800 Output: DeserializeOwned,
801 {
802 let body = self.codec.encode(input)?;
803 let response = self
804 .request_raw_with_query_and_accept(
805 Method::POST,
806 path,
807 Some(body),
808 None,
809 headers,
810 Some(self.codec.sequence_accept_header_value()),
811 )
812 .await?;
813 decode_sequence_response(&self.codec, &response)
814 }
815
816 pub async fn patch<Input, Output>(
817 &self,
818 path: &str,
819 input: &Input,
820 headers: &[HeaderPair<'_>],
821 ) -> Result<Output, ClientError>
822 where
823 Input: Serialize,
824 Output: DeserializeOwned,
825 {
826 let body = self.codec.encode(input)?;
827 let response = self
828 .request_raw(Method::PATCH, path, Some(body), &[], headers)
829 .await?;
830 decode_typed_response(&self.codec, &response)
831 }
832
833 pub async fn delete<Output>(
834 &self,
835 path: &str,
836 headers: &[HeaderPair<'_>],
837 ) -> Result<Output, ClientError>
838 where
839 Output: DeserializeOwned,
840 {
841 let response = self
842 .request_raw(Method::DELETE, path, None, &[], headers)
843 .await?;
844 decode_typed_response(&self.codec, &response)
845 }
846
847 pub async fn execute_raw_transport(
848 &self,
849 request: RuntimeRequestWire,
850 ) -> Result<RuntimeResponseWire, ClientError> {
851 let method = Method::from_bytes(request.method.as_bytes()).map_err(|error| {
852 ClientError::BadInput(format!("invalid HTTP method '{}': {error}", request.method))
853 })?;
854 let header_pairs = request
855 .headers
856 .iter()
857 .map(|header| (header.name.as_str(), header.value.as_str()))
858 .collect::<Vec<_>>();
859 self.request_raw_with_query(
860 method,
861 &request.path,
862 if request.body.is_empty() {
863 None
864 } else {
865 Some(request.body)
866 },
867 request.canonical_query.as_deref(),
868 &header_pairs,
869 )
870 .await
871 }
872
873 async fn request_raw(
874 &self,
875 method: Method,
876 path: &str,
877 body: Option<Vec<u8>>,
878 query: &[QueryPair<'_>],
879 headers: &[HeaderPair<'_>],
880 ) -> Result<RuntimeResponseWire, ClientError> {
881 let canonical_query =
882 if query.is_empty() {
883 None
884 } else {
885 Some(serde_urlencoded::to_string(query).map_err(|error| {
886 ClientError::BadInput(format!("invalid query pairs: {error}"))
887 })?)
888 };
889 self.request_raw_with_query(method, path, body, canonical_query.as_deref(), headers)
890 .await
891 }
892
893 async fn request_raw_with_query_and_accept(
894 &self,
895 method: Method,
896 path: &str,
897 body: Option<Vec<u8>>,
898 canonical_query: Option<&str>,
899 headers: &[HeaderPair<'_>],
900 accept_override: Option<&'static str>,
901 ) -> Result<RuntimeResponseWire, ClientError> {
902 let url = build_url(&self.config.base_url, path, canonical_query)?;
903 let mut header_map = HeaderMap::new();
904 header_map.insert(
905 ACCEPT,
906 HeaderValue::from_static(
907 accept_override.unwrap_or_else(|| self.codec.accept_header_value()),
908 ),
909 );
910 let content_type = if body.is_some() {
911 header_map.insert(CONTENT_TYPE, HeaderValue::from_static(C::CONTENT_TYPE));
912 Some(C::CONTENT_TYPE.to_owned())
913 } else {
914 None
915 };
916 if let Some(authorizer) = &self.request_authorizer {
917 let canonical_request = canonical_request_string(
918 method.as_str(),
919 path,
920 canonical_query,
921 content_type.as_deref(),
922 body.as_deref().unwrap_or(&[]),
923 );
924 let authorization_request = AuthorizationRequest {
925 method: method.as_str().to_owned(),
926 path: path.to_owned(),
927 canonical_query: canonical_query.map(str::to_owned),
928 content_type: content_type.clone(),
929 body: body.clone().unwrap_or_default(),
930 canonical_request,
931 };
932 for (name, value) in authorizer.authorize(&authorization_request)? {
933 header_map.insert(
934 HeaderName::from_bytes(name.as_bytes()).map_err(|error| {
935 ClientError::BadInput(format!("invalid header name '{name}': {error}"))
936 })?,
937 HeaderValue::from_str(&value).map_err(|error| {
938 ClientError::BadInput(format!("invalid header value for '{name}': {error}"))
939 })?,
940 );
941 }
942 }
943 for (name, value) in headers {
944 header_map.insert(
945 HeaderName::from_bytes(name.as_bytes()).map_err(|error| {
946 ClientError::BadInput(format!("invalid header name '{name}': {error}"))
947 })?,
948 HeaderValue::from_str(value).map_err(|error| {
949 ClientError::BadInput(format!("invalid header value for '{name}': {error}"))
950 })?,
951 );
952 }
953
954 let mut request = self.http.request(method.clone(), url).headers(header_map);
955 if let Some(body) = body {
956 request = request.body(body);
957 }
958
959 let response = request.send().await?;
960 let status = response.status();
961 let headers = response.headers().clone();
962 let bytes = response.bytes().await?;
963 let response_wire = RuntimeResponseWire {
964 status_code: status.as_u16(),
965 headers: headers_to_runtime(&headers),
966 body: bytes.to_vec(),
967 };
968
969 self.record_request(method.as_str(), path, status, &headers)?;
970
971 Ok(response_wire)
972 }
973
974 async fn request_raw_with_query(
975 &self,
976 method: Method,
977 path: &str,
978 body: Option<Vec<u8>>,
979 canonical_query: Option<&str>,
980 headers: &[HeaderPair<'_>],
981 ) -> Result<RuntimeResponseWire, ClientError> {
982 self.request_raw_with_query_and_accept(method, path, body, canonical_query, headers, None)
983 .await
984 }
985
986 fn record_request(
987 &self,
988 method: &str,
989 path: &str,
990 status: StatusCode,
991 headers: &HeaderMap,
992 ) -> Result<(), ClientError> {
993 self.state_store
994 .append_request_journal(&RequestJournalEntry {
995 method: method.to_owned(),
996 path: path.to_owned(),
997 status_code: status.as_u16(),
998 content_type: headers
999 .get(CONTENT_TYPE)
1000 .and_then(|value| value.to_str().ok())
1001 .map(ToOwned::to_owned),
1002 recorded_at: Utc::now(),
1003 })
1004 }
1005}
1006
1007fn decode_typed_response<C, Output>(
1008 codec: &C,
1009 response: &RuntimeResponseWire,
1010) -> Result<Output, ClientError>
1011where
1012 C: HttpClientCodec,
1013 Output: DeserializeOwned,
1014{
1015 let content_type = response
1016 .headers
1017 .iter()
1018 .find(|header| header.name.eq_ignore_ascii_case("content-type"))
1019 .map(|header| header.value.as_str())
1020 .ok_or_else(|| {
1021 ClientError::InvalidResponse("response is missing Content-Type header".to_owned())
1022 })?;
1023
1024 if (200..=299).contains(&response.status_code) {
1025 codec
1026 .decode_response::<Output>(content_type, &response.body)
1027 .map_err(ClientError::from)
1028 } else {
1029 let error = codec
1030 .decode_response::<CoolErrorResponse>(content_type, &response.body)
1031 .ok();
1032 let message = error
1033 .as_ref()
1034 .map(|value| value.message.clone())
1035 .unwrap_or_else(|| {
1036 format!("unexpected error body for status {}", response.status_code)
1037 });
1038 Err(ClientError::Remote {
1039 status: StatusCode::from_u16(response.status_code)
1040 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1041 error,
1042 message,
1043 })
1044 }
1045}
1046
1047fn decode_json_value_response<C>(
1048 codec: &C,
1049 response: &RuntimeResponseWire,
1050) -> Result<JsonValue, ClientError>
1051where
1052 C: HttpClientCodec,
1053{
1054 decode_typed_response(codec, response)
1055}
1056
1057fn decode_sequence_response<C, Output>(
1058 codec: &C,
1059 response: &RuntimeResponseWire,
1060) -> Result<Vec<Output>, ClientError>
1061where
1062 C: HttpClientCodec,
1063 Output: DeserializeOwned,
1064{
1065 let content_type = response
1066 .headers
1067 .iter()
1068 .find(|header| header.name.eq_ignore_ascii_case("content-type"))
1069 .map(|header| header.value.as_str())
1070 .ok_or_else(|| {
1071 ClientError::InvalidResponse("response is missing Content-Type header".to_owned())
1072 })?;
1073
1074 if (200..=299).contains(&response.status_code) {
1075 codec
1076 .decode_sequence_response::<Output>(content_type, &response.body)
1077 .map_err(ClientError::from)
1078 } else {
1079 let error = if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
1080 decode_cbor_sequence::<CoolErrorResponse>(&response.body)
1081 .ok()
1082 .and_then(|mut values| {
1083 if values.len() == 1 {
1084 values.pop()
1085 } else {
1086 None
1087 }
1088 })
1089 } else {
1090 codec
1091 .decode_response::<CoolErrorResponse>(content_type, &response.body)
1092 .ok()
1093 };
1094 let message = error
1095 .as_ref()
1096 .map(|value| value.message.clone())
1097 .unwrap_or_else(|| {
1098 format!("unexpected error body for status {}", response.status_code)
1099 });
1100 Err(ClientError::Remote {
1101 status: StatusCode::from_u16(response.status_code)
1102 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1103 error,
1104 message,
1105 })
1106 }
1107}
1108
1109fn canonical_query_from_selection(
1110 selection: &SelectionQuery,
1111 extra_query: &[QueryPair<'_>],
1112) -> Result<Option<String>, ClientError> {
1113 let mut query: Vec<(String, String)> = Vec::new();
1114 if !selection.fields.is_empty() {
1115 query.push(("fields".to_owned(), selection.fields.join(",")));
1116 }
1117 if !selection.includes.is_empty() {
1118 query.push(("include".to_owned(), selection.includes.join(",")));
1119 }
1120 for (include, fields) in &selection.include_fields {
1121 if !fields.is_empty() {
1122 query.push((format!("includeFields[{include}]"), fields.join(",")));
1123 }
1124 }
1125 for (key, value) in extra_query {
1126 if *key == "fields" || *key == "include" || key.starts_with("includeFields[") {
1127 return Err(ClientError::BadInput(format!(
1128 "projection query parameter '{key}' must come from SelectionQuery, not extra_query"
1129 )));
1130 }
1131 query.push(((*key).to_owned(), (*value).to_owned()));
1132 }
1133 if query.is_empty() {
1134 return Ok(None);
1135 }
1136 serde_urlencoded::to_string(&query)
1137 .map(Some)
1138 .map_err(|error| ClientError::BadInput(format!("invalid selection query: {error}")))
1139}
1140
1141fn headers_to_runtime(headers: &HeaderMap) -> Vec<RuntimeHeader> {
1142 headers
1143 .iter()
1144 .filter_map(|(name, value)| {
1145 value.to_str().ok().map(|value| RuntimeHeader {
1146 name: name.as_str().to_owned(),
1147 value: value.to_owned(),
1148 })
1149 })
1150 .collect()
1151}
1152
1153fn build_url(
1154 base_url: &Url,
1155 path: &str,
1156 canonical_query: Option<&str>,
1157) -> Result<Url, ClientError> {
1158 let mut base = base_url.clone();
1159 if !base.path().ends_with('/') {
1160 let next_path = format!("{}/", base.path());
1161 base.set_path(&next_path);
1162 }
1163 let mut url = base.join(path.trim_start_matches('/')).map_err(|error| {
1164 ClientError::InvalidResponse(format!(
1165 "failed to resolve path '{path}' against {}: {error}",
1166 base
1167 ))
1168 })?;
1169 match canonical_query {
1170 Some(query) if !query.is_empty() => url.set_query(Some(query)),
1171 _ => url.set_query(None),
1172 }
1173 Ok(url)
1174}
1175
1176fn media_type_matches(candidate: &str, expected: &str) -> bool {
1177 candidate.split(';').next().unwrap_or(candidate).trim() == expected
1178}
1179
1180fn decode_cbor_sequence<T>(bytes: &[u8]) -> Result<Vec<T>, CoolError>
1181where
1182 T: DeserializeOwned,
1183{
1184 let mut values = Vec::new();
1185 let mut offset = 0usize;
1186 while offset < bytes.len() {
1187 let mut deserializer = minicbor_serde::Deserializer::new(&bytes[offset..]);
1188 values.push(T::deserialize(&mut deserializer).map_err(|error| {
1189 CoolError::Codec(format!("failed to decode CBOR sequence body: {error}"))
1190 })?);
1191 let consumed = deserializer.decoder().position();
1192 if consumed == 0 {
1193 return Err(CoolError::Codec(
1194 "failed to decode CBOR sequence body: decoder made no progress".to_owned(),
1195 ));
1196 }
1197 offset += consumed;
1198 }
1199 Ok(values)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204 use std::path::PathBuf;
1205
1206 use super::{
1207 ClientStateStore, JsonFileStateStore, PersistedClientState, RequestJournalEntry,
1208 RuntimeCodecConfig, RuntimeConfigWire, RuntimeEnvelopeConfig, RuntimeErrorCode,
1209 RuntimeHandle, RuntimeRequestWire, RuntimeStateStoreConfig, RuntimeTransportConfig,
1210 };
1211
1212 #[test]
1213 fn json_file_store_round_trips_state_under_project_tmp() {
1214 let path = project_tmp_path("state-store-unit.json");
1215 if path.exists() {
1216 std::fs::remove_file(&path).expect("existing tmp file should be removable");
1217 }
1218
1219 let store = JsonFileStateStore::new(&path);
1220 store
1221 .append_request_journal(&RequestJournalEntry {
1222 method: "GET".to_owned(),
1223 path: "/posts".to_owned(),
1224 status_code: 200,
1225 content_type: Some("application/cbor".to_owned()),
1226 recorded_at: chrono::Utc::now(),
1227 })
1228 .expect("journal entry should append");
1229
1230 let loaded = store.load().expect("state should load");
1231 assert_eq!(loaded.schema_version, 1);
1232 assert_eq!(loaded.state_version, 1);
1233 assert_eq!(loaded.request_journal.len(), 1);
1234
1235 std::fs::remove_file(&path).expect("tmp file should be removable");
1236 }
1237
1238 #[test]
1239 fn runtime_handle_rejects_invalid_method_without_running_http() {
1240 let handle = RuntimeHandle::new(RuntimeConfigWire {
1241 base_url: "http://127.0.0.1:1/".to_owned(),
1242 state_store: RuntimeStateStoreConfig::InMemory,
1243 transport: RuntimeTransportConfig::default(),
1244 })
1245 .expect("runtime handle should build");
1246
1247 let error = handle
1248 .execute(RuntimeRequestWire {
1249 method: "BAD METHOD".to_owned(),
1250 path: "/posts".to_owned(),
1251 canonical_query: None,
1252 headers: Vec::new(),
1253 body: Vec::new(),
1254 })
1255 .expect_err("invalid method should fail before transport");
1256
1257 assert_eq!(error.code as u32, super::RuntimeErrorCode::BadInput as u32);
1258 }
1259
1260 #[test]
1261 fn persisted_state_defaults_missing_state_version() {
1262 let state: PersistedClientState =
1263 serde_json::from_str(r#"{"schema_version":1,"request_journal":[]}"#)
1264 .expect("legacy state should decode");
1265
1266 assert_eq!(state.state_version, 0);
1267 }
1268
1269 #[test]
1270 fn runtime_handle_rejects_unsupported_envelope_config() {
1271 let result = RuntimeHandle::new(RuntimeConfigWire {
1272 base_url: "http://127.0.0.1:1/".to_owned(),
1273 state_store: RuntimeStateStoreConfig::InMemory,
1274 transport: RuntimeTransportConfig {
1275 codec: RuntimeCodecConfig::Cbor,
1276 envelope: RuntimeEnvelopeConfig::CoseSign1,
1277 },
1278 });
1279
1280 let error = match result {
1281 Ok(_) => panic!("unsupported envelope should fail"),
1282 Err(error) => error,
1283 };
1284
1285 assert_eq!(error.code, RuntimeErrorCode::BadInput);
1286 }
1287
1288 fn project_tmp_path(file_name: &str) -> PathBuf {
1289 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1290 .join("../../tmp/client-rust-tests")
1291 .join(file_name)
1292 }
1293}