Skip to main content

nyx_space/od/groundpnt/
solution.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::DefaultAllocator;
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::od::groundpnt::GroundAsset;
27use crate::od::interlink::InterlinkTxSpacecraft;
28use crate::od::process::ODSolution;
29use crate::State;
30use crate::{od::*, Spacecraft};
31use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
32use arrow::datatypes::{DataType, Field, Schema};
33use arrow::record_batch::RecordBatch;
34use hifitime::TimeScale;
35use log::{info, warn};
36use nalgebra::{Const, U2};
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43impl ODSolution<GroundAsset, KfEstimate<GroundAsset>, U2, InterlinkTxSpacecraft>
44where
45    DefaultAllocator: Allocator<U2>
46        + Allocator<U2, <Spacecraft as State>::Size>
47        + Allocator<Const<1>, U2>
48        + Allocator<<Spacecraft as State>::Size>
49        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50        + Allocator<U2, U2>
51        + Allocator<U2, <Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, U2>
53        + Allocator<<Spacecraft as State>::Size>
54        + Allocator<<Spacecraft as State>::VecLength>
55        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57    /// Store the estimates and residuals in a parquet file
58    pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59        ensure!(
60            !self.estimates.is_empty(),
61            TooFewMeasurementsSnafu {
62                need: 1_usize,
63                action: "exporting PNT results"
64            }
65        );
66
67        if self.estimates.len() != self.residuals.len() {
68            return Err(ODError::ODConfigError {
69                source: ConfigError::InvalidConfig {
70                    msg: format!(
71                        "Estimates ({}) and residuals ({}) are not aligned.",
72                        self.estimates.len(),
73                        self.residuals.len()
74                    ),
75                },
76            });
77        }
78
79        if self.estimates.len() != self.gains.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: format!(
83                        "Estimates ({}) and filter gains ({}) are not aligned.",
84                        self.estimates.len(),
85                        self.gains.len()
86                    ),
87                },
88            });
89        }
90
91        if self.estimates.len() != self.filter_smoother_ratios.len() {
92            return Err(ODError::ODConfigError {
93                source: ConfigError::InvalidConfig {
94                    msg: format!(
95                        "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96                        self.estimates.len(),
97                        self.filter_smoother_ratios.len()
98                    ),
99                },
100            });
101        }
102
103        let tick = Epoch::now().unwrap();
104        info!("Exporting orbit determination result to parquet file...");
105
106        if cfg.step.is_some() {
107            warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108        }
109
110        // Grab the path here before we move stuff.
111        let path_buf = cfg.actual_path(path);
112
113        // Build the schema
114        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116        let frame = self.estimates[0].state().frame();
117
118        let more_meta = Some(vec![(
119            "Frame".to_string(),
120            serde_dhall::serialize(&frame)
121                .static_type_annotation()
122                .to_string()
123                .map_err(|e| ODError::ODIOError {
124                    source: InputOutputError::SerializeDhall {
125                        what: format!("frame `{frame}`"),
126                        err: e.to_string(),
127                    },
128                })?,
129        )]);
130
131        let mut fields = GroundAsset::export_params();
132
133        // Check that we can retrieve this information
134        fields.retain(|param| match self.estimates[0].state().value(*param) {
135            Ok(_) => param != &StateParameter::GuidanceMode(),
136            Err(_) => false,
137        });
138
139        for field in &fields {
140            hdrs.push(field.to_field(more_meta.clone()));
141        }
142
143        let mut sigma_fields = fields.clone();
144        // Check that we can retrieve this information
145        sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
146
147        for field in &sigma_fields {
148            hdrs.push(field.to_cov_field(more_meta.clone()));
149        }
150
151        // Don't export the lat/long/alt rate because we don't have the associated state parameters.
152        let state_items = [
153            "Latitude",
154            "Longitude",
155            "Altitude",
156            // "Latitude rate",
157            // "Longitude rate",
158            // "Altitude rate",
159        ];
160
161        let state_units = ["deg", "deg", "km"];
162        let mut cov_units = vec![];
163
164        for i in 0..state_items.len() {
165            for j in i..state_items.len() {
166                let cov_unit = format!("{}*{}", state_units[i], state_units[j]);
167
168                cov_units.push(cov_unit);
169            }
170        }
171
172        let mut idx = 0;
173        for i in 0..state_items.len() {
174            for j in i..state_items.len() {
175                hdrs.push(Field::new(
176                    format!(
177                        "Covariance {}*{} ({frame:x}) ({})",
178                        state_items[i], state_items[j], cov_units[idx]
179                    ),
180                    DataType::Float64,
181                    false,
182                ));
183                idx += 1;
184            }
185        }
186
187        // Add the fields of the residuals
188        let mut msr_fields = Vec::new();
189        for f in &self.measurement_types {
190            msr_fields.push(
191                f.to_field()
192                    .with_nullable(true)
193                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
194            );
195        }
196        for f in &self.measurement_types {
197            msr_fields.push(
198                f.to_field()
199                    .with_nullable(true)
200                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
201            );
202        }
203        for f in &self.measurement_types {
204            msr_fields.push(
205                f.to_field()
206                    .with_nullable(true)
207                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
208            );
209        }
210        for f in &self.measurement_types {
211            msr_fields.push(
212                f.to_field()
213                    .with_nullable(true)
214                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
215            );
216        }
217        for f in &self.measurement_types {
218            msr_fields.push(
219                f.to_field()
220                    .with_nullable(true)
221                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
222            );
223        }
224
225        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
226        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
227        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
228
229        hdrs.append(&mut msr_fields);
230
231        // Add the filter gain columns
232        for i in 0..state_items.len() {
233            for f in &self.measurement_types {
234                hdrs.push(Field::new(
235                    format!(
236                        "Gain {}*{f:?} ({}*{}*{})",
237                        state_items[i],
238                        state_units[i],
239                        state_units[i],
240                        f.unit()
241                    ),
242                    DataType::Float64,
243                    true,
244                ));
245            }
246        }
247
248        // Add the filter-smoother ratio columns
249        for i in 0..state_items.len() {
250            hdrs.push(Field::new(
251                format!(
252                    "Filter-smoother ratio {} ({}*{})",
253                    state_items[i], state_units[i], state_units[i],
254                ),
255                DataType::Float64,
256                true,
257            ));
258        }
259
260        // Build the schema
261        let schema = Arc::new(Schema::new(hdrs));
262        let mut record: Vec<Arc<dyn Array>> = Vec::new();
263
264        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
265        let (estimates, residuals) =
266            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
267                // Must interpolate the data!
268                let start = cfg
269                    .start_epoch
270                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
271                let end = cfg
272                    .end_epoch
273                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
274
275                let mut residuals: Vec<Option<Residual<U2>>> =
276                    Vec::with_capacity(self.residuals.len());
277                let mut estimates = Vec::with_capacity(self.estimates.len());
278
279                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
280                    if estimate.epoch() >= start && estimate.epoch() <= end {
281                        estimates.push(*estimate);
282                        residuals.push(residual.clone());
283                    }
284                }
285
286                (estimates, residuals)
287            } else {
288                (self.estimates.to_vec(), self.residuals.to_vec())
289            };
290
291        // Build all of the records
292
293        // Epochs
294        let mut utc_epoch = StringBuilder::new();
295        for s in &estimates {
296            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
297        }
298        record.push(Arc::new(utc_epoch.finish()));
299
300        // Add all of the fields
301        for field in fields {
302            let mut data = Float64Builder::new();
303            for s in &estimates {
304                data.append_value(
305                    s.state()
306                        .value(field)
307                        .context(ODStateSnafu { action: "export" })?,
308                );
309            }
310            record.push(Arc::new(data.finish()));
311        }
312
313        // Add the sigma/uncertainty in the integration frame
314        for i in 0..state_items.len() {
315            let mut data = Float64Builder::new();
316            for s in &estimates {
317                data.append_value(s.covar()[(i, i)].sqrt());
318            }
319            record.push(Arc::new(data.finish()));
320        }
321
322        // Add the 1-sigma covariance in the integration frame
323        for i in 0..state_items.len() {
324            for j in i..state_items.len() {
325                let mut data = Float64Builder::new();
326                for s in &estimates {
327                    data.append_value(s.covar()[(i, j)]);
328                }
329                record.push(Arc::new(data.finish()));
330            }
331        }
332
333        // Finally, add the residuals.
334        // Prefits
335        for msr_type in &self.measurement_types {
336            let mut data = Float64Builder::new();
337            for resid_opt in &residuals {
338                if let Some(resid) = resid_opt {
339                    match resid.prefit(*msr_type) {
340                        Some(prefit) => data.append_value(prefit),
341                        None => data.append_null(),
342                    };
343                } else {
344                    data.append_null();
345                }
346            }
347            record.push(Arc::new(data.finish()));
348        }
349        // Postfit
350        for msr_type in &self.measurement_types {
351            let mut data = Float64Builder::new();
352            for resid_opt in &residuals {
353                if let Some(resid) = resid_opt {
354                    match resid.postfit(*msr_type) {
355                        Some(postfit) => data.append_value(postfit),
356                        None => data.append_null(),
357                    };
358                } else {
359                    data.append_null();
360                }
361            }
362            record.push(Arc::new(data.finish()));
363        }
364
365        // Measurement noise
366        for msr_type in &self.measurement_types {
367            let mut data = Float64Builder::new();
368            for resid_opt in &residuals {
369                if let Some(resid) = resid_opt {
370                    match resid.trk_noise(*msr_type) {
371                        Some(noise) => data.append_value(noise),
372                        None => data.append_null(),
373                    };
374                } else {
375                    data.append_null();
376                }
377            }
378            record.push(Arc::new(data.finish()));
379        }
380
381        // Real observation
382        for msr_type in &self.measurement_types {
383            let mut data = Float64Builder::new();
384            for resid_opt in &residuals {
385                if let Some(resid) = resid_opt {
386                    match resid.real_obs(*msr_type) {
387                        Some(postfit) => data.append_value(postfit),
388                        None => data.append_null(),
389                    };
390                } else {
391                    data.append_null();
392                }
393            }
394            record.push(Arc::new(data.finish()));
395        }
396
397        // Computed observation
398        for msr_type in &self.measurement_types {
399            let mut data = Float64Builder::new();
400            for resid_opt in &residuals {
401                if let Some(resid) = resid_opt {
402                    match resid.computed_obs(*msr_type) {
403                        Some(postfit) => data.append_value(postfit),
404                        None => data.append_null(),
405                    };
406                } else {
407                    data.append_null();
408                }
409            }
410            record.push(Arc::new(data.finish()));
411        }
412
413        // Residual ratio (unique entry regardless of the size)
414        let mut data = Float64Builder::new();
415        for resid_opt in &residuals {
416            if let Some(resid) = resid_opt {
417                data.append_value(resid.ratio);
418            } else {
419                data.append_null();
420            }
421        }
422        record.push(Arc::new(data.finish()));
423
424        // Residual acceptance (unique entry regardless of the size)
425        let mut data = BooleanBuilder::new();
426        for resid_opt in &residuals {
427            if let Some(resid) = resid_opt {
428                data.append_value(resid.rejected);
429            } else {
430                data.append_null();
431            }
432        }
433        record.push(Arc::new(data.finish()));
434
435        // Residual tracker (unique entry regardless of the size)
436        let mut data = StringBuilder::new();
437        for resid_opt in &residuals {
438            if let Some(resid) = resid_opt {
439                data.append_value(
440                    resid
441                        .tracker
442                        .clone()
443                        .unwrap_or("Undefined tracker".to_string()),
444                );
445            } else {
446                data.append_null();
447            }
448        }
449        record.push(Arc::new(data.finish()));
450
451        // Add the filter gains
452        for i in 0..state_items.len() {
453            for j in 0..2 {
454                let mut data = Float64Builder::new();
455                for opt_k in &self.gains {
456                    if let Some(k) = opt_k {
457                        data.append_value(k[(i, j)]);
458                    } else {
459                        data.append_null();
460                    }
461                }
462                record.push(Arc::new(data.finish()));
463            }
464        }
465
466        // Add the filter-smoother consistency ratios
467        for i in 0..state_items.len() {
468            let mut data = Float64Builder::new();
469            for opt_fsr in &self.filter_smoother_ratios {
470                if let Some(fsr) = opt_fsr {
471                    data.append_value(fsr[i]);
472                } else {
473                    data.append_null();
474                }
475            }
476            record.push(Arc::new(data.finish()));
477        }
478
479        info!("Serialized {} estimates and residuals", estimates.len());
480
481        // Serialize all of the devices and add that to the parquet file too.
482        let mut metadata = HashMap::new();
483        metadata.insert("Purpose".to_string(), "PNT results".to_string());
484        if let Some(add_meta) = cfg.metadata {
485            for (k, v) in add_meta {
486                metadata.insert(k, v);
487            }
488        }
489
490        let props = pq_writer(Some(metadata));
491
492        let file = File::create(&path_buf)
493            .context(StdIOSnafu {
494                action: "creating PNT results file",
495            })
496            .context(ODIOSnafu)?;
497
498        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
499            .context(ParquetSnafu {
500                action: "exporting PNT results",
501            })
502            .context(ODIOSnafu)?;
503
504        let batch = RecordBatch::try_new(schema, record)
505            .context(ArrowSnafu {
506                action: "writing PNT results (building batch record)",
507            })
508            .context(ODIOSnafu)?;
509
510        writer
511            .write(&batch)
512            .context(ParquetSnafu {
513                action: "writing PNT results",
514            })
515            .context(ODIOSnafu)?;
516
517        writer
518            .close()
519            .context(ParquetSnafu {
520                action: "closing PNT results file",
521            })
522            .context(ODIOSnafu)?;
523
524        // Return the path this was written to
525        let tock_time = Epoch::now().unwrap() - tick;
526        info!(
527            "PNT results written to {} in {tock_time}",
528            path_buf.display()
529        );
530        Ok(path_buf)
531    }
532}