1use log::{info, warn};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fs::File;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
27use crate::io::watermark::pq_writer;
28use crate::io::{ExportCfg, InputOutputError};
29use crate::linalg::allocator::Allocator;
30use crate::linalg::DefaultAllocator;
31use crate::md::prelude::GuidanceMode;
32use crate::md::trajectory::{Interpolatable, Traj};
33use crate::md::StateParameter;
34use crate::propagators::PropagationError;
35use crate::time::{Duration, Epoch, TimeUnits};
36use anise::constants::frames::EARTH_J2000;
37use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
38use arrow::datatypes::{DataType, Field, Schema};
39use arrow::record_batch::RecordBatch;
40use hifitime::TimeScale;
41use parquet::arrow::ArrowWriter;
42pub use rstats::Stats;
43use snafu::ensure;
44
45use super::DispersedState;
46
47pub struct Run<S: Interpolatable, R>
49where
50 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
51 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
52{
53 pub index: usize,
55 pub dispersed_state: DispersedState<S>,
57 pub result: Result<R, PropagationError>,
59}
60
61pub struct Results<S: Interpolatable, R>
63where
64 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
65 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
66{
67 pub runs: Vec<Run<S, R>>,
69 pub scenario: String,
71}
72
73pub struct PropResult<S: Interpolatable>
75where
76 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
77 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
78{
79 pub state: S,
80 pub traj: Traj<S>,
81}
82
83impl<S: Interpolatable> Results<S, PropResult<S>>
84where
85 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
86 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
87{
88 pub fn every_value_of_between(
91 &self,
92 param: StateParameter,
93 step: Duration,
94 start: Epoch,
95 end: Epoch,
96 value_if_run_failed: Option<f64>,
97 ) -> Vec<f64> {
98 let mut report = Vec::with_capacity(self.runs.len());
99 for run in &self.runs {
100 match &run.result {
101 Ok(r) => {
102 for state in r.traj.every_between(step, start, end) {
103 match state.value(param) {
104 Ok(val) => report.push(val),
105 Err(e) => match value_if_run_failed {
106 Some(val) => report.push(val),
107 None => {
108 info!("run #{}: {}, skipping {} in report", run.index, e, param)
109 }
110 },
111 }
112 }
113 }
114 Err(e) => match value_if_run_failed {
115 Some(val) => report.push(val),
116 None => warn!(
117 "run #{} failed with {}, skipping {} in report",
118 run.index, e, param
119 ),
120 },
121 }
122 }
123 report
124 }
125
126 pub fn every_value_of(
129 &self,
130 param: StateParameter,
131 step: Duration,
132 value_if_run_failed: Option<f64>,
133 ) -> Vec<f64> {
134 let mut report = Vec::with_capacity(self.runs.len());
135 for run in &self.runs {
136 match &run.result {
137 Ok(r) => {
138 for state in r.traj.every(step) {
139 match state.value(param) {
140 Ok(val) => report.push(val),
141 Err(e) => match value_if_run_failed {
142 Some(val) => report.push(val),
143 None => {
144 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
145 }
146 },
147 }
148 }
149 }
150 Err(e) => match value_if_run_failed {
151 Some(val) => report.push(val),
152 None => warn!(
153 "run #{} failed with {}, skipping {} in report",
154 run.index, e, param
155 ),
156 },
157 }
158 }
159 report
160 }
161
162 pub fn first_values_of(
165 &self,
166 param: StateParameter,
167 value_if_run_failed: Option<f64>,
168 ) -> Vec<f64> {
169 let mut report = Vec::with_capacity(self.runs.len());
170 for run in &self.runs {
171 match &run.result {
172 Ok(r) => match r.traj.first().value(param) {
173 Ok(val) => report.push(val),
174 Err(e) => match value_if_run_failed {
175 Some(val) => report.push(val),
176 None => {
177 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
178 }
179 },
180 },
181 Err(e) => match value_if_run_failed {
182 Some(val) => report.push(val),
183 None => warn!(
184 "run #{} failed with {}, skipping {} in report",
185 run.index, e, param
186 ),
187 },
188 }
189 }
190 report
191 }
192
193 pub fn last_values_of(
196 &self,
197 param: StateParameter,
198 value_if_run_failed: Option<f64>,
199 ) -> Vec<f64> {
200 let mut report = Vec::with_capacity(self.runs.len());
201 for run in &self.runs {
202 match &run.result {
203 Ok(r) => match r.traj.last().value(param) {
204 Ok(val) => report.push(val),
205 Err(e) => match value_if_run_failed {
206 Some(val) => report.push(val),
207 None => {
208 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
209 }
210 },
211 },
212 Err(e) => match value_if_run_failed {
213 Some(val) => report.push(val),
214 None => warn!(
215 "run #{} failed with {}, skipping {} in report",
216 run.index, e, param
217 ),
218 },
219 }
220 }
221 report
222 }
223
224 pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
226 let mut report = Vec::with_capacity(self.runs.len());
227 'run_loop: for run in &self.runs {
228 for (dparam, val) in &run.dispersed_state.actual_dispersions {
229 if dparam == ¶m {
230 report.push(*val);
231 continue 'run_loop;
232 }
233 }
234 return Err(MonteCarloError::StateError {
236 source: StateError::Unavailable { param },
237 });
238 }
239 Ok(report)
240 }
241
242 pub fn to_parquet<P: AsRef<Path>>(
243 &self,
244 path: P,
245 cfg: ExportCfg,
246 ) -> Result<PathBuf, Box<dyn Error>> {
247 let tick = Epoch::now().unwrap();
248 info!("Exporting Monte Carlo results to parquet file...");
249
250 let path_buf = cfg.actual_path(path);
252
253 let mut hdrs = vec![
255 Field::new("Epoch (UTC)", DataType::Utf8, false),
256 Field::new("Monte Carlo Run Index", DataType::Int32, false),
257 ];
258
259 let mut frame = EARTH_J2000;
261 let mut fields = match cfg.fields {
262 Some(fields) => fields,
263 None => S::export_params(),
264 };
265
266 let mut start = None;
267 let mut end = None;
268
269 let mut all_states: Vec<S> = vec![];
271 let mut run_indexes: Vec<i32> = vec![];
272
273 for run in &self.runs {
274 if let Ok(success) = &run.result {
275 if start.is_none() {
276 frame = success.state.frame();
278
279 fields.retain(|param| success.state.value(*param).is_ok());
281
282 start = Some(success.traj.first().epoch());
283 end = Some(success.state.epoch());
284 }
285
286 let states =
288 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
289 let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
291 let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
292 let step = cfg.step.unwrap_or_else(|| 1.minutes());
293 success
294 .traj
295 .every_between(step, start, end)
296 .collect::<Vec<S>>()
297 } else {
298 success.traj.states.to_vec()
299 };
300 for _ in 0..states.len() {
302 run_indexes.push(run.index as i32);
303 }
304 all_states.extend(states.iter());
305 }
306 }
307
308 ensure!(
309 start.is_some(),
310 NoSuccessfulRunsSnafu {
311 action: "export",
312 num_runs: self.runs.len()
313 }
314 );
315
316 let more_meta = Some(vec![(
317 "Frame".to_string(),
318 serde_dhall::serialize(&frame)
319 .static_type_annotation()
320 .to_string()
321 .map_err(|e| {
322 Box::new(InputOutputError::SerializeDhall {
323 what: format!("frame `{frame}`"),
324 err: e.to_string(),
325 })
326 })?,
327 )]);
328
329 for field in &fields {
330 hdrs.push(field.to_field(more_meta.clone()));
331 }
332
333 let schema = Arc::new(Schema::new(hdrs));
335 let mut record: Vec<Arc<dyn Array>> = Vec::new();
336
337 let mut utc_epoch = StringBuilder::new();
341 let mut idx_col = Int32Builder::new();
342 for (sno, s) in all_states.iter().enumerate() {
343 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
344
345 idx_col.append_value(run_indexes[sno]);
347 }
348 record.push(Arc::new(utc_epoch.finish()));
349 record.push(Arc::new(idx_col.finish()));
350
351 for field in fields {
353 if field == StateParameter::GuidanceMode {
354 let mut guid_mode = StringBuilder::new();
355 for s in &all_states {
356 guid_mode
357 .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
358 }
359 record.push(Arc::new(guid_mode.finish()));
360 } else {
361 let mut data = Float64Builder::new();
362 for s in &all_states {
363 data.append_value(s.value(field).unwrap());
364 }
365 record.push(Arc::new(data.finish()));
366 }
367 }
368
369 info!(
370 "Serialized {} states from {} to {}",
371 all_states.len(),
372 start.unwrap(),
373 end.unwrap()
374 );
375
376 let mut metadata = HashMap::new();
378 metadata.insert(
379 "Purpose".to_string(),
380 "Monte Carlo Trajectory data".to_string(),
381 );
382 if let Some(add_meta) = cfg.metadata {
383 for (k, v) in add_meta {
384 metadata.insert(k, v);
385 }
386 }
387
388 let props = pq_writer(Some(metadata));
389
390 let file = File::create(&path_buf)?;
391 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
392
393 let batch = RecordBatch::try_new(schema, record)?;
394 writer.write(&batch)?;
395 writer.close()?;
396
397 let tock_time = Epoch::now().unwrap() - tick;
399 info!(
400 "Trajectory written to {} in {tock_time}",
401 path_buf.display()
402 );
403 Ok(path_buf)
404 }
405}