nyx_space/od/process/solution/
export.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use log::{info, warn};
33use msr::sensitivity::TrackerSensitivity;
34use nalgebra::Const;
35use parquet::arrow::ArrowWriter;
36use snafu::prelude::*;
37use std::collections::HashMap;
38use std::fs::File;
39use std::path::{Path, PathBuf};
40
41use super::ODSolution;
42
43impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
44    ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
45where
46    DefaultAllocator: Allocator<MsrSize>
47        + Allocator<MsrSize, <Spacecraft as State>::Size>
48        + Allocator<Const<1>, MsrSize>
49        + Allocator<<Spacecraft as State>::Size>
50        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
51        + Allocator<MsrSize, MsrSize>
52        + Allocator<MsrSize, <Spacecraft as State>::Size>
53        + Allocator<<Spacecraft as State>::Size, MsrSize>
54        + Allocator<<Spacecraft as State>::Size>
55        + Allocator<<Spacecraft as State>::VecLength>
56        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
57{
58    /// Store the estimates and residuals in a parquet file
59    pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
60        ensure!(
61            !self.estimates.is_empty(),
62            TooFewMeasurementsSnafu {
63                need: 1_usize,
64                action: "exporting OD results"
65            }
66        );
67
68        if self.estimates.len() != self.residuals.len() {
69            return Err(ODError::ODConfigError {
70                source: ConfigError::InvalidConfig {
71                    msg: format!(
72                        "Estimates ({}) and residuals ({}) are not aligned.",
73                        self.estimates.len(),
74                        self.residuals.len()
75                    ),
76                },
77            });
78        }
79
80        if self.estimates.len() != self.gains.len() {
81            return Err(ODError::ODConfigError {
82                source: ConfigError::InvalidConfig {
83                    msg: format!(
84                        "Estimates ({}) and filter gains ({}) are not aligned.",
85                        self.estimates.len(),
86                        self.gains.len()
87                    ),
88                },
89            });
90        }
91
92        if self.estimates.len() != self.filter_smoother_ratios.len() {
93            return Err(ODError::ODConfigError {
94                source: ConfigError::InvalidConfig {
95                    msg: format!(
96                        "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
97                        self.estimates.len(),
98                        self.filter_smoother_ratios.len()
99                    ),
100                },
101            });
102        }
103
104        let tick = Epoch::now().unwrap();
105        info!("Exporting orbit determination result to parquet file...");
106
107        if cfg.step.is_some() {
108            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
109        }
110
111        // Grab the path here before we move stuff.
112        let path_buf = cfg.actual_path(path);
113
114        // Build the schema
115        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
116
117        let frame = self.estimates[0].state().frame();
118
119        let more_meta = Some(vec![
120            (
121                "Frame".to_string(),
122                serde_dhall::serialize(&frame)
123                    .static_type_annotation()
124                    .to_string()
125                    .map_err(|e| ODError::ODIOError {
126                        source: InputOutputError::SerializeDhall {
127                            what: format!("frame `{frame}`"),
128                            err: e.to_string(),
129                        },
130                    })?,
131            ),
132            (
133                "SRP Area (m2)".to_string(),
134                self.estimates
135                    .first()
136                    .as_ref()
137                    .unwrap()
138                    .nominal_state()
139                    .srp
140                    .area_m2
141                    .to_string(),
142            ),
143            (
144                "Drag Area (m2)".to_string(),
145                self.estimates
146                    .first()
147                    .as_ref()
148                    .unwrap()
149                    .nominal_state()
150                    .drag
151                    .area_m2
152                    .to_string(),
153            ),
154        ]);
155
156        let mut fields = match cfg.fields {
157            Some(fields) => fields,
158            None => Spacecraft::export_params(),
159        };
160
161        // Check that we can retrieve this information
162        fields.retain(|param| match self.estimates[0].state().value(*param) {
163            Ok(_) => param != &StateParameter::GuidanceMode,
164            Err(_) => false,
165        });
166
167        for field in &fields {
168            hdrs.push(field.to_field(more_meta.clone()));
169        }
170
171        let mut sigma_fields = fields.clone();
172        // Check that we can retrieve this information
173        sigma_fields.retain(|param| {
174            !matches!(
175                param,
176                &StateParameter::X
177                    | &StateParameter::Y
178                    | &StateParameter::Z
179                    | &StateParameter::VX
180                    | &StateParameter::VY
181                    | &StateParameter::VZ
182            ) && self.estimates[0].sigma_for(*param).is_ok()
183        });
184
185        for field in &sigma_fields {
186            hdrs.push(field.to_cov_field(more_meta.clone()));
187        }
188
189        let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
190        let state_units = [
191            "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
192        ];
193        let mut cov_units = vec![];
194
195        for i in 0..state_items.len() {
196            for j in i..state_items.len() {
197                let cov_unit = if i < 3 {
198                    if j < 3 {
199                        "km^2"
200                    } else if (3..6).contains(&j) {
201                        "km^2/s"
202                    } else if j == 8 {
203                        "km*kg"
204                    } else {
205                        "km"
206                    }
207                } else if (3..6).contains(&i) {
208                    if (3..6).contains(&j) {
209                        "km^2/s^2"
210                    } else if j == 8 {
211                        "km/s*kg"
212                    } else {
213                        "km/s"
214                    }
215                } else if i == 8 || j == 8 {
216                    "kg^2"
217                } else {
218                    "unitless"
219                };
220
221                cov_units.push(cov_unit);
222            }
223        }
224
225        let est_size = <Spacecraft as State>::Size::dim();
226
227        let mut idx = 0;
228        for i in 0..state_items.len() {
229            for j in i..state_items.len() {
230                hdrs.push(Field::new(
231                    format!(
232                        "Covariance {}*{} ({frame:x}) ({})",
233                        state_items[i], state_items[j], cov_units[idx]
234                    ),
235                    DataType::Float64,
236                    false,
237                ));
238                idx += 1;
239            }
240        }
241
242        // Add the uncertainty in the integration frame
243        for (i, coord) in state_items.iter().enumerate() {
244            hdrs.push(Field::new(
245                format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
246                DataType::Float64,
247                false,
248            ));
249        }
250
251        // Add the position and velocity uncertainty in the RIC frame
252        for (i, coord) in state_items.iter().enumerate().take(6) {
253            hdrs.push(Field::new(
254                format!("Sigma {coord} (RIC) ({})", state_units[i]),
255                DataType::Float64,
256                false,
257            ));
258        }
259
260        // Add the fields of the residuals
261        let mut msr_fields = Vec::new();
262        for f in &self.measurement_types {
263            msr_fields.push(
264                f.to_field()
265                    .with_nullable(true)
266                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
267            );
268        }
269        for f in &self.measurement_types {
270            msr_fields.push(
271                f.to_field()
272                    .with_nullable(true)
273                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
274            );
275        }
276        for f in &self.measurement_types {
277            msr_fields.push(
278                f.to_field()
279                    .with_nullable(true)
280                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
281            );
282        }
283        for f in &self.measurement_types {
284            msr_fields.push(
285                f.to_field()
286                    .with_nullable(true)
287                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
288            );
289        }
290        for f in &self.measurement_types {
291            msr_fields.push(
292                f.to_field()
293                    .with_nullable(true)
294                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
295            );
296        }
297
298        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
299        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
300        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
301
302        hdrs.append(&mut msr_fields);
303
304        // Add the filter gain columns
305        if self.measurement_types.len() == MsrSize::DIM {
306            for i in 0..state_items.len() {
307                for f in &self.measurement_types {
308                    hdrs.push(Field::new(
309                        format!(
310                            "Gain {}*{f:?} ({}*{})",
311                            state_items[i],
312                            cov_units[i],
313                            f.unit()
314                        ),
315                        DataType::Float64,
316                        true,
317                    ));
318                }
319            }
320        } else {
321            for state_item in &state_items {
322                for j in 0..MsrSize::DIM {
323                    hdrs.push(Field::new(
324                        format!("Gain {state_item}*[{j}]"),
325                        DataType::Float64,
326                        true,
327                    ));
328                }
329            }
330        }
331
332        // Add the filter-smoother ratio columns
333        for i in 0..state_items.len() {
334            hdrs.push(Field::new(
335                format!(
336                    "Filter-smoother ratio {} ({})",
337                    state_items[i], cov_units[i],
338                ),
339                DataType::Float64,
340                true,
341            ));
342        }
343
344        // Build the schema
345        let schema = Arc::new(Schema::new(hdrs));
346        let mut record: Vec<Arc<dyn Array>> = Vec::new();
347
348        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
349        let (estimates, residuals) =
350            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
351                // Must interpolate the data!
352                let start = cfg
353                    .start_epoch
354                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
355                let end = cfg
356                    .end_epoch
357                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
358
359                let mut residuals: Vec<Option<Residual<MsrSize>>> =
360                    Vec::with_capacity(self.residuals.len());
361                let mut estimates = Vec::with_capacity(self.estimates.len());
362
363                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
364                    if estimate.epoch() >= start && estimate.epoch() <= end {
365                        estimates.push(*estimate);
366                        residuals.push(residual.clone());
367                    }
368                }
369
370                (estimates, residuals)
371            } else {
372                (self.estimates.to_vec(), self.residuals.to_vec())
373            };
374
375        // Build all of the records
376
377        // Epochs
378        let mut utc_epoch = StringBuilder::new();
379        for s in &estimates {
380            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
381        }
382        record.push(Arc::new(utc_epoch.finish()));
383
384        // Add all of the fields
385        for field in fields {
386            let mut data = Float64Builder::new();
387            for s in &estimates {
388                data.append_value(
389                    s.state()
390                        .value(field)
391                        .context(ODStateSnafu { action: "export" })?,
392                );
393            }
394            record.push(Arc::new(data.finish()));
395        }
396
397        // Add all of the 1-sigma uncertainties
398        for field in sigma_fields {
399            let mut data = Float64Builder::new();
400            for s in &estimates {
401                data.append_value(s.sigma_for(field).unwrap());
402            }
403            record.push(Arc::new(data.finish()));
404        }
405
406        // Add the 1-sigma covariance in the integration frame
407        for i in 0..est_size {
408            for j in i..est_size {
409                let mut data = Float64Builder::new();
410                for s in &estimates {
411                    data.append_value(s.covar()[(i, j)]);
412                }
413                record.push(Arc::new(data.finish()));
414            }
415        }
416
417        // Add the sigma/uncertainty in the integration frame
418        for i in 0..est_size {
419            let mut data = Float64Builder::new();
420            for s in &estimates {
421                data.append_value(s.covar()[(i, i)].sqrt());
422            }
423            record.push(Arc::new(data.finish()));
424        }
425
426        // Add the sigma/uncertainty covariance in the RIC frame
427        let mut ric_covariances = Vec::new();
428
429        for s in &estimates {
430            let dcm_ric2inertial = s
431                .state()
432                .orbit()
433                .dcm_from_ric_to_inertial()
434                .unwrap()
435                .state_dcm();
436
437            // Build the matrix view of the orbit part of the covariance.
438            let cov = s.covar();
439            let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
440
441            // Rotate back into the RIC frame
442            let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
443            ric_covariances.push(ric_covar);
444        }
445
446        // Now store the RIC covariance data.
447        for i in 0..6 {
448            let mut data = Float64Builder::new();
449            for cov in ric_covariances.iter().take(estimates.len()) {
450                data.append_value(cov[(i, i)].sqrt());
451            }
452            record.push(Arc::new(data.finish()));
453        }
454
455        // Finally, add the residuals.
456        // Prefits
457        for msr_type in &self.measurement_types {
458            let mut data = Float64Builder::new();
459            for resid_opt in &residuals {
460                if let Some(resid) = resid_opt {
461                    match resid.prefit(*msr_type) {
462                        Some(prefit) => data.append_value(prefit),
463                        None => data.append_null(),
464                    };
465                } else {
466                    data.append_null();
467                }
468            }
469            record.push(Arc::new(data.finish()));
470        }
471        // Postfit
472        for msr_type in &self.measurement_types {
473            let mut data = Float64Builder::new();
474            for resid_opt in &residuals {
475                if let Some(resid) = resid_opt {
476                    match resid.postfit(*msr_type) {
477                        Some(postfit) => data.append_value(postfit),
478                        None => data.append_null(),
479                    };
480                } else {
481                    data.append_null();
482                }
483            }
484            record.push(Arc::new(data.finish()));
485        }
486        // Measurement noise
487        for msr_type in &self.measurement_types {
488            let mut data = Float64Builder::new();
489            for resid_opt in &residuals {
490                if let Some(resid) = resid_opt {
491                    match resid.trk_noise(*msr_type) {
492                        Some(noise) => data.append_value(noise),
493                        None => data.append_null(),
494                    };
495                } else {
496                    data.append_null();
497                }
498            }
499            record.push(Arc::new(data.finish()));
500        }
501        // Real observation
502        for msr_type in &self.measurement_types {
503            let mut data = Float64Builder::new();
504            for resid_opt in &residuals {
505                if let Some(resid) = resid_opt {
506                    match resid.real_obs(*msr_type) {
507                        Some(postfit) => data.append_value(postfit),
508                        None => data.append_null(),
509                    };
510                } else {
511                    data.append_null();
512                }
513            }
514            record.push(Arc::new(data.finish()));
515        }
516        // Computed observation
517        for msr_type in &self.measurement_types {
518            let mut data = Float64Builder::new();
519            for resid_opt in &residuals {
520                if let Some(resid) = resid_opt {
521                    match resid.computed_obs(*msr_type) {
522                        Some(postfit) => data.append_value(postfit),
523                        None => data.append_null(),
524                    };
525                } else {
526                    data.append_null();
527                }
528            }
529            record.push(Arc::new(data.finish()));
530        }
531        // Residual ratio (unique entry regardless of the size)
532        let mut data = Float64Builder::new();
533        for resid_opt in &residuals {
534            if let Some(resid) = resid_opt {
535                data.append_value(resid.ratio);
536            } else {
537                data.append_null();
538            }
539        }
540        record.push(Arc::new(data.finish()));
541
542        // Residual acceptance (unique entry regardless of the size)
543        let mut data = BooleanBuilder::new();
544        for resid_opt in &residuals {
545            if let Some(resid) = resid_opt {
546                data.append_value(resid.rejected);
547            } else {
548                data.append_null();
549            }
550        }
551        record.push(Arc::new(data.finish()));
552
553        // Residual tracker (unique entry regardless of the size)
554        let mut data = StringBuilder::new();
555        for resid_opt in &residuals {
556            if let Some(resid) = resid_opt {
557                data.append_value(
558                    resid
559                        .tracker
560                        .clone()
561                        .unwrap_or("Undefined tracker".to_string()),
562                );
563            } else {
564                data.append_null();
565            }
566        }
567        record.push(Arc::new(data.finish()));
568
569        // Add the filter gains
570        for i in 0..est_size {
571            for j in 0..MsrSize::DIM {
572                let mut data = Float64Builder::new();
573                for opt_k in &self.gains {
574                    if let Some(k) = opt_k {
575                        data.append_value(k[(i, j)]);
576                    } else {
577                        data.append_null();
578                    }
579                }
580                record.push(Arc::new(data.finish()));
581            }
582        }
583
584        // Add the filter-smoother consistency ratios
585        for i in 0..est_size {
586            let mut data = Float64Builder::new();
587            for opt_fsr in &self.filter_smoother_ratios {
588                if let Some(fsr) = opt_fsr {
589                    data.append_value(fsr[i]);
590                } else {
591                    data.append_null();
592                }
593            }
594            record.push(Arc::new(data.finish()));
595        }
596
597        info!("Serialized {} estimates and residuals", estimates.len());
598
599        // Serialize all of the devices and add that to the parquet file too.
600        let mut metadata = HashMap::new();
601        metadata.insert(
602            "Purpose".to_string(),
603            "Orbit determination results".to_string(),
604        );
605        if let Some(add_meta) = cfg.metadata {
606            for (k, v) in add_meta {
607                metadata.insert(k, v);
608            }
609        }
610
611        let props = pq_writer(Some(metadata));
612
613        let file = File::create(&path_buf)
614            .context(StdIOSnafu {
615                action: "creating OD results file",
616            })
617            .context(ODIOSnafu)?;
618
619        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
620            .context(ParquetSnafu {
621                action: "exporting OD results",
622            })
623            .context(ODIOSnafu)?;
624
625        let batch = RecordBatch::try_new(schema, record)
626            .context(ArrowSnafu {
627                action: "writing OD results (building batch record)",
628            })
629            .context(ODIOSnafu)?;
630
631        writer
632            .write(&batch)
633            .context(ParquetSnafu {
634                action: "writing OD results",
635            })
636            .context(ODIOSnafu)?;
637
638        writer
639            .close()
640            .context(ParquetSnafu {
641                action: "closing OD results file",
642            })
643            .context(ODIOSnafu)?;
644
645        // Return the path this was written to
646        let tock_time = Epoch::now().unwrap() - tick;
647        info!(
648            "Orbit determination results written to {} in {tock_time}",
649            path_buf.display()
650        );
651        Ok(path_buf)
652    }
653}