1use std::collections::HashMap;
20use std::error::Error;
21use std::fs::File;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24
25use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
26use crate::io::watermark::pq_writer;
27use crate::io::{ExportCfg, InputOutputError};
28use crate::linalg::allocator::Allocator;
29use crate::linalg::DefaultAllocator;
30use crate::md::prelude::GuidanceMode;
31use crate::md::trajectory::{Interpolatable, Traj};
32use crate::md::{EventEvaluator, StateParameter};
33use crate::propagators::PropagationError;
34use crate::time::{Duration, Epoch, TimeUnits};
35use anise::almanac::Almanac;
36use anise::constants::frames::EARTH_J2000;
37use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
38use arrow::datatypes::{DataType, Field, Schema};
39use arrow::record_batch::RecordBatch;
40use hifitime::TimeScale;
41use parquet::arrow::ArrowWriter;
42pub use rstats::Stats;
43use snafu::ensure;
44
45use super::DispersedState;
46
47pub struct Run<S: Interpolatable, R>
49where
50 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
51 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
52{
53 pub index: usize,
55 pub dispersed_state: DispersedState<S>,
57 pub result: Result<R, PropagationError>,
59}
60
61pub struct Results<S: Interpolatable, R>
63where
64 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
65 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
66{
67 pub runs: Vec<Run<S, R>>,
69 pub scenario: String,
71}
72
73pub struct PropResult<S: Interpolatable>
75where
76 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
77 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
78{
79 pub state: S,
80 pub traj: Traj<S>,
81}
82
83impl<S: Interpolatable> Results<S, PropResult<S>>
84where
85 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
86 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
87{
88 pub fn every_value_of_between(
91 &self,
92 param: StateParameter,
93 step: Duration,
94 start: Epoch,
95 end: Epoch,
96 value_if_run_failed: Option<f64>,
97 ) -> Vec<f64> {
98 let mut report = Vec::with_capacity(self.runs.len());
99 for run in &self.runs {
100 match &run.result {
101 Ok(r) => {
102 for state in r.traj.every_between(step, start, end) {
103 match state.value(param) {
104 Ok(val) => report.push(val),
105 Err(e) => match value_if_run_failed {
106 Some(val) => report.push(val),
107 None => {
108 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
109 }
110 },
111 }
112 }
113 }
114 Err(e) => match value_if_run_failed {
115 Some(val) => report.push(val),
116 None => warn!(
117 "run #{} failed with {}, skipping {} in report",
118 run.index, e, param
119 ),
120 },
121 }
122 }
123 report
124 }
125
126 pub fn every_value_of(
129 &self,
130 param: StateParameter,
131 step: Duration,
132 value_if_run_failed: Option<f64>,
133 ) -> Vec<f64> {
134 let mut report = Vec::with_capacity(self.runs.len());
135 for run in &self.runs {
136 match &run.result {
137 Ok(r) => {
138 for state in r.traj.every(step) {
139 match state.value(param) {
140 Ok(val) => report.push(val),
141 Err(e) => match value_if_run_failed {
142 Some(val) => report.push(val),
143 None => {
144 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
145 }
146 },
147 }
148 }
149 }
150 Err(e) => match value_if_run_failed {
151 Some(val) => report.push(val),
152 None => warn!(
153 "run #{} failed with {}, skipping {} in report",
154 run.index, e, param
155 ),
156 },
157 }
158 }
159 report
160 }
161
162 pub fn first_values_of(
165 &self,
166 param: StateParameter,
167 value_if_run_failed: Option<f64>,
168 ) -> Vec<f64> {
169 let mut report = Vec::with_capacity(self.runs.len());
170 for run in &self.runs {
171 match &run.result {
172 Ok(r) => match r.traj.first().value(param) {
173 Ok(val) => report.push(val),
174 Err(e) => match value_if_run_failed {
175 Some(val) => report.push(val),
176 None => {
177 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
178 }
179 },
180 },
181 Err(e) => match value_if_run_failed {
182 Some(val) => report.push(val),
183 None => warn!(
184 "run #{} failed with {}, skipping {} in report",
185 run.index, e, param
186 ),
187 },
188 }
189 }
190 report
191 }
192
193 pub fn last_values_of(
196 &self,
197 param: StateParameter,
198 value_if_run_failed: Option<f64>,
199 ) -> Vec<f64> {
200 let mut report = Vec::with_capacity(self.runs.len());
201 for run in &self.runs {
202 match &run.result {
203 Ok(r) => match r.traj.last().value(param) {
204 Ok(val) => report.push(val),
205 Err(e) => match value_if_run_failed {
206 Some(val) => report.push(val),
207 None => {
208 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
209 }
210 },
211 },
212 Err(e) => match value_if_run_failed {
213 Some(val) => report.push(val),
214 None => warn!(
215 "run #{} failed with {}, skipping {} in report",
216 run.index, e, param
217 ),
218 },
219 }
220 }
221 report
222 }
223
224 pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
226 let mut report = Vec::with_capacity(self.runs.len());
227 'run_loop: for run in &self.runs {
228 for (dparam, val) in &run.dispersed_state.actual_dispersions {
229 if dparam == ¶m {
230 report.push(*val);
231 continue 'run_loop;
232 }
233 }
234 return Err(MonteCarloError::StateError {
236 source: StateError::Unavailable { param },
237 });
238 }
239 Ok(report)
240 }
241
242 pub fn to_parquet<P: AsRef<Path>>(
243 &self,
244 path: P,
245 events: Option<Vec<&dyn EventEvaluator<S>>>,
246 cfg: ExportCfg,
247 almanac: Arc<Almanac>,
248 ) -> Result<PathBuf, Box<dyn Error>> {
249 let tick = Epoch::now().unwrap();
250 info!("Exporting Monte Carlo results to parquet file...");
251
252 let path_buf = cfg.actual_path(path);
254
255 let mut hdrs = vec![
257 Field::new("Epoch (UTC)", DataType::Utf8, false),
258 Field::new("Monte Carlo Run Index", DataType::Int32, false),
259 ];
260
261 let mut frame = EARTH_J2000;
263 let mut fields = match cfg.fields {
264 Some(fields) => fields,
265 None => S::export_params(),
266 };
267
268 let mut start = None;
269 let mut end = None;
270
271 let mut all_states: Vec<S> = vec![];
273 let mut run_indexes: Vec<i32> = vec![];
274
275 for run in &self.runs {
276 if let Ok(success) = &run.result {
277 if start.is_none() {
278 frame = success.state.frame();
280
281 fields.retain(|param| success.state.value(*param).is_ok());
283
284 start = Some(success.traj.first().epoch());
285 end = Some(success.state.epoch());
286 }
287
288 let states =
290 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
291 let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
293 let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
294 let step = cfg.step.unwrap_or_else(|| 1.minutes());
295 success
296 .traj
297 .every_between(step, start, end)
298 .collect::<Vec<S>>()
299 } else {
300 success.traj.states.to_vec()
301 };
302 for _ in 0..states.len() {
304 run_indexes.push(run.index as i32);
305 }
306 all_states.extend(states.iter());
307 }
308 }
309
310 ensure!(
311 start.is_some(),
312 NoSuccessfulRunsSnafu {
313 action: "export",
314 num_runs: self.runs.len()
315 }
316 );
317
318 let more_meta = Some(vec![(
319 "Frame".to_string(),
320 serde_dhall::serialize(&frame).to_string().map_err(|e| {
321 Box::new(InputOutputError::SerializeDhall {
322 what: format!("frame `{frame}`"),
323 err: e.to_string(),
324 })
325 })?,
326 )]);
327
328 for field in &fields {
329 hdrs.push(field.to_field(more_meta.clone()));
330 }
331
332 if let Some(events) = events.as_ref() {
333 for event in events {
334 let field = Field::new(format!("{event}"), DataType::Float64, false);
335 hdrs.push(field);
336 }
337 }
338
339 let schema = Arc::new(Schema::new(hdrs));
341 let mut record: Vec<Arc<dyn Array>> = Vec::new();
342
343 let mut utc_epoch = StringBuilder::new();
347 let mut idx_col = Int32Builder::new();
348 for (sno, s) in all_states.iter().enumerate() {
349 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
350
351 idx_col.append_value(run_indexes[sno]);
353 }
354 record.push(Arc::new(utc_epoch.finish()));
355 record.push(Arc::new(idx_col.finish()));
356
357 for field in fields {
359 if field == StateParameter::GuidanceMode {
360 let mut guid_mode = StringBuilder::new();
361 for s in &all_states {
362 guid_mode
363 .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
364 }
365 record.push(Arc::new(guid_mode.finish()));
366 } else {
367 let mut data = Float64Builder::new();
368 for s in &all_states {
369 data.append_value(s.value(field).unwrap());
370 }
371 record.push(Arc::new(data.finish()));
372 }
373 }
374
375 info!(
376 "Serialized {} states from {} to {}",
377 all_states.len(),
378 start.unwrap(),
379 end.unwrap()
380 );
381
382 if let Some(events) = events {
384 info!("Evaluating {} event(s)", events.len());
385 for event in events {
386 let mut data = Float64Builder::new();
387 for s in &all_states {
388 data.append_value(event.eval(s, almanac.clone()).map_err(Box::new)?);
389 }
390 record.push(Arc::new(data.finish()));
391 }
392 }
393
394 let mut metadata = HashMap::new();
396 metadata.insert(
397 "Purpose".to_string(),
398 "Monte Carlo Trajectory data".to_string(),
399 );
400 if let Some(add_meta) = cfg.metadata {
401 for (k, v) in add_meta {
402 metadata.insert(k, v);
403 }
404 }
405
406 let props = pq_writer(Some(metadata));
407
408 let file = File::create(&path_buf)?;
409 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
410
411 let batch = RecordBatch::try_new(schema, record)?;
412 writer.write(&batch)?;
413 writer.close()?;
414
415 let tock_time = Epoch::now().unwrap() - tick;
417 info!(
418 "Trajectory written to {} in {tock_time}",
419 path_buf.display()
420 );
421 Ok(path_buf)
422 }
423}