1use crate::dynamics::SpacecraftDynamics;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::allocator::Allocator;
23use crate::linalg::{DefaultAllocator, DimName};
24use crate::md::trajectory::Interpolatable;
25use crate::md::StateParameter;
26use crate::od::estimate::*;
27use crate::State;
28use crate::{od::*, Spacecraft};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use filter::kalman::KF;
33use hifitime::TimeScale;
34use msr::sensitivity::TrackerSensitivity;
35use msr::TrackingDataArc;
36use nalgebra::Const;
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43use super::ODProcess;
44
45impl<MsrSize: DimName, Accel: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
46 ODProcess<'_, SpacecraftDynamics, MsrSize, Accel, KF<Spacecraft, Accel, MsrSize>, Trk>
47where
48 DefaultAllocator: Allocator<MsrSize>
49 + Allocator<MsrSize, <Spacecraft as State>::Size>
50 + Allocator<Const<1>, MsrSize>
51 + Allocator<<Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
53 + Allocator<MsrSize, MsrSize>
54 + Allocator<MsrSize, <Spacecraft as State>::Size>
55 + Allocator<<Spacecraft as State>::Size, MsrSize>
56 + Allocator<Accel>
57 + Allocator<Accel, Accel>
58 + Allocator<<Spacecraft as State>::Size>
59 + Allocator<<Spacecraft as State>::VecLength>
60 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
61 + Allocator<<Spacecraft as State>::Size, Accel>
62 + Allocator<Accel, <Spacecraft as State>::Size>,
63{
64 pub fn to_parquet<P: AsRef<Path>>(
66 &self,
67 arc: &TrackingDataArc,
68 path: P,
69 cfg: ExportCfg,
70 ) -> Result<PathBuf, ODError> {
71 ensure!(
72 !self.estimates.is_empty(),
73 TooFewMeasurementsSnafu {
74 need: 1_usize,
75 action: "exporting OD results"
76 }
77 );
78
79 if self.estimates.len() != self.residuals.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: "Estimates and residuals are not aligned.".to_string(),
83 },
84 });
85 }
86
87 let tick = Epoch::now().unwrap();
88 info!("Exporting orbit determination result to parquet file...");
89
90 if cfg.step.is_some() {
91 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
92 }
93
94 let path_buf = cfg.actual_path(path);
96
97 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
99
100 let frame = self.estimates[0].state().frame();
101
102 let more_meta = Some(vec![(
103 "Frame".to_string(),
104 serde_dhall::serialize(&frame)
105 .to_string()
106 .map_err(|e| ODError::ODIOError {
107 source: InputOutputError::SerializeDhall {
108 what: format!("frame `{frame}`"),
109 err: e.to_string(),
110 },
111 })?,
112 )]);
113
114 let mut fields = match cfg.fields {
115 Some(fields) => fields,
116 None => Spacecraft::export_params(),
117 };
118
119 fields.retain(|param| match self.estimates[0].state().value(*param) {
121 Ok(_) => param != &StateParameter::GuidanceMode,
122 Err(_) => false,
123 });
124
125 for field in &fields {
126 hdrs.push(field.to_field(more_meta.clone()));
127 }
128
129 let mut sigma_fields = fields.clone();
130 sigma_fields.retain(|param| {
132 !matches!(
133 param,
134 &StateParameter::X
135 | &StateParameter::Y
136 | &StateParameter::Z
137 | &StateParameter::VX
138 | &StateParameter::VY
139 | &StateParameter::VZ
140 ) && self.estimates[0].sigma_for(*param).is_ok()
141 });
142
143 for field in &sigma_fields {
144 hdrs.push(field.to_cov_field(more_meta.clone()));
145 }
146
147 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
148 let state_units = [
149 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
150 ];
151 let mut cov_units = vec![];
152
153 for i in 0..state_items.len() {
154 for j in i..state_items.len() {
155 let cov_unit = if i < 3 {
156 if j < 3 {
157 "km^2"
158 } else if (3..6).contains(&j) {
159 "km^2/s"
160 } else if j == 8 {
161 "km*kg"
162 } else {
163 "km"
164 }
165 } else if (3..6).contains(&i) {
166 if (3..6).contains(&j) {
167 "km^2/s^2"
168 } else if j == 8 {
169 "km/s*kg"
170 } else {
171 "km/s"
172 }
173 } else if i == 8 || j == 8 {
174 "kg^2"
175 } else {
176 "unitless"
177 };
178
179 cov_units.push(cov_unit);
180 }
181 }
182
183 let est_size = <Spacecraft as State>::Size::dim();
184
185 let mut idx = 0;
186 for i in 0..state_items.len() {
187 for j in i..state_items.len() {
188 hdrs.push(Field::new(
189 format!(
190 "Covariance {}*{} ({frame:x}) ({})",
191 state_items[i], state_items[j], cov_units[idx]
192 ),
193 DataType::Float64,
194 false,
195 ));
196 idx += 1;
197 }
198 }
199
200 for (i, coord) in state_items.iter().enumerate() {
202 hdrs.push(Field::new(
203 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
204 DataType::Float64,
205 false,
206 ));
207 }
208
209 for (i, coord) in state_items.iter().enumerate().take(6) {
211 hdrs.push(Field::new(
212 format!("Sigma {coord} (RIC) ({})", state_units[i]),
213 DataType::Float64,
214 false,
215 ));
216 }
217
218 let mut msr_fields = Vec::new();
220 for f in arc.unique_types() {
221 msr_fields.push(
222 f.to_field()
223 .with_nullable(true)
224 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
225 );
226 }
227 for f in arc.unique_types() {
228 msr_fields.push(
229 f.to_field()
230 .with_nullable(true)
231 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
232 );
233 }
234 for f in arc.unique_types() {
235 msr_fields.push(
236 f.to_field()
237 .with_nullable(true)
238 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
239 );
240 }
241
242 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
243 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
244 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
245
246 hdrs.append(&mut msr_fields);
247
248 let schema = Arc::new(Schema::new(hdrs));
250 let mut record: Vec<Arc<dyn Array>> = Vec::new();
251
252 let (estimates, residuals) =
254 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
255 let start = cfg
257 .start_epoch
258 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
259 let end = cfg
260 .end_epoch
261 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
262
263 let mut residuals: Vec<Option<Residual<MsrSize>>> =
264 Vec::with_capacity(self.residuals.len());
265 let mut estimates = Vec::with_capacity(self.estimates.len());
266
267 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
268 if estimate.epoch() >= start && estimate.epoch() <= end {
269 estimates.push(*estimate);
270 residuals.push(residual.clone());
271 }
272 }
273
274 (estimates, residuals)
275 } else {
276 (self.estimates.to_vec(), self.residuals.to_vec())
277 };
278
279 let mut utc_epoch = StringBuilder::new();
283 for s in &estimates {
284 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
285 }
286 record.push(Arc::new(utc_epoch.finish()));
287
288 for field in fields {
290 let mut data = Float64Builder::new();
291 for s in &estimates {
292 data.append_value(s.state().value(field).unwrap());
293 }
294 record.push(Arc::new(data.finish()));
295 }
296
297 for field in sigma_fields {
299 let mut data = Float64Builder::new();
300 for s in &estimates {
301 data.append_value(s.sigma_for(field).unwrap());
302 }
303 record.push(Arc::new(data.finish()));
304 }
305
306 for i in 0..est_size {
308 for j in i..est_size {
309 let mut data = Float64Builder::new();
310 for s in &estimates {
311 data.append_value(s.covar()[(i, j)]);
312 }
313 record.push(Arc::new(data.finish()));
314 }
315 }
316
317 for i in 0..est_size {
319 let mut data = Float64Builder::new();
320 for s in &estimates {
321 data.append_value(s.covar()[(i, i)].sqrt());
322 }
323 record.push(Arc::new(data.finish()));
324 }
325
326 let mut ric_covariances = Vec::new();
328
329 for s in &estimates {
330 let dcm_ric2inertial = s
331 .state()
332 .orbit()
333 .dcm_from_ric_to_inertial()
334 .unwrap()
335 .state_dcm();
336
337 let cov = s.covar();
339 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
340
341 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
343 ric_covariances.push(ric_covar);
344 }
345
346 for i in 0..6 {
348 let mut data = Float64Builder::new();
349 for cov in ric_covariances.iter().take(estimates.len()) {
350 data.append_value(cov[(i, i)].sqrt());
351 }
352 record.push(Arc::new(data.finish()));
353 }
354
355 for msr_type in arc.unique_types() {
358 let mut data = Float64Builder::new();
359 for resid_opt in &residuals {
360 if let Some(resid) = resid_opt {
361 match resid.prefit(msr_type) {
362 Some(prefit) => data.append_value(prefit),
363 None => data.append_null(),
364 };
365 } else {
366 data.append_null();
367 }
368 }
369 record.push(Arc::new(data.finish()));
370 }
371 for msr_type in arc.unique_types() {
373 let mut data = Float64Builder::new();
374 for resid_opt in &residuals {
375 if let Some(resid) = resid_opt {
376 match resid.postfit(msr_type) {
377 Some(postfit) => data.append_value(postfit),
378 None => data.append_null(),
379 };
380 } else {
381 data.append_null();
382 }
383 }
384 record.push(Arc::new(data.finish()));
385 }
386 for msr_type in arc.unique_types() {
388 let mut data = Float64Builder::new();
389 for resid_opt in &residuals {
390 if let Some(resid) = resid_opt {
391 match resid.trk_noise(msr_type) {
392 Some(noise) => data.append_value(noise),
393 None => data.append_null(),
394 };
395 } else {
396 data.append_null();
397 }
398 }
399 record.push(Arc::new(data.finish()));
400 }
401 let mut data = Float64Builder::new();
403 for resid_opt in &residuals {
404 if let Some(resid) = resid_opt {
405 data.append_value(resid.ratio);
406 } else {
407 data.append_null();
408 }
409 }
410 record.push(Arc::new(data.finish()));
411
412 let mut data = BooleanBuilder::new();
414 for resid_opt in &residuals {
415 if let Some(resid) = resid_opt {
416 data.append_value(resid.rejected);
417 } else {
418 data.append_null();
419 }
420 }
421 record.push(Arc::new(data.finish()));
422
423 let mut data = StringBuilder::new();
425 for resid_opt in &residuals {
426 if let Some(resid) = resid_opt {
427 data.append_value(
428 resid
429 .tracker
430 .clone()
431 .unwrap_or("Undefined tracker".to_string()),
432 );
433 } else {
434 data.append_null();
435 }
436 }
437 record.push(Arc::new(data.finish()));
438
439 info!("Serialized {} estimates and residuals", estimates.len());
440
441 let mut metadata = HashMap::new();
443 metadata.insert(
444 "Purpose".to_string(),
445 "Orbit determination results".to_string(),
446 );
447 if let Some(add_meta) = cfg.metadata {
448 for (k, v) in add_meta {
449 metadata.insert(k, v);
450 }
451 }
452
453 let props = pq_writer(Some(metadata));
454
455 let file = File::create(&path_buf)
456 .context(StdIOSnafu {
457 action: "creating OD results file",
458 })
459 .context(ODIOSnafu)?;
460
461 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
462 .context(ParquetSnafu {
463 action: "exporting OD results",
464 })
465 .context(ODIOSnafu)?;
466
467 let batch = RecordBatch::try_new(schema, record)
468 .context(ArrowSnafu {
469 action: "writing OD results (building batch record)",
470 })
471 .context(ODIOSnafu)?;
472
473 writer
474 .write(&batch)
475 .context(ParquetSnafu {
476 action: "writing OD results",
477 })
478 .context(ODIOSnafu)?;
479
480 writer
481 .close()
482 .context(ParquetSnafu {
483 action: "closing OD results file",
484 })
485 .context(ODIOSnafu)?;
486
487 let tock_time = Epoch::now().unwrap() - tick;
489 info!(
490 "Orbit determination results written to {} in {tock_time}",
491 path_buf.display()
492 );
493 Ok(path_buf)
494 }
495}