1pub mod error;
18
19use tracing::Instrument;
20
21use self::error::Error;
22use crate::types::Event;
23use async_nats as nats;
24
25#[derive(Clone, Default, Debug)]
26pub struct Config {
27 pub host: String,
28 pub prefix: Option<String>,
29}
30
31#[derive(Debug, Clone)]
33pub struct Metio {
34 client: nats::Client,
35 prefix: Option<String>,
36}
37
38impl Metio {
39 pub fn get_underlying(&self) -> nats::Client {
41 self.client.clone()
42 }
43
44 pub async fn publish<I>(&self, subject: String, data: I) -> Result<(), Error>
46 where
47 I: IntoIterator<Item = Event>,
48 {
49 let subject = match &self.prefix {
50 Some(prefix) => format!("{}.{}", prefix, subject),
51 None => subject,
52 };
53 let mut failed_events: Vec<(Event, String)> = Vec::new();
54 for event in data {
55 let res: Result<(), String> = async {
56 let bytes = serde_json::to_vec(&event).map_err(|e| e.to_string())?;
57 self.internal_publish(&subject, bytes).await?;
58 Ok(())
59 }
60 .instrument(tracing::trace_span!("publish", event_id = event.event_id))
61 .await;
62
63 if let Err(e) = res {
64 failed_events.push((event, e));
65 }
66 }
67
68 if !failed_events.is_empty() {
69 return Err(Error::new_with_related(
70 error::Kind::Send,
71 format!("Failed to publish events: {:?}", failed_events),
72 failed_events.into_iter().map(|(_, e)| e).collect(),
73 ));
74 }
75
76 Ok(())
77 }
78
79 #[tracing::instrument(level = "trace", skip(data))]
80 async fn internal_publish(&self, subject: &String, data: Vec<u8>) -> Result<(), String> {
81 self.client
82 .publish(subject.clone(), data.into())
83 .await
84 .map_err(|e| e.to_string())?;
85
86 self.client.flush().await.map_err(|e| e.to_string())?;
87 Ok(())
88 }
89}
90
91pub async fn connect<C>(cfg: C) -> Result<Metio, error::Error>
105where
106 C: Into<Config>,
107{
108 let cfg = cfg.into();
109
110 let client = nats::connect(&cfg.host)
111 .await
112 .map_err(|e| error::Error::new(error::Kind::Connect, e.to_string()))?;
113
114 tracing::info!("Connecting to server with config: {:?}", cfg);
115
116 Ok(Metio {
117 client,
118 prefix: cfg.prefix,
119 })
120}
121
122#[macro_export]
123macro_rules! connect {
124 ($host:expr) => {
125 $crate::client::connect($crate::client::Config {
126 host: $host.to_string(),
127 prefix: None,
128 })
129 };
130 ($host:expr, $prefix:expr) => {
131 $crate::client::connect($crate::client::Config {
132 host: $host.to_string(),
133 prefix: Some($prefix.to_string()),
134 })
135 };
136}