nyx_space/od/process/
export.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::dynamics::SpacecraftDynamics;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::allocator::Allocator;
23use crate::linalg::{DefaultAllocator, DimName};
24use crate::md::trajectory::Interpolatable;
25use crate::md::StateParameter;
26use crate::od::estimate::*;
27use crate::State;
28use crate::{od::*, Spacecraft};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use filter::kalman::KF;
33use hifitime::TimeScale;
34use msr::sensitivity::TrackerSensitivity;
35use msr::TrackingDataArc;
36use nalgebra::Const;
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43use super::ODProcess;
44
45impl<MsrSize: DimName, Accel: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
46    ODProcess<'_, SpacecraftDynamics, MsrSize, Accel, KF<Spacecraft, Accel, MsrSize>, Trk>
47where
48    DefaultAllocator: Allocator<MsrSize>
49        + Allocator<MsrSize, <Spacecraft as State>::Size>
50        + Allocator<Const<1>, MsrSize>
51        + Allocator<<Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
53        + Allocator<MsrSize, MsrSize>
54        + Allocator<MsrSize, <Spacecraft as State>::Size>
55        + Allocator<<Spacecraft as State>::Size, MsrSize>
56        + Allocator<Accel>
57        + Allocator<Accel, Accel>
58        + Allocator<<Spacecraft as State>::Size>
59        + Allocator<<Spacecraft as State>::VecLength>
60        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
61        + Allocator<<Spacecraft as State>::Size, Accel>
62        + Allocator<Accel, <Spacecraft as State>::Size>,
63{
64    /// Store the estimates and residuals in a parquet file
65    pub fn to_parquet<P: AsRef<Path>>(
66        &self,
67        arc: &TrackingDataArc,
68        path: P,
69        cfg: ExportCfg,
70    ) -> Result<PathBuf, ODError> {
71        ensure!(
72            !self.estimates.is_empty(),
73            TooFewMeasurementsSnafu {
74                need: 1_usize,
75                action: "exporting OD results"
76            }
77        );
78
79        if self.estimates.len() != self.residuals.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: "Estimates and residuals are not aligned.".to_string(),
83                },
84            });
85        }
86
87        let tick = Epoch::now().unwrap();
88        info!("Exporting orbit determination result to parquet file...");
89
90        if cfg.step.is_some() {
91            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
92        }
93
94        // Grab the path here before we move stuff.
95        let path_buf = cfg.actual_path(path);
96
97        // Build the schema
98        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
99
100        let frame = self.estimates[0].state().frame();
101
102        let more_meta = Some(vec![(
103            "Frame".to_string(),
104            serde_dhall::serialize(&frame)
105                .to_string()
106                .map_err(|e| ODError::ODIOError {
107                    source: InputOutputError::SerializeDhall {
108                        what: format!("frame `{frame}`"),
109                        err: e.to_string(),
110                    },
111                })?,
112        )]);
113
114        let mut fields = match cfg.fields {
115            Some(fields) => fields,
116            None => Spacecraft::export_params(),
117        };
118
119        // Check that we can retrieve this information
120        fields.retain(|param| match self.estimates[0].state().value(*param) {
121            Ok(_) => param != &StateParameter::GuidanceMode,
122            Err(_) => false,
123        });
124
125        for field in &fields {
126            hdrs.push(field.to_field(more_meta.clone()));
127        }
128
129        let mut sigma_fields = fields.clone();
130        // Check that we can retrieve this information
131        sigma_fields.retain(|param| {
132            !matches!(
133                param,
134                &StateParameter::X
135                    | &StateParameter::Y
136                    | &StateParameter::Z
137                    | &StateParameter::VX
138                    | &StateParameter::VY
139                    | &StateParameter::VZ
140            ) && self.estimates[0].sigma_for(*param).is_ok()
141        });
142
143        for field in &sigma_fields {
144            hdrs.push(field.to_cov_field(more_meta.clone()));
145        }
146
147        let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
148        let state_units = [
149            "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
150        ];
151        let mut cov_units = vec![];
152
153        for i in 0..state_items.len() {
154            for j in i..state_items.len() {
155                let cov_unit = if i < 3 {
156                    if j < 3 {
157                        "km^2"
158                    } else if (3..6).contains(&j) {
159                        "km^2/s"
160                    } else if j == 8 {
161                        "km*kg"
162                    } else {
163                        "km"
164                    }
165                } else if (3..6).contains(&i) {
166                    if (3..6).contains(&j) {
167                        "km^2/s^2"
168                    } else if j == 8 {
169                        "km/s*kg"
170                    } else {
171                        "km/s"
172                    }
173                } else if i == 8 || j == 8 {
174                    "kg^2"
175                } else {
176                    "unitless"
177                };
178
179                cov_units.push(cov_unit);
180            }
181        }
182
183        let est_size = <Spacecraft as State>::Size::dim();
184
185        let mut idx = 0;
186        for i in 0..state_items.len() {
187            for j in i..state_items.len() {
188                hdrs.push(Field::new(
189                    format!(
190                        "Covariance {}*{} ({frame:x}) ({})",
191                        state_items[i], state_items[j], cov_units[idx]
192                    ),
193                    DataType::Float64,
194                    false,
195                ));
196                idx += 1;
197            }
198        }
199
200        // Add the uncertainty in the integration frame
201        for (i, coord) in state_items.iter().enumerate() {
202            hdrs.push(Field::new(
203                format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
204                DataType::Float64,
205                false,
206            ));
207        }
208
209        // Add the position and velocity uncertainty in the RIC frame
210        for (i, coord) in state_items.iter().enumerate().take(6) {
211            hdrs.push(Field::new(
212                format!("Sigma {coord} (RIC) ({})", state_units[i]),
213                DataType::Float64,
214                false,
215            ));
216        }
217
218        // Add the fields of the residuals
219        let mut msr_fields = Vec::new();
220        for f in arc.unique_types() {
221            msr_fields.push(
222                f.to_field()
223                    .with_nullable(true)
224                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
225            );
226        }
227        for f in arc.unique_types() {
228            msr_fields.push(
229                f.to_field()
230                    .with_nullable(true)
231                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
232            );
233        }
234        for f in arc.unique_types() {
235            msr_fields.push(
236                f.to_field()
237                    .with_nullable(true)
238                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
239            );
240        }
241
242        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
243        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
244        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
245
246        hdrs.append(&mut msr_fields);
247
248        // Build the schema
249        let schema = Arc::new(Schema::new(hdrs));
250        let mut record: Vec<Arc<dyn Array>> = Vec::new();
251
252        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
253        let (estimates, residuals) =
254            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
255                // Must interpolate the data!
256                let start = cfg
257                    .start_epoch
258                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
259                let end = cfg
260                    .end_epoch
261                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
262
263                let mut residuals: Vec<Option<Residual<MsrSize>>> =
264                    Vec::with_capacity(self.residuals.len());
265                let mut estimates = Vec::with_capacity(self.estimates.len());
266
267                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
268                    if estimate.epoch() >= start && estimate.epoch() <= end {
269                        estimates.push(*estimate);
270                        residuals.push(residual.clone());
271                    }
272                }
273
274                (estimates, residuals)
275            } else {
276                (self.estimates.to_vec(), self.residuals.to_vec())
277            };
278
279        // Build all of the records
280
281        // Epochs
282        let mut utc_epoch = StringBuilder::new();
283        for s in &estimates {
284            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
285        }
286        record.push(Arc::new(utc_epoch.finish()));
287
288        // Add all of the fields
289        for field in fields {
290            let mut data = Float64Builder::new();
291            for s in &estimates {
292                data.append_value(s.state().value(field).unwrap());
293            }
294            record.push(Arc::new(data.finish()));
295        }
296
297        // Add all of the 1-sigma uncertainties
298        for field in sigma_fields {
299            let mut data = Float64Builder::new();
300            for s in &estimates {
301                data.append_value(s.sigma_for(field).unwrap());
302            }
303            record.push(Arc::new(data.finish()));
304        }
305
306        // Add the 1-sigma covariance in the integration frame
307        for i in 0..est_size {
308            for j in i..est_size {
309                let mut data = Float64Builder::new();
310                for s in &estimates {
311                    data.append_value(s.covar()[(i, j)]);
312                }
313                record.push(Arc::new(data.finish()));
314            }
315        }
316
317        // Add the sigma/uncertainty in the integration frame
318        for i in 0..est_size {
319            let mut data = Float64Builder::new();
320            for s in &estimates {
321                data.append_value(s.covar()[(i, i)].sqrt());
322            }
323            record.push(Arc::new(data.finish()));
324        }
325
326        // Add the sigma/uncertainty covariance in the RIC frame
327        let mut ric_covariances = Vec::new();
328
329        for s in &estimates {
330            let dcm_ric2inertial = s
331                .state()
332                .orbit()
333                .dcm_from_ric_to_inertial()
334                .unwrap()
335                .state_dcm();
336
337            // Build the matrix view of the orbit part of the covariance.
338            let cov = s.covar();
339            let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
340
341            // Rotate back into the RIC frame
342            let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
343            ric_covariances.push(ric_covar);
344        }
345
346        // Now store the RIC covariance data.
347        for i in 0..6 {
348            let mut data = Float64Builder::new();
349            for cov in ric_covariances.iter().take(estimates.len()) {
350                data.append_value(cov[(i, i)].sqrt());
351            }
352            record.push(Arc::new(data.finish()));
353        }
354
355        // Finally, add the residuals.
356        // Prefits
357        for msr_type in arc.unique_types() {
358            let mut data = Float64Builder::new();
359            for resid_opt in &residuals {
360                if let Some(resid) = resid_opt {
361                    match resid.prefit(msr_type) {
362                        Some(prefit) => data.append_value(prefit),
363                        None => data.append_null(),
364                    };
365                } else {
366                    data.append_null();
367                }
368            }
369            record.push(Arc::new(data.finish()));
370        }
371        // Postfit
372        for msr_type in arc.unique_types() {
373            let mut data = Float64Builder::new();
374            for resid_opt in &residuals {
375                if let Some(resid) = resid_opt {
376                    match resid.postfit(msr_type) {
377                        Some(postfit) => data.append_value(postfit),
378                        None => data.append_null(),
379                    };
380                } else {
381                    data.append_null();
382                }
383            }
384            record.push(Arc::new(data.finish()));
385        }
386        // Measurement noise
387        for msr_type in arc.unique_types() {
388            let mut data = Float64Builder::new();
389            for resid_opt in &residuals {
390                if let Some(resid) = resid_opt {
391                    match resid.trk_noise(msr_type) {
392                        Some(noise) => data.append_value(noise),
393                        None => data.append_null(),
394                    };
395                } else {
396                    data.append_null();
397                }
398            }
399            record.push(Arc::new(data.finish()));
400        }
401        // Residual ratio (unique entry regardless of the size)
402        let mut data = Float64Builder::new();
403        for resid_opt in &residuals {
404            if let Some(resid) = resid_opt {
405                data.append_value(resid.ratio);
406            } else {
407                data.append_null();
408            }
409        }
410        record.push(Arc::new(data.finish()));
411
412        // Residual acceptance (unique entry regardless of the size)
413        let mut data = BooleanBuilder::new();
414        for resid_opt in &residuals {
415            if let Some(resid) = resid_opt {
416                data.append_value(resid.rejected);
417            } else {
418                data.append_null();
419            }
420        }
421        record.push(Arc::new(data.finish()));
422
423        // Residual tracker (unique entry regardless of the size)
424        let mut data = StringBuilder::new();
425        for resid_opt in &residuals {
426            if let Some(resid) = resid_opt {
427                data.append_value(
428                    resid
429                        .tracker
430                        .clone()
431                        .unwrap_or("Undefined tracker".to_string()),
432                );
433            } else {
434                data.append_null();
435            }
436        }
437        record.push(Arc::new(data.finish()));
438
439        info!("Serialized {} estimates and residuals", estimates.len());
440
441        // Serialize all of the devices and add that to the parquet file too.
442        let mut metadata = HashMap::new();
443        metadata.insert(
444            "Purpose".to_string(),
445            "Orbit determination results".to_string(),
446        );
447        if let Some(add_meta) = cfg.metadata {
448            for (k, v) in add_meta {
449                metadata.insert(k, v);
450            }
451        }
452
453        let props = pq_writer(Some(metadata));
454
455        let file = File::create(&path_buf)
456            .context(StdIOSnafu {
457                action: "creating OD results file",
458            })
459            .context(ODIOSnafu)?;
460
461        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
462            .context(ParquetSnafu {
463                action: "exporting OD results",
464            })
465            .context(ODIOSnafu)?;
466
467        let batch = RecordBatch::try_new(schema, record)
468            .context(ArrowSnafu {
469                action: "writing OD results (building batch record)",
470            })
471            .context(ODIOSnafu)?;
472
473        writer
474            .write(&batch)
475            .context(ParquetSnafu {
476                action: "writing OD results",
477            })
478            .context(ODIOSnafu)?;
479
480        writer
481            .close()
482            .context(ParquetSnafu {
483                action: "closing OD results file",
484            })
485            .context(ODIOSnafu)?;
486
487        // Return the path this was written to
488        let tock_time = Epoch::now().unwrap() - tick;
489        info!(
490            "Orbit determination results written to {} in {tock_time}",
491            path_buf.display()
492        );
493        Ok(path_buf)
494    }
495}