nyx_space/mc/
results.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use std::collections::HashMap;
20use std::error::Error;
21use std::fs::File;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24
25use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
26use crate::io::watermark::pq_writer;
27use crate::io::{ExportCfg, InputOutputError};
28use crate::linalg::allocator::Allocator;
29use crate::linalg::DefaultAllocator;
30use crate::md::prelude::GuidanceMode;
31use crate::md::trajectory::{Interpolatable, Traj};
32use crate::md::{EventEvaluator, StateParameter};
33use crate::propagators::PropagationError;
34use crate::time::{Duration, Epoch, TimeUnits};
35use anise::almanac::Almanac;
36use anise::constants::frames::EARTH_J2000;
37use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
38use arrow::datatypes::{DataType, Field, Schema};
39use arrow::record_batch::RecordBatch;
40use hifitime::TimeScale;
41use parquet::arrow::ArrowWriter;
42pub use rstats::Stats;
43use snafu::ensure;
44
45use super::DispersedState;
46
47/// A structure storing the result of a single Monte Carlo run
48pub struct Run<S: Interpolatable, R>
49where
50    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
51    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
52{
53    /// The index of this run
54    pub index: usize,
55    /// The original dispersed state
56    pub dispersed_state: DispersedState<S>,
57    /// The result from this run
58    pub result: Result<R, PropagationError>,
59}
60
61/// A structure of Monte Carlo results
62pub struct Results<S: Interpolatable, R>
63where
64    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
65    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
66{
67    /// Raw data from each run, sorted by run index for O(1) access to each run
68    pub runs: Vec<Run<S, R>>,
69    /// Name of this scenario
70    pub scenario: String,
71}
72
73/// A structure that stores the result of a propagation segment of a Monte Carlo.
74pub struct PropResult<S: Interpolatable>
75where
76    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
77    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
78{
79    pub state: S,
80    pub traj: Traj<S>,
81}
82
83impl<S: Interpolatable> Results<S, PropResult<S>>
84where
85    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
86    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
87{
88    /// Returns the value of the requested state parameter for all trajectories from `start` to `end` every `step` and
89    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
90    pub fn every_value_of_between(
91        &self,
92        param: StateParameter,
93        step: Duration,
94        start: Epoch,
95        end: Epoch,
96        value_if_run_failed: Option<f64>,
97    ) -> Vec<f64> {
98        let mut report = Vec::with_capacity(self.runs.len());
99        for run in &self.runs {
100            match &run.result {
101                Ok(r) => {
102                    for state in r.traj.every_between(step, start, end) {
103                        match state.value(param) {
104                            Ok(val) => report.push(val),
105                            Err(e) => match value_if_run_failed {
106                                Some(val) => report.push(val),
107                                None => {
108                                    warn!("run #{}: {}, skipping {} in report", run.index, e, param)
109                                }
110                            },
111                        }
112                    }
113                }
114                Err(e) => match value_if_run_failed {
115                    Some(val) => report.push(val),
116                    None => warn!(
117                        "run #{} failed with {}, skipping {} in report",
118                        run.index, e, param
119                    ),
120                },
121            }
122        }
123        report
124    }
125
126    /// Returns the value of the requested state parameter for all trajectories from the start to the end of each trajectory and
127    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
128    pub fn every_value_of(
129        &self,
130        param: StateParameter,
131        step: Duration,
132        value_if_run_failed: Option<f64>,
133    ) -> Vec<f64> {
134        let mut report = Vec::with_capacity(self.runs.len());
135        for run in &self.runs {
136            match &run.result {
137                Ok(r) => {
138                    for state in r.traj.every(step) {
139                        match state.value(param) {
140                            Ok(val) => report.push(val),
141                            Err(e) => match value_if_run_failed {
142                                Some(val) => report.push(val),
143                                None => {
144                                    warn!("run #{}: {}, skipping {} in report", run.index, e, param)
145                                }
146                            },
147                        }
148                    }
149                }
150                Err(e) => match value_if_run_failed {
151                    Some(val) => report.push(val),
152                    None => warn!(
153                        "run #{} failed with {}, skipping {} in report",
154                        run.index, e, param
155                    ),
156                },
157            }
158        }
159        report
160    }
161
162    /// Returns the value of the requested state parameter for the first state and
163    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
164    pub fn first_values_of(
165        &self,
166        param: StateParameter,
167        value_if_run_failed: Option<f64>,
168    ) -> Vec<f64> {
169        let mut report = Vec::with_capacity(self.runs.len());
170        for run in &self.runs {
171            match &run.result {
172                Ok(r) => match r.traj.first().value(param) {
173                    Ok(val) => report.push(val),
174                    Err(e) => match value_if_run_failed {
175                        Some(val) => report.push(val),
176                        None => {
177                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
178                        }
179                    },
180                },
181                Err(e) => match value_if_run_failed {
182                    Some(val) => report.push(val),
183                    None => warn!(
184                        "run #{} failed with {}, skipping {} in report",
185                        run.index, e, param
186                    ),
187                },
188            }
189        }
190        report
191    }
192
193    /// Returns the value of the requested state parameter for the first state and
194    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
195    pub fn last_values_of(
196        &self,
197        param: StateParameter,
198        value_if_run_failed: Option<f64>,
199    ) -> Vec<f64> {
200        let mut report = Vec::with_capacity(self.runs.len());
201        for run in &self.runs {
202            match &run.result {
203                Ok(r) => match r.traj.last().value(param) {
204                    Ok(val) => report.push(val),
205                    Err(e) => match value_if_run_failed {
206                        Some(val) => report.push(val),
207                        None => {
208                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
209                        }
210                    },
211                },
212                Err(e) => match value_if_run_failed {
213                    Some(val) => report.push(val),
214                    None => warn!(
215                        "run #{} failed with {}, skipping {} in report",
216                        run.index, e, param
217                    ),
218                },
219            }
220        }
221        report
222    }
223
224    /// Returns the dispersion values of the requested state parameter
225    pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
226        let mut report = Vec::with_capacity(self.runs.len());
227        'run_loop: for run in &self.runs {
228            for (dparam, val) in &run.dispersed_state.actual_dispersions {
229                if dparam == &param {
230                    report.push(*val);
231                    continue 'run_loop;
232                }
233            }
234            // Oh, this parameter was not found!
235            return Err(MonteCarloError::StateError {
236                source: StateError::Unavailable { param },
237            });
238        }
239        Ok(report)
240    }
241
242    pub fn to_parquet<P: AsRef<Path>>(
243        &self,
244        path: P,
245        events: Option<Vec<&dyn EventEvaluator<S>>>,
246        cfg: ExportCfg,
247        almanac: Arc<Almanac>,
248    ) -> Result<PathBuf, Box<dyn Error>> {
249        let tick = Epoch::now().unwrap();
250        info!("Exporting Monte Carlo results to parquet file...");
251
252        // Grab the path here before we move stuff.
253        let path_buf = cfg.actual_path(path);
254
255        // Build the schema
256        let mut hdrs = vec![
257            Field::new("Epoch (UTC)", DataType::Utf8, false),
258            Field::new("Monte Carlo Run Index", DataType::Int32, false),
259        ];
260
261        // Use the first successful run to build up some data shared for all
262        let mut frame = EARTH_J2000;
263        let mut fields = match cfg.fields {
264            Some(fields) => fields,
265            None => S::export_params(),
266        };
267
268        let mut start = None;
269        let mut end = None;
270
271        // Literally all of the states of all the successful runs.
272        let mut all_states: Vec<S> = vec![];
273        let mut run_indexes: Vec<i32> = vec![];
274
275        for run in &self.runs {
276            if let Ok(success) = &run.result {
277                if start.is_none() {
278                    // No need to check other states.
279                    frame = success.state.frame();
280
281                    // Check that we can retrieve this information
282                    fields.retain(|param| success.state.value(*param).is_ok());
283
284                    start = Some(success.traj.first().epoch());
285                    end = Some(success.state.epoch());
286                }
287
288                // Build the states iterator.
289                let states =
290                    if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
291                        // Must interpolate the data!
292                        let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
293                        let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
294                        let step = cfg.step.unwrap_or_else(|| 1.minutes());
295                        success
296                            .traj
297                            .every_between(step, start, end)
298                            .collect::<Vec<S>>()
299                    } else {
300                        success.traj.states.to_vec()
301                    };
302                // Mark all of these states as part of this run index.
303                for _ in 0..states.len() {
304                    run_indexes.push(run.index as i32);
305                }
306                all_states.extend(states.iter());
307            }
308        }
309
310        ensure!(
311            start.is_some(),
312            NoSuccessfulRunsSnafu {
313                action: "export",
314                num_runs: self.runs.len()
315            }
316        );
317
318        let more_meta = Some(vec![(
319            "Frame".to_string(),
320            serde_dhall::serialize(&frame).to_string().map_err(|e| {
321                Box::new(InputOutputError::SerializeDhall {
322                    what: format!("frame `{frame}`"),
323                    err: e.to_string(),
324                })
325            })?,
326        )]);
327
328        for field in &fields {
329            hdrs.push(field.to_field(more_meta.clone()));
330        }
331
332        if let Some(events) = events.as_ref() {
333            for event in events {
334                let field = Field::new(format!("{event}"), DataType::Float64, false);
335                hdrs.push(field);
336            }
337        }
338
339        // Build the schema
340        let schema = Arc::new(Schema::new(hdrs));
341        let mut record: Vec<Arc<dyn Array>> = Vec::new();
342
343        // Build all of the records
344
345        // Epochs
346        let mut utc_epoch = StringBuilder::new();
347        let mut idx_col = Int32Builder::new();
348        for (sno, s) in all_states.iter().enumerate() {
349            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
350
351            // Copy this a bunch of times because all columns must have the same length
352            idx_col.append_value(run_indexes[sno]);
353        }
354        record.push(Arc::new(utc_epoch.finish()));
355        record.push(Arc::new(idx_col.finish()));
356
357        // Add all of the fields
358        for field in fields {
359            if field == StateParameter::GuidanceMode {
360                let mut guid_mode = StringBuilder::new();
361                for s in &all_states {
362                    guid_mode
363                        .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
364                }
365                record.push(Arc::new(guid_mode.finish()));
366            } else {
367                let mut data = Float64Builder::new();
368                for s in &all_states {
369                    data.append_value(s.value(field).unwrap());
370                }
371                record.push(Arc::new(data.finish()));
372            }
373        }
374
375        info!(
376            "Serialized {} states from {} to {}",
377            all_states.len(),
378            start.unwrap(),
379            end.unwrap()
380        );
381
382        // Add all of the evaluated events
383        if let Some(events) = events {
384            info!("Evaluating {} event(s)", events.len());
385            for event in events {
386                let mut data = Float64Builder::new();
387                for s in &all_states {
388                    data.append_value(event.eval(s, almanac.clone()).map_err(Box::new)?);
389                }
390                record.push(Arc::new(data.finish()));
391            }
392        }
393
394        // Serialize all of the devices and add that to the parquet file too.
395        let mut metadata = HashMap::new();
396        metadata.insert(
397            "Purpose".to_string(),
398            "Monte Carlo Trajectory data".to_string(),
399        );
400        if let Some(add_meta) = cfg.metadata {
401            for (k, v) in add_meta {
402                metadata.insert(k, v);
403            }
404        }
405
406        let props = pq_writer(Some(metadata));
407
408        let file = File::create(&path_buf)?;
409        let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
410
411        let batch = RecordBatch::try_new(schema, record)?;
412        writer.write(&batch)?;
413        writer.close()?;
414
415        // Return the path this was written to
416        let tock_time = Epoch::now().unwrap() - tick;
417        info!(
418            "Trajectory written to {} in {tock_time}",
419            path_buf.display()
420        );
421        Ok(path_buf)
422    }
423}