1use crate::dynamics::SpacecraftDynamics;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::allocator::Allocator;
23use crate::linalg::{DefaultAllocator, DimName};
24use crate::md::trajectory::Interpolatable;
25use crate::md::StateParameter;
26use crate::od::estimate::*;
27use crate::State;
28use crate::{od::*, Spacecraft};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use filter::kalman::KF;
33use hifitime::TimeScale;
34use msr::sensitivity::TrackerSensitivity;
35use msr::TrackingDataArc;
36use nalgebra::Const;
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43use super::ODProcess;
44
45impl<MsrSize: DimName, Accel: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
46 ODProcess<'_, SpacecraftDynamics, MsrSize, Accel, KF<Spacecraft, Accel>, Trk>
47where
48 DefaultAllocator: Allocator<MsrSize>
49 + Allocator<MsrSize, <Spacecraft as State>::Size>
50 + Allocator<Const<1>, MsrSize>
51 + Allocator<<Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
53 + Allocator<MsrSize, MsrSize>
54 + Allocator<MsrSize, <Spacecraft as State>::Size>
55 + Allocator<<Spacecraft as State>::Size, MsrSize>
56 + Allocator<Accel>
57 + Allocator<Accel, Accel>
58 + Allocator<<Spacecraft as State>::Size>
59 + Allocator<<Spacecraft as State>::VecLength>
60 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
61 + Allocator<<Spacecraft as State>::Size, Accel>
62 + Allocator<Accel, <Spacecraft as State>::Size>,
63{
64 pub fn to_parquet<P: AsRef<Path>>(
66 &self,
67 arc: &TrackingDataArc,
68 path: P,
69 cfg: ExportCfg,
70 ) -> Result<PathBuf, ODError> {
71 ensure!(
72 !self.estimates.is_empty(),
73 TooFewMeasurementsSnafu {
74 need: 1_usize,
75 action: "exporting OD results"
76 }
77 );
78
79 if self.estimates.len() != self.residuals.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: "Estimates and residuals are not aligned.".to_string(),
83 },
84 });
85 }
86
87 let tick = Epoch::now().unwrap();
88 info!("Exporting orbit determination result to parquet file...");
89
90 if cfg.step.is_some() {
91 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
92 }
93
94 let path_buf = cfg.actual_path(path);
96
97 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
99
100 let frame = self.estimates[0].state().frame();
101
102 let more_meta = Some(vec![(
103 "Frame".to_string(),
104 serde_dhall::serialize(&frame)
105 .to_string()
106 .map_err(|e| ODError::ODIOError {
107 source: InputOutputError::SerializeDhall {
108 what: format!("frame `{frame}`"),
109 err: e.to_string(),
110 },
111 })?,
112 )]);
113
114 let mut fields = match cfg.fields {
115 Some(fields) => fields,
116 None => Spacecraft::export_params(),
117 };
118
119 fields.retain(|param| match self.estimates[0].state().value(*param) {
121 Ok(_) => param != &StateParameter::GuidanceMode,
122 Err(_) => false,
123 });
124
125 for field in &fields {
126 hdrs.push(field.to_field(more_meta.clone()));
127 }
128
129 let mut sigma_fields = fields.clone();
130 sigma_fields.retain(|param| {
132 !matches!(
133 param,
134 &StateParameter::X
135 | &StateParameter::Y
136 | &StateParameter::Z
137 | &StateParameter::VX
138 | &StateParameter::VY
139 | &StateParameter::VZ
140 ) && self.estimates[0].sigma_for(*param).is_ok()
141 });
142
143 for field in &sigma_fields {
144 hdrs.push(field.to_cov_field(more_meta.clone()));
145 }
146
147 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
148 let state_units = [
149 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
150 ];
151 let mut cov_units = vec![];
152
153 for i in 0..state_items.len() {
154 for j in i..state_items.len() {
155 let cov_unit = if i < 3 {
156 if j < 3 {
157 "km^2"
158 } else if (3..6).contains(&j) {
159 "km^2/s"
160 } else if j == 8 {
161 "km*kg"
162 } else {
163 "km"
164 }
165 } else if (3..6).contains(&i) {
166 if (3..6).contains(&j) {
167 "km^2/s^2"
168 } else if j == 8 {
169 "km/s*kg"
170 } else {
171 "km/s"
172 }
173 } else if i == 8 || j == 8 {
174 "kg^2"
175 } else {
176 "unitless"
177 };
178
179 cov_units.push(cov_unit);
180 }
181 }
182
183 let est_size = <Spacecraft as State>::Size::dim();
184
185 let mut idx = 0;
186 for i in 0..state_items.len() {
187 for j in i..state_items.len() {
188 hdrs.push(Field::new(
189 format!(
190 "Covariance {}*{} ({frame:x}) ({})",
191 state_items[i], state_items[j], cov_units[idx]
192 ),
193 DataType::Float64,
194 false,
195 ));
196 idx += 1;
197 }
198 }
199
200 for (i, coord) in state_items.iter().enumerate() {
202 hdrs.push(Field::new(
203 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
204 DataType::Float64,
205 false,
206 ));
207 }
208
209 for (i, coord) in state_items.iter().enumerate().take(6) {
211 hdrs.push(Field::new(
212 format!("Sigma {coord} (RIC) ({})", state_units[i]),
213 DataType::Float64,
214 false,
215 ));
216 }
217
218 let mut msr_fields = Vec::new();
220 for f in arc.unique_types() {
221 msr_fields.push(
222 f.to_field()
223 .with_nullable(true)
224 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
225 );
226 }
227 for f in arc.unique_types() {
228 msr_fields.push(
229 f.to_field()
230 .with_nullable(true)
231 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
232 );
233 }
234 for f in arc.unique_types() {
235 msr_fields.push(
236 f.to_field()
237 .with_nullable(true)
238 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
239 );
240 }
241 for f in arc.unique_types() {
242 msr_fields.push(
243 f.to_field()
244 .with_nullable(true)
245 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
246 );
247 }
248 for f in arc.unique_types() {
249 msr_fields.push(
250 f.to_field()
251 .with_nullable(true)
252 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
253 );
254 }
255
256 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
257 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
258 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
259
260 hdrs.append(&mut msr_fields);
261
262 let schema = Arc::new(Schema::new(hdrs));
264 let mut record: Vec<Arc<dyn Array>> = Vec::new();
265
266 let (estimates, residuals) =
268 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
269 let start = cfg
271 .start_epoch
272 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
273 let end = cfg
274 .end_epoch
275 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
276
277 let mut residuals: Vec<Option<Residual<MsrSize>>> =
278 Vec::with_capacity(self.residuals.len());
279 let mut estimates = Vec::with_capacity(self.estimates.len());
280
281 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
282 if estimate.epoch() >= start && estimate.epoch() <= end {
283 estimates.push(*estimate);
284 residuals.push(residual.clone());
285 }
286 }
287
288 (estimates, residuals)
289 } else {
290 (self.estimates.to_vec(), self.residuals.to_vec())
291 };
292
293 let mut utc_epoch = StringBuilder::new();
297 for s in &estimates {
298 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
299 }
300 record.push(Arc::new(utc_epoch.finish()));
301
302 for field in fields {
304 let mut data = Float64Builder::new();
305 for s in &estimates {
306 data.append_value(
307 s.state()
308 .value(field)
309 .context(ODStateSnafu { action: "export" })?,
310 );
311 }
312 record.push(Arc::new(data.finish()));
313 }
314
315 for field in sigma_fields {
317 let mut data = Float64Builder::new();
318 for s in &estimates {
319 data.append_value(s.sigma_for(field).unwrap());
320 }
321 record.push(Arc::new(data.finish()));
322 }
323
324 for i in 0..est_size {
326 for j in i..est_size {
327 let mut data = Float64Builder::new();
328 for s in &estimates {
329 data.append_value(s.covar()[(i, j)]);
330 }
331 record.push(Arc::new(data.finish()));
332 }
333 }
334
335 for i in 0..est_size {
337 let mut data = Float64Builder::new();
338 for s in &estimates {
339 data.append_value(s.covar()[(i, i)].sqrt());
340 }
341 record.push(Arc::new(data.finish()));
342 }
343
344 let mut ric_covariances = Vec::new();
346
347 for s in &estimates {
348 let dcm_ric2inertial = s
349 .state()
350 .orbit()
351 .dcm_from_ric_to_inertial()
352 .unwrap()
353 .state_dcm();
354
355 let cov = s.covar();
357 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
358
359 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
361 ric_covariances.push(ric_covar);
362 }
363
364 for i in 0..6 {
366 let mut data = Float64Builder::new();
367 for cov in ric_covariances.iter().take(estimates.len()) {
368 data.append_value(cov[(i, i)].sqrt());
369 }
370 record.push(Arc::new(data.finish()));
371 }
372
373 for msr_type in arc.unique_types() {
376 let mut data = Float64Builder::new();
377 for resid_opt in &residuals {
378 if let Some(resid) = resid_opt {
379 match resid.prefit(msr_type) {
380 Some(prefit) => data.append_value(prefit),
381 None => data.append_null(),
382 };
383 } else {
384 data.append_null();
385 }
386 }
387 record.push(Arc::new(data.finish()));
388 }
389 for msr_type in arc.unique_types() {
391 let mut data = Float64Builder::new();
392 for resid_opt in &residuals {
393 if let Some(resid) = resid_opt {
394 match resid.postfit(msr_type) {
395 Some(postfit) => data.append_value(postfit),
396 None => data.append_null(),
397 };
398 } else {
399 data.append_null();
400 }
401 }
402 record.push(Arc::new(data.finish()));
403 }
404 for msr_type in arc.unique_types() {
406 let mut data = Float64Builder::new();
407 for resid_opt in &residuals {
408 if let Some(resid) = resid_opt {
409 match resid.trk_noise(msr_type) {
410 Some(noise) => data.append_value(noise),
411 None => data.append_null(),
412 };
413 } else {
414 data.append_null();
415 }
416 }
417 record.push(Arc::new(data.finish()));
418 }
419 for msr_type in arc.unique_types() {
421 let mut data = Float64Builder::new();
422 for resid_opt in &residuals {
423 if let Some(resid) = resid_opt {
424 match resid.real_obs(msr_type) {
425 Some(postfit) => data.append_value(postfit),
426 None => data.append_null(),
427 };
428 } else {
429 data.append_null();
430 }
431 }
432 record.push(Arc::new(data.finish()));
433 }
434 for msr_type in arc.unique_types() {
436 let mut data = Float64Builder::new();
437 for resid_opt in &residuals {
438 if let Some(resid) = resid_opt {
439 match resid.computed_obs(msr_type) {
440 Some(postfit) => data.append_value(postfit),
441 None => data.append_null(),
442 };
443 } else {
444 data.append_null();
445 }
446 }
447 record.push(Arc::new(data.finish()));
448 }
449 let mut data = Float64Builder::new();
451 for resid_opt in &residuals {
452 if let Some(resid) = resid_opt {
453 data.append_value(resid.ratio);
454 } else {
455 data.append_null();
456 }
457 }
458 record.push(Arc::new(data.finish()));
459
460 let mut data = BooleanBuilder::new();
462 for resid_opt in &residuals {
463 if let Some(resid) = resid_opt {
464 data.append_value(resid.rejected);
465 } else {
466 data.append_null();
467 }
468 }
469 record.push(Arc::new(data.finish()));
470
471 let mut data = StringBuilder::new();
473 for resid_opt in &residuals {
474 if let Some(resid) = resid_opt {
475 data.append_value(
476 resid
477 .tracker
478 .clone()
479 .unwrap_or("Undefined tracker".to_string()),
480 );
481 } else {
482 data.append_null();
483 }
484 }
485 record.push(Arc::new(data.finish()));
486
487 info!("Serialized {} estimates and residuals", estimates.len());
488
489 let mut metadata = HashMap::new();
491 metadata.insert(
492 "Purpose".to_string(),
493 "Orbit determination results".to_string(),
494 );
495 if let Some(add_meta) = cfg.metadata {
496 for (k, v) in add_meta {
497 metadata.insert(k, v);
498 }
499 }
500
501 let props = pq_writer(Some(metadata));
502
503 let file = File::create(&path_buf)
504 .context(StdIOSnafu {
505 action: "creating OD results file",
506 })
507 .context(ODIOSnafu)?;
508
509 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
510 .context(ParquetSnafu {
511 action: "exporting OD results",
512 })
513 .context(ODIOSnafu)?;
514
515 let batch = RecordBatch::try_new(schema, record)
516 .context(ArrowSnafu {
517 action: "writing OD results (building batch record)",
518 })
519 .context(ODIOSnafu)?;
520
521 writer
522 .write(&batch)
523 .context(ParquetSnafu {
524 action: "writing OD results",
525 })
526 .context(ODIOSnafu)?;
527
528 writer
529 .close()
530 .context(ParquetSnafu {
531 action: "closing OD results file",
532 })
533 .context(ODIOSnafu)?;
534
535 let tock_time = Epoch::now().unwrap() - tick;
537 info!(
538 "Orbit determination results written to {} in {tock_time}",
539 path_buf.display()
540 );
541 Ok(path_buf)
542 }
543}