1use log::{info, warn};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fs::File;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
27use crate::io::watermark::pq_writer;
28use crate::io::{ExportCfg, InputOutputError};
29use crate::linalg::allocator::Allocator;
30use crate::linalg::DefaultAllocator;
31use crate::md::prelude::GuidanceMode;
32use crate::md::trajectory::{Interpolatable, Traj};
33use crate::md::{EventEvaluator, StateParameter};
34use crate::propagators::PropagationError;
35use crate::time::{Duration, Epoch, TimeUnits};
36use anise::almanac::Almanac;
37use anise::constants::frames::EARTH_J2000;
38use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
39use arrow::datatypes::{DataType, Field, Schema};
40use arrow::record_batch::RecordBatch;
41use hifitime::TimeScale;
42use parquet::arrow::ArrowWriter;
43pub use rstats::Stats;
44use snafu::ensure;
45
46use super::DispersedState;
47
48pub struct Run<S: Interpolatable, R>
50where
51 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
52 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
53{
54 pub index: usize,
56 pub dispersed_state: DispersedState<S>,
58 pub result: Result<R, PropagationError>,
60}
61
62pub struct Results<S: Interpolatable, R>
64where
65 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
66 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
67{
68 pub runs: Vec<Run<S, R>>,
70 pub scenario: String,
72}
73
74pub struct PropResult<S: Interpolatable>
76where
77 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
78 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
79{
80 pub state: S,
81 pub traj: Traj<S>,
82}
83
84impl<S: Interpolatable> Results<S, PropResult<S>>
85where
86 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
87 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
88{
89 pub fn every_value_of_between(
92 &self,
93 param: StateParameter,
94 step: Duration,
95 start: Epoch,
96 end: Epoch,
97 value_if_run_failed: Option<f64>,
98 ) -> Vec<f64> {
99 let mut report = Vec::with_capacity(self.runs.len());
100 for run in &self.runs {
101 match &run.result {
102 Ok(r) => {
103 for state in r.traj.every_between(step, start, end) {
104 match state.value(param) {
105 Ok(val) => report.push(val),
106 Err(e) => match value_if_run_failed {
107 Some(val) => report.push(val),
108 None => {
109 info!("run #{}: {}, skipping {} in report", run.index, e, param)
110 }
111 },
112 }
113 }
114 }
115 Err(e) => match value_if_run_failed {
116 Some(val) => report.push(val),
117 None => warn!(
118 "run #{} failed with {}, skipping {} in report",
119 run.index, e, param
120 ),
121 },
122 }
123 }
124 report
125 }
126
127 pub fn every_value_of(
130 &self,
131 param: StateParameter,
132 step: Duration,
133 value_if_run_failed: Option<f64>,
134 ) -> Vec<f64> {
135 let mut report = Vec::with_capacity(self.runs.len());
136 for run in &self.runs {
137 match &run.result {
138 Ok(r) => {
139 for state in r.traj.every(step) {
140 match state.value(param) {
141 Ok(val) => report.push(val),
142 Err(e) => match value_if_run_failed {
143 Some(val) => report.push(val),
144 None => {
145 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
146 }
147 },
148 }
149 }
150 }
151 Err(e) => match value_if_run_failed {
152 Some(val) => report.push(val),
153 None => warn!(
154 "run #{} failed with {}, skipping {} in report",
155 run.index, e, param
156 ),
157 },
158 }
159 }
160 report
161 }
162
163 pub fn first_values_of(
166 &self,
167 param: StateParameter,
168 value_if_run_failed: Option<f64>,
169 ) -> Vec<f64> {
170 let mut report = Vec::with_capacity(self.runs.len());
171 for run in &self.runs {
172 match &run.result {
173 Ok(r) => match r.traj.first().value(param) {
174 Ok(val) => report.push(val),
175 Err(e) => match value_if_run_failed {
176 Some(val) => report.push(val),
177 None => {
178 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
179 }
180 },
181 },
182 Err(e) => match value_if_run_failed {
183 Some(val) => report.push(val),
184 None => warn!(
185 "run #{} failed with {}, skipping {} in report",
186 run.index, e, param
187 ),
188 },
189 }
190 }
191 report
192 }
193
194 pub fn last_values_of(
197 &self,
198 param: StateParameter,
199 value_if_run_failed: Option<f64>,
200 ) -> Vec<f64> {
201 let mut report = Vec::with_capacity(self.runs.len());
202 for run in &self.runs {
203 match &run.result {
204 Ok(r) => match r.traj.last().value(param) {
205 Ok(val) => report.push(val),
206 Err(e) => match value_if_run_failed {
207 Some(val) => report.push(val),
208 None => {
209 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
210 }
211 },
212 },
213 Err(e) => match value_if_run_failed {
214 Some(val) => report.push(val),
215 None => warn!(
216 "run #{} failed with {}, skipping {} in report",
217 run.index, e, param
218 ),
219 },
220 }
221 }
222 report
223 }
224
225 pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
227 let mut report = Vec::with_capacity(self.runs.len());
228 'run_loop: for run in &self.runs {
229 for (dparam, val) in &run.dispersed_state.actual_dispersions {
230 if dparam == ¶m {
231 report.push(*val);
232 continue 'run_loop;
233 }
234 }
235 return Err(MonteCarloError::StateError {
237 source: StateError::Unavailable { param },
238 });
239 }
240 Ok(report)
241 }
242
243 pub fn to_parquet<P: AsRef<Path>>(
244 &self,
245 path: P,
246 events: Option<Vec<&dyn EventEvaluator<S>>>,
247 cfg: ExportCfg,
248 almanac: Arc<Almanac>,
249 ) -> Result<PathBuf, Box<dyn Error>> {
250 let tick = Epoch::now().unwrap();
251 info!("Exporting Monte Carlo results to parquet file...");
252
253 let path_buf = cfg.actual_path(path);
255
256 let mut hdrs = vec![
258 Field::new("Epoch (UTC)", DataType::Utf8, false),
259 Field::new("Monte Carlo Run Index", DataType::Int32, false),
260 ];
261
262 let mut frame = EARTH_J2000;
264 let mut fields = match cfg.fields {
265 Some(fields) => fields,
266 None => S::export_params(),
267 };
268
269 let mut start = None;
270 let mut end = None;
271
272 let mut all_states: Vec<S> = vec![];
274 let mut run_indexes: Vec<i32> = vec![];
275
276 for run in &self.runs {
277 if let Ok(success) = &run.result {
278 if start.is_none() {
279 frame = success.state.frame();
281
282 fields.retain(|param| success.state.value(*param).is_ok());
284
285 start = Some(success.traj.first().epoch());
286 end = Some(success.state.epoch());
287 }
288
289 let states =
291 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
292 let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
294 let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
295 let step = cfg.step.unwrap_or_else(|| 1.minutes());
296 success
297 .traj
298 .every_between(step, start, end)
299 .collect::<Vec<S>>()
300 } else {
301 success.traj.states.to_vec()
302 };
303 for _ in 0..states.len() {
305 run_indexes.push(run.index as i32);
306 }
307 all_states.extend(states.iter());
308 }
309 }
310
311 ensure!(
312 start.is_some(),
313 NoSuccessfulRunsSnafu {
314 action: "export",
315 num_runs: self.runs.len()
316 }
317 );
318
319 let more_meta = Some(vec![(
320 "Frame".to_string(),
321 serde_dhall::serialize(&frame)
322 .static_type_annotation()
323 .to_string()
324 .map_err(|e| {
325 Box::new(InputOutputError::SerializeDhall {
326 what: format!("frame `{frame}`"),
327 err: e.to_string(),
328 })
329 })?,
330 )]);
331
332 for field in &fields {
333 hdrs.push(field.to_field(more_meta.clone()));
334 }
335
336 if let Some(events) = events.as_ref() {
337 for event in events {
338 let field = Field::new(format!("{event}"), DataType::Float64, false);
339 hdrs.push(field);
340 }
341 }
342
343 let schema = Arc::new(Schema::new(hdrs));
345 let mut record: Vec<Arc<dyn Array>> = Vec::new();
346
347 let mut utc_epoch = StringBuilder::new();
351 let mut idx_col = Int32Builder::new();
352 for (sno, s) in all_states.iter().enumerate() {
353 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
354
355 idx_col.append_value(run_indexes[sno]);
357 }
358 record.push(Arc::new(utc_epoch.finish()));
359 record.push(Arc::new(idx_col.finish()));
360
361 for field in fields {
363 if field == StateParameter::GuidanceMode {
364 let mut guid_mode = StringBuilder::new();
365 for s in &all_states {
366 guid_mode
367 .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
368 }
369 record.push(Arc::new(guid_mode.finish()));
370 } else {
371 let mut data = Float64Builder::new();
372 for s in &all_states {
373 data.append_value(s.value(field).unwrap());
374 }
375 record.push(Arc::new(data.finish()));
376 }
377 }
378
379 info!(
380 "Serialized {} states from {} to {}",
381 all_states.len(),
382 start.unwrap(),
383 end.unwrap()
384 );
385
386 if let Some(events) = events {
388 info!("Evaluating {} event(s)", events.len());
389 for event in events {
390 let mut data = Float64Builder::new();
391 for s in &all_states {
392 data.append_value(event.eval(s, almanac.clone()).map_err(Box::new)?);
393 }
394 record.push(Arc::new(data.finish()));
395 }
396 }
397
398 let mut metadata = HashMap::new();
400 metadata.insert(
401 "Purpose".to_string(),
402 "Monte Carlo Trajectory data".to_string(),
403 );
404 if let Some(add_meta) = cfg.metadata {
405 for (k, v) in add_meta {
406 metadata.insert(k, v);
407 }
408 }
409
410 let props = pq_writer(Some(metadata));
411
412 let file = File::create(&path_buf)?;
413 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
414
415 let batch = RecordBatch::try_new(schema, record)?;
416 writer.write(&batch)?;
417 writer.close()?;
418
419 let tock_time = Epoch::now().unwrap() - tick;
421 info!(
422 "Trajectory written to {} in {tock_time}",
423 path_buf.display()
424 );
425 Ok(path_buf)
426 }
427}