1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use anise::ephemerides::ephemeris::{Covariance, Ephemeris, EphemerisRecord};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use hifitime::TimeScale;
33use log::{info, warn};
34use msr::sensitivity::TrackerSensitivity;
35use nalgebra::Const;
36use parquet::arrow::ArrowWriter;
37use snafu::prelude::*;
38use std::collections::HashMap;
39use std::fs::File;
40use std::path::{Path, PathBuf};
41
42use super::ODSolution;
43
44impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
45 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
46where
47 DefaultAllocator: Allocator<MsrSize>
48 + Allocator<MsrSize, <Spacecraft as State>::Size>
49 + Allocator<Const<1>, MsrSize>
50 + Allocator<<Spacecraft as State>::Size>
51 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
52 + Allocator<MsrSize, MsrSize>
53 + Allocator<MsrSize, <Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::Size, MsrSize>
55 + Allocator<<Spacecraft as State>::Size>
56 + Allocator<<Spacecraft as State>::VecLength>
57 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
58{
59 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
61 ensure!(
62 !self.estimates.is_empty(),
63 TooFewMeasurementsSnafu {
64 need: 1_usize,
65 action: "exporting OD results"
66 }
67 );
68
69 if self.estimates.len() != self.residuals.len() {
70 return Err(ODError::ODConfigError {
71 source: ConfigError::InvalidConfig {
72 msg: format!(
73 "Estimates ({}) and residuals ({}) are not aligned.",
74 self.estimates.len(),
75 self.residuals.len()
76 ),
77 },
78 });
79 }
80
81 if self.estimates.len() != self.gains.len() {
82 return Err(ODError::ODConfigError {
83 source: ConfigError::InvalidConfig {
84 msg: format!(
85 "Estimates ({}) and filter gains ({}) are not aligned.",
86 self.estimates.len(),
87 self.gains.len()
88 ),
89 },
90 });
91 }
92
93 if self.estimates.len() != self.filter_smoother_ratios.len() {
94 return Err(ODError::ODConfigError {
95 source: ConfigError::InvalidConfig {
96 msg: format!(
97 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
98 self.estimates.len(),
99 self.filter_smoother_ratios.len()
100 ),
101 },
102 });
103 }
104
105 let tick = Epoch::now().unwrap();
106 info!("Exporting orbit determination result to parquet file...");
107
108 if cfg.step.is_some() {
109 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
110 }
111
112 let path_buf = cfg.actual_path(path);
114
115 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
117
118 let frame = self.estimates[0].state().frame();
119
120 let more_meta = Some(vec![
121 (
122 "Frame".to_string(),
123 serde_dhall::serialize(&frame)
124 .static_type_annotation()
125 .to_string()
126 .map_err(|e| ODError::ODIOError {
127 source: InputOutputError::SerializeDhall {
128 what: format!("frame `{frame}`"),
129 err: e.to_string(),
130 },
131 })?,
132 ),
133 (
134 "SRP Area (m2)".to_string(),
135 self.estimates
136 .first()
137 .as_ref()
138 .unwrap()
139 .nominal_state()
140 .srp
141 .area_m2
142 .to_string(),
143 ),
144 (
145 "Drag Area (m2)".to_string(),
146 self.estimates
147 .first()
148 .as_ref()
149 .unwrap()
150 .nominal_state()
151 .drag
152 .area_m2
153 .to_string(),
154 ),
155 ]);
156
157 let mut fields = match cfg.fields {
158 Some(fields) => fields,
159 None => Spacecraft::export_params(),
160 };
161
162 fields.retain(|param| match self.estimates[0].state().value(*param) {
164 Ok(_) => param != &StateParameter::GuidanceMode,
165 Err(_) => false,
166 });
167
168 for field in &fields {
169 hdrs.push(field.to_field(more_meta.clone()));
170 }
171
172 let mut sigma_fields = fields.clone();
173 sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
175
176 for field in &sigma_fields {
177 hdrs.push(field.to_cov_field(more_meta.clone()));
178 }
179
180 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
181 let state_units = [
182 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
183 ];
184 let mut cov_units = vec![];
185
186 for i in 0..state_items.len() {
187 for j in i..state_items.len() {
188 let cov_unit = if i < 3 {
189 if j < 3 {
190 "km^2"
191 } else if (3..6).contains(&j) {
192 "km^2/s"
193 } else if j == 8 {
194 "km*kg"
195 } else {
196 "km"
197 }
198 } else if (3..6).contains(&i) {
199 if (3..6).contains(&j) {
200 "km^2/s^2"
201 } else if j == 8 {
202 "km/s*kg"
203 } else {
204 "km/s"
205 }
206 } else if i == 8 || j == 8 {
207 "kg^2"
208 } else {
209 "unitless"
210 };
211
212 cov_units.push(cov_unit);
213 }
214 }
215
216 let est_size = <Spacecraft as State>::Size::dim();
217
218 let mut idx = 0;
219 for i in 0..state_items.len() {
220 for j in i..state_items.len() {
221 hdrs.push(Field::new(
222 format!(
223 "Covariance {}*{} ({frame:x}) ({})",
224 state_items[i], state_items[j], cov_units[idx]
225 ),
226 DataType::Float64,
227 false,
228 ));
229 idx += 1;
230 }
231 }
232
233 for (i, coord) in state_items.iter().enumerate() {
235 hdrs.push(Field::new(
236 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
237 DataType::Float64,
238 false,
239 ));
240 }
241
242 for (i, coord) in state_items.iter().enumerate().take(6) {
244 hdrs.push(Field::new(
245 format!("Sigma {coord} (RIC) ({})", state_units[i]),
246 DataType::Float64,
247 false,
248 ));
249 }
250
251 let mut msr_fields = Vec::new();
253 for f in &self.measurement_types {
254 msr_fields.push(
255 f.to_field()
256 .with_nullable(true)
257 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
258 );
259 }
260 for f in &self.measurement_types {
261 msr_fields.push(
262 f.to_field()
263 .with_nullable(true)
264 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
265 );
266 }
267 for f in &self.measurement_types {
268 msr_fields.push(
269 f.to_field()
270 .with_nullable(true)
271 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
272 );
273 }
274 for f in &self.measurement_types {
275 msr_fields.push(
276 f.to_field()
277 .with_nullable(true)
278 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
279 );
280 }
281 for f in &self.measurement_types {
282 msr_fields.push(
283 f.to_field()
284 .with_nullable(true)
285 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
286 );
287 }
288
289 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
290 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
291 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
292
293 hdrs.append(&mut msr_fields);
294
295 if self.measurement_types.len() == MsrSize::DIM {
297 for i in 0..state_items.len() {
298 for f in &self.measurement_types {
299 hdrs.push(Field::new(
300 format!(
301 "Gain {}*{f:?} ({}*{})",
302 state_items[i],
303 cov_units[i],
304 f.unit()
305 ),
306 DataType::Float64,
307 true,
308 ));
309 }
310 }
311 } else {
312 for state_item in &state_items {
313 for j in 0..MsrSize::DIM {
314 hdrs.push(Field::new(
315 format!("Gain {state_item}*[{j}]"),
316 DataType::Float64,
317 true,
318 ));
319 }
320 }
321 }
322
323 for i in 0..state_items.len() {
325 hdrs.push(Field::new(
326 format!(
327 "Filter-smoother ratio {} ({})",
328 state_items[i], cov_units[i],
329 ),
330 DataType::Float64,
331 true,
332 ));
333 }
334
335 let schema = Arc::new(Schema::new(hdrs));
337 let mut record: Vec<Arc<dyn Array>> = Vec::new();
338
339 let (estimates, residuals) =
341 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
342 let start = cfg
344 .start_epoch
345 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
346 let end = cfg
347 .end_epoch
348 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
349
350 let mut residuals: Vec<Option<Residual<MsrSize>>> =
351 Vec::with_capacity(self.residuals.len());
352 let mut estimates = Vec::with_capacity(self.estimates.len());
353
354 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
355 if estimate.epoch() >= start && estimate.epoch() <= end {
356 estimates.push(*estimate);
357 residuals.push(residual.clone());
358 }
359 }
360
361 (estimates, residuals)
362 } else {
363 (self.estimates.to_vec(), self.residuals.to_vec())
364 };
365
366 let mut utc_epoch = StringBuilder::new();
370 for s in &estimates {
371 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
372 }
373 record.push(Arc::new(utc_epoch.finish()));
374
375 for field in fields {
377 let mut data = Float64Builder::new();
378 for s in &estimates {
379 data.append_value(
380 s.state()
381 .value(field)
382 .context(ODStateSnafu { action: "export" })?,
383 );
384 }
385 record.push(Arc::new(data.finish()));
386 }
387
388 for field in sigma_fields {
390 let mut data = Float64Builder::new();
391 for s in &estimates {
392 if let StateParameter::Element(oe) = field {
393 data.append_value(s.sigma_for(oe).unwrap());
394 }
395 }
396 record.push(Arc::new(data.finish()));
397 }
398
399 for i in 0..est_size {
401 for j in i..est_size {
402 let mut data = Float64Builder::new();
403 for s in &estimates {
404 data.append_value(s.covar()[(i, j)]);
405 }
406 record.push(Arc::new(data.finish()));
407 }
408 }
409
410 for i in 0..est_size {
412 let mut data = Float64Builder::new();
413 for s in &estimates {
414 data.append_value(s.covar()[(i, i)].sqrt());
415 }
416 record.push(Arc::new(data.finish()));
417 }
418
419 let mut ric_covariances = Vec::new();
421
422 for s in &estimates {
423 let dcm_ric2inertial = s
424 .state()
425 .orbit()
426 .dcm_from_ric_to_inertial()
427 .unwrap()
428 .state_dcm();
429
430 let cov = s.covar();
432 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
433
434 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
436 ric_covariances.push(ric_covar);
437 }
438
439 for i in 0..6 {
441 let mut data = Float64Builder::new();
442 for cov in ric_covariances.iter().take(estimates.len()) {
443 data.append_value(cov[(i, i)].sqrt());
444 }
445 record.push(Arc::new(data.finish()));
446 }
447
448 for msr_type in &self.measurement_types {
451 let mut data = Float64Builder::new();
452 for resid_opt in &residuals {
453 if let Some(resid) = resid_opt {
454 match resid.prefit(*msr_type) {
455 Some(prefit) => data.append_value(prefit),
456 None => data.append_null(),
457 };
458 } else {
459 data.append_null();
460 }
461 }
462 record.push(Arc::new(data.finish()));
463 }
464 for msr_type in &self.measurement_types {
466 let mut data = Float64Builder::new();
467 for resid_opt in &residuals {
468 if let Some(resid) = resid_opt {
469 match resid.postfit(*msr_type) {
470 Some(postfit) => data.append_value(postfit),
471 None => data.append_null(),
472 };
473 } else {
474 data.append_null();
475 }
476 }
477 record.push(Arc::new(data.finish()));
478 }
479 for msr_type in &self.measurement_types {
481 let mut data = Float64Builder::new();
482 for resid_opt in &residuals {
483 if let Some(resid) = resid_opt {
484 match resid.trk_noise(*msr_type) {
485 Some(noise) => data.append_value(noise),
486 None => data.append_null(),
487 };
488 } else {
489 data.append_null();
490 }
491 }
492 record.push(Arc::new(data.finish()));
493 }
494 for msr_type in &self.measurement_types {
496 let mut data = Float64Builder::new();
497 for resid_opt in &residuals {
498 if let Some(resid) = resid_opt {
499 match resid.real_obs(*msr_type) {
500 Some(postfit) => data.append_value(postfit),
501 None => data.append_null(),
502 };
503 } else {
504 data.append_null();
505 }
506 }
507 record.push(Arc::new(data.finish()));
508 }
509 for msr_type in &self.measurement_types {
511 let mut data = Float64Builder::new();
512 for resid_opt in &residuals {
513 if let Some(resid) = resid_opt {
514 match resid.computed_obs(*msr_type) {
515 Some(postfit) => data.append_value(postfit),
516 None => data.append_null(),
517 };
518 } else {
519 data.append_null();
520 }
521 }
522 record.push(Arc::new(data.finish()));
523 }
524 let mut data = Float64Builder::new();
526 for resid_opt in &residuals {
527 if let Some(resid) = resid_opt {
528 data.append_value(resid.ratio);
529 } else {
530 data.append_null();
531 }
532 }
533 record.push(Arc::new(data.finish()));
534
535 let mut data = BooleanBuilder::new();
537 for resid_opt in &residuals {
538 if let Some(resid) = resid_opt {
539 data.append_value(resid.rejected);
540 } else {
541 data.append_null();
542 }
543 }
544 record.push(Arc::new(data.finish()));
545
546 let mut data = StringBuilder::new();
548 for resid_opt in &residuals {
549 if let Some(resid) = resid_opt {
550 data.append_value(
551 resid
552 .tracker
553 .clone()
554 .unwrap_or("Undefined tracker".to_string()),
555 );
556 } else {
557 data.append_null();
558 }
559 }
560 record.push(Arc::new(data.finish()));
561
562 for i in 0..est_size {
564 for j in 0..MsrSize::DIM {
565 let mut data = Float64Builder::new();
566 for opt_k in &self.gains {
567 if let Some(k) = opt_k {
568 data.append_value(k[(i, j)]);
569 } else {
570 data.append_null();
571 }
572 }
573 record.push(Arc::new(data.finish()));
574 }
575 }
576
577 for i in 0..est_size {
579 let mut data = Float64Builder::new();
580 for opt_fsr in &self.filter_smoother_ratios {
581 if let Some(fsr) = opt_fsr {
582 data.append_value(fsr[i]);
583 } else {
584 data.append_null();
585 }
586 }
587 record.push(Arc::new(data.finish()));
588 }
589
590 info!("Serialized {} estimates and residuals", estimates.len());
591
592 let mut metadata = HashMap::new();
594 metadata.insert(
595 "Purpose".to_string(),
596 "Orbit determination results".to_string(),
597 );
598 if let Some(add_meta) = cfg.metadata {
599 for (k, v) in add_meta {
600 metadata.insert(k, v);
601 }
602 }
603
604 let props = pq_writer(Some(metadata));
605
606 let file = File::create(&path_buf)
607 .context(StdIOSnafu {
608 action: "creating OD results file",
609 })
610 .context(ODIOSnafu)?;
611
612 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
613 .context(ParquetSnafu {
614 action: "exporting OD results",
615 })
616 .context(ODIOSnafu)?;
617
618 let batch = RecordBatch::try_new(schema, record)
619 .context(ArrowSnafu {
620 action: "writing OD results (building batch record)",
621 })
622 .context(ODIOSnafu)?;
623
624 writer
625 .write(&batch)
626 .context(ParquetSnafu {
627 action: "writing OD results",
628 })
629 .context(ODIOSnafu)?;
630
631 writer
632 .close()
633 .context(ParquetSnafu {
634 action: "closing OD results file",
635 })
636 .context(ODIOSnafu)?;
637
638 let tock_time = Epoch::now().unwrap() - tick;
640 info!(
641 "Orbit determination results written to {} in {tock_time}",
642 path_buf.display()
643 );
644 Ok(path_buf)
645 }
646
647 pub fn to_ephemeris(&self, object_id: String) -> Ephemeris {
649 let mut ephem = Ephemeris::new(object_id);
650
651 for estimate in &self.estimates {
652 let covar = Covariance {
653 matrix: estimate.covar.fixed_view::<6, 6>(0, 0).into(),
654 local_frame: anise::ephemerides::ephemeris::LocalFrame::Inertial,
655 };
656 let rcrd = EphemerisRecord {
657 orbit: estimate.orbital_state(),
658 covar: Some(covar),
659 };
660 ephem.insert(rcrd);
661 }
662
663 ephem
664 }
665}