1use crate::errors::NyxError;
20use crate::md::StateParameter;
21use crate::time::Epoch;
22use arrow::error::ArrowError;
23use log::debug;
24use parquet::errors::ParquetError;
25use snafu::prelude::*;
26pub(crate) mod watermark;
27use hifitime::Duration;
28use hifitime::prelude::{Format, Formatter};
29use serde::de::DeserializeOwned;
30use serde::{Deserialize, Deserializer};
31use serde::{Serialize, Serializer};
32use serde_yml::Error as YamlError;
33use std::collections::{BTreeMap, HashMap};
34use std::convert::From;
35use std::fmt::Debug;
36use std::fs::File;
37use std::io::BufReader;
38use std::io::Error as IoError;
39use std::path::{Path, PathBuf};
40use std::str::FromStr;
41use typed_builder::TypedBuilder;
42
43pub mod gravity;
45
46use std::io;
47
48#[derive(Clone, Default, Serialize, Deserialize, TypedBuilder)]
50#[builder(doc)]
51pub struct ExportCfg {
52 #[builder(default, setter(strip_option))]
54 pub fields: Option<Vec<StateParameter>>,
55 #[builder(default, setter(strip_option))]
57 pub start_epoch: Option<Epoch>,
58 #[builder(default, setter(strip_option))]
60 pub end_epoch: Option<Epoch>,
61 #[builder(default, setter(strip_option))]
63 pub step: Option<Duration>,
64 #[builder(default, setter(strip_option))]
66 pub metadata: Option<HashMap<String, String>>,
67 #[builder(default)]
69 pub timestamp: bool,
70}
71
72impl ExportCfg {
73 pub fn from_metadata(metadata: Vec<(String, String)>) -> Self {
75 let mut me = ExportCfg {
76 metadata: Some(HashMap::new()),
77 ..Default::default()
78 };
79 for (k, v) in metadata {
80 me.metadata.as_mut().unwrap().insert(k, v);
81 }
82 me
83 }
84
85 pub fn timestamped() -> Self {
87 Self {
88 timestamp: true,
89 ..Default::default()
90 }
91 }
92
93 pub fn append_field(&mut self, field: StateParameter) {
94 if let Some(fields) = self.fields.as_mut() {
95 fields.push(field);
96 } else {
97 self.fields = Some(vec![field]);
98 }
99 }
100
101 pub(crate) fn actual_path<P: AsRef<Path>>(&self, path: P) -> PathBuf {
103 let mut path_buf = path.as_ref().to_path_buf();
104 if self.timestamp
105 && let Some(file_name) = path_buf.file_name()
106 && let Some(file_name_str) = file_name.to_str()
107 && let Some(extension) = path_buf.extension()
108 {
109 let stamp = Formatter::new(
110 Epoch::now().unwrap(),
111 Format::from_str("%Y-%m-%dT%H-%M-%S").unwrap(),
112 );
113 let ext = extension.to_str().unwrap();
114 let file_name = file_name_str.replace(&format!(".{ext}"), "");
115 let new_file_name = format!("{file_name}-{stamp}.{ext}");
116 path_buf.set_file_name(new_file_name);
117 };
118 path_buf
119 }
120}
121
122#[derive(Debug, Snafu)]
123#[snafu(visibility(pub(crate)))]
124pub enum ConfigError {
125 #[snafu(display("failed to read configuration file: {source}"))]
126 ReadError { source: io::Error },
127
128 #[snafu(display("failed to parse YAML configuration file: {source}"))]
129 ParseError { source: serde_yml::Error },
130
131 #[snafu(display("of invalid configuration: {msg}"))]
132 InvalidConfig { msg: String },
133}
134
135impl PartialEq for ConfigError {
136 fn eq(&self, _other: &Self) -> bool {
138 false
139 }
140}
141
142#[derive(Debug, Snafu)]
143#[snafu(visibility(pub(crate)))]
144pub enum InputOutputError {
145 #[snafu(display("{action} encountered i/o error: {source}"))]
146 StdIOError {
147 source: io::Error,
148 action: &'static str,
149 },
150 #[snafu(display("missing required data {which}"))]
151 MissingData { which: String },
152 #[snafu(display("unknown data `{which}`"))]
153 UnsupportedData { which: String },
154 #[snafu(display("{action} encountered a Parquet error: {source}"))]
155 ParquetError {
156 source: ParquetError,
157 action: &'static str,
158 },
159 #[snafu(display("inconsistency detected: {msg}"))]
160 Inconsistency { msg: String },
161 #[snafu(display("{action} encountered an Arrow error: {source}"))]
162 ArrowError {
163 source: ArrowError,
164 action: &'static str,
165 },
166 #[snafu(display("error parsing `{data}` as Dhall config: {err}"))]
167 ParseDhall { data: String, err: String },
168 #[snafu(display("error serializing {what} to Dhall: {err}"))]
169 SerializeDhall { what: String, err: String },
170 #[snafu(display("empty dataset error when (de)serializing {action}"))]
171 EmptyDataset { action: &'static str },
172}
173
174impl PartialEq for InputOutputError {
175 fn eq(&self, _other: &Self) -> bool {
176 false
177 }
178}
179
180pub trait ConfigRepr: Debug + Sized + Serialize + DeserializeOwned {
181 fn load<P>(path: P) -> Result<Self, ConfigError>
183 where
184 P: AsRef<Path>,
185 {
186 let file = File::open(path).context(ReadSnafu)?;
187 let reader = BufReader::new(file);
188
189 serde_yml::from_reader(reader).context(ParseSnafu)
190 }
191
192 fn load_many<P>(path: P) -> Result<Vec<Self>, ConfigError>
194 where
195 P: AsRef<Path>,
196 {
197 let file = File::open(path).context(ReadSnafu)?;
198 let reader = BufReader::new(file);
199
200 serde_yml::from_reader(reader).context(ParseSnafu)
201 }
202
203 fn load_named<P>(path: P) -> Result<BTreeMap<String, Self>, ConfigError>
205 where
206 P: AsRef<Path>,
207 {
208 let file = File::open(path).context(ReadSnafu)?;
209 let reader = BufReader::new(file);
210
211 serde_yml::from_reader(reader).context(ParseSnafu)
212 }
213
214 fn loads_many(data: &str) -> Result<Vec<Self>, ConfigError> {
216 debug!("Loading YAML:\n{data}");
217 serde_yml::from_str(data).context(ParseSnafu)
218 }
219
220 fn loads_named(data: &str) -> Result<BTreeMap<String, Self>, ConfigError> {
222 debug!("Loading YAML:\n{data}");
223 serde_yml::from_str(data).context(ParseSnafu)
224 }
225}
226
227pub(crate) fn epoch_to_str<S>(epoch: &Epoch, serializer: S) -> Result<S::Ok, S::Error>
228where
229 S: Serializer,
230{
231 serializer.serialize_str(&format!("{epoch}"))
232}
233
234pub(crate) fn epoch_from_str<'de, D>(deserializer: D) -> Result<Epoch, D::Error>
236where
237 D: Deserializer<'de>,
238{
239 let s = String::deserialize(deserializer)?;
241 Epoch::from_str(&s).map_err(serde::de::Error::custom)
242}
243
244pub(crate) fn duration_to_str<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
245where
246 S: Serializer,
247{
248 serializer.serialize_str(&format!("{duration}"))
249}
250
251pub(crate) fn duration_from_str<'de, D>(deserializer: D) -> Result<Duration, D::Error>
253where
254 D: Deserializer<'de>,
255{
256 let s = String::deserialize(deserializer)?;
258 Duration::from_str(&s).map_err(serde::de::Error::custom)
259}
260
261pub(crate) fn maybe_duration_to_str<S>(
262 duration: &Option<Duration>,
263 serializer: S,
264) -> Result<S::Ok, S::Error>
265where
266 S: Serializer,
267{
268 if let Some(duration) = duration {
269 duration_to_str(duration, serializer)
270 } else {
271 serializer.serialize_none()
272 }
273}
274
275pub(crate) fn maybe_duration_from_str<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
276where
277 D: Deserializer<'de>,
278{
279 if let Ok(s) = String::deserialize(deserializer) {
280 if let Ok(duration) = Duration::from_str(&s) {
281 Ok(Some(duration))
282 } else {
283 Ok(None)
284 }
285 } else {
286 Ok(None)
287 }
288}
289
290#[allow(clippy::upper_case_acronyms)]
291#[derive(Debug)]
292pub enum ParsingError {
293 MD(String),
294 OD(String),
295 UseOdInstead,
296 UseMdInstead,
297 FileNotFound(String),
298 FileNotUTF8(String),
299 SequenceNotFound(String),
300 LoadingError(String),
301 PropagatorNotFound(String),
302 Duration(String),
303 Quantity(String),
304 Distance(String),
305 Velocity(String),
306 IllDefined(String),
307 ExecutionError(NyxError),
308 IoError(IoError),
309 Yaml(YamlError),
310}
311
312impl From<NyxError> for ParsingError {
313 fn from(error: NyxError) -> Self {
314 Self::ExecutionError(error)
315 }
316}