Skip to main content

cratestack_client_rust/
lib.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6pub use cratestack_codec_cbor::CborCodec;
7pub use cratestack_codec_json::JsonCodec;
8use cratestack_core::{
9    CoolCodec, CoolError, CoolErrorResponse, Page, SelectionQuery, canonical_request_string,
10};
11use reqwest::header::{ACCEPT, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
12use reqwest::{Method, StatusCode, Url};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use serde_json::Value as JsonValue;
16
17const BRIDGE_CONTENT_TYPE: &str = "application/json";
18const CBOR_SEQUENCE_CONTENT_TYPE: &str = "application/cbor-seq";
19
20pub trait Projection {
21    type Output;
22
23    fn selection_query(&self) -> SelectionQuery;
24
25    fn decode_one(&self, value: JsonValue) -> Result<Self::Output, CoolError>;
26
27    fn decode_many(&self, value: JsonValue) -> Result<Vec<Self::Output>, CoolError> {
28        match value {
29            JsonValue::Array(values) => values
30                .into_iter()
31                .map(|value| self.decode_one(value))
32                .collect(),
33            other => Err(CoolError::Internal(format!(
34                "projected list payload must be an array, got {other:?}"
35            ))),
36        }
37    }
38
39    fn decode_page(&self, value: JsonValue) -> Result<Page<Self::Output>, CoolError> {
40        let page = serde_json::from_value::<Page<JsonValue>>(value).map_err(|error| {
41            CoolError::Codec(format!("failed to decode projected page payload: {error}"))
42        })?;
43        let items = page
44            .items
45            .into_iter()
46            .map(|value| self.decode_one(value))
47            .collect::<Result<Vec<_>, _>>()?;
48        Ok(Page::new(items, page.page_info).with_total_count(page.total_count))
49    }
50}
51
52impl Projection for SelectionQuery {
53    type Output = JsonValue;
54
55    fn selection_query(&self) -> SelectionQuery {
56        self.clone()
57    }
58
59    fn decode_one(&self, value: JsonValue) -> Result<Self::Output, CoolError> {
60        Ok(value)
61    }
62}
63
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
65pub enum RuntimeCodecConfig {
66    #[default]
67    Cbor,
68    Json,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
72pub enum RuntimeEnvelopeConfig {
73    #[default]
74    None,
75    CoseSign1,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
79pub struct RuntimeTransportConfig {
80    pub codec: RuntimeCodecConfig,
81    pub envelope: RuntimeEnvelopeConfig,
82}
83
84pub trait HttpClientCodec: CoolCodec {
85    fn accept_header_value(&self) -> &'static str;
86
87    fn sequence_accept_header_value(&self) -> &'static str;
88
89    fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
90    where
91        T: DeserializeOwned;
92
93    fn decode_sequence_response<T>(
94        &self,
95        content_type: &str,
96        body: &[u8],
97    ) -> Result<Vec<T>, CoolError>
98    where
99        T: DeserializeOwned;
100}
101
102impl HttpClientCodec for CborCodec {
103    fn accept_header_value(&self) -> &'static str {
104        "application/cbor, application/json"
105    }
106
107    fn sequence_accept_header_value(&self) -> &'static str {
108        "application/cbor-seq, application/cbor, application/json"
109    }
110
111    fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
112    where
113        T: DeserializeOwned,
114    {
115        if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
116            self.decode(body)
117        } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
118            JsonCodec.decode(body)
119        } else {
120            Err(CoolError::Codec(format!(
121                "unsupported response Content-Type {content_type}"
122            )))
123        }
124    }
125
126    fn decode_sequence_response<T>(
127        &self,
128        content_type: &str,
129        body: &[u8],
130    ) -> Result<Vec<T>, CoolError>
131    where
132        T: DeserializeOwned,
133    {
134        if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
135            decode_cbor_sequence(body)
136        } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
137            self.decode(body)
138        } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
139            JsonCodec.decode(body)
140        } else {
141            Err(CoolError::Codec(format!(
142                "unsupported response Content-Type {content_type}"
143            )))
144        }
145    }
146}
147
148impl HttpClientCodec for JsonCodec {
149    fn accept_header_value(&self) -> &'static str {
150        "application/json, application/cbor"
151    }
152
153    fn sequence_accept_header_value(&self) -> &'static str {
154        "application/cbor-seq, application/json, application/cbor"
155    }
156
157    fn decode_response<T>(&self, content_type: &str, body: &[u8]) -> Result<T, CoolError>
158    where
159        T: DeserializeOwned,
160    {
161        if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
162            self.decode(body)
163        } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
164            CborCodec.decode(body)
165        } else {
166            Err(CoolError::Codec(format!(
167                "unsupported response Content-Type {content_type}"
168            )))
169        }
170    }
171
172    fn decode_sequence_response<T>(
173        &self,
174        content_type: &str,
175        body: &[u8],
176    ) -> Result<Vec<T>, CoolError>
177    where
178        T: DeserializeOwned,
179    {
180        if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
181            decode_cbor_sequence(body)
182        } else if media_type_matches(content_type, JsonCodec::CONTENT_TYPE) {
183            self.decode(body)
184        } else if media_type_matches(content_type, CborCodec::CONTENT_TYPE) {
185            CborCodec.decode(body)
186        } else {
187            Err(CoolError::Codec(format!(
188                "unsupported response Content-Type {content_type}"
189            )))
190        }
191    }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub struct RequestJournalEntry {
196    pub method: String,
197    pub path: String,
198    pub status_code: u16,
199    pub content_type: Option<String>,
200    pub recorded_at: DateTime<Utc>,
201}
202
203fn default_schema_version() -> u32 {
204    1
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208pub struct PersistedClientState {
209    #[serde(default = "default_schema_version")]
210    pub schema_version: u32,
211    #[serde(default)]
212    pub state_version: u64,
213    #[serde(default)]
214    pub request_journal: Vec<RequestJournalEntry>,
215}
216
217impl Default for PersistedClientState {
218    fn default() -> Self {
219        Self {
220            schema_version: default_schema_version(),
221            state_version: 0,
222            request_journal: Vec::new(),
223        }
224    }
225}
226
227pub trait ClientStateStore: Send + Sync {
228    fn load(&self) -> Result<PersistedClientState, ClientError>;
229    fn save(&self, state: &PersistedClientState) -> Result<(), ClientError>;
230
231    fn append_request_journal(&self, entry: &RequestJournalEntry) -> Result<(), ClientError> {
232        let mut state = self.load()?;
233        state.request_journal.push(entry.clone());
234        state.state_version = state.state_version.saturating_add(1);
235        self.save(&state)
236    }
237}
238
239#[derive(Debug, Default)]
240pub struct InMemoryStateStore {
241    state: Mutex<PersistedClientState>,
242}
243
244impl ClientStateStore for InMemoryStateStore {
245    fn load(&self) -> Result<PersistedClientState, ClientError> {
246        self.state
247            .lock()
248            .map_err(|error| ClientError::State(format!("failed to lock state store: {error}")))
249            .map(|state| state.clone())
250    }
251
252    fn save(&self, state: &PersistedClientState) -> Result<(), ClientError> {
253        let mut guard = self
254            .state
255            .lock()
256            .map_err(|error| ClientError::State(format!("failed to lock state store: {error}")))?;
257        *guard = state.clone();
258        Ok(())
259    }
260}
261
262#[derive(Debug, Clone)]
263pub struct JsonFileStateStore {
264    path: PathBuf,
265}
266
267impl JsonFileStateStore {
268    pub fn new(path: impl Into<PathBuf>) -> Self {
269        Self { path: path.into() }
270    }
271
272    pub fn path(&self) -> &Path {
273        &self.path
274    }
275}
276
277impl ClientStateStore for JsonFileStateStore {
278    fn load(&self) -> Result<PersistedClientState, ClientError> {
279        match fs::read(&self.path) {
280            Ok(bytes) => serde_json::from_slice(&bytes).map_err(|error| {
281                ClientError::State(format!(
282                    "failed to decode state file {}: {error}",
283                    self.path.display()
284                ))
285            }),
286            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
287                Ok(PersistedClientState::default())
288            }
289            Err(error) => Err(ClientError::State(format!(
290                "failed to read state file {}: {error}",
291                self.path.display()
292            ))),
293        }
294    }
295
296    fn save(&self, state: &PersistedClientState) -> Result<(), ClientError> {
297        if let Some(parent) = self.path.parent() {
298            fs::create_dir_all(parent).map_err(|error| {
299                ClientError::State(format!(
300                    "failed to create state directory {}: {error}",
301                    parent.display()
302                ))
303            })?;
304        }
305        let bytes = serde_json::to_vec_pretty(state).map_err(|error| {
306            ClientError::State(format!(
307                "failed to encode state file {}: {error}",
308                self.path.display()
309            ))
310        })?;
311        fs::write(&self.path, bytes).map_err(|error| {
312            ClientError::State(format!(
313                "failed to write state file {}: {error}",
314                self.path.display()
315            ))
316        })
317    }
318}
319
320#[derive(Debug, Clone)]
321pub struct ClientConfig {
322    pub base_url: Url,
323}
324
325impl ClientConfig {
326    pub fn new(base_url: Url) -> Self {
327        Self { base_url }
328    }
329}
330
331#[derive(Debug, thiserror::Error)]
332pub enum ClientError {
333    #[error("transport error: {0}")]
334    Transport(#[from] reqwest::Error),
335    #[error("codec error: {0}")]
336    Codec(#[from] CoolError),
337    #[error("state error: {0}")]
338    State(String),
339    #[error("invalid response: {0}")]
340    InvalidResponse(String),
341    #[error("bad input: {0}")]
342    BadInput(String),
343    #[error("remote call failed with status {status}: {message}")]
344    Remote {
345        status: StatusCode,
346        error: Option<CoolErrorResponse>,
347        message: String,
348    },
349}
350
351pub type HeaderPair<'a> = (&'a str, &'a str);
352pub type QueryPair<'a> = (&'a str, &'a str);
353
354#[derive(Debug, Clone, PartialEq, Eq)]
355pub struct AuthorizationRequest {
356    pub method: String,
357    pub path: String,
358    pub canonical_query: Option<String>,
359    pub content_type: Option<String>,
360    pub body: Vec<u8>,
361    pub canonical_request: String,
362}
363
364pub trait RequestAuthorizer: Send + Sync {
365    fn authorize(
366        &self,
367        request: &AuthorizationRequest,
368    ) -> Result<Vec<(String, String)>, ClientError>;
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
372pub struct RuntimeHeader {
373    pub name: String,
374    pub value: String,
375}
376
377#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
378pub struct RuntimeRequestWire {
379    pub method: String,
380    pub path: String,
381    pub canonical_query: Option<String>,
382    pub headers: Vec<RuntimeHeader>,
383    pub body: Vec<u8>,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
387pub struct RuntimeResponseWire {
388    pub status_code: u16,
389    pub headers: Vec<RuntimeHeader>,
390    pub body: Vec<u8>,
391}
392
393#[repr(u32)]
394#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
395pub enum RuntimeErrorCode {
396    Transport = 1,
397    Codec = 2,
398    State = 3,
399    InvalidResponse = 4,
400    Remote = 5,
401    BadInput = 6,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
405pub struct RuntimeErrorWire {
406    pub code: RuntimeErrorCode,
407    pub http_status: Option<u16>,
408    pub message: String,
409    pub remote_code: Option<String>,
410    pub remote_body: Option<Vec<u8>>,
411}
412
413impl From<ClientError> for RuntimeErrorWire {
414    fn from(value: ClientError) -> Self {
415        match value {
416            ClientError::Transport(error) => Self {
417                code: RuntimeErrorCode::Transport,
418                http_status: None,
419                message: error.to_string(),
420                remote_code: None,
421                remote_body: None,
422            },
423            ClientError::Codec(error) => Self {
424                code: RuntimeErrorCode::Codec,
425                http_status: Some(error.status_code().as_u16()),
426                message: error.to_string(),
427                remote_code: Some(error.code().to_owned()),
428                remote_body: None,
429            },
430            ClientError::State(message) => Self {
431                code: RuntimeErrorCode::State,
432                http_status: None,
433                message,
434                remote_code: None,
435                remote_body: None,
436            },
437            ClientError::InvalidResponse(message) => Self {
438                code: RuntimeErrorCode::InvalidResponse,
439                http_status: None,
440                message,
441                remote_code: None,
442                remote_body: None,
443            },
444            ClientError::BadInput(message) => Self {
445                code: RuntimeErrorCode::BadInput,
446                http_status: None,
447                message,
448                remote_code: None,
449                remote_body: None,
450            },
451            ClientError::Remote {
452                status,
453                error,
454                message,
455            } => Self {
456                code: RuntimeErrorCode::Remote,
457                http_status: Some(status.as_u16()),
458                remote_code: error.as_ref().map(|value| value.code.clone()),
459                remote_body: error
460                    .as_ref()
461                    .and_then(|value| serde_json::to_vec(value).ok()),
462                message,
463            },
464        }
465    }
466}
467
468#[derive(Debug, Clone, PartialEq, Eq)]
469pub enum RuntimeStateStoreConfig {
470    InMemory,
471    JsonFile { path: PathBuf },
472}
473
474#[derive(Debug, Clone, PartialEq, Eq)]
475pub struct RuntimeConfigWire {
476    pub base_url: String,
477    pub state_store: RuntimeStateStoreConfig,
478    pub transport: RuntimeTransportConfig,
479}
480
481pub struct RuntimeHandle {
482    runtime: tokio::runtime::Runtime,
483    client: RuntimeTransportClient,
484}
485
486enum RuntimeTransportClient {
487    Cbor(CratestackClient<CborCodec>),
488    Json(CratestackClient<JsonCodec>),
489}
490
491impl RuntimeHandle {
492    pub fn new(config: RuntimeConfigWire) -> Result<Self, RuntimeErrorWire> {
493        let base_url = Url::parse(&config.base_url).map_err(|error| RuntimeErrorWire {
494            code: RuntimeErrorCode::BadInput,
495            http_status: None,
496            message: format!("invalid base URL '{}': {error}", config.base_url),
497            remote_code: None,
498            remote_body: None,
499        })?;
500        let state_store: Arc<dyn ClientStateStore> = match config.state_store {
501            RuntimeStateStoreConfig::InMemory => Arc::new(InMemoryStateStore::default()),
502            RuntimeStateStoreConfig::JsonFile { path } => Arc::new(JsonFileStateStore::new(path)),
503        };
504        if config.transport.envelope != RuntimeEnvelopeConfig::None {
505            return Err(RuntimeErrorWire {
506                code: RuntimeErrorCode::BadInput,
507                http_status: None,
508                message: "COSE envelope support is not implemented yet".to_owned(),
509                remote_code: None,
510                remote_body: None,
511            });
512        }
513        let client = match config.transport.codec {
514            RuntimeCodecConfig::Cbor => RuntimeTransportClient::Cbor(
515                CratestackClient::new(ClientConfig::new(base_url.clone()), CborCodec)
516                    .with_state_store(state_store.clone()),
517            ),
518            RuntimeCodecConfig::Json => RuntimeTransportClient::Json(
519                CratestackClient::new(ClientConfig::new(base_url), JsonCodec)
520                    .with_state_store(state_store),
521            ),
522        };
523        let runtime = tokio::runtime::Builder::new_current_thread()
524            .enable_all()
525            .build()
526            .map_err(|error| RuntimeErrorWire {
527                code: RuntimeErrorCode::State,
528                http_status: None,
529                message: format!("failed to build runtime: {error}"),
530                remote_code: None,
531                remote_body: None,
532            })?;
533
534        Ok(Self { runtime, client })
535    }
536
537    pub fn execute(
538        &self,
539        request: RuntimeRequestWire,
540    ) -> Result<RuntimeResponseWire, RuntimeErrorWire> {
541        self.runtime
542            .block_on(self.client.execute_raw(request))
543            .map_err(RuntimeErrorWire::from)
544    }
545
546    pub fn state(&self) -> Result<PersistedClientState, ClientError> {
547        self.client.state()
548    }
549}
550
551fn replace_bridge_content_type(headers: &mut Vec<RuntimeHeader>) {
552    headers.retain(|header| !header.name.eq_ignore_ascii_case("content-type"));
553    headers.push(RuntimeHeader {
554        name: "content-type".to_owned(),
555        value: BRIDGE_CONTENT_TYPE.to_owned(),
556    });
557}
558
559impl RuntimeTransportClient {
560    async fn execute_raw(
561        &self,
562        request: RuntimeRequestWire,
563    ) -> Result<RuntimeResponseWire, ClientError> {
564        let request = self.bridge_request_to_transport(request)?;
565        match self {
566            Self::Cbor(client) => client.execute_raw_transport(request).await,
567            Self::Json(client) => client.execute_raw_transport(request).await,
568        }
569        .and_then(|response| self.transport_response_to_bridge(response))
570    }
571
572    fn bridge_request_to_transport(
573        &self,
574        request: RuntimeRequestWire,
575    ) -> Result<RuntimeRequestWire, ClientError> {
576        if request.body.is_empty() {
577            return Ok(request);
578        }
579
580        let value: JsonValue = serde_json::from_slice(&request.body).map_err(|error| {
581            ClientError::BadInput(format!("invalid bridge payload JSON: {error}"))
582        })?;
583        let body = match self {
584            Self::Cbor(_) => CborCodec.encode(&value)?,
585            Self::Json(_) => JsonCodec.encode(&value)?,
586        };
587
588        Ok(RuntimeRequestWire { body, ..request })
589    }
590
591    fn transport_response_to_bridge(
592        &self,
593        mut response: RuntimeResponseWire,
594    ) -> Result<RuntimeResponseWire, ClientError> {
595        if response.body.is_empty() {
596            replace_bridge_content_type(&mut response.headers);
597            return Ok(response);
598        }
599
600        let value = match self {
601            Self::Cbor(_) => CborCodec.decode::<JsonValue>(&response.body)?,
602            Self::Json(_) => JsonCodec.decode::<JsonValue>(&response.body)?,
603        };
604
605        response.body = serde_json::to_vec(&value).map_err(|error| {
606            ClientError::InvalidResponse(format!("failed to encode bridge payload JSON: {error}"))
607        })?;
608        replace_bridge_content_type(&mut response.headers);
609        Ok(response)
610    }
611
612    fn state(&self) -> Result<PersistedClientState, ClientError> {
613        match self {
614            Self::Cbor(client) => client.state(),
615            Self::Json(client) => client.state(),
616        }
617    }
618}
619
620#[derive(Clone)]
621pub struct CratestackClient<C = CborCodec> {
622    http: reqwest::Client,
623    config: ClientConfig,
624    codec: C,
625    state_store: Arc<dyn ClientStateStore>,
626    request_authorizer: Option<Arc<dyn RequestAuthorizer>>,
627}
628
629impl CratestackClient<CborCodec> {
630    pub fn cbor(config: ClientConfig) -> Self {
631        Self::new(config, CborCodec)
632    }
633}
634
635impl<C> CratestackClient<C>
636where
637    C: HttpClientCodec,
638{
639    pub fn new(config: ClientConfig, codec: C) -> Self {
640        Self {
641            http: reqwest::Client::new(),
642            config,
643            codec,
644            state_store: Arc::new(InMemoryStateStore::default()),
645            request_authorizer: None,
646        }
647    }
648
649    pub fn with_http_client(config: ClientConfig, codec: C, http: reqwest::Client) -> Self {
650        Self {
651            http,
652            config,
653            codec,
654            state_store: Arc::new(InMemoryStateStore::default()),
655            request_authorizer: None,
656        }
657    }
658
659    pub fn with_state_store(mut self, state_store: Arc<dyn ClientStateStore>) -> Self {
660        self.state_store = state_store;
661        self
662    }
663
664    pub fn with_optional_state_store(self, state_store: Option<Arc<dyn ClientStateStore>>) -> Self {
665        match state_store {
666            Some(state_store) => self.with_state_store(state_store),
667            None => self,
668        }
669    }
670
671    pub fn with_request_authorizer(
672        mut self,
673        request_authorizer: Arc<dyn RequestAuthorizer>,
674    ) -> Self {
675        self.request_authorizer = Some(request_authorizer);
676        self
677    }
678
679    pub fn state(&self) -> Result<PersistedClientState, ClientError> {
680        self.state_store.load()
681    }
682
683    pub async fn get<Output>(
684        &self,
685        path: &str,
686        query: &[QueryPair<'_>],
687        headers: &[HeaderPair<'_>],
688    ) -> Result<Output, ClientError>
689    where
690        Output: DeserializeOwned,
691    {
692        let response = self
693            .request_raw(Method::GET, path, None, query, headers)
694            .await?;
695        decode_typed_response(&self.codec, &response)
696    }
697
698    pub async fn get_view<P>(
699        &self,
700        path: &str,
701        projection: &P,
702        headers: &[HeaderPair<'_>],
703    ) -> Result<P::Output, ClientError>
704    where
705        P: Projection,
706    {
707        let selection = projection.selection_query();
708        let canonical_query = canonical_query_from_selection(&selection, &[])?;
709        let response = self
710            .request_raw_with_query_and_accept(
711                Method::GET,
712                path,
713                None,
714                canonical_query.as_deref(),
715                headers,
716                Some(JsonCodec::CONTENT_TYPE),
717            )
718            .await?;
719        let value = decode_json_value_response(&JsonCodec, &response)?;
720        projection.decode_one(value).map_err(ClientError::from)
721    }
722
723    pub async fn list_view<P>(
724        &self,
725        path: &str,
726        projection: &P,
727        extra_query: &[QueryPair<'_>],
728        headers: &[HeaderPair<'_>],
729    ) -> Result<Vec<P::Output>, ClientError>
730    where
731        P: Projection,
732    {
733        let selection = projection.selection_query();
734        let canonical_query = canonical_query_from_selection(&selection, extra_query)?;
735        let response = self
736            .request_raw_with_query_and_accept(
737                Method::GET,
738                path,
739                None,
740                canonical_query.as_deref(),
741                headers,
742                Some(JsonCodec::CONTENT_TYPE),
743            )
744            .await?;
745        let value = decode_json_value_response(&JsonCodec, &response)?;
746        projection.decode_many(value).map_err(ClientError::from)
747    }
748
749    pub async fn list_view_paged<P>(
750        &self,
751        path: &str,
752        projection: &P,
753        extra_query: &[QueryPair<'_>],
754        headers: &[HeaderPair<'_>],
755    ) -> Result<Page<P::Output>, ClientError>
756    where
757        P: Projection,
758    {
759        let selection = projection.selection_query();
760        let canonical_query = canonical_query_from_selection(&selection, extra_query)?;
761        let response = self
762            .request_raw_with_query_and_accept(
763                Method::GET,
764                path,
765                None,
766                canonical_query.as_deref(),
767                headers,
768                Some(JsonCodec::CONTENT_TYPE),
769            )
770            .await?;
771        let value = decode_json_value_response(&JsonCodec, &response)?;
772        projection.decode_page(value).map_err(ClientError::from)
773    }
774
775    pub async fn post<Input, Output>(
776        &self,
777        path: &str,
778        input: &Input,
779        headers: &[HeaderPair<'_>],
780    ) -> Result<Output, ClientError>
781    where
782        Input: Serialize,
783        Output: DeserializeOwned,
784    {
785        let body = self.codec.encode(input)?;
786        let response = self
787            .request_raw(Method::POST, path, Some(body), &[], headers)
788            .await?;
789        decode_typed_response(&self.codec, &response)
790    }
791
792    pub async fn post_list<Input, Output>(
793        &self,
794        path: &str,
795        input: &Input,
796        headers: &[HeaderPair<'_>],
797    ) -> Result<Vec<Output>, ClientError>
798    where
799        Input: Serialize,
800        Output: DeserializeOwned,
801    {
802        let body = self.codec.encode(input)?;
803        let response = self
804            .request_raw_with_query_and_accept(
805                Method::POST,
806                path,
807                Some(body),
808                None,
809                headers,
810                Some(self.codec.sequence_accept_header_value()),
811            )
812            .await?;
813        decode_sequence_response(&self.codec, &response)
814    }
815
816    pub async fn patch<Input, Output>(
817        &self,
818        path: &str,
819        input: &Input,
820        headers: &[HeaderPair<'_>],
821    ) -> Result<Output, ClientError>
822    where
823        Input: Serialize,
824        Output: DeserializeOwned,
825    {
826        let body = self.codec.encode(input)?;
827        let response = self
828            .request_raw(Method::PATCH, path, Some(body), &[], headers)
829            .await?;
830        decode_typed_response(&self.codec, &response)
831    }
832
833    pub async fn delete<Output>(
834        &self,
835        path: &str,
836        headers: &[HeaderPair<'_>],
837    ) -> Result<Output, ClientError>
838    where
839        Output: DeserializeOwned,
840    {
841        let response = self
842            .request_raw(Method::DELETE, path, None, &[], headers)
843            .await?;
844        decode_typed_response(&self.codec, &response)
845    }
846
847    pub async fn execute_raw_transport(
848        &self,
849        request: RuntimeRequestWire,
850    ) -> Result<RuntimeResponseWire, ClientError> {
851        let method = Method::from_bytes(request.method.as_bytes()).map_err(|error| {
852            ClientError::BadInput(format!("invalid HTTP method '{}': {error}", request.method))
853        })?;
854        let header_pairs = request
855            .headers
856            .iter()
857            .map(|header| (header.name.as_str(), header.value.as_str()))
858            .collect::<Vec<_>>();
859        self.request_raw_with_query(
860            method,
861            &request.path,
862            if request.body.is_empty() {
863                None
864            } else {
865                Some(request.body)
866            },
867            request.canonical_query.as_deref(),
868            &header_pairs,
869        )
870        .await
871    }
872
873    async fn request_raw(
874        &self,
875        method: Method,
876        path: &str,
877        body: Option<Vec<u8>>,
878        query: &[QueryPair<'_>],
879        headers: &[HeaderPair<'_>],
880    ) -> Result<RuntimeResponseWire, ClientError> {
881        let canonical_query =
882            if query.is_empty() {
883                None
884            } else {
885                Some(serde_urlencoded::to_string(query).map_err(|error| {
886                    ClientError::BadInput(format!("invalid query pairs: {error}"))
887                })?)
888            };
889        self.request_raw_with_query(method, path, body, canonical_query.as_deref(), headers)
890            .await
891    }
892
893    async fn request_raw_with_query_and_accept(
894        &self,
895        method: Method,
896        path: &str,
897        body: Option<Vec<u8>>,
898        canonical_query: Option<&str>,
899        headers: &[HeaderPair<'_>],
900        accept_override: Option<&'static str>,
901    ) -> Result<RuntimeResponseWire, ClientError> {
902        let url = build_url(&self.config.base_url, path, canonical_query)?;
903        let mut header_map = HeaderMap::new();
904        header_map.insert(
905            ACCEPT,
906            HeaderValue::from_static(
907                accept_override.unwrap_or_else(|| self.codec.accept_header_value()),
908            ),
909        );
910        let content_type = if body.is_some() {
911            header_map.insert(CONTENT_TYPE, HeaderValue::from_static(C::CONTENT_TYPE));
912            Some(C::CONTENT_TYPE.to_owned())
913        } else {
914            None
915        };
916        if let Some(authorizer) = &self.request_authorizer {
917            let canonical_request = canonical_request_string(
918                method.as_str(),
919                path,
920                canonical_query,
921                content_type.as_deref(),
922                body.as_deref().unwrap_or(&[]),
923            );
924            let authorization_request = AuthorizationRequest {
925                method: method.as_str().to_owned(),
926                path: path.to_owned(),
927                canonical_query: canonical_query.map(str::to_owned),
928                content_type: content_type.clone(),
929                body: body.clone().unwrap_or_default(),
930                canonical_request,
931            };
932            for (name, value) in authorizer.authorize(&authorization_request)? {
933                header_map.insert(
934                    HeaderName::from_bytes(name.as_bytes()).map_err(|error| {
935                        ClientError::BadInput(format!("invalid header name '{name}': {error}"))
936                    })?,
937                    HeaderValue::from_str(&value).map_err(|error| {
938                        ClientError::BadInput(format!("invalid header value for '{name}': {error}"))
939                    })?,
940                );
941            }
942        }
943        for (name, value) in headers {
944            header_map.insert(
945                HeaderName::from_bytes(name.as_bytes()).map_err(|error| {
946                    ClientError::BadInput(format!("invalid header name '{name}': {error}"))
947                })?,
948                HeaderValue::from_str(value).map_err(|error| {
949                    ClientError::BadInput(format!("invalid header value for '{name}': {error}"))
950                })?,
951            );
952        }
953
954        let mut request = self.http.request(method.clone(), url).headers(header_map);
955        if let Some(body) = body {
956            request = request.body(body);
957        }
958
959        let response = request.send().await?;
960        let status = response.status();
961        let headers = response.headers().clone();
962        let bytes = response.bytes().await?;
963        let response_wire = RuntimeResponseWire {
964            status_code: status.as_u16(),
965            headers: headers_to_runtime(&headers),
966            body: bytes.to_vec(),
967        };
968
969        self.record_request(method.as_str(), path, status, &headers)?;
970
971        Ok(response_wire)
972    }
973
974    async fn request_raw_with_query(
975        &self,
976        method: Method,
977        path: &str,
978        body: Option<Vec<u8>>,
979        canonical_query: Option<&str>,
980        headers: &[HeaderPair<'_>],
981    ) -> Result<RuntimeResponseWire, ClientError> {
982        self.request_raw_with_query_and_accept(method, path, body, canonical_query, headers, None)
983            .await
984    }
985
986    fn record_request(
987        &self,
988        method: &str,
989        path: &str,
990        status: StatusCode,
991        headers: &HeaderMap,
992    ) -> Result<(), ClientError> {
993        self.state_store
994            .append_request_journal(&RequestJournalEntry {
995                method: method.to_owned(),
996                path: path.to_owned(),
997                status_code: status.as_u16(),
998                content_type: headers
999                    .get(CONTENT_TYPE)
1000                    .and_then(|value| value.to_str().ok())
1001                    .map(ToOwned::to_owned),
1002                recorded_at: Utc::now(),
1003            })
1004    }
1005}
1006
1007fn decode_typed_response<C, Output>(
1008    codec: &C,
1009    response: &RuntimeResponseWire,
1010) -> Result<Output, ClientError>
1011where
1012    C: HttpClientCodec,
1013    Output: DeserializeOwned,
1014{
1015    let content_type = response
1016        .headers
1017        .iter()
1018        .find(|header| header.name.eq_ignore_ascii_case("content-type"))
1019        .map(|header| header.value.as_str())
1020        .ok_or_else(|| {
1021            ClientError::InvalidResponse("response is missing Content-Type header".to_owned())
1022        })?;
1023
1024    if (200..=299).contains(&response.status_code) {
1025        codec
1026            .decode_response::<Output>(content_type, &response.body)
1027            .map_err(ClientError::from)
1028    } else {
1029        let error = codec
1030            .decode_response::<CoolErrorResponse>(content_type, &response.body)
1031            .ok();
1032        let message = error
1033            .as_ref()
1034            .map(|value| value.message.clone())
1035            .unwrap_or_else(|| {
1036                format!("unexpected error body for status {}", response.status_code)
1037            });
1038        Err(ClientError::Remote {
1039            status: StatusCode::from_u16(response.status_code)
1040                .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1041            error,
1042            message,
1043        })
1044    }
1045}
1046
1047fn decode_json_value_response<C>(
1048    codec: &C,
1049    response: &RuntimeResponseWire,
1050) -> Result<JsonValue, ClientError>
1051where
1052    C: HttpClientCodec,
1053{
1054    decode_typed_response(codec, response)
1055}
1056
1057fn decode_sequence_response<C, Output>(
1058    codec: &C,
1059    response: &RuntimeResponseWire,
1060) -> Result<Vec<Output>, ClientError>
1061where
1062    C: HttpClientCodec,
1063    Output: DeserializeOwned,
1064{
1065    let content_type = response
1066        .headers
1067        .iter()
1068        .find(|header| header.name.eq_ignore_ascii_case("content-type"))
1069        .map(|header| header.value.as_str())
1070        .ok_or_else(|| {
1071            ClientError::InvalidResponse("response is missing Content-Type header".to_owned())
1072        })?;
1073
1074    if (200..=299).contains(&response.status_code) {
1075        codec
1076            .decode_sequence_response::<Output>(content_type, &response.body)
1077            .map_err(ClientError::from)
1078    } else {
1079        let error = if media_type_matches(content_type, CBOR_SEQUENCE_CONTENT_TYPE) {
1080            decode_cbor_sequence::<CoolErrorResponse>(&response.body)
1081                .ok()
1082                .and_then(|mut values| {
1083                    if values.len() == 1 {
1084                        values.pop()
1085                    } else {
1086                        None
1087                    }
1088                })
1089        } else {
1090            codec
1091                .decode_response::<CoolErrorResponse>(content_type, &response.body)
1092                .ok()
1093        };
1094        let message = error
1095            .as_ref()
1096            .map(|value| value.message.clone())
1097            .unwrap_or_else(|| {
1098                format!("unexpected error body for status {}", response.status_code)
1099            });
1100        Err(ClientError::Remote {
1101            status: StatusCode::from_u16(response.status_code)
1102                .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
1103            error,
1104            message,
1105        })
1106    }
1107}
1108
1109fn canonical_query_from_selection(
1110    selection: &SelectionQuery,
1111    extra_query: &[QueryPair<'_>],
1112) -> Result<Option<String>, ClientError> {
1113    let mut query: Vec<(String, String)> = Vec::new();
1114    if !selection.fields.is_empty() {
1115        query.push(("fields".to_owned(), selection.fields.join(",")));
1116    }
1117    if !selection.includes.is_empty() {
1118        query.push(("include".to_owned(), selection.includes.join(",")));
1119    }
1120    for (include, fields) in &selection.include_fields {
1121        if !fields.is_empty() {
1122            query.push((format!("includeFields[{include}]"), fields.join(",")));
1123        }
1124    }
1125    for (key, value) in extra_query {
1126        if *key == "fields" || *key == "include" || key.starts_with("includeFields[") {
1127            return Err(ClientError::BadInput(format!(
1128                "projection query parameter '{key}' must come from SelectionQuery, not extra_query"
1129            )));
1130        }
1131        query.push(((*key).to_owned(), (*value).to_owned()));
1132    }
1133    if query.is_empty() {
1134        return Ok(None);
1135    }
1136    serde_urlencoded::to_string(&query)
1137        .map(Some)
1138        .map_err(|error| ClientError::BadInput(format!("invalid selection query: {error}")))
1139}
1140
1141fn headers_to_runtime(headers: &HeaderMap) -> Vec<RuntimeHeader> {
1142    headers
1143        .iter()
1144        .filter_map(|(name, value)| {
1145            value.to_str().ok().map(|value| RuntimeHeader {
1146                name: name.as_str().to_owned(),
1147                value: value.to_owned(),
1148            })
1149        })
1150        .collect()
1151}
1152
1153fn build_url(
1154    base_url: &Url,
1155    path: &str,
1156    canonical_query: Option<&str>,
1157) -> Result<Url, ClientError> {
1158    let mut base = base_url.clone();
1159    if !base.path().ends_with('/') {
1160        let next_path = format!("{}/", base.path());
1161        base.set_path(&next_path);
1162    }
1163    let mut url = base.join(path.trim_start_matches('/')).map_err(|error| {
1164        ClientError::InvalidResponse(format!(
1165            "failed to resolve path '{path}' against {}: {error}",
1166            base
1167        ))
1168    })?;
1169    match canonical_query {
1170        Some(query) if !query.is_empty() => url.set_query(Some(query)),
1171        _ => url.set_query(None),
1172    }
1173    Ok(url)
1174}
1175
1176fn media_type_matches(candidate: &str, expected: &str) -> bool {
1177    candidate.split(';').next().unwrap_or(candidate).trim() == expected
1178}
1179
1180fn decode_cbor_sequence<T>(bytes: &[u8]) -> Result<Vec<T>, CoolError>
1181where
1182    T: DeserializeOwned,
1183{
1184    let mut values = Vec::new();
1185    let mut offset = 0usize;
1186    while offset < bytes.len() {
1187        let mut deserializer = minicbor_serde::Deserializer::new(&bytes[offset..]);
1188        values.push(T::deserialize(&mut deserializer).map_err(|error| {
1189            CoolError::Codec(format!("failed to decode CBOR sequence body: {error}"))
1190        })?);
1191        let consumed = deserializer.decoder().position();
1192        if consumed == 0 {
1193            return Err(CoolError::Codec(
1194                "failed to decode CBOR sequence body: decoder made no progress".to_owned(),
1195            ));
1196        }
1197        offset += consumed;
1198    }
1199    Ok(values)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use std::path::PathBuf;
1205
1206    use super::{
1207        ClientStateStore, JsonFileStateStore, PersistedClientState, RequestJournalEntry,
1208        RuntimeCodecConfig, RuntimeConfigWire, RuntimeEnvelopeConfig, RuntimeErrorCode,
1209        RuntimeHandle, RuntimeRequestWire, RuntimeStateStoreConfig, RuntimeTransportConfig,
1210    };
1211
1212    #[test]
1213    fn json_file_store_round_trips_state_under_project_tmp() {
1214        let path = project_tmp_path("state-store-unit.json");
1215        if path.exists() {
1216            std::fs::remove_file(&path).expect("existing tmp file should be removable");
1217        }
1218
1219        let store = JsonFileStateStore::new(&path);
1220        store
1221            .append_request_journal(&RequestJournalEntry {
1222                method: "GET".to_owned(),
1223                path: "/posts".to_owned(),
1224                status_code: 200,
1225                content_type: Some("application/cbor".to_owned()),
1226                recorded_at: chrono::Utc::now(),
1227            })
1228            .expect("journal entry should append");
1229
1230        let loaded = store.load().expect("state should load");
1231        assert_eq!(loaded.schema_version, 1);
1232        assert_eq!(loaded.state_version, 1);
1233        assert_eq!(loaded.request_journal.len(), 1);
1234
1235        std::fs::remove_file(&path).expect("tmp file should be removable");
1236    }
1237
1238    #[test]
1239    fn runtime_handle_rejects_invalid_method_without_running_http() {
1240        let handle = RuntimeHandle::new(RuntimeConfigWire {
1241            base_url: "http://127.0.0.1:1/".to_owned(),
1242            state_store: RuntimeStateStoreConfig::InMemory,
1243            transport: RuntimeTransportConfig::default(),
1244        })
1245        .expect("runtime handle should build");
1246
1247        let error = handle
1248            .execute(RuntimeRequestWire {
1249                method: "BAD METHOD".to_owned(),
1250                path: "/posts".to_owned(),
1251                canonical_query: None,
1252                headers: Vec::new(),
1253                body: Vec::new(),
1254            })
1255            .expect_err("invalid method should fail before transport");
1256
1257        assert_eq!(error.code as u32, super::RuntimeErrorCode::BadInput as u32);
1258    }
1259
1260    #[test]
1261    fn persisted_state_defaults_missing_state_version() {
1262        let state: PersistedClientState =
1263            serde_json::from_str(r#"{"schema_version":1,"request_journal":[]}"#)
1264                .expect("legacy state should decode");
1265
1266        assert_eq!(state.state_version, 0);
1267    }
1268
1269    #[test]
1270    fn runtime_handle_rejects_unsupported_envelope_config() {
1271        let result = RuntimeHandle::new(RuntimeConfigWire {
1272            base_url: "http://127.0.0.1:1/".to_owned(),
1273            state_store: RuntimeStateStoreConfig::InMemory,
1274            transport: RuntimeTransportConfig {
1275                codec: RuntimeCodecConfig::Cbor,
1276                envelope: RuntimeEnvelopeConfig::CoseSign1,
1277            },
1278        });
1279
1280        let error = match result {
1281            Ok(_) => panic!("unsupported envelope should fail"),
1282            Err(error) => error,
1283        };
1284
1285        assert_eq!(error.code, RuntimeErrorCode::BadInput);
1286    }
1287
1288    fn project_tmp_path(file_name: &str) -> PathBuf {
1289        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1290            .join("../../tmp/client-rust-tests")
1291            .join(file_name)
1292    }
1293}