1use crate::errors::NyxError;
20use crate::md::StateParameter;
21use crate::time::Epoch;
22
23use arrow::error::ArrowError;
24use parquet::errors::ParquetError;
25use snafu::prelude::*;
26pub(crate) mod watermark;
27use hifitime::prelude::{Format, Formatter};
28use hifitime::Duration;
29use serde::de::DeserializeOwned;
30use serde::{Deserialize, Deserializer};
31use serde::{Serialize, Serializer};
32use serde_yml::Error as YamlError;
33use std::collections::{BTreeMap, HashMap};
34use std::convert::From;
35use std::fmt::Debug;
36use std::fs::File;
37use std::io::BufReader;
38use std::io::Error as IoError;
39use std::path::{Path, PathBuf};
40use std::str::FromStr;
41use typed_builder::TypedBuilder;
42
43pub mod gravity;
45
46use std::io;
47
48#[derive(Clone, Default, Serialize, Deserialize, TypedBuilder)]
50#[builder(doc)]
51pub struct ExportCfg {
52 #[builder(default, setter(strip_option))]
54 pub fields: Option<Vec<StateParameter>>,
55 #[builder(default, setter(strip_option))]
57 pub start_epoch: Option<Epoch>,
58 #[builder(default, setter(strip_option))]
60 pub end_epoch: Option<Epoch>,
61 #[builder(default, setter(strip_option))]
63 pub step: Option<Duration>,
64 #[builder(default, setter(strip_option))]
66 pub metadata: Option<HashMap<String, String>>,
67 #[builder(default)]
69 pub timestamp: bool,
70}
71
72impl ExportCfg {
73 pub fn from_metadata(metadata: Vec<(String, String)>) -> Self {
75 let mut me = ExportCfg {
76 metadata: Some(HashMap::new()),
77 ..Default::default()
78 };
79 for (k, v) in metadata {
80 me.metadata.as_mut().unwrap().insert(k, v);
81 }
82 me
83 }
84
85 pub fn timestamped() -> Self {
87 Self {
88 timestamp: true,
89 ..Default::default()
90 }
91 }
92
93 pub fn append_field(&mut self, field: StateParameter) {
94 if let Some(fields) = self.fields.as_mut() {
95 fields.push(field);
96 } else {
97 self.fields = Some(vec![field]);
98 }
99 }
100
101 pub(crate) fn actual_path<P: AsRef<Path>>(&self, path: P) -> PathBuf {
103 let mut path_buf = path.as_ref().to_path_buf();
104 if self.timestamp {
105 if let Some(file_name) = path_buf.file_name() {
106 if let Some(file_name_str) = file_name.to_str() {
107 if let Some(extension) = path_buf.extension() {
108 let stamp = Formatter::new(
109 Epoch::now().unwrap(),
110 Format::from_str("%Y-%m-%dT%H-%M-%S").unwrap(),
111 );
112 let ext = extension.to_str().unwrap();
113 let file_name = file_name_str.replace(&format!(".{ext}"), "");
114 let new_file_name = format!("{file_name}-{stamp}.{}", ext);
115 path_buf.set_file_name(new_file_name);
116 }
117 }
118 }
119 };
120 path_buf
121 }
122}
123
124#[derive(Debug, Snafu)]
125#[snafu(visibility(pub(crate)))]
126pub enum ConfigError {
127 #[snafu(display("failed to read configuration file: {source}"))]
128 ReadError { source: io::Error },
129
130 #[snafu(display("failed to parse YAML configuration file: {source}"))]
131 ParseError { source: serde_yml::Error },
132
133 #[snafu(display("of invalid configuration: {msg}"))]
134 InvalidConfig { msg: String },
135}
136
137impl PartialEq for ConfigError {
138 fn eq(&self, _other: &Self) -> bool {
140 false
141 }
142}
143
144#[derive(Debug, Snafu)]
145#[snafu(visibility(pub(crate)))]
146pub enum InputOutputError {
147 #[snafu(display("{action} encountered i/o error: {source}"))]
148 StdIOError {
149 source: io::Error,
150 action: &'static str,
151 },
152 #[snafu(display("missing required data {which}"))]
153 MissingData { which: String },
154 #[snafu(display("unknown data `{which}`"))]
155 UnsupportedData { which: String },
156 #[snafu(display("{action} encountered a Parquet error: {source}"))]
157 ParquetError {
158 source: ParquetError,
159 action: &'static str,
160 },
161 #[snafu(display("inconsistency detected: {msg}"))]
162 Inconsistency { msg: String },
163 #[snafu(display("{action} encountered an Arrow error: {source}"))]
164 ArrowError {
165 source: ArrowError,
166 action: &'static str,
167 },
168 #[snafu(display("error parsing `{data}` as Dhall config: {err}"))]
169 ParseDhall { data: String, err: String },
170 #[snafu(display("error serializing {what} to Dhall: {err}"))]
171 SerializeDhall { what: String, err: String },
172 #[snafu(display("empty dataset error when (de)serializing {action}"))]
173 EmptyDataset { action: &'static str },
174}
175
176impl PartialEq for InputOutputError {
177 fn eq(&self, _other: &Self) -> bool {
178 false
179 }
180}
181
182pub trait ConfigRepr: Debug + Sized + Serialize + DeserializeOwned {
183 fn load<P>(path: P) -> Result<Self, ConfigError>
185 where
186 P: AsRef<Path>,
187 {
188 let file = File::open(path).context(ReadSnafu)?;
189 let reader = BufReader::new(file);
190
191 serde_yml::from_reader(reader).context(ParseSnafu)
192 }
193
194 fn load_many<P>(path: P) -> Result<Vec<Self>, ConfigError>
196 where
197 P: AsRef<Path>,
198 {
199 let file = File::open(path).context(ReadSnafu)?;
200 let reader = BufReader::new(file);
201
202 serde_yml::from_reader(reader).context(ParseSnafu)
203 }
204
205 fn load_named<P>(path: P) -> Result<BTreeMap<String, Self>, ConfigError>
207 where
208 P: AsRef<Path>,
209 {
210 let file = File::open(path).context(ReadSnafu)?;
211 let reader = BufReader::new(file);
212
213 serde_yml::from_reader(reader).context(ParseSnafu)
214 }
215
216 fn loads_many(data: &str) -> Result<Vec<Self>, ConfigError> {
218 debug!("Loading YAML:\n{data}");
219 serde_yml::from_str(data).context(ParseSnafu)
220 }
221
222 fn loads_named(data: &str) -> Result<BTreeMap<String, Self>, ConfigError> {
224 debug!("Loading YAML:\n{data}");
225 serde_yml::from_str(data).context(ParseSnafu)
226 }
227}
228
229pub(crate) fn epoch_to_str<S>(epoch: &Epoch, serializer: S) -> Result<S::Ok, S::Error>
230where
231 S: Serializer,
232{
233 serializer.serialize_str(&format!("{epoch}"))
234}
235
236pub(crate) fn epoch_from_str<'de, D>(deserializer: D) -> Result<Epoch, D::Error>
238where
239 D: Deserializer<'de>,
240{
241 let s = String::deserialize(deserializer)?;
243 Epoch::from_str(&s).map_err(serde::de::Error::custom)
244}
245
246pub(crate) fn duration_to_str<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
247where
248 S: Serializer,
249{
250 serializer.serialize_str(&format!("{duration}"))
251}
252
253pub(crate) fn duration_from_str<'de, D>(deserializer: D) -> Result<Duration, D::Error>
255where
256 D: Deserializer<'de>,
257{
258 let s = String::deserialize(deserializer)?;
260 Duration::from_str(&s).map_err(serde::de::Error::custom)
261}
262
263pub(crate) fn maybe_duration_to_str<S>(
264 duration: &Option<Duration>,
265 serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268 S: Serializer,
269{
270 if let Some(duration) = duration {
271 duration_to_str(duration, serializer)
272 } else {
273 serializer.serialize_none()
274 }
275}
276
277pub(crate) fn maybe_duration_from_str<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
278where
279 D: Deserializer<'de>,
280{
281 if let Ok(s) = String::deserialize(deserializer) {
282 if let Ok(duration) = Duration::from_str(&s) {
283 Ok(Some(duration))
284 } else {
285 Ok(None)
286 }
287 } else {
288 Ok(None)
289 }
290}
291
292#[allow(clippy::upper_case_acronyms)]
293#[derive(Debug)]
294pub enum ParsingError {
295 MD(String),
296 OD(String),
297 UseOdInstead,
298 UseMdInstead,
299 FileNotFound(String),
300 FileNotUTF8(String),
301 SequenceNotFound(String),
302 LoadingError(String),
303 PropagatorNotFound(String),
304 Duration(String),
305 Quantity(String),
306 Distance(String),
307 Velocity(String),
308 IllDefined(String),
309 ExecutionError(NyxError),
310 IoError(IoError),
311 Yaml(YamlError),
312}
313
314impl From<NyxError> for ParsingError {
315 fn from(error: NyxError) -> Self {
316 Self::ExecutionError(error)
317 }
318}