1use log::{info, warn};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fs::File;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
27use crate::io::watermark::pq_writer;
28use crate::io::{ExportCfg, InputOutputError};
29use crate::linalg::allocator::Allocator;
30use crate::linalg::DefaultAllocator;
31use crate::md::prelude::GuidanceMode;
32use crate::md::trajectory::{Interpolatable, Traj};
33use crate::md::StateParameter;
34use crate::propagators::PropagationError;
35use crate::time::{Duration, Epoch, TimeUnits};
36use anise::constants::frames::EARTH_J2000;
37use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
38use arrow::datatypes::{DataType, Field, Schema};
39use arrow::record_batch::RecordBatch;
40use hifitime::TimeScale;
41use parquet::arrow::ArrowWriter;
42pub use rstats::Stats;
43use snafu::ensure;
44
45use super::DispersedState;
46
47pub struct Run<S: Interpolatable, R>
49where
50 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
51 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
52{
53 pub index: usize,
55 pub dispersed_state: DispersedState<S>,
57 pub result: Result<R, PropagationError>,
59}
60
61pub struct Results<S: Interpolatable, R>
63where
64 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
65 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
66{
67 pub runs: Vec<Run<S, R>>,
69 pub scenario: String,
71}
72
73pub struct PropResult<S: Interpolatable>
75where
76 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
77 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
78{
79 pub state: S,
80 pub traj: Traj<S>,
81}
82
83impl<S: Interpolatable> Results<S, PropResult<S>>
84where
85 DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
86 <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
87{
88 pub fn every_value_of_between(
91 &self,
92 param: StateParameter,
93 step: Duration,
94 start: Epoch,
95 end: Epoch,
96 value_if_run_failed: Option<f64>,
97 ) -> Vec<f64> {
98 let mut report = Vec::with_capacity(self.runs.len());
99 for run in &self.runs {
100 match &run.result {
101 Ok(r) => {
102 for state in r.traj.every_between(step, start, end) {
103 match state.value(param) {
104 Ok(val) => report.push(val),
105 Err(e) => match value_if_run_failed {
106 Some(val) => report.push(val),
107 None => {
108 info!("run #{}: {}, skipping {} in report", run.index, e, param)
109 }
110 },
111 }
112 }
113 }
114 Err(e) => match value_if_run_failed {
115 Some(val) => report.push(val),
116 None => warn!(
117 "run #{} failed with {}, skipping {} in report",
118 run.index, e, param
119 ),
120 },
121 }
122 }
123 report
124 }
125
126 pub fn every_value_of(
129 &self,
130 param: StateParameter,
131 step: Duration,
132 value_if_run_failed: Option<f64>,
133 ) -> Vec<f64> {
134 let mut report = Vec::with_capacity(self.runs.len());
135 for run in &self.runs {
136 match &run.result {
137 Ok(r) => {
138 for state in r.traj.every(step) {
139 match state.value(param) {
140 Ok(val) => report.push(val),
141 Err(e) => match value_if_run_failed {
142 Some(val) => report.push(val),
143 None => {
144 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
145 }
146 },
147 }
148 }
149 }
150 Err(e) => match value_if_run_failed {
151 Some(val) => report.push(val),
152 None => warn!(
153 "run #{} failed with {}, skipping {} in report",
154 run.index, e, param
155 ),
156 },
157 }
158 }
159 report
160 }
161
162 pub fn first_values_of(
165 &self,
166 param: StateParameter,
167 value_if_run_failed: Option<f64>,
168 ) -> Vec<f64> {
169 let mut report = Vec::with_capacity(self.runs.len());
170 for run in &self.runs {
171 match &run.result {
172 Ok(r) => match r.traj.first().value(param) {
173 Ok(val) => report.push(val),
174 Err(e) => match value_if_run_failed {
175 Some(val) => report.push(val),
176 None => {
177 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
178 }
179 },
180 },
181 Err(e) => match value_if_run_failed {
182 Some(val) => report.push(val),
183 None => warn!(
184 "run #{} failed with {}, skipping {} in report",
185 run.index, e, param
186 ),
187 },
188 }
189 }
190 report
191 }
192
193 pub fn last_values_of(
196 &self,
197 param: StateParameter,
198 value_if_run_failed: Option<f64>,
199 ) -> Vec<f64> {
200 let mut report = Vec::with_capacity(self.runs.len());
201 for run in &self.runs {
202 match &run.result {
203 Ok(r) => match r.traj.last().value(param) {
204 Ok(val) => report.push(val),
205 Err(e) => match value_if_run_failed {
206 Some(val) => report.push(val),
207 None => {
208 warn!("run #{}: {}, skipping {} in report", run.index, e, param)
209 }
210 },
211 },
212 Err(e) => match value_if_run_failed {
213 Some(val) => report.push(val),
214 None => warn!(
215 "run #{} failed with {}, skipping {} in report",
216 run.index, e, param
217 ),
218 },
219 }
220 }
221 report
222 }
223
224 pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
226 let mut report = Vec::with_capacity(self.runs.len());
227 'run_loop: for run in &self.runs {
228 for (dparam, val) in &run.dispersed_state.actual_dispersions {
229 if dparam == ¶m {
230 report.push(*val);
231 continue 'run_loop;
232 }
233 }
234 return Err(MonteCarloError::StateError {
236 source: StateError::Unavailable { param },
237 });
238 }
239 Ok(report)
240 }
241
242 pub fn to_parquet<P: AsRef<Path>>(
243 &self,
244 path: P,
245 cfg: ExportCfg,
246 ) -> Result<PathBuf, Box<dyn Error>> {
247 let tick = Epoch::now().unwrap();
248 info!("Exporting Monte Carlo results to parquet file...");
249
250 let path_buf = cfg.actual_path(path);
252
253 let mut hdrs = vec![
255 Field::new("Epoch (UTC)", DataType::Utf8, false),
256 Field::new("Monte Carlo Run Index", DataType::Int32, false),
257 ];
258
259 let mut frame = EARTH_J2000;
261 let requested_fields = match cfg.fields {
262 Some(fields) => fields,
263 None => S::export_params(),
264 };
265
266 let mut start = None;
267 let mut end = None;
268
269 let mut all_states: Vec<S> = vec![];
271 let mut run_indexes: Vec<i32> = vec![];
272
273 for run in &self.runs {
274 if let Ok(success) = &run.result {
275 if start.is_none() {
276 frame = success.state.frame();
278
279 start = Some(success.traj.first().epoch());
280 end = Some(success.state.epoch());
281 }
282
283 let states =
285 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
286 let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
288 let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
289 let step = cfg.step.unwrap_or_else(|| 1.minutes());
290 success
291 .traj
292 .every_between(step, start, end)
293 .collect::<Vec<S>>()
294 } else {
295 success.traj.states.to_vec()
296 };
297 for _ in 0..states.len() {
299 run_indexes.push(run.index as i32);
300 }
301 all_states.extend(states.iter());
302 }
303 }
304
305 ensure!(
306 start.is_some(),
307 NoSuccessfulRunsSnafu {
308 action: "export",
309 num_runs: self.runs.len()
310 }
311 );
312
313 let more_meta = Some(vec![(
314 "Frame".to_string(),
315 serde_dhall::serialize(&frame)
316 .static_type_annotation()
317 .to_string()
318 .map_err(|e| {
319 Box::new(InputOutputError::SerializeDhall {
320 what: format!("frame `{frame}`"),
321 err: e.to_string(),
322 })
323 })?,
324 )]);
325
326 let mut fields = Vec::new();
327 let mut field_nullable = Vec::new();
328 for field in requested_fields {
329 let mut any_ok = false;
330 let mut any_err = false;
331 for state in &all_states {
332 if state.value(field).is_ok() {
333 any_ok = true;
334 } else {
335 any_err = true;
336 }
337 }
338
339 if any_ok {
340 fields.push(field);
341 field_nullable.push(any_err);
342 }
343 }
344
345 for (field, nullable) in fields.iter().zip(field_nullable.iter().copied()) {
346 hdrs.push(field.to_field(more_meta.clone()).with_nullable(nullable));
347 }
348
349 let schema = Arc::new(Schema::new(hdrs));
351 let mut record: Vec<Arc<dyn Array>> = Vec::new();
352
353 let mut utc_epoch = StringBuilder::new();
357 let mut idx_col = Int32Builder::new();
358 for (sno, s) in all_states.iter().enumerate() {
359 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
360
361 idx_col.append_value(run_indexes[sno]);
363 }
364 record.push(Arc::new(utc_epoch.finish()));
365 record.push(Arc::new(idx_col.finish()));
366
367 for field in fields {
369 if field == StateParameter::GuidanceMode() {
370 let mut guid_mode = StringBuilder::new();
371 for s in &all_states {
372 match s.value(field) {
373 Ok(value) => {
374 guid_mode.append_value(format!("{:?}", GuidanceMode::from(value)));
375 }
376 Err(_) => guid_mode.append_null(),
377 }
378 }
379 record.push(Arc::new(guid_mode.finish()));
380 } else {
381 let mut data = Float64Builder::new();
382 for s in &all_states {
383 match s.value(field) {
384 Ok(value) => data.append_value(value),
385 Err(_) => data.append_null(),
386 };
387 }
388 record.push(Arc::new(data.finish()));
389 }
390 }
391
392 info!(
393 "Serialized {} states from {} to {}",
394 all_states.len(),
395 start.unwrap(),
396 end.unwrap()
397 );
398
399 let mut metadata = HashMap::new();
401 metadata.insert(
402 "Purpose".to_string(),
403 "Monte Carlo Trajectory data".to_string(),
404 );
405 if let Some(add_meta) = cfg.metadata {
406 for (k, v) in add_meta {
407 metadata.insert(k, v);
408 }
409 }
410
411 let props = pq_writer(Some(metadata));
412
413 let file = File::create(&path_buf)?;
414 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
415
416 let batch = RecordBatch::try_new(schema, record)?;
417 writer.write(&batch)?;
418 writer.close()?;
419
420 let tock_time = Epoch::now().unwrap() - tick;
422 info!(
423 "Trajectory written to {} in {tock_time}",
424 path_buf.display()
425 );
426 Ok(path_buf)
427 }
428}