nyx_space/od/process/solution/
export.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use msr::sensitivity::TrackerSensitivity;
33use nalgebra::Const;
34use parquet::arrow::ArrowWriter;
35use snafu::prelude::*;
36use std::collections::HashMap;
37use std::fs::File;
38use std::path::{Path, PathBuf};
39
40use super::ODSolution;
41
42impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
43    ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
44where
45    DefaultAllocator: Allocator<MsrSize>
46        + Allocator<MsrSize, <Spacecraft as State>::Size>
47        + Allocator<Const<1>, MsrSize>
48        + Allocator<<Spacecraft as State>::Size>
49        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50        + Allocator<MsrSize, MsrSize>
51        + Allocator<MsrSize, <Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, MsrSize>
53        + Allocator<<Spacecraft as State>::Size>
54        + Allocator<<Spacecraft as State>::VecLength>
55        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57    /// Store the estimates and residuals in a parquet file
58    pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59        ensure!(
60            !self.estimates.is_empty(),
61            TooFewMeasurementsSnafu {
62                need: 1_usize,
63                action: "exporting OD results"
64            }
65        );
66
67        if self.estimates.len() != self.residuals.len() {
68            return Err(ODError::ODConfigError {
69                source: ConfigError::InvalidConfig {
70                    msg: format!(
71                        "Estimates ({}) and residuals ({}) are not aligned.",
72                        self.estimates.len(),
73                        self.residuals.len()
74                    ),
75                },
76            });
77        }
78
79        if self.estimates.len() != self.gains.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: format!(
83                        "Estimates ({}) and filter gains ({}) are not aligned.",
84                        self.estimates.len(),
85                        self.gains.len()
86                    ),
87                },
88            });
89        }
90
91        if self.estimates.len() != self.filter_smoother_ratios.len() {
92            return Err(ODError::ODConfigError {
93                source: ConfigError::InvalidConfig {
94                    msg: format!(
95                        "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96                        self.estimates.len(),
97                        self.filter_smoother_ratios.len()
98                    ),
99                },
100            });
101        }
102
103        let tick = Epoch::now().unwrap();
104        info!("Exporting orbit determination result to parquet file...");
105
106        if cfg.step.is_some() {
107            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108        }
109
110        // Grab the path here before we move stuff.
111        let path_buf = cfg.actual_path(path);
112
113        // Build the schema
114        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116        let frame = self.estimates[0].state().frame();
117
118        let more_meta = Some(vec![
119            (
120                "Frame".to_string(),
121                serde_dhall::serialize(&frame)
122                    .static_type_annotation()
123                    .to_string()
124                    .map_err(|e| ODError::ODIOError {
125                        source: InputOutputError::SerializeDhall {
126                            what: format!("frame `{frame}`"),
127                            err: e.to_string(),
128                        },
129                    })?,
130            ),
131            (
132                "SRP Area (m2)".to_string(),
133                self.estimates
134                    .first()
135                    .as_ref()
136                    .unwrap()
137                    .nominal_state()
138                    .srp
139                    .area_m2
140                    .to_string(),
141            ),
142            (
143                "Drag Area (m2)".to_string(),
144                self.estimates
145                    .first()
146                    .as_ref()
147                    .unwrap()
148                    .nominal_state()
149                    .drag
150                    .area_m2
151                    .to_string(),
152            ),
153        ]);
154
155        let mut fields = match cfg.fields {
156            Some(fields) => fields,
157            None => Spacecraft::export_params(),
158        };
159
160        // Check that we can retrieve this information
161        fields.retain(|param| match self.estimates[0].state().value(*param) {
162            Ok(_) => param != &StateParameter::GuidanceMode,
163            Err(_) => false,
164        });
165
166        for field in &fields {
167            hdrs.push(field.to_field(more_meta.clone()));
168        }
169
170        let mut sigma_fields = fields.clone();
171        // Check that we can retrieve this information
172        sigma_fields.retain(|param| {
173            !matches!(
174                param,
175                &StateParameter::X
176                    | &StateParameter::Y
177                    | &StateParameter::Z
178                    | &StateParameter::VX
179                    | &StateParameter::VY
180                    | &StateParameter::VZ
181            ) && self.estimates[0].sigma_for(*param).is_ok()
182        });
183
184        for field in &sigma_fields {
185            hdrs.push(field.to_cov_field(more_meta.clone()));
186        }
187
188        let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
189        let state_units = [
190            "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
191        ];
192        let mut cov_units = vec![];
193
194        for i in 0..state_items.len() {
195            for j in i..state_items.len() {
196                let cov_unit = if i < 3 {
197                    if j < 3 {
198                        "km^2"
199                    } else if (3..6).contains(&j) {
200                        "km^2/s"
201                    } else if j == 8 {
202                        "km*kg"
203                    } else {
204                        "km"
205                    }
206                } else if (3..6).contains(&i) {
207                    if (3..6).contains(&j) {
208                        "km^2/s^2"
209                    } else if j == 8 {
210                        "km/s*kg"
211                    } else {
212                        "km/s"
213                    }
214                } else if i == 8 || j == 8 {
215                    "kg^2"
216                } else {
217                    "unitless"
218                };
219
220                cov_units.push(cov_unit);
221            }
222        }
223
224        let est_size = <Spacecraft as State>::Size::dim();
225
226        let mut idx = 0;
227        for i in 0..state_items.len() {
228            for j in i..state_items.len() {
229                hdrs.push(Field::new(
230                    format!(
231                        "Covariance {}*{} ({frame:x}) ({})",
232                        state_items[i], state_items[j], cov_units[idx]
233                    ),
234                    DataType::Float64,
235                    false,
236                ));
237                idx += 1;
238            }
239        }
240
241        // Add the uncertainty in the integration frame
242        for (i, coord) in state_items.iter().enumerate() {
243            hdrs.push(Field::new(
244                format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
245                DataType::Float64,
246                false,
247            ));
248        }
249
250        // Add the position and velocity uncertainty in the RIC frame
251        for (i, coord) in state_items.iter().enumerate().take(6) {
252            hdrs.push(Field::new(
253                format!("Sigma {coord} (RIC) ({})", state_units[i]),
254                DataType::Float64,
255                false,
256            ));
257        }
258
259        // Add the fields of the residuals
260        let mut msr_fields = Vec::new();
261        for f in &self.measurement_types {
262            msr_fields.push(
263                f.to_field()
264                    .with_nullable(true)
265                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
266            );
267        }
268        for f in &self.measurement_types {
269            msr_fields.push(
270                f.to_field()
271                    .with_nullable(true)
272                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
273            );
274        }
275        for f in &self.measurement_types {
276            msr_fields.push(
277                f.to_field()
278                    .with_nullable(true)
279                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
280            );
281        }
282        for f in &self.measurement_types {
283            msr_fields.push(
284                f.to_field()
285                    .with_nullable(true)
286                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
287            );
288        }
289        for f in &self.measurement_types {
290            msr_fields.push(
291                f.to_field()
292                    .with_nullable(true)
293                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
294            );
295        }
296
297        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
298        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
299        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
300
301        hdrs.append(&mut msr_fields);
302
303        // Add the filter gain columns
304        if self.measurement_types.len() == MsrSize::USIZE {
305            for i in 0..state_items.len() {
306                for f in &self.measurement_types {
307                    hdrs.push(Field::new(
308                        format!(
309                            "Gain {}*{f:?} ({}*{})",
310                            state_items[i],
311                            cov_units[i],
312                            f.unit()
313                        ),
314                        DataType::Float64,
315                        true,
316                    ));
317                }
318            }
319        } else {
320            for state_item in &state_items {
321                for j in 0..MsrSize::USIZE {
322                    hdrs.push(Field::new(
323                        format!("Gain {state_item}*[{j}]"),
324                        DataType::Float64,
325                        true,
326                    ));
327                }
328            }
329        }
330
331        // Add the filter-smoother ratio columns
332        for i in 0..state_items.len() {
333            hdrs.push(Field::new(
334                format!(
335                    "Filter-smoother ratio {} ({})",
336                    state_items[i], cov_units[i],
337                ),
338                DataType::Float64,
339                true,
340            ));
341        }
342
343        // Build the schema
344        let schema = Arc::new(Schema::new(hdrs));
345        let mut record: Vec<Arc<dyn Array>> = Vec::new();
346
347        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
348        let (estimates, residuals) =
349            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
350                // Must interpolate the data!
351                let start = cfg
352                    .start_epoch
353                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
354                let end = cfg
355                    .end_epoch
356                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
357
358                let mut residuals: Vec<Option<Residual<MsrSize>>> =
359                    Vec::with_capacity(self.residuals.len());
360                let mut estimates = Vec::with_capacity(self.estimates.len());
361
362                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
363                    if estimate.epoch() >= start && estimate.epoch() <= end {
364                        estimates.push(*estimate);
365                        residuals.push(residual.clone());
366                    }
367                }
368
369                (estimates, residuals)
370            } else {
371                (self.estimates.to_vec(), self.residuals.to_vec())
372            };
373
374        // Build all of the records
375
376        // Epochs
377        let mut utc_epoch = StringBuilder::new();
378        for s in &estimates {
379            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
380        }
381        record.push(Arc::new(utc_epoch.finish()));
382
383        // Add all of the fields
384        for field in fields {
385            let mut data = Float64Builder::new();
386            for s in &estimates {
387                data.append_value(
388                    s.state()
389                        .value(field)
390                        .context(ODStateSnafu { action: "export" })?,
391                );
392            }
393            record.push(Arc::new(data.finish()));
394        }
395
396        // Add all of the 1-sigma uncertainties
397        for field in sigma_fields {
398            let mut data = Float64Builder::new();
399            for s in &estimates {
400                data.append_value(s.sigma_for(field).unwrap());
401            }
402            record.push(Arc::new(data.finish()));
403        }
404
405        // Add the 1-sigma covariance in the integration frame
406        for i in 0..est_size {
407            for j in i..est_size {
408                let mut data = Float64Builder::new();
409                for s in &estimates {
410                    data.append_value(s.covar()[(i, j)]);
411                }
412                record.push(Arc::new(data.finish()));
413            }
414        }
415
416        // Add the sigma/uncertainty in the integration frame
417        for i in 0..est_size {
418            let mut data = Float64Builder::new();
419            for s in &estimates {
420                data.append_value(s.covar()[(i, i)].sqrt());
421            }
422            record.push(Arc::new(data.finish()));
423        }
424
425        // Add the sigma/uncertainty covariance in the RIC frame
426        let mut ric_covariances = Vec::new();
427
428        for s in &estimates {
429            let dcm_ric2inertial = s
430                .state()
431                .orbit()
432                .dcm_from_ric_to_inertial()
433                .unwrap()
434                .state_dcm();
435
436            // Build the matrix view of the orbit part of the covariance.
437            let cov = s.covar();
438            let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
439
440            // Rotate back into the RIC frame
441            let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
442            ric_covariances.push(ric_covar);
443        }
444
445        // Now store the RIC covariance data.
446        for i in 0..6 {
447            let mut data = Float64Builder::new();
448            for cov in ric_covariances.iter().take(estimates.len()) {
449                data.append_value(cov[(i, i)].sqrt());
450            }
451            record.push(Arc::new(data.finish()));
452        }
453
454        // Finally, add the residuals.
455        // Prefits
456        for msr_type in &self.measurement_types {
457            let mut data = Float64Builder::new();
458            for resid_opt in &residuals {
459                if let Some(resid) = resid_opt {
460                    match resid.prefit(*msr_type) {
461                        Some(prefit) => data.append_value(prefit),
462                        None => data.append_null(),
463                    };
464                } else {
465                    data.append_null();
466                }
467            }
468            record.push(Arc::new(data.finish()));
469        }
470        // Postfit
471        for msr_type in &self.measurement_types {
472            let mut data = Float64Builder::new();
473            for resid_opt in &residuals {
474                if let Some(resid) = resid_opt {
475                    match resid.postfit(*msr_type) {
476                        Some(postfit) => data.append_value(postfit),
477                        None => data.append_null(),
478                    };
479                } else {
480                    data.append_null();
481                }
482            }
483            record.push(Arc::new(data.finish()));
484        }
485        // Measurement noise
486        for msr_type in &self.measurement_types {
487            let mut data = Float64Builder::new();
488            for resid_opt in &residuals {
489                if let Some(resid) = resid_opt {
490                    match resid.trk_noise(*msr_type) {
491                        Some(noise) => data.append_value(noise),
492                        None => data.append_null(),
493                    };
494                } else {
495                    data.append_null();
496                }
497            }
498            record.push(Arc::new(data.finish()));
499        }
500        // Real observation
501        for msr_type in &self.measurement_types {
502            let mut data = Float64Builder::new();
503            for resid_opt in &residuals {
504                if let Some(resid) = resid_opt {
505                    match resid.real_obs(*msr_type) {
506                        Some(postfit) => data.append_value(postfit),
507                        None => data.append_null(),
508                    };
509                } else {
510                    data.append_null();
511                }
512            }
513            record.push(Arc::new(data.finish()));
514        }
515        // Computed observation
516        for msr_type in &self.measurement_types {
517            let mut data = Float64Builder::new();
518            for resid_opt in &residuals {
519                if let Some(resid) = resid_opt {
520                    match resid.computed_obs(*msr_type) {
521                        Some(postfit) => data.append_value(postfit),
522                        None => data.append_null(),
523                    };
524                } else {
525                    data.append_null();
526                }
527            }
528            record.push(Arc::new(data.finish()));
529        }
530        // Residual ratio (unique entry regardless of the size)
531        let mut data = Float64Builder::new();
532        for resid_opt in &residuals {
533            if let Some(resid) = resid_opt {
534                data.append_value(resid.ratio);
535            } else {
536                data.append_null();
537            }
538        }
539        record.push(Arc::new(data.finish()));
540
541        // Residual acceptance (unique entry regardless of the size)
542        let mut data = BooleanBuilder::new();
543        for resid_opt in &residuals {
544            if let Some(resid) = resid_opt {
545                data.append_value(resid.rejected);
546            } else {
547                data.append_null();
548            }
549        }
550        record.push(Arc::new(data.finish()));
551
552        // Residual tracker (unique entry regardless of the size)
553        let mut data = StringBuilder::new();
554        for resid_opt in &residuals {
555            if let Some(resid) = resid_opt {
556                data.append_value(
557                    resid
558                        .tracker
559                        .clone()
560                        .unwrap_or("Undefined tracker".to_string()),
561                );
562            } else {
563                data.append_null();
564            }
565        }
566        record.push(Arc::new(data.finish()));
567
568        // Add the filter gains
569        for i in 0..est_size {
570            for j in 0..MsrSize::USIZE {
571                let mut data = Float64Builder::new();
572                for opt_k in &self.gains {
573                    if let Some(k) = opt_k {
574                        data.append_value(k[(i, j)]);
575                    } else {
576                        data.append_null();
577                    }
578                }
579                record.push(Arc::new(data.finish()));
580            }
581        }
582
583        // Add the filter-smoother consistency ratios
584        for i in 0..est_size {
585            let mut data = Float64Builder::new();
586            for opt_fsr in &self.filter_smoother_ratios {
587                if let Some(fsr) = opt_fsr {
588                    data.append_value(fsr[i]);
589                } else {
590                    data.append_null();
591                }
592            }
593            record.push(Arc::new(data.finish()));
594        }
595
596        info!("Serialized {} estimates and residuals", estimates.len());
597
598        // Serialize all of the devices and add that to the parquet file too.
599        let mut metadata = HashMap::new();
600        metadata.insert(
601            "Purpose".to_string(),
602            "Orbit determination results".to_string(),
603        );
604        if let Some(add_meta) = cfg.metadata {
605            for (k, v) in add_meta {
606                metadata.insert(k, v);
607            }
608        }
609
610        let props = pq_writer(Some(metadata));
611
612        let file = File::create(&path_buf)
613            .context(StdIOSnafu {
614                action: "creating OD results file",
615            })
616            .context(ODIOSnafu)?;
617
618        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
619            .context(ParquetSnafu {
620                action: "exporting OD results",
621            })
622            .context(ODIOSnafu)?;
623
624        let batch = RecordBatch::try_new(schema, record)
625            .context(ArrowSnafu {
626                action: "writing OD results (building batch record)",
627            })
628            .context(ODIOSnafu)?;
629
630        writer
631            .write(&batch)
632            .context(ParquetSnafu {
633                action: "writing OD results",
634            })
635            .context(ODIOSnafu)?;
636
637        writer
638            .close()
639            .context(ParquetSnafu {
640                action: "closing OD results file",
641            })
642            .context(ODIOSnafu)?;
643
644        // Return the path this was written to
645        let tock_time = Epoch::now().unwrap() - tick;
646        info!(
647            "Orbit determination results written to {} in {tock_time}",
648            path_buf.display()
649        );
650        Ok(path_buf)
651    }
652}