1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::DefaultAllocator;
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::od::groundpnt::GroundAsset;
27use crate::od::interlink::InterlinkTxSpacecraft;
28use crate::od::process::ODSolution;
29use crate::State;
30use crate::{od::*, Spacecraft};
31use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
32use arrow::datatypes::{DataType, Field, Schema};
33use arrow::record_batch::RecordBatch;
34use hifitime::TimeScale;
35use log::{info, warn};
36use nalgebra::{Const, U2};
37use parquet::arrow::ArrowWriter;
38use snafu::prelude::*;
39use std::collections::HashMap;
40use std::fs::File;
41use std::path::{Path, PathBuf};
42
43impl ODSolution<GroundAsset, KfEstimate<GroundAsset>, U2, InterlinkTxSpacecraft>
44where
45 DefaultAllocator: Allocator<U2>
46 + Allocator<U2, <Spacecraft as State>::Size>
47 + Allocator<Const<1>, U2>
48 + Allocator<<Spacecraft as State>::Size>
49 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
50 + Allocator<U2, U2>
51 + Allocator<U2, <Spacecraft as State>::Size>
52 + Allocator<<Spacecraft as State>::Size, U2>
53 + Allocator<<Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::VecLength>
55 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
56{
57 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
59 ensure!(
60 !self.estimates.is_empty(),
61 TooFewMeasurementsSnafu {
62 need: 1_usize,
63 action: "exporting PNT results"
64 }
65 );
66
67 if self.estimates.len() != self.residuals.len() {
68 return Err(ODError::ODConfigError {
69 source: ConfigError::InvalidConfig {
70 msg: format!(
71 "Estimates ({}) and residuals ({}) are not aligned.",
72 self.estimates.len(),
73 self.residuals.len()
74 ),
75 },
76 });
77 }
78
79 if self.estimates.len() != self.gains.len() {
80 return Err(ODError::ODConfigError {
81 source: ConfigError::InvalidConfig {
82 msg: format!(
83 "Estimates ({}) and filter gains ({}) are not aligned.",
84 self.estimates.len(),
85 self.gains.len()
86 ),
87 },
88 });
89 }
90
91 if self.estimates.len() != self.filter_smoother_ratios.len() {
92 return Err(ODError::ODConfigError {
93 source: ConfigError::InvalidConfig {
94 msg: format!(
95 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
96 self.estimates.len(),
97 self.filter_smoother_ratios.len()
98 ),
99 },
100 });
101 }
102
103 let tick = Epoch::now().unwrap();
104 info!("Exporting orbit determination result to parquet file...");
105
106 if cfg.step.is_some() {
107 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
108 }
109
110 let path_buf = cfg.actual_path(path);
112
113 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
115
116 let frame = self.estimates[0].state().frame();
117
118 let more_meta = Some(vec![(
119 "Frame".to_string(),
120 serde_dhall::serialize(&frame)
121 .static_type_annotation()
122 .to_string()
123 .map_err(|e| ODError::ODIOError {
124 source: InputOutputError::SerializeDhall {
125 what: format!("frame `{frame}`"),
126 err: e.to_string(),
127 },
128 })?,
129 )]);
130
131 let mut fields = GroundAsset::export_params();
132
133 fields.retain(|param| match self.estimates[0].state().value(*param) {
135 Ok(_) => param != &StateParameter::GuidanceMode(),
136 Err(_) => false,
137 });
138
139 for field in &fields {
140 hdrs.push(field.to_field(more_meta.clone()));
141 }
142
143 let mut sigma_fields = fields.clone();
144 sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
146
147 for field in &sigma_fields {
148 hdrs.push(field.to_cov_field(more_meta.clone()));
149 }
150
151 let state_items = [
153 "Latitude",
154 "Longitude",
155 "Altitude",
156 ];
160
161 let state_units = ["deg", "deg", "km"];
162 let mut cov_units = vec![];
163
164 for i in 0..state_items.len() {
165 for j in i..state_items.len() {
166 let cov_unit = format!("{}*{}", state_units[i], state_units[j]);
167
168 cov_units.push(cov_unit);
169 }
170 }
171
172 let mut idx = 0;
173 for i in 0..state_items.len() {
174 for j in i..state_items.len() {
175 hdrs.push(Field::new(
176 format!(
177 "Covariance {}*{} ({frame:x}) ({})",
178 state_items[i], state_items[j], cov_units[idx]
179 ),
180 DataType::Float64,
181 false,
182 ));
183 idx += 1;
184 }
185 }
186
187 let mut msr_fields = Vec::new();
189 for f in &self.measurement_types {
190 msr_fields.push(
191 f.to_field()
192 .with_nullable(true)
193 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
194 );
195 }
196 for f in &self.measurement_types {
197 msr_fields.push(
198 f.to_field()
199 .with_nullable(true)
200 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
201 );
202 }
203 for f in &self.measurement_types {
204 msr_fields.push(
205 f.to_field()
206 .with_nullable(true)
207 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
208 );
209 }
210 for f in &self.measurement_types {
211 msr_fields.push(
212 f.to_field()
213 .with_nullable(true)
214 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
215 );
216 }
217 for f in &self.measurement_types {
218 msr_fields.push(
219 f.to_field()
220 .with_nullable(true)
221 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
222 );
223 }
224
225 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
226 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
227 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
228
229 hdrs.append(&mut msr_fields);
230
231 for i in 0..state_items.len() {
233 for f in &self.measurement_types {
234 hdrs.push(Field::new(
235 format!(
236 "Gain {}*{f:?} ({}*{}*{})",
237 state_items[i],
238 state_units[i],
239 state_units[i],
240 f.unit()
241 ),
242 DataType::Float64,
243 true,
244 ));
245 }
246 }
247
248 for i in 0..state_items.len() {
250 hdrs.push(Field::new(
251 format!(
252 "Filter-smoother ratio {} ({}*{})",
253 state_items[i], state_units[i], state_units[i],
254 ),
255 DataType::Float64,
256 true,
257 ));
258 }
259
260 let schema = Arc::new(Schema::new(hdrs));
262 let mut record: Vec<Arc<dyn Array>> = Vec::new();
263
264 let (estimates, residuals) =
266 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
267 let start = cfg
269 .start_epoch
270 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
271 let end = cfg
272 .end_epoch
273 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
274
275 let mut residuals: Vec<Option<Residual<U2>>> =
276 Vec::with_capacity(self.residuals.len());
277 let mut estimates = Vec::with_capacity(self.estimates.len());
278
279 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
280 if estimate.epoch() >= start && estimate.epoch() <= end {
281 estimates.push(*estimate);
282 residuals.push(residual.clone());
283 }
284 }
285
286 (estimates, residuals)
287 } else {
288 (self.estimates.to_vec(), self.residuals.to_vec())
289 };
290
291 let mut utc_epoch = StringBuilder::new();
295 for s in &estimates {
296 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
297 }
298 record.push(Arc::new(utc_epoch.finish()));
299
300 for field in fields {
302 let mut data = Float64Builder::new();
303 for s in &estimates {
304 data.append_value(
305 s.state()
306 .value(field)
307 .context(ODStateSnafu { action: "export" })?,
308 );
309 }
310 record.push(Arc::new(data.finish()));
311 }
312
313 for i in 0..state_items.len() {
315 let mut data = Float64Builder::new();
316 for s in &estimates {
317 data.append_value(s.covar()[(i, i)].sqrt());
318 }
319 record.push(Arc::new(data.finish()));
320 }
321
322 for i in 0..state_items.len() {
324 for j in i..state_items.len() {
325 let mut data = Float64Builder::new();
326 for s in &estimates {
327 data.append_value(s.covar()[(i, j)]);
328 }
329 record.push(Arc::new(data.finish()));
330 }
331 }
332
333 for msr_type in &self.measurement_types {
336 let mut data = Float64Builder::new();
337 for resid_opt in &residuals {
338 if let Some(resid) = resid_opt {
339 match resid.prefit(*msr_type) {
340 Some(prefit) => data.append_value(prefit),
341 None => data.append_null(),
342 };
343 } else {
344 data.append_null();
345 }
346 }
347 record.push(Arc::new(data.finish()));
348 }
349 for msr_type in &self.measurement_types {
351 let mut data = Float64Builder::new();
352 for resid_opt in &residuals {
353 if let Some(resid) = resid_opt {
354 match resid.postfit(*msr_type) {
355 Some(postfit) => data.append_value(postfit),
356 None => data.append_null(),
357 };
358 } else {
359 data.append_null();
360 }
361 }
362 record.push(Arc::new(data.finish()));
363 }
364
365 for msr_type in &self.measurement_types {
367 let mut data = Float64Builder::new();
368 for resid_opt in &residuals {
369 if let Some(resid) = resid_opt {
370 match resid.trk_noise(*msr_type) {
371 Some(noise) => data.append_value(noise),
372 None => data.append_null(),
373 };
374 } else {
375 data.append_null();
376 }
377 }
378 record.push(Arc::new(data.finish()));
379 }
380
381 for msr_type in &self.measurement_types {
383 let mut data = Float64Builder::new();
384 for resid_opt in &residuals {
385 if let Some(resid) = resid_opt {
386 match resid.real_obs(*msr_type) {
387 Some(postfit) => data.append_value(postfit),
388 None => data.append_null(),
389 };
390 } else {
391 data.append_null();
392 }
393 }
394 record.push(Arc::new(data.finish()));
395 }
396
397 for msr_type in &self.measurement_types {
399 let mut data = Float64Builder::new();
400 for resid_opt in &residuals {
401 if let Some(resid) = resid_opt {
402 match resid.computed_obs(*msr_type) {
403 Some(postfit) => data.append_value(postfit),
404 None => data.append_null(),
405 };
406 } else {
407 data.append_null();
408 }
409 }
410 record.push(Arc::new(data.finish()));
411 }
412
413 let mut data = Float64Builder::new();
415 for resid_opt in &residuals {
416 if let Some(resid) = resid_opt {
417 data.append_value(resid.ratio);
418 } else {
419 data.append_null();
420 }
421 }
422 record.push(Arc::new(data.finish()));
423
424 let mut data = BooleanBuilder::new();
426 for resid_opt in &residuals {
427 if let Some(resid) = resid_opt {
428 data.append_value(resid.rejected);
429 } else {
430 data.append_null();
431 }
432 }
433 record.push(Arc::new(data.finish()));
434
435 let mut data = StringBuilder::new();
437 for resid_opt in &residuals {
438 if let Some(resid) = resid_opt {
439 data.append_value(
440 resid
441 .tracker
442 .clone()
443 .unwrap_or("Undefined tracker".to_string()),
444 );
445 } else {
446 data.append_null();
447 }
448 }
449 record.push(Arc::new(data.finish()));
450
451 for i in 0..state_items.len() {
453 for j in 0..2 {
454 let mut data = Float64Builder::new();
455 for opt_k in &self.gains {
456 if let Some(k) = opt_k {
457 data.append_value(k[(i, j)]);
458 } else {
459 data.append_null();
460 }
461 }
462 record.push(Arc::new(data.finish()));
463 }
464 }
465
466 for i in 0..state_items.len() {
468 let mut data = Float64Builder::new();
469 for opt_fsr in &self.filter_smoother_ratios {
470 if let Some(fsr) = opt_fsr {
471 data.append_value(fsr[i]);
472 } else {
473 data.append_null();
474 }
475 }
476 record.push(Arc::new(data.finish()));
477 }
478
479 info!("Serialized {} estimates and residuals", estimates.len());
480
481 let mut metadata = HashMap::new();
483 metadata.insert("Purpose".to_string(), "PNT results".to_string());
484 if let Some(add_meta) = cfg.metadata {
485 for (k, v) in add_meta {
486 metadata.insert(k, v);
487 }
488 }
489
490 let props = pq_writer(Some(metadata));
491
492 let file = File::create(&path_buf)
493 .context(StdIOSnafu {
494 action: "creating PNT results file",
495 })
496 .context(ODIOSnafu)?;
497
498 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
499 .context(ParquetSnafu {
500 action: "exporting PNT results",
501 })
502 .context(ODIOSnafu)?;
503
504 let batch = RecordBatch::try_new(schema, record)
505 .context(ArrowSnafu {
506 action: "writing PNT results (building batch record)",
507 })
508 .context(ODIOSnafu)?;
509
510 writer
511 .write(&batch)
512 .context(ParquetSnafu {
513 action: "writing PNT results",
514 })
515 .context(ODIOSnafu)?;
516
517 writer
518 .close()
519 .context(ParquetSnafu {
520 action: "closing PNT results file",
521 })
522 .context(ODIOSnafu)?;
523
524 let tock_time = Epoch::now().unwrap() - tick;
526 info!(
527 "PNT results written to {} in {tock_time}",
528 path_buf.display()
529 );
530 Ok(path_buf)
531 }
532}