nyx_space/od/process/
export.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::dynamics::SpacecraftDynamics;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::allocator::Allocator;
23use crate::linalg::{DefaultAllocator, DimName};
24use crate::md::trajectory::Interpolatable;
25use crate::md::StateParameter;
26use crate::od::estimate::*;
27use crate::State;
28use crate::{od::*, Spacecraft};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use filter::kalman::KF;
33use hifitime::TimeScale;
34use msr::sensitivity::TrackerSensitivity;
35use msr::TrackingDataArc;
36use nalgebra::Const;
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43use super::ODProcess;
44
45impl<MsrSize: DimName, Accel: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
46    ODProcess<'_, SpacecraftDynamics, MsrSize, Accel, KF<Spacecraft, Accel>, Trk>
47where
48    DefaultAllocator: Allocator<MsrSize>
49        + Allocator<MsrSize, <Spacecraft as State>::Size>
50        + Allocator<Const<1>, MsrSize>
51        + Allocator<<Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
53        + Allocator<MsrSize, MsrSize>
54        + Allocator<MsrSize, <Spacecraft as State>::Size>
55        + Allocator<<Spacecraft as State>::Size, MsrSize>
56        + Allocator<Accel>
57        + Allocator<Accel, Accel>
58        + Allocator<<Spacecraft as State>::Size>
59        + Allocator<<Spacecraft as State>::VecLength>
60        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
61        + Allocator<<Spacecraft as State>::Size, Accel>
62        + Allocator<Accel, <Spacecraft as State>::Size>,
63{
64    /// Store the estimates and residuals in a parquet file
65    pub fn to_parquet<P: AsRef<Path>>(
66        &self,
67        arc: &TrackingDataArc,
68        path: P,
69        cfg: ExportCfg,
70    ) -> Result<PathBuf, ODError> {
71        ensure!(
72            !self.estimates.is_empty(),
73            TooFewMeasurementsSnafu {
74                need: 1_usize,
75                action: "exporting OD results"
76            }
77        );
78
79        if self.estimates.len() != self.residuals.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: "Estimates and residuals are not aligned.".to_string(),
83                },
84            });
85        }
86
87        let tick = Epoch::now().unwrap();
88        info!("Exporting orbit determination result to parquet file...");
89
90        if cfg.step.is_some() {
91            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
92        }
93
94        // Grab the path here before we move stuff.
95        let path_buf = cfg.actual_path(path);
96
97        // Build the schema
98        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
99
100        let frame = self.estimates[0].state().frame();
101
102        let more_meta = Some(vec![(
103            "Frame".to_string(),
104            serde_dhall::serialize(&frame)
105                .to_string()
106                .map_err(|e| ODError::ODIOError {
107                    source: InputOutputError::SerializeDhall {
108                        what: format!("frame `{frame}`"),
109                        err: e.to_string(),
110                    },
111                })?,
112        )]);
113
114        let mut fields = match cfg.fields {
115            Some(fields) => fields,
116            None => Spacecraft::export_params(),
117        };
118
119        // Check that we can retrieve this information
120        fields.retain(|param| match self.estimates[0].state().value(*param) {
121            Ok(_) => param != &StateParameter::GuidanceMode,
122            Err(_) => false,
123        });
124
125        for field in &fields {
126            hdrs.push(field.to_field(more_meta.clone()));
127        }
128
129        let mut sigma_fields = fields.clone();
130        // Check that we can retrieve this information
131        sigma_fields.retain(|param| {
132            !matches!(
133                param,
134                &StateParameter::X
135                    | &StateParameter::Y
136                    | &StateParameter::Z
137                    | &StateParameter::VX
138                    | &StateParameter::VY
139                    | &StateParameter::VZ
140            ) && self.estimates[0].sigma_for(*param).is_ok()
141        });
142
143        for field in &sigma_fields {
144            hdrs.push(field.to_cov_field(more_meta.clone()));
145        }
146
147        let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
148        let state_units = [
149            "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
150        ];
151        let mut cov_units = vec![];
152
153        for i in 0..state_items.len() {
154            for j in i..state_items.len() {
155                let cov_unit = if i < 3 {
156                    if j < 3 {
157                        "km^2"
158                    } else if (3..6).contains(&j) {
159                        "km^2/s"
160                    } else if j == 8 {
161                        "km*kg"
162                    } else {
163                        "km"
164                    }
165                } else if (3..6).contains(&i) {
166                    if (3..6).contains(&j) {
167                        "km^2/s^2"
168                    } else if j == 8 {
169                        "km/s*kg"
170                    } else {
171                        "km/s"
172                    }
173                } else if i == 8 || j == 8 {
174                    "kg^2"
175                } else {
176                    "unitless"
177                };
178
179                cov_units.push(cov_unit);
180            }
181        }
182
183        let est_size = <Spacecraft as State>::Size::dim();
184
185        let mut idx = 0;
186        for i in 0..state_items.len() {
187            for j in i..state_items.len() {
188                hdrs.push(Field::new(
189                    format!(
190                        "Covariance {}*{} ({frame:x}) ({})",
191                        state_items[i], state_items[j], cov_units[idx]
192                    ),
193                    DataType::Float64,
194                    false,
195                ));
196                idx += 1;
197            }
198        }
199
200        // Add the uncertainty in the integration frame
201        for (i, coord) in state_items.iter().enumerate() {
202            hdrs.push(Field::new(
203                format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
204                DataType::Float64,
205                false,
206            ));
207        }
208
209        // Add the position and velocity uncertainty in the RIC frame
210        for (i, coord) in state_items.iter().enumerate().take(6) {
211            hdrs.push(Field::new(
212                format!("Sigma {coord} (RIC) ({})", state_units[i]),
213                DataType::Float64,
214                false,
215            ));
216        }
217
218        // Add the fields of the residuals
219        let mut msr_fields = Vec::new();
220        for f in arc.unique_types() {
221            msr_fields.push(
222                f.to_field()
223                    .with_nullable(true)
224                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
225            );
226        }
227        for f in arc.unique_types() {
228            msr_fields.push(
229                f.to_field()
230                    .with_nullable(true)
231                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
232            );
233        }
234        for f in arc.unique_types() {
235            msr_fields.push(
236                f.to_field()
237                    .with_nullable(true)
238                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
239            );
240        }
241        for f in arc.unique_types() {
242            msr_fields.push(
243                f.to_field()
244                    .with_nullable(true)
245                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
246            );
247        }
248        for f in arc.unique_types() {
249            msr_fields.push(
250                f.to_field()
251                    .with_nullable(true)
252                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
253            );
254        }
255
256        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
257        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
258        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
259
260        hdrs.append(&mut msr_fields);
261
262        // Build the schema
263        let schema = Arc::new(Schema::new(hdrs));
264        let mut record: Vec<Arc<dyn Array>> = Vec::new();
265
266        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
267        let (estimates, residuals) =
268            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
269                // Must interpolate the data!
270                let start = cfg
271                    .start_epoch
272                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
273                let end = cfg
274                    .end_epoch
275                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
276
277                let mut residuals: Vec<Option<Residual<MsrSize>>> =
278                    Vec::with_capacity(self.residuals.len());
279                let mut estimates = Vec::with_capacity(self.estimates.len());
280
281                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
282                    if estimate.epoch() >= start && estimate.epoch() <= end {
283                        estimates.push(*estimate);
284                        residuals.push(residual.clone());
285                    }
286                }
287
288                (estimates, residuals)
289            } else {
290                (self.estimates.to_vec(), self.residuals.to_vec())
291            };
292
293        // Build all of the records
294
295        // Epochs
296        let mut utc_epoch = StringBuilder::new();
297        for s in &estimates {
298            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
299        }
300        record.push(Arc::new(utc_epoch.finish()));
301
302        // Add all of the fields
303        for field in fields {
304            let mut data = Float64Builder::new();
305            for s in &estimates {
306                data.append_value(
307                    s.state()
308                        .value(field)
309                        .context(ODStateSnafu { action: "export" })?,
310                );
311            }
312            record.push(Arc::new(data.finish()));
313        }
314
315        // Add all of the 1-sigma uncertainties
316        for field in sigma_fields {
317            let mut data = Float64Builder::new();
318            for s in &estimates {
319                data.append_value(s.sigma_for(field).unwrap());
320            }
321            record.push(Arc::new(data.finish()));
322        }
323
324        // Add the 1-sigma covariance in the integration frame
325        for i in 0..est_size {
326            for j in i..est_size {
327                let mut data = Float64Builder::new();
328                for s in &estimates {
329                    data.append_value(s.covar()[(i, j)]);
330                }
331                record.push(Arc::new(data.finish()));
332            }
333        }
334
335        // Add the sigma/uncertainty in the integration frame
336        for i in 0..est_size {
337            let mut data = Float64Builder::new();
338            for s in &estimates {
339                data.append_value(s.covar()[(i, i)].sqrt());
340            }
341            record.push(Arc::new(data.finish()));
342        }
343
344        // Add the sigma/uncertainty covariance in the RIC frame
345        let mut ric_covariances = Vec::new();
346
347        for s in &estimates {
348            let dcm_ric2inertial = s
349                .state()
350                .orbit()
351                .dcm_from_ric_to_inertial()
352                .unwrap()
353                .state_dcm();
354
355            // Build the matrix view of the orbit part of the covariance.
356            let cov = s.covar();
357            let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
358
359            // Rotate back into the RIC frame
360            let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
361            ric_covariances.push(ric_covar);
362        }
363
364        // Now store the RIC covariance data.
365        for i in 0..6 {
366            let mut data = Float64Builder::new();
367            for cov in ric_covariances.iter().take(estimates.len()) {
368                data.append_value(cov[(i, i)].sqrt());
369            }
370            record.push(Arc::new(data.finish()));
371        }
372
373        // Finally, add the residuals.
374        // Prefits
375        for msr_type in arc.unique_types() {
376            let mut data = Float64Builder::new();
377            for resid_opt in &residuals {
378                if let Some(resid) = resid_opt {
379                    match resid.prefit(msr_type) {
380                        Some(prefit) => data.append_value(prefit),
381                        None => data.append_null(),
382                    };
383                } else {
384                    data.append_null();
385                }
386            }
387            record.push(Arc::new(data.finish()));
388        }
389        // Postfit
390        for msr_type in arc.unique_types() {
391            let mut data = Float64Builder::new();
392            for resid_opt in &residuals {
393                if let Some(resid) = resid_opt {
394                    match resid.postfit(msr_type) {
395                        Some(postfit) => data.append_value(postfit),
396                        None => data.append_null(),
397                    };
398                } else {
399                    data.append_null();
400                }
401            }
402            record.push(Arc::new(data.finish()));
403        }
404        // Measurement noise
405        for msr_type in arc.unique_types() {
406            let mut data = Float64Builder::new();
407            for resid_opt in &residuals {
408                if let Some(resid) = resid_opt {
409                    match resid.trk_noise(msr_type) {
410                        Some(noise) => data.append_value(noise),
411                        None => data.append_null(),
412                    };
413                } else {
414                    data.append_null();
415                }
416            }
417            record.push(Arc::new(data.finish()));
418        }
419        // Real observation
420        for msr_type in arc.unique_types() {
421            let mut data = Float64Builder::new();
422            for resid_opt in &residuals {
423                if let Some(resid) = resid_opt {
424                    match resid.real_obs(msr_type) {
425                        Some(postfit) => data.append_value(postfit),
426                        None => data.append_null(),
427                    };
428                } else {
429                    data.append_null();
430                }
431            }
432            record.push(Arc::new(data.finish()));
433        }
434        // Computed observation
435        for msr_type in arc.unique_types() {
436            let mut data = Float64Builder::new();
437            for resid_opt in &residuals {
438                if let Some(resid) = resid_opt {
439                    match resid.computed_obs(msr_type) {
440                        Some(postfit) => data.append_value(postfit),
441                        None => data.append_null(),
442                    };
443                } else {
444                    data.append_null();
445                }
446            }
447            record.push(Arc::new(data.finish()));
448        }
449        // Residual ratio (unique entry regardless of the size)
450        let mut data = Float64Builder::new();
451        for resid_opt in &residuals {
452            if let Some(resid) = resid_opt {
453                data.append_value(resid.ratio);
454            } else {
455                data.append_null();
456            }
457        }
458        record.push(Arc::new(data.finish()));
459
460        // Residual acceptance (unique entry regardless of the size)
461        let mut data = BooleanBuilder::new();
462        for resid_opt in &residuals {
463            if let Some(resid) = resid_opt {
464                data.append_value(resid.rejected);
465            } else {
466                data.append_null();
467            }
468        }
469        record.push(Arc::new(data.finish()));
470
471        // Residual tracker (unique entry regardless of the size)
472        let mut data = StringBuilder::new();
473        for resid_opt in &residuals {
474            if let Some(resid) = resid_opt {
475                data.append_value(
476                    resid
477                        .tracker
478                        .clone()
479                        .unwrap_or("Undefined tracker".to_string()),
480                );
481            } else {
482                data.append_null();
483            }
484        }
485        record.push(Arc::new(data.finish()));
486
487        info!("Serialized {} estimates and residuals", estimates.len());
488
489        // Serialize all of the devices and add that to the parquet file too.
490        let mut metadata = HashMap::new();
491        metadata.insert(
492            "Purpose".to_string(),
493            "Orbit determination results".to_string(),
494        );
495        if let Some(add_meta) = cfg.metadata {
496            for (k, v) in add_meta {
497                metadata.insert(k, v);
498            }
499        }
500
501        let props = pq_writer(Some(metadata));
502
503        let file = File::create(&path_buf)
504            .context(StdIOSnafu {
505                action: "creating OD results file",
506            })
507            .context(ODIOSnafu)?;
508
509        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
510            .context(ParquetSnafu {
511                action: "exporting OD results",
512            })
513            .context(ODIOSnafu)?;
514
515        let batch = RecordBatch::try_new(schema, record)
516            .context(ArrowSnafu {
517                action: "writing OD results (building batch record)",
518            })
519            .context(ODIOSnafu)?;
520
521        writer
522            .write(&batch)
523            .context(ParquetSnafu {
524                action: "writing OD results",
525            })
526            .context(ODIOSnafu)?;
527
528        writer
529            .close()
530            .context(ParquetSnafu {
531                action: "closing OD results file",
532            })
533            .context(ODIOSnafu)?;
534
535        // Return the path this was written to
536        let tock_time = Epoch::now().unwrap() - tick;
537        info!(
538            "Orbit determination results written to {} in {tock_time}",
539            path_buf.display()
540        );
541        Ok(path_buf)
542    }
543}