Skip to main content

nyx_space/od/groundpnt/
solution.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use crate::State;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::DefaultAllocator;
23use crate::linalg::allocator::Allocator;
24use crate::md::StateParameter;
25use crate::md::trajectory::Interpolatable;
26use crate::od::estimate::*;
27use crate::od::groundpnt::GroundAsset;
28use crate::od::interlink::InterlinkTxSpacecraft;
29use crate::od::process::ODSolution;
30use crate::{Spacecraft, od::*};
31use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
32use arrow::datatypes::{DataType, Field, Schema};
33use arrow::record_batch::RecordBatch;
34use hifitime::TimeScale;
35use log::{info, warn};
36use nalgebra::{Const, U2};
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43impl ODSolution<GroundAsset, KfEstimate<GroundAsset>, U2, InterlinkTxSpacecraft>
44where
45    DefaultAllocator: Allocator<U2>
46        + Allocator<U2, <Spacecraft as State>::Size>
47        + Allocator<Const<1>, U2>
48        + Allocator<<Spacecraft as State>::Size>
49        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50        + Allocator<U2, U2>
51        + Allocator<U2, <Spacecraft as State>::Size>
52        + Allocator<<Spacecraft as State>::Size, U2>
53        + Allocator<<Spacecraft as State>::Size>
54        + Allocator<<Spacecraft as State>::VecLength>
55        + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57    /// Store the estimates and residuals in a parquet file
58    pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59        ensure!(
60            !self.estimates.is_empty(),
61            TooFewMeasurementsSnafu {
62                need: 1_usize,
63                action: "exporting PNT results"
64            }
65        );
66
67        if self.estimates.len() != self.residuals.len() {
68            return Err(ODError::ODConfigError {
69                source: ConfigError::InvalidConfig {
70                    msg: format!(
71                        "Estimates ({}) and residuals ({}) are not aligned.",
72                        self.estimates.len(),
73                        self.residuals.len()
74                    ),
75                },
76            });
77        }
78
79        if self.estimates.len() != self.gains.len() {
80            return Err(ODError::ODConfigError {
81                source: ConfigError::InvalidConfig {
82                    msg: format!(
83                        "Estimates ({}) and filter gains ({}) are not aligned.",
84                        self.estimates.len(),
85                        self.gains.len()
86                    ),
87                },
88            });
89        }
90
91        if self.estimates.len() != self.filter_smoother_ratios.len() {
92            return Err(ODError::ODConfigError {
93                source: ConfigError::InvalidConfig {
94                    msg: format!(
95                        "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96                        self.estimates.len(),
97                        self.filter_smoother_ratios.len()
98                    ),
99                },
100            });
101        }
102
103        let tick = Epoch::now().unwrap();
104        info!("Exporting orbit determination result to parquet file...");
105
106        if cfg.step.is_some() {
107            warn!(
108                "The `step` parameter in the export is not supported for orbit determination exports."
109            );
110        }
111
112        // Grab the path here before we move stuff.
113        let path_buf = cfg.actual_path(path);
114
115        // Build the schema
116        let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
117
118        let frame = self.estimates[0].state().frame();
119
120        let more_meta = Some(vec![(
121            "Frame".to_string(),
122            serde_dhall::serialize(&frame)
123                .static_type_annotation()
124                .to_string()
125                .map_err(|e| ODError::ODIOError {
126                    source: InputOutputError::SerializeDhall {
127                        what: format!("frame `{frame}`"),
128                        err: e.to_string(),
129                    },
130                })?,
131        )]);
132
133        let mut fields = GroundAsset::export_params();
134
135        // Check that we can retrieve this information
136        fields.retain(|param| match self.estimates[0].state().value(*param) {
137            Ok(_) => param != &StateParameter::GuidanceMode(),
138            Err(_) => false,
139        });
140
141        for field in &fields {
142            hdrs.push(field.to_field(more_meta.clone()));
143        }
144
145        let mut sigma_fields = fields.clone();
146        // Check that we can retrieve this information
147        sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
148
149        for field in &sigma_fields {
150            hdrs.push(field.to_cov_field(more_meta.clone()));
151        }
152
153        // Don't export the lat/long/alt rate because we don't have the associated state parameters.
154        let state_items = [
155            "Latitude",
156            "Longitude",
157            "Altitude",
158            // "Latitude rate",
159            // "Longitude rate",
160            // "Altitude rate",
161        ];
162
163        let state_units = ["deg", "deg", "km"];
164        let mut cov_units = vec![];
165
166        for i in 0..state_items.len() {
167            for j in i..state_items.len() {
168                let cov_unit = format!("{}*{}", state_units[i], state_units[j]);
169
170                cov_units.push(cov_unit);
171            }
172        }
173
174        let mut idx = 0;
175        for i in 0..state_items.len() {
176            for j in i..state_items.len() {
177                hdrs.push(Field::new(
178                    format!(
179                        "Covariance {}*{} ({frame:x}) ({})",
180                        state_items[i], state_items[j], cov_units[idx]
181                    ),
182                    DataType::Float64,
183                    false,
184                ));
185                idx += 1;
186            }
187        }
188
189        // Add the fields of the residuals
190        let mut msr_fields = Vec::new();
191        for f in &self.measurement_types {
192            msr_fields.push(
193                f.to_field()
194                    .with_nullable(true)
195                    .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
196            );
197        }
198        for f in &self.measurement_types {
199            msr_fields.push(
200                f.to_field()
201                    .with_nullable(true)
202                    .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
203            );
204        }
205        for f in &self.measurement_types {
206            msr_fields.push(
207                f.to_field()
208                    .with_nullable(true)
209                    .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
210            );
211        }
212        for f in &self.measurement_types {
213            msr_fields.push(
214                f.to_field()
215                    .with_nullable(true)
216                    .with_name(format!("Real observation: {f:?} ({})", f.unit())),
217            );
218        }
219        for f in &self.measurement_types {
220            msr_fields.push(
221                f.to_field()
222                    .with_nullable(true)
223                    .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
224            );
225        }
226
227        msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
228        msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
229        msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
230
231        hdrs.append(&mut msr_fields);
232
233        // Add the filter gain columns
234        for i in 0..state_items.len() {
235            for f in &self.measurement_types {
236                hdrs.push(Field::new(
237                    format!(
238                        "Gain {}*{f:?} ({}*{}*{})",
239                        state_items[i],
240                        state_units[i],
241                        state_units[i],
242                        f.unit()
243                    ),
244                    DataType::Float64,
245                    true,
246                ));
247            }
248        }
249
250        // Add the filter-smoother ratio columns
251        for i in 0..state_items.len() {
252            hdrs.push(Field::new(
253                format!(
254                    "Filter-smoother ratio {} ({}*{})",
255                    state_items[i], state_units[i], state_units[i],
256                ),
257                DataType::Float64,
258                true,
259            ));
260        }
261
262        // Build the schema
263        let schema = Arc::new(Schema::new(hdrs));
264        let mut record: Vec<Arc<dyn Array>> = Vec::new();
265
266        // Build the states iterator -- this does require copying the current states but I can't either get a reference or a copy of all the states.
267        let (estimates, residuals) =
268            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
269                // Must interpolate the data!
270                let start = cfg
271                    .start_epoch
272                    .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
273                let end = cfg
274                    .end_epoch
275                    .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
276
277                let mut residuals: Vec<Option<Residual<U2>>> =
278                    Vec::with_capacity(self.residuals.len());
279                let mut estimates = Vec::with_capacity(self.estimates.len());
280
281                for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
282                    if estimate.epoch() >= start && estimate.epoch() <= end {
283                        estimates.push(*estimate);
284                        residuals.push(residual.clone());
285                    }
286                }
287
288                (estimates, residuals)
289            } else {
290                (self.estimates.to_vec(), self.residuals.to_vec())
291            };
292
293        // Build all of the records
294
295        // Epochs
296        let mut utc_epoch = StringBuilder::new();
297        for s in &estimates {
298            utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
299        }
300        record.push(Arc::new(utc_epoch.finish()));
301
302        // Add all of the fields
303        for field in fields {
304            let mut data = Float64Builder::new();
305            for s in &estimates {
306                data.append_value(
307                    s.state()
308                        .value(field)
309                        .context(ODStateSnafu { action: "export" })?,
310                );
311            }
312            record.push(Arc::new(data.finish()));
313        }
314
315        // Add the sigma/uncertainty in the integration frame
316        for i in 0..state_items.len() {
317            let mut data = Float64Builder::new();
318            for s in &estimates {
319                data.append_value(s.covar()[(i, i)].sqrt());
320            }
321            record.push(Arc::new(data.finish()));
322        }
323
324        // Add the 1-sigma covariance in the integration frame
325        for i in 0..state_items.len() {
326            for j in i..state_items.len() {
327                let mut data = Float64Builder::new();
328                for s in &estimates {
329                    data.append_value(s.covar()[(i, j)]);
330                }
331                record.push(Arc::new(data.finish()));
332            }
333        }
334
335        // Finally, add the residuals.
336        // Prefits
337        for msr_type in &self.measurement_types {
338            let mut data = Float64Builder::new();
339            for resid_opt in &residuals {
340                if let Some(resid) = resid_opt {
341                    match resid.prefit(*msr_type) {
342                        Some(prefit) => data.append_value(prefit),
343                        None => data.append_null(),
344                    };
345                } else {
346                    data.append_null();
347                }
348            }
349            record.push(Arc::new(data.finish()));
350        }
351        // Postfit
352        for msr_type in &self.measurement_types {
353            let mut data = Float64Builder::new();
354            for resid_opt in &residuals {
355                if let Some(resid) = resid_opt {
356                    match resid.postfit(*msr_type) {
357                        Some(postfit) => data.append_value(postfit),
358                        None => data.append_null(),
359                    };
360                } else {
361                    data.append_null();
362                }
363            }
364            record.push(Arc::new(data.finish()));
365        }
366
367        // Measurement noise
368        for msr_type in &self.measurement_types {
369            let mut data = Float64Builder::new();
370            for resid_opt in &residuals {
371                if let Some(resid) = resid_opt {
372                    match resid.trk_noise(*msr_type) {
373                        Some(noise) => data.append_value(noise),
374                        None => data.append_null(),
375                    };
376                } else {
377                    data.append_null();
378                }
379            }
380            record.push(Arc::new(data.finish()));
381        }
382
383        // Real observation
384        for msr_type in &self.measurement_types {
385            let mut data = Float64Builder::new();
386            for resid_opt in &residuals {
387                if let Some(resid) = resid_opt {
388                    match resid.real_obs(*msr_type) {
389                        Some(postfit) => data.append_value(postfit),
390                        None => data.append_null(),
391                    };
392                } else {
393                    data.append_null();
394                }
395            }
396            record.push(Arc::new(data.finish()));
397        }
398
399        // Computed observation
400        for msr_type in &self.measurement_types {
401            let mut data = Float64Builder::new();
402            for resid_opt in &residuals {
403                if let Some(resid) = resid_opt {
404                    match resid.computed_obs(*msr_type) {
405                        Some(postfit) => data.append_value(postfit),
406                        None => data.append_null(),
407                    };
408                } else {
409                    data.append_null();
410                }
411            }
412            record.push(Arc::new(data.finish()));
413        }
414
415        // Residual ratio (unique entry regardless of the size)
416        let mut data = Float64Builder::new();
417        for resid_opt in &residuals {
418            if let Some(resid) = resid_opt {
419                data.append_value(resid.ratio);
420            } else {
421                data.append_null();
422            }
423        }
424        record.push(Arc::new(data.finish()));
425
426        // Residual acceptance (unique entry regardless of the size)
427        let mut data = BooleanBuilder::new();
428        for resid_opt in &residuals {
429            if let Some(resid) = resid_opt {
430                data.append_value(resid.rejected);
431            } else {
432                data.append_null();
433            }
434        }
435        record.push(Arc::new(data.finish()));
436
437        // Residual tracker (unique entry regardless of the size)
438        let mut data = StringBuilder::new();
439        for resid_opt in &residuals {
440            if let Some(resid) = resid_opt {
441                data.append_value(
442                    resid
443                        .tracker
444                        .clone()
445                        .unwrap_or("Undefined tracker".to_string()),
446                );
447            } else {
448                data.append_null();
449            }
450        }
451        record.push(Arc::new(data.finish()));
452
453        // Add the filter gains
454        for i in 0..state_items.len() {
455            for j in 0..2 {
456                let mut data = Float64Builder::new();
457                for opt_k in &self.gains {
458                    if let Some(k) = opt_k {
459                        data.append_value(k[(i, j)]);
460                    } else {
461                        data.append_null();
462                    }
463                }
464                record.push(Arc::new(data.finish()));
465            }
466        }
467
468        // Add the filter-smoother consistency ratios
469        for i in 0..state_items.len() {
470            let mut data = Float64Builder::new();
471            for opt_fsr in &self.filter_smoother_ratios {
472                if let Some(fsr) = opt_fsr {
473                    data.append_value(fsr[i]);
474                } else {
475                    data.append_null();
476                }
477            }
478            record.push(Arc::new(data.finish()));
479        }
480
481        info!("Serialized {} estimates and residuals", estimates.len());
482
483        // Serialize all of the devices and add that to the parquet file too.
484        let mut metadata = HashMap::new();
485        metadata.insert("Purpose".to_string(), "PNT results".to_string());
486        if let Some(add_meta) = cfg.metadata {
487            for (k, v) in add_meta {
488                metadata.insert(k, v);
489            }
490        }
491
492        let props = pq_writer(Some(metadata));
493
494        let file = File::create(&path_buf)
495            .context(StdIOSnafu {
496                action: "creating PNT results file",
497            })
498            .context(ODIOSnafu)?;
499
500        let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
501            .context(ParquetSnafu {
502                action: "exporting PNT results",
503            })
504            .context(ODIOSnafu)?;
505
506        let batch = RecordBatch::try_new(schema, record)
507            .context(ArrowSnafu {
508                action: "writing PNT results (building batch record)",
509            })
510            .context(ODIOSnafu)?;
511
512        writer
513            .write(&batch)
514            .context(ParquetSnafu {
515                action: "writing PNT results",
516            })
517            .context(ODIOSnafu)?;
518
519        writer
520            .close()
521            .context(ParquetSnafu {
522                action: "closing PNT results file",
523            })
524            .context(ODIOSnafu)?;
525
526        // Return the path this was written to
527        let tock_time = Epoch::now().unwrap() - tick;
528        info!(
529            "PNT results written to {} in {tock_time}",
530            path_buf.display()
531        );
532        Ok(path_buf)
533    }
534}