nyx_space/od/process/solution/
export.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use msr::sensitivity::TrackerSensitivity;
33use nalgebra::Const;
34use parquet::arrow::ArrowWriter;
35use snafu::prelude::*;
36use std::collections::HashMap;
37use std::fs::File;
38use std::path::{Path, PathBuf};
39
40use super::ODSolution;
41
42impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
43    ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
44where
45    DefaultAllocator: Allocator<MsrSize>
46        + Allocator<MsrSize, <Spacecraft as State>::Size>
47        + Allocator<Const<1>, MsrSize>
48        + Allocator<<Spacecraft as State>::Size>
49        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50        + Allocator<MsrSize, MsrSize>
51        + Allocator<MsrSize, <Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, MsrSize>
53        + Allocator<<Spacecraft as State>::Size>
54        + Allocator<<Spacecraft as State>::VecLength>
55        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57    /// Store the estimates and residuals in a parquet file
58    pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59        ensure!(
60            !self.estimates.is_empty(),
61            TooFewMeasurementsSnafu {
62                need: 1_usize,
63                action: "exporting OD results"
64            }
65        );
66
67        if self.estimates.len() != self.residuals.len() {
68            return Err(ODError::ODConfigError {
69                source: ConfigError::InvalidConfig {
70                    msg: format!(
71                        "Estimates ({}) and residuals ({}) are not aligned.",
72                        self.estimates.len(),
73                        self.residuals.len()
74                    ),
75                },
76            });
77        }
78
79        if self.estimates.len() != self.gains.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: format!(
83                        "Estimates ({}) and filter gains ({}) are not aligned.",
84                        self.estimates.len(),
85                        self.gains.len()
86                    ),
87                },
88            });
89        }
90
91        if self.estimates.len() != self.filter_smoother_ratios.len() {
92            return Err(ODError::ODConfigError {
93                source: ConfigError::InvalidConfig {
94                    msg: format!(
95                        "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96                        self.estimates.len(),
97                        self.filter_smoother_ratios.len()
98                    ),
99                },
100            });
101        }
102
103        let tick = Epoch::now().unwrap();
104        info!("Exporting orbit determination result to parquet file...");
105
106        if cfg.step.is_some() {
107            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108        }
109
110        // Grab the path here before we move stuff.
111        let path_buf = cfg.actual_path(path);
112
113        // Build the schema
114        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116        let frame = self.estimates[0].state().frame();
117
118        let more_meta = Some(vec![
119            (
120                "Frame".to_string(),
121                serde_dhall::serialize(&frame)
122                    .to_string()
123                    .map_err(|e| ODError::ODIOError {
124                        source: InputOutputError::SerializeDhall {
125                            what: format!("frame `{frame}`"),
126                            err: e.to_string(),
127                        },
128                    })?,
129            ),
130            (
131                "SRP Area (m2)".to_string(),
132                self.estimates
133                    .first()
134                    .as_ref()
135                    .unwrap()
136                    .nominal_state()
137                    .srp
138                    .area_m2
139                    .to_string(),
140            ),
141            (
142                "Drag Area (m2)".to_string(),
143                self.estimates
144                    .first()
145                    .as_ref()
146                    .unwrap()
147                    .nominal_state()
148                    .drag
149                    .area_m2
150                    .to_string(),
151            ),
152        ]);
153
154        let mut fields = match cfg.fields {
155            Some(fields) => fields,
156            None => Spacecraft::export_params(),
157        };
158
159        // Check that we can retrieve this information
160        fields.retain(|param| match self.estimates[0].state().value(*param) {
161            Ok(_) => param != &StateParameter::GuidanceMode,
162            Err(_) => false,
163        });
164
165        for field in &fields {
166            hdrs.push(field.to_field(more_meta.clone()));
167        }
168
169        let mut sigma_fields = fields.clone();
170        // Check that we can retrieve this information
171        sigma_fields.retain(|param| {
172            !matches!(
173                param,
174                &StateParameter::X
175                    | &StateParameter::Y
176                    | &StateParameter::Z
177                    | &StateParameter::VX
178                    | &StateParameter::VY
179                    | &StateParameter::VZ
180            ) && self.estimates[0].sigma_for(*param).is_ok()
181        });
182
183        for field in &sigma_fields {
184            hdrs.push(field.to_cov_field(more_meta.clone()));
185        }
186
187        let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
188        let state_units = [
189            "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
190        ];
191        let mut cov_units = vec![];
192
193        for i in 0..state_items.len() {
194            for j in i..state_items.len() {
195                let cov_unit = if i < 3 {
196                    if j < 3 {
197                        "km^2"
198                    } else if (3..6).contains(&j) {
199                        "km^2/s"
200                    } else if j == 8 {
201                        "km*kg"
202                    } else {
203                        "km"
204                    }
205                } else if (3..6).contains(&i) {
206                    if (3..6).contains(&j) {
207                        "km^2/s^2"
208                    } else if j == 8 {
209                        "km/s*kg"
210                    } else {
211                        "km/s"
212                    }
213                } else if i == 8 || j == 8 {
214                    "kg^2"
215                } else {
216                    "unitless"
217                };
218
219                cov_units.push(cov_unit);
220            }
221        }
222
223        let est_size = <Spacecraft as State>::Size::dim();
224
225        let mut idx = 0;
226        for i in 0..state_items.len() {
227            for j in i..state_items.len() {
228                hdrs.push(Field::new(
229                    format!(
230                        "Covariance {}*{} ({frame:x}) ({})",
231                        state_items[i], state_items[j], cov_units[idx]
232                    ),
233                    DataType::Float64,
234                    false,
235                ));
236                idx += 1;
237            }
238        }
239
240        // Add the uncertainty in the integration frame
241        for (i, coord) in state_items.iter().enumerate() {
242            hdrs.push(Field::new(
243                format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
244                DataType::Float64,
245                false,
246            ));
247        }
248
249        // Add the position and velocity uncertainty in the RIC frame
250        for (i, coord) in state_items.iter().enumerate().take(6) {
251            hdrs.push(Field::new(
252                format!("Sigma {coord} (RIC) ({})", state_units[i]),
253                DataType::Float64,
254                false,
255            ));
256        }
257
258        // Add the fields of the residuals
259        let mut msr_fields = Vec::new();
260        for f in &self.measurement_types {
261            msr_fields.push(
262                f.to_field()
263                    .with_nullable(true)
264                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
265            );
266        }
267        for f in &self.measurement_types {
268            msr_fields.push(
269                f.to_field()
270                    .with_nullable(true)
271                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
272            );
273        }
274        for f in &self.measurement_types {
275            msr_fields.push(
276                f.to_field()
277                    .with_nullable(true)
278                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
279            );
280        }
281        for f in &self.measurement_types {
282            msr_fields.push(
283                f.to_field()
284                    .with_nullable(true)
285                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
286            );
287        }
288        for f in &self.measurement_types {
289            msr_fields.push(
290                f.to_field()
291                    .with_nullable(true)
292                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
293            );
294        }
295
296        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
297        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
298        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
299
300        hdrs.append(&mut msr_fields);
301
302        // Add the filter gain columns
303        if self.measurement_types.len() == MsrSize::USIZE {
304            for i in 0..state_items.len() {
305                for f in &self.measurement_types {
306                    hdrs.push(Field::new(
307                        format!(
308                            "Gain {}*{f:?} ({}*{})",
309                            state_items[i],
310                            cov_units[i],
311                            f.unit()
312                        ),
313                        DataType::Float64,
314                        true,
315                    ));
316                }
317            }
318        } else {
319            for state_item in &state_items {
320                for j in 0..MsrSize::USIZE {
321                    hdrs.push(Field::new(
322                        format!("Gain {}*[{j}]", state_item),
323                        DataType::Float64,
324                        true,
325                    ));
326                }
327            }
328        }
329
330        // Add the filter-smoother ratio columns
331        for i in 0..state_items.len() {
332            hdrs.push(Field::new(
333                format!(
334                    "Filter-smoother ratio {} ({})",
335                    state_items[i], cov_units[i],
336                ),
337                DataType::Float64,
338                true,
339            ));
340        }
341
342        // Build the schema
343        let schema = Arc::new(Schema::new(hdrs));
344        let mut record: Vec<Arc<dyn Array>> = Vec::new();
345
346        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
347        let (estimates, residuals) =
348            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
349                // Must interpolate the data!
350                let start = cfg
351                    .start_epoch
352                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
353                let end = cfg
354                    .end_epoch
355                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
356
357                let mut residuals: Vec<Option<Residual<MsrSize>>> =
358                    Vec::with_capacity(self.residuals.len());
359                let mut estimates = Vec::with_capacity(self.estimates.len());
360
361                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
362                    if estimate.epoch() >= start && estimate.epoch() <= end {
363                        estimates.push(*estimate);
364                        residuals.push(residual.clone());
365                    }
366                }
367
368                (estimates, residuals)
369            } else {
370                (self.estimates.to_vec(), self.residuals.to_vec())
371            };
372
373        // Build all of the records
374
375        // Epochs
376        let mut utc_epoch = StringBuilder::new();
377        for s in &estimates {
378            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
379        }
380        record.push(Arc::new(utc_epoch.finish()));
381
382        // Add all of the fields
383        for field in fields {
384            let mut data = Float64Builder::new();
385            for s in &estimates {
386                data.append_value(
387                    s.state()
388                        .value(field)
389                        .context(ODStateSnafu { action: "export" })?,
390                );
391            }
392            record.push(Arc::new(data.finish()));
393        }
394
395        // Add all of the 1-sigma uncertainties
396        for field in sigma_fields {
397            let mut data = Float64Builder::new();
398            for s in &estimates {
399                data.append_value(s.sigma_for(field).unwrap());
400            }
401            record.push(Arc::new(data.finish()));
402        }
403
404        // Add the 1-sigma covariance in the integration frame
405        for i in 0..est_size {
406            for j in i..est_size {
407                let mut data = Float64Builder::new();
408                for s in &estimates {
409                    data.append_value(s.covar()[(i, j)]);
410                }
411                record.push(Arc::new(data.finish()));
412            }
413        }
414
415        // Add the sigma/uncertainty in the integration frame
416        for i in 0..est_size {
417            let mut data = Float64Builder::new();
418            for s in &estimates {
419                data.append_value(s.covar()[(i, i)].sqrt());
420            }
421            record.push(Arc::new(data.finish()));
422        }
423
424        // Add the sigma/uncertainty covariance in the RIC frame
425        let mut ric_covariances = Vec::new();
426
427        for s in &estimates {
428            let dcm_ric2inertial = s
429                .state()
430                .orbit()
431                .dcm_from_ric_to_inertial()
432                .unwrap()
433                .state_dcm();
434
435            // Build the matrix view of the orbit part of the covariance.
436            let cov = s.covar();
437            let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
438
439            // Rotate back into the RIC frame
440            let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
441            ric_covariances.push(ric_covar);
442        }
443
444        // Now store the RIC covariance data.
445        for i in 0..6 {
446            let mut data = Float64Builder::new();
447            for cov in ric_covariances.iter().take(estimates.len()) {
448                data.append_value(cov[(i, i)].sqrt());
449            }
450            record.push(Arc::new(data.finish()));
451        }
452
453        // Finally, add the residuals.
454        // Prefits
455        for msr_type in &self.measurement_types {
456            let mut data = Float64Builder::new();
457            for resid_opt in &residuals {
458                if let Some(resid) = resid_opt {
459                    match resid.prefit(*msr_type) {
460                        Some(prefit) => data.append_value(prefit),
461                        None => data.append_null(),
462                    };
463                } else {
464                    data.append_null();
465                }
466            }
467            record.push(Arc::new(data.finish()));
468        }
469        // Postfit
470        for msr_type in &self.measurement_types {
471            let mut data = Float64Builder::new();
472            for resid_opt in &residuals {
473                if let Some(resid) = resid_opt {
474                    match resid.postfit(*msr_type) {
475                        Some(postfit) => data.append_value(postfit),
476                        None => data.append_null(),
477                    };
478                } else {
479                    data.append_null();
480                }
481            }
482            record.push(Arc::new(data.finish()));
483        }
484        // Measurement noise
485        for msr_type in &self.measurement_types {
486            let mut data = Float64Builder::new();
487            for resid_opt in &residuals {
488                if let Some(resid) = resid_opt {
489                    match resid.trk_noise(*msr_type) {
490                        Some(noise) => data.append_value(noise),
491                        None => data.append_null(),
492                    };
493                } else {
494                    data.append_null();
495                }
496            }
497            record.push(Arc::new(data.finish()));
498        }
499        // Real observation
500        for msr_type in &self.measurement_types {
501            let mut data = Float64Builder::new();
502            for resid_opt in &residuals {
503                if let Some(resid) = resid_opt {
504                    match resid.real_obs(*msr_type) {
505                        Some(postfit) => data.append_value(postfit),
506                        None => data.append_null(),
507                    };
508                } else {
509                    data.append_null();
510                }
511            }
512            record.push(Arc::new(data.finish()));
513        }
514        // Computed observation
515        for msr_type in &self.measurement_types {
516            let mut data = Float64Builder::new();
517            for resid_opt in &residuals {
518                if let Some(resid) = resid_opt {
519                    match resid.computed_obs(*msr_type) {
520                        Some(postfit) => data.append_value(postfit),
521                        None => data.append_null(),
522                    };
523                } else {
524                    data.append_null();
525                }
526            }
527            record.push(Arc::new(data.finish()));
528        }
529        // Residual ratio (unique entry regardless of the size)
530        let mut data = Float64Builder::new();
531        for resid_opt in &residuals {
532            if let Some(resid) = resid_opt {
533                data.append_value(resid.ratio);
534            } else {
535                data.append_null();
536            }
537        }
538        record.push(Arc::new(data.finish()));
539
540        // Residual acceptance (unique entry regardless of the size)
541        let mut data = BooleanBuilder::new();
542        for resid_opt in &residuals {
543            if let Some(resid) = resid_opt {
544                data.append_value(resid.rejected);
545            } else {
546                data.append_null();
547            }
548        }
549        record.push(Arc::new(data.finish()));
550
551        // Residual tracker (unique entry regardless of the size)
552        let mut data = StringBuilder::new();
553        for resid_opt in &residuals {
554            if let Some(resid) = resid_opt {
555                data.append_value(
556                    resid
557                        .tracker
558                        .clone()
559                        .unwrap_or("Undefined tracker".to_string()),
560                );
561            } else {
562                data.append_null();
563            }
564        }
565        record.push(Arc::new(data.finish()));
566
567        // Add the filter gains
568        for i in 0..est_size {
569            for j in 0..MsrSize::USIZE {
570                let mut data = Float64Builder::new();
571                for opt_k in &self.gains {
572                    if let Some(k) = opt_k {
573                        data.append_value(k[(i, j)]);
574                    } else {
575                        data.append_null();
576                    }
577                }
578                record.push(Arc::new(data.finish()));
579            }
580        }
581
582        // Add the filter-smoother consistency ratios
583        for i in 0..est_size {
584            let mut data = Float64Builder::new();
585            for opt_fsr in &self.filter_smoother_ratios {
586                if let Some(fsr) = opt_fsr {
587                    data.append_value(fsr[i]);
588                } else {
589                    data.append_null();
590                }
591            }
592            record.push(Arc::new(data.finish()));
593        }
594
595        info!("Serialized {} estimates and residuals", estimates.len());
596
597        // Serialize all of the devices and add that to the parquet file too.
598        let mut metadata = HashMap::new();
599        metadata.insert(
600            "Purpose".to_string(),
601            "Orbit determination results".to_string(),
602        );
603        if let Some(add_meta) = cfg.metadata {
604            for (k, v) in add_meta {
605                metadata.insert(k, v);
606            }
607        }
608
609        let props = pq_writer(Some(metadata));
610
611        let file = File::create(&path_buf)
612            .context(StdIOSnafu {
613                action: "creating OD results file",
614            })
615            .context(ODIOSnafu)?;
616
617        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
618            .context(ParquetSnafu {
619                action: "exporting OD results",
620            })
621            .context(ODIOSnafu)?;
622
623        let batch = RecordBatch::try_new(schema, record)
624            .context(ArrowSnafu {
625                action: "writing OD results (building batch record)",
626            })
627            .context(ODIOSnafu)?;
628
629        writer
630            .write(&batch)
631            .context(ParquetSnafu {
632                action: "writing OD results",
633            })
634            .context(ODIOSnafu)?;
635
636        writer
637            .close()
638            .context(ParquetSnafu {
639                action: "closing OD results file",
640            })
641            .context(ODIOSnafu)?;
642
643        // Return the path this was written to
644        let tock_time = Epoch::now().unwrap() - tick;
645        info!(
646            "Orbit determination results written to {} in {tock_time}",
647            path_buf.display()
648        );
649        Ok(path_buf)
650    }
651}