Skip to main content

nyx_space/mc/
results.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use log::{info, warn};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fs::File;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
27use crate::io::watermark::pq_writer;
28use crate::io::{ExportCfg, InputOutputError};
29use crate::linalg::allocator::Allocator;
30use crate::linalg::DefaultAllocator;
31use crate::md::prelude::GuidanceMode;
32use crate::md::trajectory::{Interpolatable, Traj};
33use crate::md::StateParameter;
34use crate::propagators::PropagationError;
35use crate::time::{Duration, Epoch, TimeUnits};
36use anise::constants::frames::EARTH_J2000;
37use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
38use arrow::datatypes::{DataType, Field, Schema};
39use arrow::record_batch::RecordBatch;
40use hifitime::TimeScale;
41use parquet::arrow::ArrowWriter;
42pub use rstats::Stats;
43use snafu::ensure;
44
45use super::DispersedState;
46
47/// A structure storing the result of a single Monte Carlo run
48pub struct Run<S: Interpolatable, R>
49where
50    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
51    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
52{
53    /// The index of this run
54    pub index: usize,
55    /// The original dispersed state
56    pub dispersed_state: DispersedState<S>,
57    /// The result from this run
58    pub result: Result<R, PropagationError>,
59}
60
61/// A structure of Monte Carlo results
62pub struct Results<S: Interpolatable, R>
63where
64    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
65    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
66{
67    /// Raw data from each run, sorted by run index for O(1) access to each run
68    pub runs: Vec<Run<S, R>>,
69    /// Name of this scenario
70    pub scenario: String,
71}
72
73/// A structure that stores the result of a propagation segment of a Monte Carlo.
74pub struct PropResult<S: Interpolatable>
75where
76    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
77    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
78{
79    pub state: S,
80    pub traj: Traj<S>,
81}
82
83impl<S: Interpolatable> Results<S, PropResult<S>>
84where
85    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
86    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
87{
88    /// Returns the value of the requested state parameter for all trajectories from `start` to `end` every `step` and
89    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
90    pub fn every_value_of_between(
91        &self,
92        param: StateParameter,
93        step: Duration,
94        start: Epoch,
95        end: Epoch,
96        value_if_run_failed: Option<f64>,
97    ) -> Vec<f64> {
98        let mut report = Vec::with_capacity(self.runs.len());
99        for run in &self.runs {
100            match &run.result {
101                Ok(r) => {
102                    for state in r.traj.every_between(step, start, end) {
103                        match state.value(param) {
104                            Ok(val) => report.push(val),
105                            Err(e) => match value_if_run_failed {
106                                Some(val) => report.push(val),
107                                None => {
108                                    info!("run #{}: {}, skipping {} in report", run.index, e, param)
109                                }
110                            },
111                        }
112                    }
113                }
114                Err(e) => match value_if_run_failed {
115                    Some(val) => report.push(val),
116                    None => warn!(
117                        "run #{} failed with {}, skipping {} in report",
118                        run.index, e, param
119                    ),
120                },
121            }
122        }
123        report
124    }
125
126    /// Returns the value of the requested state parameter for all trajectories from the start to the end of each trajectory and
127    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
128    pub fn every_value_of(
129        &self,
130        param: StateParameter,
131        step: Duration,
132        value_if_run_failed: Option<f64>,
133    ) -> Vec<f64> {
134        let mut report = Vec::with_capacity(self.runs.len());
135        for run in &self.runs {
136            match &run.result {
137                Ok(r) => {
138                    for state in r.traj.every(step) {
139                        match state.value(param) {
140                            Ok(val) => report.push(val),
141                            Err(e) => match value_if_run_failed {
142                                Some(val) => report.push(val),
143                                None => {
144                                    warn!("run #{}: {}, skipping {} in report", run.index, e, param)
145                                }
146                            },
147                        }
148                    }
149                }
150                Err(e) => match value_if_run_failed {
151                    Some(val) => report.push(val),
152                    None => warn!(
153                        "run #{} failed with {}, skipping {} in report",
154                        run.index, e, param
155                    ),
156                },
157            }
158        }
159        report
160    }
161
162    /// Returns the value of the requested state parameter for the first state and
163    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
164    pub fn first_values_of(
165        &self,
166        param: StateParameter,
167        value_if_run_failed: Option<f64>,
168    ) -> Vec<f64> {
169        let mut report = Vec::with_capacity(self.runs.len());
170        for run in &self.runs {
171            match &run.result {
172                Ok(r) => match r.traj.first().value(param) {
173                    Ok(val) => report.push(val),
174                    Err(e) => match value_if_run_failed {
175                        Some(val) => report.push(val),
176                        None => {
177                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
178                        }
179                    },
180                },
181                Err(e) => match value_if_run_failed {
182                    Some(val) => report.push(val),
183                    None => warn!(
184                        "run #{} failed with {}, skipping {} in report",
185                        run.index, e, param
186                    ),
187                },
188            }
189        }
190        report
191    }
192
193    /// Returns the value of the requested state parameter for the first state and
194    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
195    pub fn last_values_of(
196        &self,
197        param: StateParameter,
198        value_if_run_failed: Option<f64>,
199    ) -> Vec<f64> {
200        let mut report = Vec::with_capacity(self.runs.len());
201        for run in &self.runs {
202            match &run.result {
203                Ok(r) => match r.traj.last().value(param) {
204                    Ok(val) => report.push(val),
205                    Err(e) => match value_if_run_failed {
206                        Some(val) => report.push(val),
207                        None => {
208                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
209                        }
210                    },
211                },
212                Err(e) => match value_if_run_failed {
213                    Some(val) => report.push(val),
214                    None => warn!(
215                        "run #{} failed with {}, skipping {} in report",
216                        run.index, e, param
217                    ),
218                },
219            }
220        }
221        report
222    }
223
224    /// Returns the dispersion values of the requested state parameter
225    pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
226        let mut report = Vec::with_capacity(self.runs.len());
227        'run_loop: for run in &self.runs {
228            for (dparam, val) in &run.dispersed_state.actual_dispersions {
229                if dparam == &param {
230                    report.push(*val);
231                    continue 'run_loop;
232                }
233            }
234            // Oh, this parameter was not found!
235            return Err(MonteCarloError::StateError {
236                source: StateError::Unavailable { param },
237            });
238        }
239        Ok(report)
240    }
241
242    pub fn to_parquet<P: AsRef<Path>>(
243        &self,
244        path: P,
245        cfg: ExportCfg,
246    ) -> Result<PathBuf, Box<dyn Error>> {
247        let tick = Epoch::now().unwrap();
248        info!("Exporting Monte Carlo results to parquet file...");
249
250        // Grab the path here before we move stuff.
251        let path_buf = cfg.actual_path(path);
252
253        // Build the schema
254        let mut hdrs = vec![
255            Field::new("Epoch (UTC)", DataType::Utf8, false),
256            Field::new("Monte Carlo Run Index", DataType::Int32, false),
257        ];
258
259        // Use the first successful run to build up some data shared for all
260        let mut frame = EARTH_J2000;
261        let mut fields = match cfg.fields {
262            Some(fields) => fields,
263            None => S::export_params(),
264        };
265
266        let mut start = None;
267        let mut end = None;
268
269        // Literally all of the states of all the successful runs.
270        let mut all_states: Vec<S> = vec![];
271        let mut run_indexes: Vec<i32> = vec![];
272
273        for run in &self.runs {
274            if let Ok(success) = &run.result {
275                if start.is_none() {
276                    // No need to check other states.
277                    frame = success.state.frame();
278
279                    // Check that we can retrieve this information
280                    fields.retain(|param| success.state.value(*param).is_ok());
281
282                    start = Some(success.traj.first().epoch());
283                    end = Some(success.state.epoch());
284                }
285
286                // Build the states iterator.
287                let states =
288                    if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
289                        // Must interpolate the data!
290                        let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
291                        let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
292                        let step = cfg.step.unwrap_or_else(|| 1.minutes());
293                        success
294                            .traj
295                            .every_between(step, start, end)
296                            .collect::<Vec<S>>()
297                    } else {
298                        success.traj.states.to_vec()
299                    };
300                // Mark all of these states as part of this run index.
301                for _ in 0..states.len() {
302                    run_indexes.push(run.index as i32);
303                }
304                all_states.extend(states.iter());
305            }
306        }
307
308        ensure!(
309            start.is_some(),
310            NoSuccessfulRunsSnafu {
311                action: "export",
312                num_runs: self.runs.len()
313            }
314        );
315
316        let more_meta = Some(vec![(
317            "Frame".to_string(),
318            serde_dhall::serialize(&frame)
319                .static_type_annotation()
320                .to_string()
321                .map_err(|e| {
322                    Box::new(InputOutputError::SerializeDhall {
323                        what: format!("frame `{frame}`"),
324                        err: e.to_string(),
325                    })
326                })?,
327        )]);
328
329        for field in &fields {
330            hdrs.push(field.to_field(more_meta.clone()));
331        }
332
333        // Build the schema
334        let schema = Arc::new(Schema::new(hdrs));
335        let mut record: Vec<Arc<dyn Array>> = Vec::new();
336
337        // Build all of the records
338
339        // Epochs
340        let mut utc_epoch = StringBuilder::new();
341        let mut idx_col = Int32Builder::new();
342        for (sno, s) in all_states.iter().enumerate() {
343            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
344
345            // Copy this a bunch of times because all columns must have the same length
346            idx_col.append_value(run_indexes[sno]);
347        }
348        record.push(Arc::new(utc_epoch.finish()));
349        record.push(Arc::new(idx_col.finish()));
350
351        // Add all of the fields
352        for field in fields {
353            if field == StateParameter::GuidanceMode {
354                let mut guid_mode = StringBuilder::new();
355                for s in &all_states {
356                    guid_mode
357                        .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
358                }
359                record.push(Arc::new(guid_mode.finish()));
360            } else {
361                let mut data = Float64Builder::new();
362                for s in &all_states {
363                    data.append_value(s.value(field).unwrap());
364                }
365                record.push(Arc::new(data.finish()));
366            }
367        }
368
369        info!(
370            "Serialized {} states from {} to {}",
371            all_states.len(),
372            start.unwrap(),
373            end.unwrap()
374        );
375
376        // Serialize all of the devices and add that to the parquet file too.
377        let mut metadata = HashMap::new();
378        metadata.insert(
379            "Purpose".to_string(),
380            "Monte Carlo Trajectory data".to_string(),
381        );
382        if let Some(add_meta) = cfg.metadata {
383            for (k, v) in add_meta {
384                metadata.insert(k, v);
385            }
386        }
387
388        let props = pq_writer(Some(metadata));
389
390        let file = File::create(&path_buf)?;
391        let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
392
393        let batch = RecordBatch::try_new(schema, record)?;
394        writer.write(&batch)?;
395        writer.close()?;
396
397        // Return the path this was written to
398        let tock_time = Epoch::now().unwrap() - tick;
399        info!(
400            "Trajectory written to {} in {tock_time}",
401            path_buf.display()
402        );
403        Ok(path_buf)
404    }
405}