nyx_space/od/process/solution/
export.rs1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
29use arrow::datatypes::{DataType, Field, Schema};
30use arrow::record_batch::RecordBatch;
31use hifitime::TimeScale;
32use msr::sensitivity::TrackerSensitivity;
33use nalgebra::Const;
34use parquet::arrow::ArrowWriter;
35use snafu::prelude::*;
36use std::collections::HashMap;
37use std::fs::File;
38use std::path::{Path, PathBuf};
39
40use super::ODSolution;
41
42impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
43 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
44where
45 DefaultAllocator: Allocator<MsrSize>
46 + Allocator<MsrSize, <Spacecraft as State>::Size>
47 + Allocator<Const<1>, MsrSize>
48 + Allocator<<Spacecraft as State>::Size>
49 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50 + Allocator<MsrSize, MsrSize>
51 + Allocator<MsrSize, <Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, MsrSize>
53 + Allocator<<Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::VecLength>
55 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59 ensure!(
60 !self.estimates.is_empty(),
61 TooFewMeasurementsSnafu {
62 need: 1_usize,
63 action: "exporting OD results"
64 }
65 );
66
67 if self.estimates.len() != self.residuals.len() {
68 return Err(ODError::ODConfigError {
69 source: ConfigError::InvalidConfig {
70 msg: format!(
71 "Estimates ({}) and residuals ({}) are not aligned.",
72 self.estimates.len(),
73 self.residuals.len()
74 ),
75 },
76 });
77 }
78
79 if self.estimates.len() != self.gains.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: format!(
83 "Estimates ({}) and filter gains ({}) are not aligned.",
84 self.estimates.len(),
85 self.gains.len()
86 ),
87 },
88 });
89 }
90
91 if self.estimates.len() != self.filter_smoother_ratios.len() {
92 return Err(ODError::ODConfigError {
93 source: ConfigError::InvalidConfig {
94 msg: format!(
95 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96 self.estimates.len(),
97 self.filter_smoother_ratios.len()
98 ),
99 },
100 });
101 }
102
103 let tick = Epoch::now().unwrap();
104 info!("Exporting orbit determination result to parquet file...");
105
106 if cfg.step.is_some() {
107 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108 }
109
110 let path_buf = cfg.actual_path(path);
112
113 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116 let frame = self.estimates[0].state().frame();
117
118 let more_meta = Some(vec![
119 (
120 "Frame".to_string(),
121 serde_dhall::serialize(&frame)
122 .to_string()
123 .map_err(|e| ODError::ODIOError {
124 source: InputOutputError::SerializeDhall {
125 what: format!("frame `{frame}`"),
126 err: e.to_string(),
127 },
128 })?,
129 ),
130 (
131 "SRP Area (m2)".to_string(),
132 self.estimates
133 .first()
134 .as_ref()
135 .unwrap()
136 .nominal_state()
137 .srp
138 .area_m2
139 .to_string(),
140 ),
141 (
142 "Drag Area (m2)".to_string(),
143 self.estimates
144 .first()
145 .as_ref()
146 .unwrap()
147 .nominal_state()
148 .drag
149 .area_m2
150 .to_string(),
151 ),
152 ]);
153
154 let mut fields = match cfg.fields {
155 Some(fields) => fields,
156 None => Spacecraft::export_params(),
157 };
158
159 fields.retain(|param| match self.estimates[0].state().value(*param) {
161 Ok(_) => param != &StateParameter::GuidanceMode,
162 Err(_) => false,
163 });
164
165 for field in &fields {
166 hdrs.push(field.to_field(more_meta.clone()));
167 }
168
169 let mut sigma_fields = fields.clone();
170 sigma_fields.retain(|param| {
172 !matches!(
173 param,
174 &StateParameter::X
175 | &StateParameter::Y
176 | &StateParameter::Z
177 | &StateParameter::VX
178 | &StateParameter::VY
179 | &StateParameter::VZ
180 ) && self.estimates[0].sigma_for(*param).is_ok()
181 });
182
183 for field in &sigma_fields {
184 hdrs.push(field.to_cov_field(more_meta.clone()));
185 }
186
187 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
188 let state_units = [
189 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
190 ];
191 let mut cov_units = vec![];
192
193 for i in 0..state_items.len() {
194 for j in i..state_items.len() {
195 let cov_unit = if i < 3 {
196 if j < 3 {
197 "km^2"
198 } else if (3..6).contains(&j) {
199 "km^2/s"
200 } else if j == 8 {
201 "km*kg"
202 } else {
203 "km"
204 }
205 } else if (3..6).contains(&i) {
206 if (3..6).contains(&j) {
207 "km^2/s^2"
208 } else if j == 8 {
209 "km/s*kg"
210 } else {
211 "km/s"
212 }
213 } else if i == 8 || j == 8 {
214 "kg^2"
215 } else {
216 "unitless"
217 };
218
219 cov_units.push(cov_unit);
220 }
221 }
222
223 let est_size = <Spacecraft as State>::Size::dim();
224
225 let mut idx = 0;
226 for i in 0..state_items.len() {
227 for j in i..state_items.len() {
228 hdrs.push(Field::new(
229 format!(
230 "Covariance {}*{} ({frame:x}) ({})",
231 state_items[i], state_items[j], cov_units[idx]
232 ),
233 DataType::Float64,
234 false,
235 ));
236 idx += 1;
237 }
238 }
239
240 for (i, coord) in state_items.iter().enumerate() {
242 hdrs.push(Field::new(
243 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
244 DataType::Float64,
245 false,
246 ));
247 }
248
249 for (i, coord) in state_items.iter().enumerate().take(6) {
251 hdrs.push(Field::new(
252 format!("Sigma {coord} (RIC) ({})", state_units[i]),
253 DataType::Float64,
254 false,
255 ));
256 }
257
258 let mut msr_fields = Vec::new();
260 for f in &self.measurement_types {
261 msr_fields.push(
262 f.to_field()
263 .with_nullable(true)
264 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
265 );
266 }
267 for f in &self.measurement_types {
268 msr_fields.push(
269 f.to_field()
270 .with_nullable(true)
271 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
272 );
273 }
274 for f in &self.measurement_types {
275 msr_fields.push(
276 f.to_field()
277 .with_nullable(true)
278 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
279 );
280 }
281 for f in &self.measurement_types {
282 msr_fields.push(
283 f.to_field()
284 .with_nullable(true)
285 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
286 );
287 }
288 for f in &self.measurement_types {
289 msr_fields.push(
290 f.to_field()
291 .with_nullable(true)
292 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
293 );
294 }
295
296 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
297 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
298 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
299
300 hdrs.append(&mut msr_fields);
301
302 if self.measurement_types.len() == MsrSize::USIZE {
304 for i in 0..state_items.len() {
305 for f in &self.measurement_types {
306 hdrs.push(Field::new(
307 format!(
308 "Gain {}*{f:?} ({}*{})",
309 state_items[i],
310 cov_units[i],
311 f.unit()
312 ),
313 DataType::Float64,
314 true,
315 ));
316 }
317 }
318 } else {
319 for state_item in &state_items {
320 for j in 0..MsrSize::USIZE {
321 hdrs.push(Field::new(
322 format!("Gain {}*[{j}]", state_item),
323 DataType::Float64,
324 true,
325 ));
326 }
327 }
328 }
329
330 for i in 0..state_items.len() {
332 hdrs.push(Field::new(
333 format!(
334 "Filter-smoother ratio {} ({})",
335 state_items[i], cov_units[i],
336 ),
337 DataType::Float64,
338 true,
339 ));
340 }
341
342 let schema = Arc::new(Schema::new(hdrs));
344 let mut record: Vec<Arc<dyn Array>> = Vec::new();
345
346 let (estimates, residuals) =
348 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
349 let start = cfg
351 .start_epoch
352 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
353 let end = cfg
354 .end_epoch
355 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
356
357 let mut residuals: Vec<Option<Residual<MsrSize>>> =
358 Vec::with_capacity(self.residuals.len());
359 let mut estimates = Vec::with_capacity(self.estimates.len());
360
361 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
362 if estimate.epoch() >= start && estimate.epoch() <= end {
363 estimates.push(*estimate);
364 residuals.push(residual.clone());
365 }
366 }
367
368 (estimates, residuals)
369 } else {
370 (self.estimates.to_vec(), self.residuals.to_vec())
371 };
372
373 let mut utc_epoch = StringBuilder::new();
377 for s in &estimates {
378 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
379 }
380 record.push(Arc::new(utc_epoch.finish()));
381
382 for field in fields {
384 let mut data = Float64Builder::new();
385 for s in &estimates {
386 data.append_value(
387 s.state()
388 .value(field)
389 .context(ODStateSnafu { action: "export" })?,
390 );
391 }
392 record.push(Arc::new(data.finish()));
393 }
394
395 for field in sigma_fields {
397 let mut data = Float64Builder::new();
398 for s in &estimates {
399 data.append_value(s.sigma_for(field).unwrap());
400 }
401 record.push(Arc::new(data.finish()));
402 }
403
404 for i in 0..est_size {
406 for j in i..est_size {
407 let mut data = Float64Builder::new();
408 for s in &estimates {
409 data.append_value(s.covar()[(i, j)]);
410 }
411 record.push(Arc::new(data.finish()));
412 }
413 }
414
415 for i in 0..est_size {
417 let mut data = Float64Builder::new();
418 for s in &estimates {
419 data.append_value(s.covar()[(i, i)].sqrt());
420 }
421 record.push(Arc::new(data.finish()));
422 }
423
424 let mut ric_covariances = Vec::new();
426
427 for s in &estimates {
428 let dcm_ric2inertial = s
429 .state()
430 .orbit()
431 .dcm_from_ric_to_inertial()
432 .unwrap()
433 .state_dcm();
434
435 let cov = s.covar();
437 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
438
439 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
441 ric_covariances.push(ric_covar);
442 }
443
444 for i in 0..6 {
446 let mut data = Float64Builder::new();
447 for cov in ric_covariances.iter().take(estimates.len()) {
448 data.append_value(cov[(i, i)].sqrt());
449 }
450 record.push(Arc::new(data.finish()));
451 }
452
453 for msr_type in &self.measurement_types {
456 let mut data = Float64Builder::new();
457 for resid_opt in &residuals {
458 if let Some(resid) = resid_opt {
459 match resid.prefit(*msr_type) {
460 Some(prefit) => data.append_value(prefit),
461 None => data.append_null(),
462 };
463 } else {
464 data.append_null();
465 }
466 }
467 record.push(Arc::new(data.finish()));
468 }
469 for msr_type in &self.measurement_types {
471 let mut data = Float64Builder::new();
472 for resid_opt in &residuals {
473 if let Some(resid) = resid_opt {
474 match resid.postfit(*msr_type) {
475 Some(postfit) => data.append_value(postfit),
476 None => data.append_null(),
477 };
478 } else {
479 data.append_null();
480 }
481 }
482 record.push(Arc::new(data.finish()));
483 }
484 for msr_type in &self.measurement_types {
486 let mut data = Float64Builder::new();
487 for resid_opt in &residuals {
488 if let Some(resid) = resid_opt {
489 match resid.trk_noise(*msr_type) {
490 Some(noise) => data.append_value(noise),
491 None => data.append_null(),
492 };
493 } else {
494 data.append_null();
495 }
496 }
497 record.push(Arc::new(data.finish()));
498 }
499 for msr_type in &self.measurement_types {
501 let mut data = Float64Builder::new();
502 for resid_opt in &residuals {
503 if let Some(resid) = resid_opt {
504 match resid.real_obs(*msr_type) {
505 Some(postfit) => data.append_value(postfit),
506 None => data.append_null(),
507 };
508 } else {
509 data.append_null();
510 }
511 }
512 record.push(Arc::new(data.finish()));
513 }
514 for msr_type in &self.measurement_types {
516 let mut data = Float64Builder::new();
517 for resid_opt in &residuals {
518 if let Some(resid) = resid_opt {
519 match resid.computed_obs(*msr_type) {
520 Some(postfit) => data.append_value(postfit),
521 None => data.append_null(),
522 };
523 } else {
524 data.append_null();
525 }
526 }
527 record.push(Arc::new(data.finish()));
528 }
529 let mut data = Float64Builder::new();
531 for resid_opt in &residuals {
532 if let Some(resid) = resid_opt {
533 data.append_value(resid.ratio);
534 } else {
535 data.append_null();
536 }
537 }
538 record.push(Arc::new(data.finish()));
539
540 let mut data = BooleanBuilder::new();
542 for resid_opt in &residuals {
543 if let Some(resid) = resid_opt {
544 data.append_value(resid.rejected);
545 } else {
546 data.append_null();
547 }
548 }
549 record.push(Arc::new(data.finish()));
550
551 let mut data = StringBuilder::new();
553 for resid_opt in &residuals {
554 if let Some(resid) = resid_opt {
555 data.append_value(
556 resid
557 .tracker
558 .clone()
559 .unwrap_or("Undefined tracker".to_string()),
560 );
561 } else {
562 data.append_null();
563 }
564 }
565 record.push(Arc::new(data.finish()));
566
567 for i in 0..est_size {
569 for j in 0..MsrSize::USIZE {
570 let mut data = Float64Builder::new();
571 for opt_k in &self.gains {
572 if let Some(k) = opt_k {
573 data.append_value(k[(i, j)]);
574 } else {
575 data.append_null();
576 }
577 }
578 record.push(Arc::new(data.finish()));
579 }
580 }
581
582 for i in 0..est_size {
584 let mut data = Float64Builder::new();
585 for opt_fsr in &self.filter_smoother_ratios {
586 if let Some(fsr) = opt_fsr {
587 data.append_value(fsr[i]);
588 } else {
589 data.append_null();
590 }
591 }
592 record.push(Arc::new(data.finish()));
593 }
594
595 info!("Serialized {} estimates and residuals", estimates.len());
596
597 let mut metadata = HashMap::new();
599 metadata.insert(
600 "Purpose".to_string(),
601 "Orbit determination results".to_string(),
602 );
603 if let Some(add_meta) = cfg.metadata {
604 for (k, v) in add_meta {
605 metadata.insert(k, v);
606 }
607 }
608
609 let props = pq_writer(Some(metadata));
610
611 let file = File::create(&path_buf)
612 .context(StdIOSnafu {
613 action: "creating OD results file",
614 })
615 .context(ODIOSnafu)?;
616
617 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
618 .context(ParquetSnafu {
619 action: "exporting OD results",
620 })
621 .context(ODIOSnafu)?;
622
623 let batch = RecordBatch::try_new(schema, record)
624 .context(ArrowSnafu {
625 action: "writing OD results (building batch record)",
626 })
627 .context(ODIOSnafu)?;
628
629 writer
630 .write(&batch)
631 .context(ParquetSnafu {
632 action: "writing OD results",
633 })
634 .context(ODIOSnafu)?;
635
636 writer
637 .close()
638 .context(ParquetSnafu {
639 action: "closing OD results file",
640 })
641 .context(ODIOSnafu)?;
642
643 let tock_time = Epoch::now().unwrap() - tick;
645 info!(
646 "Orbit determination results written to {} in {tock_time}",
647 path_buf.display()
648 );
649 Ok(path_buf)
650 }
651}