1use crate::State;
20use crate::io::watermark::pq_writer;
21use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
22use crate::linalg::DefaultAllocator;
23use crate::linalg::allocator::Allocator;
24use crate::md::StateParameter;
25use crate::md::trajectory::Interpolatable;
26use crate::od::estimate::*;
27use crate::od::groundpnt::GroundAsset;
28use crate::od::interlink::InterlinkTxSpacecraft;
29use crate::od::process::ODSolution;
30use crate::{Spacecraft, od::*};
31use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
32use arrow::datatypes::{DataType, Field, Schema};
33use arrow::record_batch::RecordBatch;
34use hifitime::TimeScale;
35use log::{info, warn};
36use nalgebra::{Const, U2};
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43impl ODSolution<GroundAsset, KfEstimate<GroundAsset>, U2, InterlinkTxSpacecraft>
44where
45 DefaultAllocator: Allocator<U2>
46 + Allocator<U2, <Spacecraft as State>::Size>
47 + Allocator<Const<1>, U2>
48 + Allocator<<Spacecraft as State>::Size>
49 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50 + Allocator<U2, U2>
51 + Allocator<U2, <Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, U2>
53 + Allocator<<Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::VecLength>
55 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59 ensure!(
60 !self.estimates.is_empty(),
61 TooFewMeasurementsSnafu {
62 need: 1_usize,
63 action: "exporting PNT results"
64 }
65 );
66
67 if self.estimates.len() != self.residuals.len() {
68 return Err(ODError::ODConfigError {
69 source: ConfigError::InvalidConfig {
70 msg: format!(
71 "Estimates ({}) and residuals ({}) are not aligned.",
72 self.estimates.len(),
73 self.residuals.len()
74 ),
75 },
76 });
77 }
78
79 if self.estimates.len() != self.gains.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: format!(
83 "Estimates ({}) and filter gains ({}) are not aligned.",
84 self.estimates.len(),
85 self.gains.len()
86 ),
87 },
88 });
89 }
90
91 if self.estimates.len() != self.filter_smoother_ratios.len() {
92 return Err(ODError::ODConfigError {
93 source: ConfigError::InvalidConfig {
94 msg: format!(
95 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96 self.estimates.len(),
97 self.filter_smoother_ratios.len()
98 ),
99 },
100 });
101 }
102
103 let tick = Epoch::now().unwrap();
104 info!("Exporting orbit determination result to parquet file...");
105
106 if cfg.step.is_some() {
107 warn!(
108 "The `step` parameter in the export is not supported for orbit determination exports."
109 );
110 }
111
112 let path_buf = cfg.actual_path(path);
114
115 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
117
118 let frame = self.estimates[0].state().frame();
119
120 let more_meta = Some(vec![(
121 "Frame".to_string(),
122 serde_dhall::serialize(&frame)
123 .static_type_annotation()
124 .to_string()
125 .map_err(|e| ODError::ODIOError {
126 source: InputOutputError::SerializeDhall {
127 what: format!("frame `{frame}`"),
128 err: e.to_string(),
129 },
130 })?,
131 )]);
132
133 let mut fields = GroundAsset::export_params();
134
135 fields.retain(|param| match self.estimates[0].state().value(*param) {
137 Ok(_) => param != &StateParameter::GuidanceMode(),
138 Err(_) => false,
139 });
140
141 for field in &fields {
142 hdrs.push(field.to_field(more_meta.clone()));
143 }
144
145 let mut sigma_fields = fields.clone();
146 sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
148
149 for field in &sigma_fields {
150 hdrs.push(field.to_cov_field(more_meta.clone()));
151 }
152
153 let state_items = [
155 "Latitude",
156 "Longitude",
157 "Altitude",
158 ];
162
163 let state_units = ["deg", "deg", "km"];
164 let mut cov_units = vec![];
165
166 for i in 0..state_items.len() {
167 for j in i..state_items.len() {
168 let cov_unit = format!("{}*{}", state_units[i], state_units[j]);
169
170 cov_units.push(cov_unit);
171 }
172 }
173
174 let mut idx = 0;
175 for i in 0..state_items.len() {
176 for j in i..state_items.len() {
177 hdrs.push(Field::new(
178 format!(
179 "Covariance {}*{} ({frame:x}) ({})",
180 state_items[i], state_items[j], cov_units[idx]
181 ),
182 DataType::Float64,
183 false,
184 ));
185 idx += 1;
186 }
187 }
188
189 let mut msr_fields = Vec::new();
191 for f in &self.measurement_types {
192 msr_fields.push(
193 f.to_field()
194 .with_nullable(true)
195 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
196 );
197 }
198 for f in &self.measurement_types {
199 msr_fields.push(
200 f.to_field()
201 .with_nullable(true)
202 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
203 );
204 }
205 for f in &self.measurement_types {
206 msr_fields.push(
207 f.to_field()
208 .with_nullable(true)
209 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
210 );
211 }
212 for f in &self.measurement_types {
213 msr_fields.push(
214 f.to_field()
215 .with_nullable(true)
216 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
217 );
218 }
219 for f in &self.measurement_types {
220 msr_fields.push(
221 f.to_field()
222 .with_nullable(true)
223 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
224 );
225 }
226
227 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
228 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
229 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
230
231 hdrs.append(&mut msr_fields);
232
233 for i in 0..state_items.len() {
235 for f in &self.measurement_types {
236 hdrs.push(Field::new(
237 format!(
238 "Gain {}*{f:?} ({}*{}*{})",
239 state_items[i],
240 state_units[i],
241 state_units[i],
242 f.unit()
243 ),
244 DataType::Float64,
245 true,
246 ));
247 }
248 }
249
250 for i in 0..state_items.len() {
252 hdrs.push(Field::new(
253 format!(
254 "Filter-smoother ratio {} ({}*{})",
255 state_items[i], state_units[i], state_units[i],
256 ),
257 DataType::Float64,
258 true,
259 ));
260 }
261
262 let schema = Arc::new(Schema::new(hdrs));
264 let mut record: Vec<Arc<dyn Array>> = Vec::new();
265
266 let (estimates, residuals) =
268 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
269 let start = cfg
271 .start_epoch
272 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
273 let end = cfg
274 .end_epoch
275 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
276
277 let mut residuals: Vec<Option<Residual<U2>>> =
278 Vec::with_capacity(self.residuals.len());
279 let mut estimates = Vec::with_capacity(self.estimates.len());
280
281 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
282 if estimate.epoch() >= start && estimate.epoch() <= end {
283 estimates.push(*estimate);
284 residuals.push(residual.clone());
285 }
286 }
287
288 (estimates, residuals)
289 } else {
290 (self.estimates.to_vec(), self.residuals.to_vec())
291 };
292
293 let mut utc_epoch = StringBuilder::new();
297 for s in &estimates {
298 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
299 }
300 record.push(Arc::new(utc_epoch.finish()));
301
302 for field in fields {
304 let mut data = Float64Builder::new();
305 for s in &estimates {
306 data.append_value(
307 s.state()
308 .value(field)
309 .context(ODStateSnafu { action: "export" })?,
310 );
311 }
312 record.push(Arc::new(data.finish()));
313 }
314
315 for i in 0..state_items.len() {
317 let mut data = Float64Builder::new();
318 for s in &estimates {
319 data.append_value(s.covar()[(i, i)].sqrt());
320 }
321 record.push(Arc::new(data.finish()));
322 }
323
324 for i in 0..state_items.len() {
326 for j in i..state_items.len() {
327 let mut data = Float64Builder::new();
328 for s in &estimates {
329 data.append_value(s.covar()[(i, j)]);
330 }
331 record.push(Arc::new(data.finish()));
332 }
333 }
334
335 for msr_type in &self.measurement_types {
338 let mut data = Float64Builder::new();
339 for resid_opt in &residuals {
340 if let Some(resid) = resid_opt {
341 match resid.prefit(*msr_type) {
342 Some(prefit) => data.append_value(prefit),
343 None => data.append_null(),
344 };
345 } else {
346 data.append_null();
347 }
348 }
349 record.push(Arc::new(data.finish()));
350 }
351 for msr_type in &self.measurement_types {
353 let mut data = Float64Builder::new();
354 for resid_opt in &residuals {
355 if let Some(resid) = resid_opt {
356 match resid.postfit(*msr_type) {
357 Some(postfit) => data.append_value(postfit),
358 None => data.append_null(),
359 };
360 } else {
361 data.append_null();
362 }
363 }
364 record.push(Arc::new(data.finish()));
365 }
366
367 for msr_type in &self.measurement_types {
369 let mut data = Float64Builder::new();
370 for resid_opt in &residuals {
371 if let Some(resid) = resid_opt {
372 match resid.trk_noise(*msr_type) {
373 Some(noise) => data.append_value(noise),
374 None => data.append_null(),
375 };
376 } else {
377 data.append_null();
378 }
379 }
380 record.push(Arc::new(data.finish()));
381 }
382
383 for msr_type in &self.measurement_types {
385 let mut data = Float64Builder::new();
386 for resid_opt in &residuals {
387 if let Some(resid) = resid_opt {
388 match resid.real_obs(*msr_type) {
389 Some(postfit) => data.append_value(postfit),
390 None => data.append_null(),
391 };
392 } else {
393 data.append_null();
394 }
395 }
396 record.push(Arc::new(data.finish()));
397 }
398
399 for msr_type in &self.measurement_types {
401 let mut data = Float64Builder::new();
402 for resid_opt in &residuals {
403 if let Some(resid) = resid_opt {
404 match resid.computed_obs(*msr_type) {
405 Some(postfit) => data.append_value(postfit),
406 None => data.append_null(),
407 };
408 } else {
409 data.append_null();
410 }
411 }
412 record.push(Arc::new(data.finish()));
413 }
414
415 let mut data = Float64Builder::new();
417 for resid_opt in &residuals {
418 if let Some(resid) = resid_opt {
419 data.append_value(resid.ratio);
420 } else {
421 data.append_null();
422 }
423 }
424 record.push(Arc::new(data.finish()));
425
426 let mut data = BooleanBuilder::new();
428 for resid_opt in &residuals {
429 if let Some(resid) = resid_opt {
430 data.append_value(resid.rejected);
431 } else {
432 data.append_null();
433 }
434 }
435 record.push(Arc::new(data.finish()));
436
437 let mut data = StringBuilder::new();
439 for resid_opt in &residuals {
440 if let Some(resid) = resid_opt {
441 data.append_value(
442 resid
443 .tracker
444 .clone()
445 .unwrap_or("Undefined tracker".to_string()),
446 );
447 } else {
448 data.append_null();
449 }
450 }
451 record.push(Arc::new(data.finish()));
452
453 for i in 0..state_items.len() {
455 for j in 0..2 {
456 let mut data = Float64Builder::new();
457 for opt_k in &self.gains {
458 if let Some(k) = opt_k {
459 data.append_value(k[(i, j)]);
460 } else {
461 data.append_null();
462 }
463 }
464 record.push(Arc::new(data.finish()));
465 }
466 }
467
468 for i in 0..state_items.len() {
470 let mut data = Float64Builder::new();
471 for opt_fsr in &self.filter_smoother_ratios {
472 if let Some(fsr) = opt_fsr {
473 data.append_value(fsr[i]);
474 } else {
475 data.append_null();
476 }
477 }
478 record.push(Arc::new(data.finish()));
479 }
480
481 info!("Serialized {} estimates and residuals", estimates.len());
482
483 let mut metadata = HashMap::new();
485 metadata.insert("Purpose".to_string(), "PNT results".to_string());
486 if let Some(add_meta) = cfg.metadata {
487 for (k, v) in add_meta {
488 metadata.insert(k, v);
489 }
490 }
491
492 let props = pq_writer(Some(metadata));
493
494 let file = File::create(&path_buf)
495 .context(StdIOSnafu {
496 action: "creating PNT results file",
497 })
498 .context(ODIOSnafu)?;
499
500 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
501 .context(ParquetSnafu {
502 action: "exporting PNT results",
503 })
504 .context(ODIOSnafu)?;
505
506 let batch = RecordBatch::try_new(schema, record)
507 .context(ArrowSnafu {
508 action: "writing PNT results (building batch record)",
509 })
510 .context(ODIOSnafu)?;
511
512 writer
513 .write(&batch)
514 .context(ParquetSnafu {
515 action: "writing PNT results",
516 })
517 .context(ODIOSnafu)?;
518
519 writer
520 .close()
521 .context(ParquetSnafu {
522 action: "closing PNT results file",
523 })
524 .context(ODIOSnafu)?;
525
526 let tock_time = Epoch::now().unwrap() - tick;
528 info!(
529 "PNT results written to {} in {tock_time}",
530 path_buf.display()
531 );
532 Ok(path_buf)
533 }
534}