1use crate::State;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::allocator::Allocator;
23use crate::linalg::{DefaultAllocator, DimName};
24use crate::md::StateParameter;
25use crate::md::trajectory::Interpolatable;
26use crate::od::estimate::*;
27use crate::{Spacecraft, od::*};
28use anise::ephemerides::ephemeris::{Covariance, Ephemeris, EphemerisRecord};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use hifitime::TimeScale;
33use log::{info, warn};
34use msr::sensitivity::TrackerSensitivity;
35use nalgebra::Const;
36use parquet::arrow::ArrowWriter;
37use snafu::prelude::*;
38use std::collections::HashMap;
39use std::fs::File;
40use std::path::{Path, PathBuf};
41
42use super::ODSolution;
43
44impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
45 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
46where
47 DefaultAllocator: Allocator<MsrSize>
48 + Allocator<MsrSize, <Spacecraft as State>::Size>
49 + Allocator<Const<1>, MsrSize>
50 + Allocator<<Spacecraft as State>::Size>
51 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
52 + Allocator<MsrSize, MsrSize>
53 + Allocator<MsrSize, <Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::Size, MsrSize>
55 + Allocator<<Spacecraft as State>::Size>
56 + Allocator<<Spacecraft as State>::VecLength>
57 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
58{
59 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
61 ensure!(
62 !self.estimates.is_empty(),
63 TooFewMeasurementsSnafu {
64 need: 1_usize,
65 action: "exporting OD results"
66 }
67 );
68
69 if self.estimates.len() != self.residuals.len() {
70 return Err(ODError::ODConfigError {
71 source: ConfigError::InvalidConfig {
72 msg: format!(
73 "Estimates ({}) and residuals ({}) are not aligned.",
74 self.estimates.len(),
75 self.residuals.len()
76 ),
77 },
78 });
79 }
80
81 if self.estimates.len() != self.gains.len() {
82 return Err(ODError::ODConfigError {
83 source: ConfigError::InvalidConfig {
84 msg: format!(
85 "Estimates ({}) and filter gains ({}) are not aligned.",
86 self.estimates.len(),
87 self.gains.len()
88 ),
89 },
90 });
91 }
92
93 if self.estimates.len() != self.filter_smoother_ratios.len() {
94 return Err(ODError::ODConfigError {
95 source: ConfigError::InvalidConfig {
96 msg: format!(
97 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
98 self.estimates.len(),
99 self.filter_smoother_ratios.len()
100 ),
101 },
102 });
103 }
104
105 let tick = Epoch::now().unwrap();
106 info!("Exporting orbit determination result to parquet file...");
107
108 if cfg.step.is_some() {
109 warn!(
110 "The `step` parameter in the export is not supported for orbit determination exports."
111 );
112 }
113
114 let path_buf = cfg.actual_path(path);
116
117 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
119
120 let frame = self.estimates[0].state().frame();
121
122 let more_meta = Some(vec![
123 (
124 "Frame".to_string(),
125 serde_dhall::serialize(&frame)
126 .static_type_annotation()
127 .to_string()
128 .map_err(|e| ODError::ODIOError {
129 source: InputOutputError::SerializeDhall {
130 what: format!("frame `{frame}`"),
131 err: e.to_string(),
132 },
133 })?,
134 ),
135 (
136 "SRP Area (m2)".to_string(),
137 self.estimates
138 .first()
139 .as_ref()
140 .unwrap()
141 .nominal_state()
142 .srp
143 .area_m2
144 .to_string(),
145 ),
146 (
147 "Drag Area (m2)".to_string(),
148 self.estimates
149 .first()
150 .as_ref()
151 .unwrap()
152 .nominal_state()
153 .drag
154 .area_m2
155 .to_string(),
156 ),
157 ]);
158
159 let mut fields = match cfg.fields {
160 Some(fields) => fields,
161 None => Spacecraft::export_params(),
162 };
163
164 fields.retain(|param| match self.estimates[0].state().value(*param) {
166 Ok(_) => param != &StateParameter::GuidanceMode(),
167 Err(_) => false,
168 });
169
170 for field in &fields {
171 hdrs.push(field.to_field(more_meta.clone()));
172 }
173
174 let mut sigma_fields = fields.clone();
175 sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
177
178 for field in &sigma_fields {
179 hdrs.push(field.to_cov_field(more_meta.clone()));
180 }
181
182 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
183 let state_units = [
184 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
185 ];
186 let mut cov_units = vec![];
187
188 for i in 0..state_items.len() {
189 for j in i..state_items.len() {
190 let cov_unit = if i < 3 {
191 if j < 3 {
192 "km^2"
193 } else if (3..6).contains(&j) {
194 "km^2/s"
195 } else if j == 8 {
196 "km*kg"
197 } else {
198 "km"
199 }
200 } else if (3..6).contains(&i) {
201 if (3..6).contains(&j) {
202 "km^2/s^2"
203 } else if j == 8 {
204 "km/s*kg"
205 } else {
206 "km/s"
207 }
208 } else if i == 8 || j == 8 {
209 "kg^2"
210 } else {
211 "unitless"
212 };
213
214 cov_units.push(cov_unit);
215 }
216 }
217
218 let est_size = <Spacecraft as State>::Size::dim();
219
220 let mut idx = 0;
221 for i in 0..state_items.len() {
222 for j in i..state_items.len() {
223 hdrs.push(Field::new(
224 format!(
225 "Covariance {}*{} ({frame:x}) ({})",
226 state_items[i], state_items[j], cov_units[idx]
227 ),
228 DataType::Float64,
229 false,
230 ));
231 idx += 1;
232 }
233 }
234
235 for (i, coord) in state_items.iter().enumerate() {
237 hdrs.push(Field::new(
238 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
239 DataType::Float64,
240 false,
241 ));
242 }
243
244 for (i, coord) in state_items.iter().enumerate().take(6) {
246 hdrs.push(Field::new(
247 format!("Sigma {coord} (RIC) ({})", state_units[i]),
248 DataType::Float64,
249 false,
250 ));
251 }
252
253 let mut msr_fields = Vec::new();
255 for f in &self.measurement_types {
256 msr_fields.push(
257 f.to_field()
258 .with_nullable(true)
259 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
260 );
261 }
262 for j in 0..MsrSize::DIM {
263 msr_fields.push(Field::new(
264 format!("Whitened residual #{j}"),
265 DataType::Float64,
266 true,
267 ));
268 }
269 for f in &self.measurement_types {
270 msr_fields.push(
271 f.to_field()
272 .with_nullable(true)
273 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
274 );
275 }
276 for f in &self.measurement_types {
277 msr_fields.push(
278 f.to_field()
279 .with_nullable(true)
280 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
281 );
282 }
283 for f in &self.measurement_types {
284 msr_fields.push(
285 f.to_field()
286 .with_nullable(true)
287 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
288 );
289 }
290 for f in &self.measurement_types {
291 msr_fields.push(
292 f.to_field()
293 .with_nullable(true)
294 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
295 );
296 }
297
298 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
299 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
300 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
301
302 hdrs.append(&mut msr_fields);
303
304 if self.measurement_types.len() == MsrSize::DIM {
306 for i in 0..state_items.len() {
307 for f in &self.measurement_types {
308 hdrs.push(Field::new(
309 format!(
310 "Gain {}*{f:?} ({}*{})",
311 state_items[i],
312 cov_units[i],
313 f.unit()
314 ),
315 DataType::Float64,
316 true,
317 ));
318 }
319 }
320 } else {
321 for state_item in &state_items {
322 for j in 0..MsrSize::DIM {
323 hdrs.push(Field::new(
324 format!("Gain {state_item}*[{j}]"),
325 DataType::Float64,
326 true,
327 ));
328 }
329 }
330 }
331
332 for i in 0..state_items.len() {
334 hdrs.push(Field::new(
335 format!(
336 "Filter-smoother ratio {} ({})",
337 state_items[i], cov_units[i],
338 ),
339 DataType::Float64,
340 true,
341 ));
342 }
343
344 let schema = Arc::new(Schema::new(hdrs));
346 let mut record: Vec<Arc<dyn Array>> = Vec::new();
347
348 let (estimates, residuals) =
350 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
351 let start = cfg
353 .start_epoch
354 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
355 let end = cfg
356 .end_epoch
357 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
358
359 let mut residuals: Vec<Option<Residual<MsrSize>>> =
360 Vec::with_capacity(self.residuals.len());
361 let mut estimates = Vec::with_capacity(self.estimates.len());
362
363 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
364 if estimate.epoch() >= start && estimate.epoch() <= end {
365 estimates.push(*estimate);
366 residuals.push(residual.clone());
367 }
368 }
369
370 (estimates, residuals)
371 } else {
372 (self.estimates.to_vec(), self.residuals.to_vec())
373 };
374
375 let mut utc_epoch = StringBuilder::new();
379 for s in &estimates {
380 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
381 }
382 record.push(Arc::new(utc_epoch.finish()));
383
384 for field in fields {
386 let mut data = Float64Builder::new();
387 for s in &estimates {
388 data.append_value(
389 s.state()
390 .value(field)
391 .context(ODStateSnafu { action: "export" })?,
392 );
393 }
394 record.push(Arc::new(data.finish()));
395 }
396
397 for field in sigma_fields {
399 let mut data = Float64Builder::new();
400 for s in &estimates {
401 if let StateParameter::Element(oe) = field {
402 data.append_value(s.sigma_for(oe).unwrap());
403 }
404 }
405 record.push(Arc::new(data.finish()));
406 }
407
408 for i in 0..est_size {
410 for j in i..est_size {
411 let mut data = Float64Builder::new();
412 for s in &estimates {
413 data.append_value(s.covar()[(i, j)]);
414 }
415 record.push(Arc::new(data.finish()));
416 }
417 }
418
419 for i in 0..est_size {
421 let mut data = Float64Builder::new();
422 for s in &estimates {
423 data.append_value(s.covar()[(i, i)].sqrt());
424 }
425 record.push(Arc::new(data.finish()));
426 }
427
428 let mut ric_covariances = Vec::new();
430
431 for s in &estimates {
432 let dcm_ric2inertial = s
433 .state()
434 .orbit()
435 .dcm_from_ric_to_inertial()
436 .unwrap()
437 .state_dcm();
438
439 let cov = s.covar();
441 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
442
443 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
445 ric_covariances.push(ric_covar);
446 }
447
448 for i in 0..6 {
450 let mut data = Float64Builder::new();
451 for cov in ric_covariances.iter().take(estimates.len()) {
452 data.append_value(cov[(i, i)].sqrt());
453 }
454 record.push(Arc::new(data.finish()));
455 }
456
457 for msr_type in &self.measurement_types {
460 let mut data = Float64Builder::new();
461 for resid_opt in &residuals {
462 if let Some(resid) = resid_opt {
463 match resid.prefit(*msr_type) {
464 Some(prefit) => data.append_value(prefit),
465 None => data.append_null(),
466 };
467 } else {
468 data.append_null();
469 }
470 }
471 record.push(Arc::new(data.finish()));
472 }
473
474 for j in 0..MsrSize::DIM {
476 let mut data = Float64Builder::new();
477 for resid_opt in &residuals {
478 if let Some(resid) = resid_opt {
479 data.append_value(resid.whitened_resid[j])
480 } else {
481 data.append_null();
482 }
483 }
484 record.push(Arc::new(data.finish()));
485 }
486
487 for msr_type in &self.measurement_types {
489 let mut data = Float64Builder::new();
490 for resid_opt in &residuals {
491 if let Some(resid) = resid_opt {
492 match resid.postfit(*msr_type) {
493 Some(postfit) => data.append_value(postfit),
494 None => data.append_null(),
495 };
496 } else {
497 data.append_null();
498 }
499 }
500 record.push(Arc::new(data.finish()));
501 }
502 for msr_type in &self.measurement_types {
504 let mut data = Float64Builder::new();
505 for resid_opt in &residuals {
506 if let Some(resid) = resid_opt {
507 match resid.trk_noise(*msr_type) {
508 Some(noise) => data.append_value(noise),
509 None => data.append_null(),
510 };
511 } else {
512 data.append_null();
513 }
514 }
515 record.push(Arc::new(data.finish()));
516 }
517 for msr_type in &self.measurement_types {
519 let mut data = Float64Builder::new();
520 for resid_opt in &residuals {
521 if let Some(resid) = resid_opt {
522 match resid.real_obs(*msr_type) {
523 Some(postfit) => data.append_value(postfit),
524 None => data.append_null(),
525 };
526 } else {
527 data.append_null();
528 }
529 }
530 record.push(Arc::new(data.finish()));
531 }
532 for msr_type in &self.measurement_types {
534 let mut data = Float64Builder::new();
535 for resid_opt in &residuals {
536 if let Some(resid) = resid_opt {
537 match resid.computed_obs(*msr_type) {
538 Some(postfit) => data.append_value(postfit),
539 None => data.append_null(),
540 };
541 } else {
542 data.append_null();
543 }
544 }
545 record.push(Arc::new(data.finish()));
546 }
547 let mut data = Float64Builder::new();
549 for resid_opt in &residuals {
550 if let Some(resid) = resid_opt {
551 data.append_value(resid.ratio);
552 } else {
553 data.append_null();
554 }
555 }
556 record.push(Arc::new(data.finish()));
557
558 let mut data = BooleanBuilder::new();
560 for resid_opt in &residuals {
561 if let Some(resid) = resid_opt {
562 data.append_value(resid.rejected);
563 } else {
564 data.append_null();
565 }
566 }
567 record.push(Arc::new(data.finish()));
568
569 let mut data = StringBuilder::new();
571 for resid_opt in &residuals {
572 if let Some(resid) = resid_opt {
573 data.append_value(
574 resid
575 .tracker
576 .clone()
577 .unwrap_or("Undefined tracker".to_string()),
578 );
579 } else {
580 data.append_null();
581 }
582 }
583 record.push(Arc::new(data.finish()));
584
585 for i in 0..est_size {
587 for j in 0..MsrSize::DIM {
588 let mut data = Float64Builder::new();
589 for opt_k in &self.gains {
590 if let Some(k) = opt_k {
591 data.append_value(k[(i, j)]);
592 } else {
593 data.append_null();
594 }
595 }
596 record.push(Arc::new(data.finish()));
597 }
598 }
599
600 for i in 0..est_size {
602 let mut data = Float64Builder::new();
603 for opt_fsr in &self.filter_smoother_ratios {
604 if let Some(fsr) = opt_fsr {
605 data.append_value(fsr[i]);
606 } else {
607 data.append_null();
608 }
609 }
610 record.push(Arc::new(data.finish()));
611 }
612
613 info!("Serialized {} estimates and residuals", estimates.len());
614
615 let mut metadata = HashMap::new();
617 metadata.insert(
618 "Purpose".to_string(),
619 "Orbit determination results".to_string(),
620 );
621 if let Some(add_meta) = cfg.metadata {
622 for (k, v) in add_meta {
623 metadata.insert(k, v);
624 }
625 }
626
627 let props = pq_writer(Some(metadata));
628
629 let file = File::create(&path_buf)
630 .context(StdIOSnafu {
631 action: "creating OD results file",
632 })
633 .context(ODIOSnafu)?;
634
635 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
636 .context(ParquetSnafu {
637 action: "exporting OD results",
638 })
639 .context(ODIOSnafu)?;
640
641 let batch = RecordBatch::try_new(schema, record)
642 .context(ArrowSnafu {
643 action: "writing OD results (building batch record)",
644 })
645 .context(ODIOSnafu)?;
646
647 writer
648 .write(&batch)
649 .context(ParquetSnafu {
650 action: "writing OD results",
651 })
652 .context(ODIOSnafu)?;
653
654 writer
655 .close()
656 .context(ParquetSnafu {
657 action: "closing OD results file",
658 })
659 .context(ODIOSnafu)?;
660
661 let tock_time = Epoch::now().unwrap() - tick;
663 info!(
664 "Orbit determination results written to {} in {tock_time}",
665 path_buf.display()
666 );
667 Ok(path_buf)
668 }
669
670 pub fn to_ephemeris(&self, object_id: String) -> Ephemeris {
672 let mut ephem = Ephemeris::new(object_id);
673
674 for estimate in &self.estimates {
675 let covar = Covariance {
676 matrix: estimate.covar.fixed_view::<6, 6>(0, 0).into(),
677 local_frame: anise::ephemerides::ephemeris::LocalFrame::Inertial,
678 };
679 let rcrd = EphemerisRecord {
680 orbit: estimate.orbital_state(),
681 covar: Some(covar),
682 };
683 ephem.insert(rcrd);
684 }
685
686 ephem
687 }
688}