nyx_space/od/process/solution/
export.rs1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use log::{info, warn};
33use msr::sensitivity::TrackerSensitivity;
34use nalgebra::Const;
35use parquet::arrow::ArrowWriter;
36use snafu::prelude::*;
37use std::collections::HashMap;
38use std::fs::File;
39use std::path::{Path, PathBuf};
40
41use super::ODSolution;
42
43impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
44 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
45where
46 DefaultAllocator: Allocator<MsrSize>
47 + Allocator<MsrSize, <Spacecraft as State>::Size>
48 + Allocator<Const<1>, MsrSize>
49 + Allocator<<Spacecraft as State>::Size>
50 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
51 + Allocator<MsrSize, MsrSize>
52 + Allocator<MsrSize, <Spacecraft as State>::Size>
53 + Allocator<<Spacecraft as State>::Size, MsrSize>
54 + Allocator<<Spacecraft as State>::Size>
55 + Allocator<<Spacecraft as State>::VecLength>
56 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
57{
58 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
60 ensure!(
61 !self.estimates.is_empty(),
62 TooFewMeasurementsSnafu {
63 need: 1_usize,
64 action: "exporting OD results"
65 }
66 );
67
68 if self.estimates.len() != self.residuals.len() {
69 return Err(ODError::ODConfigError {
70 source: ConfigError::InvalidConfig {
71 msg: format!(
72 "Estimates ({}) and residuals ({}) are not aligned.",
73 self.estimates.len(),
74 self.residuals.len()
75 ),
76 },
77 });
78 }
79
80 if self.estimates.len() != self.gains.len() {
81 return Err(ODError::ODConfigError {
82 source: ConfigError::InvalidConfig {
83 msg: format!(
84 "Estimates ({}) and filter gains ({}) are not aligned.",
85 self.estimates.len(),
86 self.gains.len()
87 ),
88 },
89 });
90 }
91
92 if self.estimates.len() != self.filter_smoother_ratios.len() {
93 return Err(ODError::ODConfigError {
94 source: ConfigError::InvalidConfig {
95 msg: format!(
96 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
97 self.estimates.len(),
98 self.filter_smoother_ratios.len()
99 ),
100 },
101 });
102 }
103
104 let tick = Epoch::now().unwrap();
105 info!("Exporting orbit determination result to parquet file...");
106
107 if cfg.step.is_some() {
108 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
109 }
110
111 let path_buf = cfg.actual_path(path);
113
114 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
116
117 let frame = self.estimates[0].state().frame();
118
119 let more_meta = Some(vec![
120 (
121 "Frame".to_string(),
122 serde_dhall::serialize(&frame)
123 .static_type_annotation()
124 .to_string()
125 .map_err(|e| ODError::ODIOError {
126 source: InputOutputError::SerializeDhall {
127 what: format!("frame `{frame}`"),
128 err: e.to_string(),
129 },
130 })?,
131 ),
132 (
133 "SRP Area (m2)".to_string(),
134 self.estimates
135 .first()
136 .as_ref()
137 .unwrap()
138 .nominal_state()
139 .srp
140 .area_m2
141 .to_string(),
142 ),
143 (
144 "Drag Area (m2)".to_string(),
145 self.estimates
146 .first()
147 .as_ref()
148 .unwrap()
149 .nominal_state()
150 .drag
151 .area_m2
152 .to_string(),
153 ),
154 ]);
155
156 let mut fields = match cfg.fields {
157 Some(fields) => fields,
158 None => Spacecraft::export_params(),
159 };
160
161 fields.retain(|param| match self.estimates[0].state().value(*param) {
163 Ok(_) => param != &StateParameter::GuidanceMode,
164 Err(_) => false,
165 });
166
167 for field in &fields {
168 hdrs.push(field.to_field(more_meta.clone()));
169 }
170
171 let mut sigma_fields = fields.clone();
172 sigma_fields.retain(|param| {
174 !matches!(
175 param,
176 &StateParameter::X
177 | &StateParameter::Y
178 | &StateParameter::Z
179 | &StateParameter::VX
180 | &StateParameter::VY
181 | &StateParameter::VZ
182 ) && self.estimates[0].sigma_for(*param).is_ok()
183 });
184
185 for field in &sigma_fields {
186 hdrs.push(field.to_cov_field(more_meta.clone()));
187 }
188
189 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
190 let state_units = [
191 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
192 ];
193 let mut cov_units = vec![];
194
195 for i in 0..state_items.len() {
196 for j in i..state_items.len() {
197 let cov_unit = if i < 3 {
198 if j < 3 {
199 "km^2"
200 } else if (3..6).contains(&j) {
201 "km^2/s"
202 } else if j == 8 {
203 "km*kg"
204 } else {
205 "km"
206 }
207 } else if (3..6).contains(&i) {
208 if (3..6).contains(&j) {
209 "km^2/s^2"
210 } else if j == 8 {
211 "km/s*kg"
212 } else {
213 "km/s"
214 }
215 } else if i == 8 || j == 8 {
216 "kg^2"
217 } else {
218 "unitless"
219 };
220
221 cov_units.push(cov_unit);
222 }
223 }
224
225 let est_size = <Spacecraft as State>::Size::dim();
226
227 let mut idx = 0;
228 for i in 0..state_items.len() {
229 for j in i..state_items.len() {
230 hdrs.push(Field::new(
231 format!(
232 "Covariance {}*{} ({frame:x}) ({})",
233 state_items[i], state_items[j], cov_units[idx]
234 ),
235 DataType::Float64,
236 false,
237 ));
238 idx += 1;
239 }
240 }
241
242 for (i, coord) in state_items.iter().enumerate() {
244 hdrs.push(Field::new(
245 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
246 DataType::Float64,
247 false,
248 ));
249 }
250
251 for (i, coord) in state_items.iter().enumerate().take(6) {
253 hdrs.push(Field::new(
254 format!("Sigma {coord} (RIC) ({})", state_units[i]),
255 DataType::Float64,
256 false,
257 ));
258 }
259
260 let mut msr_fields = Vec::new();
262 for f in &self.measurement_types {
263 msr_fields.push(
264 f.to_field()
265 .with_nullable(true)
266 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
267 );
268 }
269 for f in &self.measurement_types {
270 msr_fields.push(
271 f.to_field()
272 .with_nullable(true)
273 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
274 );
275 }
276 for f in &self.measurement_types {
277 msr_fields.push(
278 f.to_field()
279 .with_nullable(true)
280 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
281 );
282 }
283 for f in &self.measurement_types {
284 msr_fields.push(
285 f.to_field()
286 .with_nullable(true)
287 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
288 );
289 }
290 for f in &self.measurement_types {
291 msr_fields.push(
292 f.to_field()
293 .with_nullable(true)
294 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
295 );
296 }
297
298 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
299 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
300 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
301
302 hdrs.append(&mut msr_fields);
303
304 if self.measurement_types.len() == MsrSize::DIM {
306 for i in 0..state_items.len() {
307 for f in &self.measurement_types {
308 hdrs.push(Field::new(
309 format!(
310 "Gain {}*{f:?} ({}*{})",
311 state_items[i],
312 cov_units[i],
313 f.unit()
314 ),
315 DataType::Float64,
316 true,
317 ));
318 }
319 }
320 } else {
321 for state_item in &state_items {
322 for j in 0..MsrSize::DIM {
323 hdrs.push(Field::new(
324 format!("Gain {state_item}*[{j}]"),
325 DataType::Float64,
326 true,
327 ));
328 }
329 }
330 }
331
332 for i in 0..state_items.len() {
334 hdrs.push(Field::new(
335 format!(
336 "Filter-smoother ratio {} ({})",
337 state_items[i], cov_units[i],
338 ),
339 DataType::Float64,
340 true,
341 ));
342 }
343
344 let schema = Arc::new(Schema::new(hdrs));
346 let mut record: Vec<Arc<dyn Array>> = Vec::new();
347
348 let (estimates, residuals) =
350 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
351 let start = cfg
353 .start_epoch
354 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
355 let end = cfg
356 .end_epoch
357 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
358
359 let mut residuals: Vec<Option<Residual<MsrSize>>> =
360 Vec::with_capacity(self.residuals.len());
361 let mut estimates = Vec::with_capacity(self.estimates.len());
362
363 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
364 if estimate.epoch() >= start && estimate.epoch() <= end {
365 estimates.push(*estimate);
366 residuals.push(residual.clone());
367 }
368 }
369
370 (estimates, residuals)
371 } else {
372 (self.estimates.to_vec(), self.residuals.to_vec())
373 };
374
375 let mut utc_epoch = StringBuilder::new();
379 for s in &estimates {
380 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
381 }
382 record.push(Arc::new(utc_epoch.finish()));
383
384 for field in fields {
386 let mut data = Float64Builder::new();
387 for s in &estimates {
388 data.append_value(
389 s.state()
390 .value(field)
391 .context(ODStateSnafu { action: "export" })?,
392 );
393 }
394 record.push(Arc::new(data.finish()));
395 }
396
397 for field in sigma_fields {
399 let mut data = Float64Builder::new();
400 for s in &estimates {
401 data.append_value(s.sigma_for(field).unwrap());
402 }
403 record.push(Arc::new(data.finish()));
404 }
405
406 for i in 0..est_size {
408 for j in i..est_size {
409 let mut data = Float64Builder::new();
410 for s in &estimates {
411 data.append_value(s.covar()[(i, j)]);
412 }
413 record.push(Arc::new(data.finish()));
414 }
415 }
416
417 for i in 0..est_size {
419 let mut data = Float64Builder::new();
420 for s in &estimates {
421 data.append_value(s.covar()[(i, i)].sqrt());
422 }
423 record.push(Arc::new(data.finish()));
424 }
425
426 let mut ric_covariances = Vec::new();
428
429 for s in &estimates {
430 let dcm_ric2inertial = s
431 .state()
432 .orbit()
433 .dcm_from_ric_to_inertial()
434 .unwrap()
435 .state_dcm();
436
437 let cov = s.covar();
439 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
440
441 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
443 ric_covariances.push(ric_covar);
444 }
445
446 for i in 0..6 {
448 let mut data = Float64Builder::new();
449 for cov in ric_covariances.iter().take(estimates.len()) {
450 data.append_value(cov[(i, i)].sqrt());
451 }
452 record.push(Arc::new(data.finish()));
453 }
454
455 for msr_type in &self.measurement_types {
458 let mut data = Float64Builder::new();
459 for resid_opt in &residuals {
460 if let Some(resid) = resid_opt {
461 match resid.prefit(*msr_type) {
462 Some(prefit) => data.append_value(prefit),
463 None => data.append_null(),
464 };
465 } else {
466 data.append_null();
467 }
468 }
469 record.push(Arc::new(data.finish()));
470 }
471 for msr_type in &self.measurement_types {
473 let mut data = Float64Builder::new();
474 for resid_opt in &residuals {
475 if let Some(resid) = resid_opt {
476 match resid.postfit(*msr_type) {
477 Some(postfit) => data.append_value(postfit),
478 None => data.append_null(),
479 };
480 } else {
481 data.append_null();
482 }
483 }
484 record.push(Arc::new(data.finish()));
485 }
486 for msr_type in &self.measurement_types {
488 let mut data = Float64Builder::new();
489 for resid_opt in &residuals {
490 if let Some(resid) = resid_opt {
491 match resid.trk_noise(*msr_type) {
492 Some(noise) => data.append_value(noise),
493 None => data.append_null(),
494 };
495 } else {
496 data.append_null();
497 }
498 }
499 record.push(Arc::new(data.finish()));
500 }
501 for msr_type in &self.measurement_types {
503 let mut data = Float64Builder::new();
504 for resid_opt in &residuals {
505 if let Some(resid) = resid_opt {
506 match resid.real_obs(*msr_type) {
507 Some(postfit) => data.append_value(postfit),
508 None => data.append_null(),
509 };
510 } else {
511 data.append_null();
512 }
513 }
514 record.push(Arc::new(data.finish()));
515 }
516 for msr_type in &self.measurement_types {
518 let mut data = Float64Builder::new();
519 for resid_opt in &residuals {
520 if let Some(resid) = resid_opt {
521 match resid.computed_obs(*msr_type) {
522 Some(postfit) => data.append_value(postfit),
523 None => data.append_null(),
524 };
525 } else {
526 data.append_null();
527 }
528 }
529 record.push(Arc::new(data.finish()));
530 }
531 let mut data = Float64Builder::new();
533 for resid_opt in &residuals {
534 if let Some(resid) = resid_opt {
535 data.append_value(resid.ratio);
536 } else {
537 data.append_null();
538 }
539 }
540 record.push(Arc::new(data.finish()));
541
542 let mut data = BooleanBuilder::new();
544 for resid_opt in &residuals {
545 if let Some(resid) = resid_opt {
546 data.append_value(resid.rejected);
547 } else {
548 data.append_null();
549 }
550 }
551 record.push(Arc::new(data.finish()));
552
553 let mut data = StringBuilder::new();
555 for resid_opt in &residuals {
556 if let Some(resid) = resid_opt {
557 data.append_value(
558 resid
559 .tracker
560 .clone()
561 .unwrap_or("Undefined tracker".to_string()),
562 );
563 } else {
564 data.append_null();
565 }
566 }
567 record.push(Arc::new(data.finish()));
568
569 for i in 0..est_size {
571 for j in 0..MsrSize::DIM {
572 let mut data = Float64Builder::new();
573 for opt_k in &self.gains {
574 if let Some(k) = opt_k {
575 data.append_value(k[(i, j)]);
576 } else {
577 data.append_null();
578 }
579 }
580 record.push(Arc::new(data.finish()));
581 }
582 }
583
584 for i in 0..est_size {
586 let mut data = Float64Builder::new();
587 for opt_fsr in &self.filter_smoother_ratios {
588 if let Some(fsr) = opt_fsr {
589 data.append_value(fsr[i]);
590 } else {
591 data.append_null();
592 }
593 }
594 record.push(Arc::new(data.finish()));
595 }
596
597 info!("Serialized {} estimates and residuals", estimates.len());
598
599 let mut metadata = HashMap::new();
601 metadata.insert(
602 "Purpose".to_string(),
603 "Orbit determination results".to_string(),
604 );
605 if let Some(add_meta) = cfg.metadata {
606 for (k, v) in add_meta {
607 metadata.insert(k, v);
608 }
609 }
610
611 let props = pq_writer(Some(metadata));
612
613 let file = File::create(&path_buf)
614 .context(StdIOSnafu {
615 action: "creating OD results file",
616 })
617 .context(ODIOSnafu)?;
618
619 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
620 .context(ParquetSnafu {
621 action: "exporting OD results",
622 })
623 .context(ODIOSnafu)?;
624
625 let batch = RecordBatch::try_new(schema, record)
626 .context(ArrowSnafu {
627 action: "writing OD results (building batch record)",
628 })
629 .context(ODIOSnafu)?;
630
631 writer
632 .write(&batch)
633 .context(ParquetSnafu {
634 action: "writing OD results",
635 })
636 .context(ODIOSnafu)?;
637
638 writer
639 .close()
640 .context(ParquetSnafu {
641 action: "closing OD results file",
642 })
643 .context(ODIOSnafu)?;
644
645 let tock_time = Epoch::now().unwrap() - tick;
647 info!(
648 "Orbit determination results written to {} in {tock_time}",
649 path_buf.display()
650 );
651 Ok(path_buf)
652 }
653}