nyx_space/od/process/solution/
export.rs1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use msr::sensitivity::TrackerSensitivity;
33use nalgebra::Const;
34use parquet::arrow::ArrowWriter;
35use snafu::prelude::*;
36use std::collections::HashMap;
37use std::fs::File;
38use std::path::{Path, PathBuf};
39
40use super::ODSolution;
41
42impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
43 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
44where
45 DefaultAllocator: Allocator<MsrSize>
46 + Allocator<MsrSize, <Spacecraft as State>::Size>
47 + Allocator<Const<1>, MsrSize>
48 + Allocator<<Spacecraft as State>::Size>
49 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50 + Allocator<MsrSize, MsrSize>
51 + Allocator<MsrSize, <Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, MsrSize>
53 + Allocator<<Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::VecLength>
55 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59 ensure!(
60 !self.estimates.is_empty(),
61 TooFewMeasurementsSnafu {
62 need: 1_usize,
63 action: "exporting OD results"
64 }
65 );
66
67 if self.estimates.len() != self.residuals.len() {
68 return Err(ODError::ODConfigError {
69 source: ConfigError::InvalidConfig {
70 msg: format!(
71 "Estimates ({}) and residuals ({}) are not aligned.",
72 self.estimates.len(),
73 self.residuals.len()
74 ),
75 },
76 });
77 }
78
79 if self.estimates.len() != self.gains.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: format!(
83 "Estimates ({}) and filter gains ({}) are not aligned.",
84 self.estimates.len(),
85 self.gains.len()
86 ),
87 },
88 });
89 }
90
91 if self.estimates.len() != self.filter_smoother_ratios.len() {
92 return Err(ODError::ODConfigError {
93 source: ConfigError::InvalidConfig {
94 msg: format!(
95 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96 self.estimates.len(),
97 self.filter_smoother_ratios.len()
98 ),
99 },
100 });
101 }
102
103 let tick = Epoch::now().unwrap();
104 info!("Exporting orbit determination result to parquet file...");
105
106 if cfg.step.is_some() {
107 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108 }
109
110 let path_buf = cfg.actual_path(path);
112
113 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116 let frame = self.estimates[0].state().frame();
117
118 let more_meta = Some(vec![
119 (
120 "Frame".to_string(),
121 serde_dhall::serialize(&frame)
122 .static_type_annotation()
123 .to_string()
124 .map_err(|e| ODError::ODIOError {
125 source: InputOutputError::SerializeDhall {
126 what: format!("frame `{frame}`"),
127 err: e.to_string(),
128 },
129 })?,
130 ),
131 (
132 "SRP Area (m2)".to_string(),
133 self.estimates
134 .first()
135 .as_ref()
136 .unwrap()
137 .nominal_state()
138 .srp
139 .area_m2
140 .to_string(),
141 ),
142 (
143 "Drag Area (m2)".to_string(),
144 self.estimates
145 .first()
146 .as_ref()
147 .unwrap()
148 .nominal_state()
149 .drag
150 .area_m2
151 .to_string(),
152 ),
153 ]);
154
155 let mut fields = match cfg.fields {
156 Some(fields) => fields,
157 None => Spacecraft::export_params(),
158 };
159
160 fields.retain(|param| match self.estimates[0].state().value(*param) {
162 Ok(_) => param != &StateParameter::GuidanceMode,
163 Err(_) => false,
164 });
165
166 for field in &fields {
167 hdrs.push(field.to_field(more_meta.clone()));
168 }
169
170 let mut sigma_fields = fields.clone();
171 sigma_fields.retain(|param| {
173 !matches!(
174 param,
175 &StateParameter::X
176 | &StateParameter::Y
177 | &StateParameter::Z
178 | &StateParameter::VX
179 | &StateParameter::VY
180 | &StateParameter::VZ
181 ) && self.estimates[0].sigma_for(*param).is_ok()
182 });
183
184 for field in &sigma_fields {
185 hdrs.push(field.to_cov_field(more_meta.clone()));
186 }
187
188 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
189 let state_units = [
190 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
191 ];
192 let mut cov_units = vec![];
193
194 for i in 0..state_items.len() {
195 for j in i..state_items.len() {
196 let cov_unit = if i < 3 {
197 if j < 3 {
198 "km^2"
199 } else if (3..6).contains(&j) {
200 "km^2/s"
201 } else if j == 8 {
202 "km*kg"
203 } else {
204 "km"
205 }
206 } else if (3..6).contains(&i) {
207 if (3..6).contains(&j) {
208 "km^2/s^2"
209 } else if j == 8 {
210 "km/s*kg"
211 } else {
212 "km/s"
213 }
214 } else if i == 8 || j == 8 {
215 "kg^2"
216 } else {
217 "unitless"
218 };
219
220 cov_units.push(cov_unit);
221 }
222 }
223
224 let est_size = <Spacecraft as State>::Size::dim();
225
226 let mut idx = 0;
227 for i in 0..state_items.len() {
228 for j in i..state_items.len() {
229 hdrs.push(Field::new(
230 format!(
231 "Covariance {}*{} ({frame:x}) ({})",
232 state_items[i], state_items[j], cov_units[idx]
233 ),
234 DataType::Float64,
235 false,
236 ));
237 idx += 1;
238 }
239 }
240
241 for (i, coord) in state_items.iter().enumerate() {
243 hdrs.push(Field::new(
244 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
245 DataType::Float64,
246 false,
247 ));
248 }
249
250 for (i, coord) in state_items.iter().enumerate().take(6) {
252 hdrs.push(Field::new(
253 format!("Sigma {coord} (RIC) ({})", state_units[i]),
254 DataType::Float64,
255 false,
256 ));
257 }
258
259 let mut msr_fields = Vec::new();
261 for f in &self.measurement_types {
262 msr_fields.push(
263 f.to_field()
264 .with_nullable(true)
265 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
266 );
267 }
268 for f in &self.measurement_types {
269 msr_fields.push(
270 f.to_field()
271 .with_nullable(true)
272 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
273 );
274 }
275 for f in &self.measurement_types {
276 msr_fields.push(
277 f.to_field()
278 .with_nullable(true)
279 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
280 );
281 }
282 for f in &self.measurement_types {
283 msr_fields.push(
284 f.to_field()
285 .with_nullable(true)
286 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
287 );
288 }
289 for f in &self.measurement_types {
290 msr_fields.push(
291 f.to_field()
292 .with_nullable(true)
293 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
294 );
295 }
296
297 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
298 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
299 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
300
301 hdrs.append(&mut msr_fields);
302
303 if self.measurement_types.len() == MsrSize::USIZE {
305 for i in 0..state_items.len() {
306 for f in &self.measurement_types {
307 hdrs.push(Field::new(
308 format!(
309 "Gain {}*{f:?} ({}*{})",
310 state_items[i],
311 cov_units[i],
312 f.unit()
313 ),
314 DataType::Float64,
315 true,
316 ));
317 }
318 }
319 } else {
320 for state_item in &state_items {
321 for j in 0..MsrSize::USIZE {
322 hdrs.push(Field::new(
323 format!("Gain {state_item}*[{j}]"),
324 DataType::Float64,
325 true,
326 ));
327 }
328 }
329 }
330
331 for i in 0..state_items.len() {
333 hdrs.push(Field::new(
334 format!(
335 "Filter-smoother ratio {} ({})",
336 state_items[i], cov_units[i],
337 ),
338 DataType::Float64,
339 true,
340 ));
341 }
342
343 let schema = Arc::new(Schema::new(hdrs));
345 let mut record: Vec<Arc<dyn Array>> = Vec::new();
346
347 let (estimates, residuals) =
349 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
350 let start = cfg
352 .start_epoch
353 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
354 let end = cfg
355 .end_epoch
356 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
357
358 let mut residuals: Vec<Option<Residual<MsrSize>>> =
359 Vec::with_capacity(self.residuals.len());
360 let mut estimates = Vec::with_capacity(self.estimates.len());
361
362 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
363 if estimate.epoch() >= start && estimate.epoch() <= end {
364 estimates.push(*estimate);
365 residuals.push(residual.clone());
366 }
367 }
368
369 (estimates, residuals)
370 } else {
371 (self.estimates.to_vec(), self.residuals.to_vec())
372 };
373
374 let mut utc_epoch = StringBuilder::new();
378 for s in &estimates {
379 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
380 }
381 record.push(Arc::new(utc_epoch.finish()));
382
383 for field in fields {
385 let mut data = Float64Builder::new();
386 for s in &estimates {
387 data.append_value(
388 s.state()
389 .value(field)
390 .context(ODStateSnafu { action: "export" })?,
391 );
392 }
393 record.push(Arc::new(data.finish()));
394 }
395
396 for field in sigma_fields {
398 let mut data = Float64Builder::new();
399 for s in &estimates {
400 data.append_value(s.sigma_for(field).unwrap());
401 }
402 record.push(Arc::new(data.finish()));
403 }
404
405 for i in 0..est_size {
407 for j in i..est_size {
408 let mut data = Float64Builder::new();
409 for s in &estimates {
410 data.append_value(s.covar()[(i, j)]);
411 }
412 record.push(Arc::new(data.finish()));
413 }
414 }
415
416 for i in 0..est_size {
418 let mut data = Float64Builder::new();
419 for s in &estimates {
420 data.append_value(s.covar()[(i, i)].sqrt());
421 }
422 record.push(Arc::new(data.finish()));
423 }
424
425 let mut ric_covariances = Vec::new();
427
428 for s in &estimates {
429 let dcm_ric2inertial = s
430 .state()
431 .orbit()
432 .dcm_from_ric_to_inertial()
433 .unwrap()
434 .state_dcm();
435
436 let cov = s.covar();
438 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
439
440 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
442 ric_covariances.push(ric_covar);
443 }
444
445 for i in 0..6 {
447 let mut data = Float64Builder::new();
448 for cov in ric_covariances.iter().take(estimates.len()) {
449 data.append_value(cov[(i, i)].sqrt());
450 }
451 record.push(Arc::new(data.finish()));
452 }
453
454 for msr_type in &self.measurement_types {
457 let mut data = Float64Builder::new();
458 for resid_opt in &residuals {
459 if let Some(resid) = resid_opt {
460 match resid.prefit(*msr_type) {
461 Some(prefit) => data.append_value(prefit),
462 None => data.append_null(),
463 };
464 } else {
465 data.append_null();
466 }
467 }
468 record.push(Arc::new(data.finish()));
469 }
470 for msr_type in &self.measurement_types {
472 let mut data = Float64Builder::new();
473 for resid_opt in &residuals {
474 if let Some(resid) = resid_opt {
475 match resid.postfit(*msr_type) {
476 Some(postfit) => data.append_value(postfit),
477 None => data.append_null(),
478 };
479 } else {
480 data.append_null();
481 }
482 }
483 record.push(Arc::new(data.finish()));
484 }
485 for msr_type in &self.measurement_types {
487 let mut data = Float64Builder::new();
488 for resid_opt in &residuals {
489 if let Some(resid) = resid_opt {
490 match resid.trk_noise(*msr_type) {
491 Some(noise) => data.append_value(noise),
492 None => data.append_null(),
493 };
494 } else {
495 data.append_null();
496 }
497 }
498 record.push(Arc::new(data.finish()));
499 }
500 for msr_type in &self.measurement_types {
502 let mut data = Float64Builder::new();
503 for resid_opt in &residuals {
504 if let Some(resid) = resid_opt {
505 match resid.real_obs(*msr_type) {
506 Some(postfit) => data.append_value(postfit),
507 None => data.append_null(),
508 };
509 } else {
510 data.append_null();
511 }
512 }
513 record.push(Arc::new(data.finish()));
514 }
515 for msr_type in &self.measurement_types {
517 let mut data = Float64Builder::new();
518 for resid_opt in &residuals {
519 if let Some(resid) = resid_opt {
520 match resid.computed_obs(*msr_type) {
521 Some(postfit) => data.append_value(postfit),
522 None => data.append_null(),
523 };
524 } else {
525 data.append_null();
526 }
527 }
528 record.push(Arc::new(data.finish()));
529 }
530 let mut data = Float64Builder::new();
532 for resid_opt in &residuals {
533 if let Some(resid) = resid_opt {
534 data.append_value(resid.ratio);
535 } else {
536 data.append_null();
537 }
538 }
539 record.push(Arc::new(data.finish()));
540
541 let mut data = BooleanBuilder::new();
543 for resid_opt in &residuals {
544 if let Some(resid) = resid_opt {
545 data.append_value(resid.rejected);
546 } else {
547 data.append_null();
548 }
549 }
550 record.push(Arc::new(data.finish()));
551
552 let mut data = StringBuilder::new();
554 for resid_opt in &residuals {
555 if let Some(resid) = resid_opt {
556 data.append_value(
557 resid
558 .tracker
559 .clone()
560 .unwrap_or("Undefined tracker".to_string()),
561 );
562 } else {
563 data.append_null();
564 }
565 }
566 record.push(Arc::new(data.finish()));
567
568 for i in 0..est_size {
570 for j in 0..MsrSize::USIZE {
571 let mut data = Float64Builder::new();
572 for opt_k in &self.gains {
573 if let Some(k) = opt_k {
574 data.append_value(k[(i, j)]);
575 } else {
576 data.append_null();
577 }
578 }
579 record.push(Arc::new(data.finish()));
580 }
581 }
582
583 for i in 0..est_size {
585 let mut data = Float64Builder::new();
586 for opt_fsr in &self.filter_smoother_ratios {
587 if let Some(fsr) = opt_fsr {
588 data.append_value(fsr[i]);
589 } else {
590 data.append_null();
591 }
592 }
593 record.push(Arc::new(data.finish()));
594 }
595
596 info!("Serialized {} estimates and residuals", estimates.len());
597
598 let mut metadata = HashMap::new();
600 metadata.insert(
601 "Purpose".to_string(),
602 "Orbit determination results".to_string(),
603 );
604 if let Some(add_meta) = cfg.metadata {
605 for (k, v) in add_meta {
606 metadata.insert(k, v);
607 }
608 }
609
610 let props = pq_writer(Some(metadata));
611
612 let file = File::create(&path_buf)
613 .context(StdIOSnafu {
614 action: "creating OD results file",
615 })
616 .context(ODIOSnafu)?;
617
618 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
619 .context(ParquetSnafu {
620 action: "exporting OD results",
621 })
622 .context(ODIOSnafu)?;
623
624 let batch = RecordBatch::try_new(schema, record)
625 .context(ArrowSnafu {
626 action: "writing OD results (building batch record)",
627 })
628 .context(ODIOSnafu)?;
629
630 writer
631 .write(&batch)
632 .context(ParquetSnafu {
633 action: "writing OD results",
634 })
635 .context(ODIOSnafu)?;
636
637 writer
638 .close()
639 .context(ParquetSnafu {
640 action: "closing OD results file",
641 })
642 .context(ODIOSnafu)?;
643
644 let tock_time = Epoch::now().unwrap() - tick;
646 info!(
647 "Orbit determination results written to {} in {tock_time}",
648 path_buf.display()
649 );
650 Ok(path_buf)
651 }
652}