metio/client/
mod.rs

1/*
2 * Copyright 2024 Bagaluten GmbH <contact@bagaluten.email>
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *   http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17pub mod error;
18
19use tracing::Instrument;
20
21use self::error::Error;
22use crate::types::Event;
23use async_nats as nats;
24
25#[derive(Clone, Default, Debug)]
26pub struct Config {
27    pub host: String,
28    pub prefix: Option<String>,
29}
30
31/// The Client struct holds the information to which metio cluster we are currently talking.
32#[derive(Debug, Clone)]
33pub struct Metio {
34    client: nats::Client,
35    prefix: Option<String>,
36}
37
38impl Metio {
39    /// This function returns the underlying NATS client.
40    pub fn get_underlying(&self) -> nats::Client {
41        self.client.clone()
42    }
43
44    /// Publish a list of events to a subject.
45    pub async fn publish<I>(&self, subject: String, data: I) -> Result<(), Error>
46    where
47        I: IntoIterator<Item = Event>,
48    {
49        let subject = match &self.prefix {
50            Some(prefix) => format!("{}.{}", prefix, subject),
51            None => subject,
52        };
53        let mut failed_events: Vec<(Event, String)> = Vec::new();
54        for event in data {
55            let res: Result<(), String> = async {
56                let bytes = serde_json::to_vec(&event).map_err(|e| e.to_string())?;
57                self.internal_publish(&subject, bytes).await?;
58                Ok(())
59            }
60            .instrument(tracing::trace_span!("publish", event_id = event.event_id))
61            .await;
62
63            if let Err(e) = res {
64                failed_events.push((event, e));
65            }
66        }
67
68        if !failed_events.is_empty() {
69            return Err(Error::new_with_related(
70                error::Kind::Send,
71                format!("Failed to publish events: {:?}", failed_events),
72                failed_events.into_iter().map(|(_, e)| e).collect(),
73            ));
74        }
75
76        Ok(())
77    }
78
79    #[tracing::instrument(level = "trace", skip(data))]
80    async fn internal_publish(&self, subject: &String, data: Vec<u8>) -> Result<(), String> {
81        self.client
82            .publish(subject.clone(), data.into())
83            .await
84            .map_err(|e| e.to_string())?;
85
86        self.client.flush().await.map_err(|e| e.to_string())?;
87        Ok(())
88    }
89}
90
91/// Connect to a Metio Server.
92/// If the connection was successfull a client will be returned.
93///
94/// # Example
95/// ```no_run
96/// use metio::client::{connect, Config, error::Error};
97/// # async fn example() -> Result<(), Error> {
98/// let cfg = Config::default();
99/// let client = connect(cfg).await?;
100/// // Do something with the client
101/// # Ok(())
102/// # }
103/// ```
104pub async fn connect<C>(cfg: C) -> Result<Metio, error::Error>
105where
106    C: Into<Config>,
107{
108    let cfg = cfg.into();
109
110    let client = nats::connect(&cfg.host)
111        .await
112        .map_err(|e| error::Error::new(error::Kind::Connect, e.to_string()))?;
113
114    tracing::info!("Connecting to server with config: {:?}", cfg);
115
116    Ok(Metio {
117        client,
118        prefix: cfg.prefix,
119    })
120}
121
122#[macro_export]
123macro_rules! connect {
124    ($host:expr) => {
125        $crate::client::connect($crate::client::Config {
126            host: $host.to_string(),
127            prefix: None,
128        })
129    };
130    ($host:expr, $prefix:expr) => {
131        $crate::client::connect($crate::client::Config {
132            host: $host.to_string(),
133            prefix: Some($prefix.to_string()),
134        })
135    };
136}