nyx_space/mc/
results.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use log::{info, warn};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fs::File;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26use crate::errors::{MonteCarloError, NoSuccessfulRunsSnafu, StateError};
27use crate::io::watermark::pq_writer;
28use crate::io::{ExportCfg, InputOutputError};
29use crate::linalg::allocator::Allocator;
30use crate::linalg::DefaultAllocator;
31use crate::md::prelude::GuidanceMode;
32use crate::md::trajectory::{Interpolatable, Traj};
33use crate::md::{EventEvaluator, StateParameter};
34use crate::propagators::PropagationError;
35use crate::time::{Duration, Epoch, TimeUnits};
36use anise::almanac::Almanac;
37use anise::constants::frames::EARTH_J2000;
38use arrow::array::{Array, Float64Builder, Int32Builder, StringBuilder};
39use arrow::datatypes::{DataType, Field, Schema};
40use arrow::record_batch::RecordBatch;
41use hifitime::TimeScale;
42use parquet::arrow::ArrowWriter;
43pub use rstats::Stats;
44use snafu::ensure;
45
46use super::DispersedState;
47
48/// A structure storing the result of a single Monte Carlo run
49pub struct Run<S: Interpolatable, R>
50where
51    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
52    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
53{
54    /// The index of this run
55    pub index: usize,
56    /// The original dispersed state
57    pub dispersed_state: DispersedState<S>,
58    /// The result from this run
59    pub result: Result<R, PropagationError>,
60}
61
62/// A structure of Monte Carlo results
63pub struct Results<S: Interpolatable, R>
64where
65    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
66    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
67{
68    /// Raw data from each run, sorted by run index for O(1) access to each run
69    pub runs: Vec<Run<S, R>>,
70    /// Name of this scenario
71    pub scenario: String,
72}
73
74/// A structure that stores the result of a propagation segment of a Monte Carlo.
75pub struct PropResult<S: Interpolatable>
76where
77    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
78    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
79{
80    pub state: S,
81    pub traj: Traj<S>,
82}
83
84impl<S: Interpolatable> Results<S, PropResult<S>>
85where
86    DefaultAllocator: Allocator<S::Size> + Allocator<S::Size, S::Size> + Allocator<S::VecLength>,
87    <DefaultAllocator as Allocator<S::VecLength>>::Buffer<f64>: Send,
88{
89    /// Returns the value of the requested state parameter for all trajectories from `start` to `end` every `step` and
90    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
91    pub fn every_value_of_between(
92        &self,
93        param: StateParameter,
94        step: Duration,
95        start: Epoch,
96        end: Epoch,
97        value_if_run_failed: Option<f64>,
98    ) -> Vec<f64> {
99        let mut report = Vec::with_capacity(self.runs.len());
100        for run in &self.runs {
101            match &run.result {
102                Ok(r) => {
103                    for state in r.traj.every_between(step, start, end) {
104                        match state.value(param) {
105                            Ok(val) => report.push(val),
106                            Err(e) => match value_if_run_failed {
107                                Some(val) => report.push(val),
108                                None => {
109                                    info!("run #{}: {}, skipping {} in report", run.index, e, param)
110                                }
111                            },
112                        }
113                    }
114                }
115                Err(e) => match value_if_run_failed {
116                    Some(val) => report.push(val),
117                    None => warn!(
118                        "run #{} failed with {}, skipping {} in report",
119                        run.index, e, param
120                    ),
121                },
122            }
123        }
124        report
125    }
126
127    /// Returns the value of the requested state parameter for all trajectories from the start to the end of each trajectory and
128    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
129    pub fn every_value_of(
130        &self,
131        param: StateParameter,
132        step: Duration,
133        value_if_run_failed: Option<f64>,
134    ) -> Vec<f64> {
135        let mut report = Vec::with_capacity(self.runs.len());
136        for run in &self.runs {
137            match &run.result {
138                Ok(r) => {
139                    for state in r.traj.every(step) {
140                        match state.value(param) {
141                            Ok(val) => report.push(val),
142                            Err(e) => match value_if_run_failed {
143                                Some(val) => report.push(val),
144                                None => {
145                                    warn!("run #{}: {}, skipping {} in report", run.index, e, param)
146                                }
147                            },
148                        }
149                    }
150                }
151                Err(e) => match value_if_run_failed {
152                    Some(val) => report.push(val),
153                    None => warn!(
154                        "run #{} failed with {}, skipping {} in report",
155                        run.index, e, param
156                    ),
157                },
158            }
159        }
160        report
161    }
162
163    /// Returns the value of the requested state parameter for the first state and
164    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
165    pub fn first_values_of(
166        &self,
167        param: StateParameter,
168        value_if_run_failed: Option<f64>,
169    ) -> Vec<f64> {
170        let mut report = Vec::with_capacity(self.runs.len());
171        for run in &self.runs {
172            match &run.result {
173                Ok(r) => match r.traj.first().value(param) {
174                    Ok(val) => report.push(val),
175                    Err(e) => match value_if_run_failed {
176                        Some(val) => report.push(val),
177                        None => {
178                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
179                        }
180                    },
181                },
182                Err(e) => match value_if_run_failed {
183                    Some(val) => report.push(val),
184                    None => warn!(
185                        "run #{} failed with {}, skipping {} in report",
186                        run.index, e, param
187                    ),
188                },
189            }
190        }
191        report
192    }
193
194    /// Returns the value of the requested state parameter for the first state and
195    /// using the value of `value_if_run_failed` if set and skipping that run if the run failed
196    pub fn last_values_of(
197        &self,
198        param: StateParameter,
199        value_if_run_failed: Option<f64>,
200    ) -> Vec<f64> {
201        let mut report = Vec::with_capacity(self.runs.len());
202        for run in &self.runs {
203            match &run.result {
204                Ok(r) => match r.traj.last().value(param) {
205                    Ok(val) => report.push(val),
206                    Err(e) => match value_if_run_failed {
207                        Some(val) => report.push(val),
208                        None => {
209                            warn!("run #{}: {}, skipping {} in report", run.index, e, param)
210                        }
211                    },
212                },
213                Err(e) => match value_if_run_failed {
214                    Some(val) => report.push(val),
215                    None => warn!(
216                        "run #{} failed with {}, skipping {} in report",
217                        run.index, e, param
218                    ),
219                },
220            }
221        }
222        report
223    }
224
225    /// Returns the dispersion values of the requested state parameter
226    pub fn dispersion_values_of(&self, param: StateParameter) -> Result<Vec<f64>, MonteCarloError> {
227        let mut report = Vec::with_capacity(self.runs.len());
228        'run_loop: for run in &self.runs {
229            for (dparam, val) in &run.dispersed_state.actual_dispersions {
230                if dparam == &param {
231                    report.push(*val);
232                    continue 'run_loop;
233                }
234            }
235            // Oh, this parameter was not found!
236            return Err(MonteCarloError::StateError {
237                source: StateError::Unavailable { param },
238            });
239        }
240        Ok(report)
241    }
242
243    pub fn to_parquet<P: AsRef<Path>>(
244        &self,
245        path: P,
246        events: Option<Vec<&dyn EventEvaluator<S>>>,
247        cfg: ExportCfg,
248        almanac: Arc<Almanac>,
249    ) -> Result<PathBuf, Box<dyn Error>> {
250        let tick = Epoch::now().unwrap();
251        info!("Exporting Monte Carlo results to parquet file...");
252
253        // Grab the path here before we move stuff.
254        let path_buf = cfg.actual_path(path);
255
256        // Build the schema
257        let mut hdrs = vec![
258            Field::new("Epoch (UTC)", DataType::Utf8, false),
259            Field::new("Monte Carlo Run Index", DataType::Int32, false),
260        ];
261
262        // Use the first successful run to build up some data shared for all
263        let mut frame = EARTH_J2000;
264        let mut fields = match cfg.fields {
265            Some(fields) => fields,
266            None => S::export_params(),
267        };
268
269        let mut start = None;
270        let mut end = None;
271
272        // Literally all of the states of all the successful runs.
273        let mut all_states: Vec<S> = vec![];
274        let mut run_indexes: Vec<i32> = vec![];
275
276        for run in &self.runs {
277            if let Ok(success) = &run.result {
278                if start.is_none() {
279                    // No need to check other states.
280                    frame = success.state.frame();
281
282                    // Check that we can retrieve this information
283                    fields.retain(|param| success.state.value(*param).is_ok());
284
285                    start = Some(success.traj.first().epoch());
286                    end = Some(success.state.epoch());
287                }
288
289                // Build the states iterator.
290                let states =
291                    if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
292                        // Must interpolate the data!
293                        let start = cfg.start_epoch.unwrap_or_else(|| start.unwrap());
294                        let end = cfg.end_epoch.unwrap_or_else(|| end.unwrap());
295                        let step = cfg.step.unwrap_or_else(|| 1.minutes());
296                        success
297                            .traj
298                            .every_between(step, start, end)
299                            .collect::<Vec<S>>()
300                    } else {
301                        success.traj.states.to_vec()
302                    };
303                // Mark all of these states as part of this run index.
304                for _ in 0..states.len() {
305                    run_indexes.push(run.index as i32);
306                }
307                all_states.extend(states.iter());
308            }
309        }
310
311        ensure!(
312            start.is_some(),
313            NoSuccessfulRunsSnafu {
314                action: "export",
315                num_runs: self.runs.len()
316            }
317        );
318
319        let more_meta = Some(vec![(
320            "Frame".to_string(),
321            serde_dhall::serialize(&frame)
322                .static_type_annotation()
323                .to_string()
324                .map_err(|e| {
325                    Box::new(InputOutputError::SerializeDhall {
326                        what: format!("frame `{frame}`"),
327                        err: e.to_string(),
328                    })
329                })?,
330        )]);
331
332        for field in &fields {
333            hdrs.push(field.to_field(more_meta.clone()));
334        }
335
336        if let Some(events) = events.as_ref() {
337            for event in events {
338                let field = Field::new(format!("{event}"), DataType::Float64, false);
339                hdrs.push(field);
340            }
341        }
342
343        // Build the schema
344        let schema = Arc::new(Schema::new(hdrs));
345        let mut record: Vec<Arc<dyn Array>> = Vec::new();
346
347        // Build all of the records
348
349        // Epochs
350        let mut utc_epoch = StringBuilder::new();
351        let mut idx_col = Int32Builder::new();
352        for (sno, s) in all_states.iter().enumerate() {
353            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
354
355            // Copy this a bunch of times because all columns must have the same length
356            idx_col.append_value(run_indexes[sno]);
357        }
358        record.push(Arc::new(utc_epoch.finish()));
359        record.push(Arc::new(idx_col.finish()));
360
361        // Add all of the fields
362        for field in fields {
363            if field == StateParameter::GuidanceMode {
364                let mut guid_mode = StringBuilder::new();
365                for s in &all_states {
366                    guid_mode
367                        .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
368                }
369                record.push(Arc::new(guid_mode.finish()));
370            } else {
371                let mut data = Float64Builder::new();
372                for s in &all_states {
373                    data.append_value(s.value(field).unwrap());
374                }
375                record.push(Arc::new(data.finish()));
376            }
377        }
378
379        info!(
380            "Serialized {} states from {} to {}",
381            all_states.len(),
382            start.unwrap(),
383            end.unwrap()
384        );
385
386        // Add all of the evaluated events
387        if let Some(events) = events {
388            info!("Evaluating {} event(s)", events.len());
389            for event in events {
390                let mut data = Float64Builder::new();
391                for s in &all_states {
392                    data.append_value(event.eval(s, almanac.clone()).map_err(Box::new)?);
393                }
394                record.push(Arc::new(data.finish()));
395            }
396        }
397
398        // Serialize all of the devices and add that to the parquet file too.
399        let mut metadata = HashMap::new();
400        metadata.insert(
401            "Purpose".to_string(),
402            "Monte Carlo Trajectory data".to_string(),
403        );
404        if let Some(add_meta) = cfg.metadata {
405            for (k, v) in add_meta {
406                metadata.insert(k, v);
407            }
408        }
409
410        let props = pq_writer(Some(metadata));
411
412        let file = File::create(&path_buf)?;
413        let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
414
415        let batch = RecordBatch::try_new(schema, record)?;
416        writer.write(&batch)?;
417        writer.close()?;
418
419        // Return the path this was written to
420        let tock_time = Epoch::now().unwrap() - tick;
421        info!(
422            "Trajectory written to {} in {tock_time}",
423            path_buf.display()
424        );
425        Ok(path_buf)
426    }
427}