nyx_space/od/process/solution/
import.rs1use crate::Spacecraft;
24use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
25use crate::linalg::allocator::Allocator;
26use crate::linalg::{Const, DefaultAllocator, DimName, OMatrix, OVector, SMatrix};
27use crate::od::estimate::*;
28use crate::od::msr::MeasurementType;
29use crate::od::*;
30use anise::frames::Frame;
31use anise::prelude::Orbit;
32use anise::structure::spacecraft::{DragData, Mass, SRPData};
33use arrow::array::RecordBatchReader;
34use arrow::array::{Array, BooleanArray, Float64Array, StringArray};
35use hifitime::Epoch;
36use indexmap::IndexSet;
37use log::{info, warn};
38use msr::sensitivity::TrackerSensitivity;
39use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
40use snafu::prelude::*;
41use std::collections::{BTreeMap, HashMap};
42use std::fs::File;
43use std::path::Path;
44use std::str::FromStr;
45use std::sync::Arc;
46
47use super::ODSolution; impl<MsrSize, Trk> ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
52where
53 MsrSize: DimName + std::fmt::Debug + Clone, Trk: TrackerSensitivity<Spacecraft, Spacecraft> + Clone, DefaultAllocator: Allocator<MsrSize>
57 + Allocator<MsrSize, <Spacecraft as State>::Size>
58 + Allocator<Const<1>, MsrSize>
59 + Allocator<<Spacecraft as State>::Size>
60 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
61 + Allocator<MsrSize, MsrSize>
62 + Allocator<<Spacecraft as State>::Size, MsrSize>
63 + Allocator<<Spacecraft as State>::VecLength>,
64 <DefaultAllocator as Allocator<<Spacecraft as State>::VecLength>>::Buffer<f64>: Send,
65 <DefaultAllocator as Allocator<<Spacecraft as State>::Size>>::Buffer<f64>: Copy,
66 <DefaultAllocator as Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>>::Buffer<f64>: Copy,
67
68{
69 pub fn from_parquet<P: AsRef<Path>>(
77 path: P,
78 devices: BTreeMap<String, Trk>,
79 ) -> Result<Self, InputOutputError> {
80
81
82 let file = File::open(&path).context(StdIOSnafu {
83 action: "opening OD solution file",
84 })?;
85
86 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
87
88 let mut metadata = HashMap::new();
89 if let Some(file_metadata) = builder.metadata().file_metadata().key_value_metadata() {
91 for key_value in file_metadata {
92 if !key_value.key.starts_with("ARROW:") {
93 metadata.insert(
94 key_value.key.clone(),
95 key_value.value.clone().unwrap_or("[unset]".to_string()),
96 );
97 }
98 }
99 }
100
101 let mut has_epoch = false; let mut frame = None;
104 let mut srp_area_m2 = None;
105 let mut drag_area_m2 = None;
106
107 let schema = builder.schema().clone();
108
109 let reader = builder.build().context(ParquetSnafu {
110 action: "building Parquet reader for OD results",
111 })?;
112
113 for field in &reader.schema().fields {
114 if field.name().as_str() == "Epoch (UTC)" {
115 has_epoch = true;
116 } else {
117 if let Some(frame_info) = field.metadata().get("Frame") {
118 match serde_dhall::from_str(frame_info).parse::<Frame>() {
120 Err(e) => {
121 return Err(InputOutputError::ParseDhall {
122 data: frame_info.to_string(),
123 err: format!("{e}"),
124 })
125 }
126 Ok(deser_frame) => frame = Some(deser_frame),
127 };
128 }
129 if let Some(info) = field.metadata().get("SRP Area (m2)") {
130 srp_area_m2 = Some(info.parse::<f64>().unwrap_or(0.0));
131 }
132 if let Some(info) = field.metadata().get("Drag Area (m2)"){
133 drag_area_m2 = Some(info.parse::<f64>().unwrap_or(0.0));
134 }
135 }
136 }
137
138 ensure!(
139 has_epoch,
140 MissingDataSnafu {
141 which: "Epoch (UTC)"
142 }
143 );
144
145 ensure!(
146 frame.is_some(),
147 MissingDataSnafu {
148 which: "Frame in metadata"
149 }
150 );
151
152
153 let mut estimates: Vec<KfEstimate<Spacecraft>> = Vec::new();
154 let mut residuals: Vec<Option<Residual<MsrSize>>> = Vec::new();
155 let mut gains: Vec<Option<OMatrix<f64, <Spacecraft as State>::Size, MsrSize>>> = Vec::new();
156 let mut filter_smoother_ratios: Vec<Option<OVector<f64, <Spacecraft as State>::Size>>> =
157 Vec::new();
158 let mut measurement_types_found = IndexSet::new();
159
160 let state_size = <Spacecraft as State>::Size::DIM;
161
162 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
164 let mut cov_units = vec![];
165
166 for i in 0..state_items.len() {
167 for j in i..state_items.len() {
168 let cov_unit = if i < 3 {
169 if j < 3 {
170 "km^2"
171 } else if (3..6).contains(&j) {
172 "km^2/s"
173 } else if j == 8 {
174 "km*kg"
175 } else {
176 "km"
177 }
178 } else if (3..6).contains(&i) {
179 if (3..6).contains(&j) {
180 "km^2/s^2"
181 } else if j == 8 {
182 "km/s*kg"
183 } else {
184 "km/s"
185 }
186 } else if i == 8 || j == 8 {
187 "kg^2"
188 } else {
189 "unitless"
190 };
191
192 cov_units.push(cov_unit);
193 }
194 }
195
196 for field in schema.fields() {
199 if let Some(msr_type_str) = field.name().strip_prefix("Prefit residual: ")
200 && let Some(bracket_pos) = msr_type_str.find(" (") {
201 let type_name = &msr_type_str[..bracket_pos];
202 if let Ok(msr_type) = MeasurementType::from_str(type_name) {
203 measurement_types_found.insert(msr_type);
204 } else {
205 warn!("Could not parse measurement type from column: {}", field.name());
206 }
207 }
208 }
209 if measurement_types_found.is_empty() {
210 warn!("Could not automatically detect any measurement types from Parquet column names. Residuals may not be loaded correctly.");
211 } else {
212 info!("Detected measurement types: {measurement_types_found:?}");
213 }
214
215
216
217 for record_batch in reader {
219 let batch = record_batch.context(ArrowSnafu {
220 action: "reading record batch from OD results",
221 })?;
222
223 let num_rows = batch.num_rows();
224
225 let get_col = |name: &str| -> Result<Arc<dyn Array>, InputOutputError> {
227 batch
228 .column_by_name(name)
229 .ok_or_else(|| InputOutputError::MissingData {
230 which: format!("column '{name}' in OD results"),
231 })
232 .cloned() };
234
235 let epoch_col = get_col("Epoch (UTC)")?
238 .as_any()
239 .downcast_ref::<StringArray>()
240 .ok_or_else(|| InputOutputError::ArrowError {
241 action: "downcasting Epoch column",
242 source: arrow::error::ArrowError::CastError("Could not cast Epoch to StringArray".to_string()),
243 })?.clone(); let x_col = get_col("X (km)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting X", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
247 let y_col = get_col("Y (km)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting Y", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
248 let z_col = get_col("Z (km)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting Z", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
249 let vx_col = get_col("VX (km/s)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting VX", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
250 let vy_col = get_col("VY (km/s)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting VY", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
251 let vz_col = get_col("VZ (km/s)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting VZ", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
252
253 let cr_col = get_col("cr")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting Cr (unitless)", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
254 let cd_col = get_col("cd")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting Cd (unitless)", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
255
256 let dry_mass_col = get_col("dry_mass (kg)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting dry_mass (kg)", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
257 let prop_mass_col = get_col("prop_mass (kg)")?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting prop_mass (kg)", source: arrow::error::ArrowError::CastError("".to_string())})?.clone();
258
259 let mut cov_cols = Vec::new();
261 for i in 0..state_size {
262 for j in i..state_size {
263 let base_name = format!("Covariance {}*{}", state_items[i], state_items[j]);
266 let col_name = schema.fields().iter()
268 .find(|f| f.name().starts_with(&base_name))
269 .map(|f| f.name().as_str())
270 .ok_or_else(|| InputOutputError::ParquetError {
271 action: "seeking covariance column",
272 source: parquet::errors::ParquetError::General("Column not found".to_string()),
273 })?;
274 cov_cols.push(get_col(col_name)?.as_any().downcast_ref::<Float64Array>().ok_or_else(|| InputOutputError::ArrowError{action: "downcasting covariance column", source: arrow::error::ArrowError::CastError("".to_string())})?.clone());
275 }
276 }
277
278
279 let rejected_col = get_col("Residual Rejected").ok().and_then(|arr| arr.as_any().downcast_ref::<BooleanArray>().cloned());
281 let tracker_col = get_col("Tracker").ok().and_then(|arr| arr.as_any().downcast_ref::<StringArray>().cloned());
282 let ratio_col = get_col("Residual ratio").ok().and_then(|arr| arr.as_any().downcast_ref::<Float64Array>().cloned());
283
284 let mut residual_data_cols: HashMap<MeasurementType, BTreeMap<String, Float64Array>> = HashMap::new();
285 for msr_type in &measurement_types_found {
286 let mut type_cols = BTreeMap::new();
287 let prefixes = ["Prefit residual", "Whitened residual", "Postfit residual", "Measurement noise", "Real observation", "Computed observation"];
288 for prefix in prefixes {
289 let base_name = format!("{}: {:?} ({})", prefix, msr_type, msr_type.unit());
291 if let Ok(col) = get_col(&base_name)
292 && let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
293 type_cols.insert(prefix.to_string(), arr.clone());
294 }
295 }
296 residual_data_cols.insert(*msr_type, type_cols);
297 }
298
299 let mut gain_cols: Vec<Option<Float64Array>> = Vec::new();
301 let mut gain_available = true;
302 for i in 0..state_size {
303 for f in &measurement_types_found {
304 let base_name = format!(
306 "Gain {}*{f:?} ({}*{})",
307 state_items[i],
308 cov_units[i],
309 f.unit()
310 );
311 let col_name = schema.fields().iter()
312 .find(|f| f.name().starts_with(&base_name))
313 .map(|f| f.name().as_str());
314
315 if let Some(name) = col_name {
316 if let Ok(col) = get_col(name) {
317 gain_cols.push(col.as_any().downcast_ref::<Float64Array>().cloned());
318 } else {
319 gain_cols.push(None); gain_available = false;
321 }
322 } else {
323 gain_available = false;
325 break; }
327 }
328 if !gain_available { break; }
329 }
330 if !gain_available { gain_cols.clear(); } let mut fsr_cols: Vec<Option<Float64Array>> = Vec::new();
335 let mut fsr_available = true;
336 for state_item in state_items.iter().take(state_size) {
338 let base_name = format!("Filter-smoother ratio {state_item}");
340 let col_name = schema.fields().iter()
341 .find(|f| f.name().starts_with(&base_name))
342 .map(|f| f.name().as_str());
343
344 if let Some(name) = col_name {
345 if let Ok(col) = get_col(name) {
346 fsr_cols.push(col.as_any().downcast_ref::<Float64Array>().cloned());
347 } else {
348 fsr_cols.push(None);
349 fsr_available = false;
350 }
351 } else {
352 fsr_available = false;
353 break;
354 }
355 }
356 if !fsr_available { fsr_cols.clear(); }
357
358
359 for i in 0..num_rows {
361
362 let epoch = Epoch::from_gregorian_str(epoch_col.value(i)).map_err(|e| {
363 InputOutputError::Inconsistency {
364 msg: format!("{e} when parsing epoch"),
365 }
366 })?;
367
368 let nominal_state = Spacecraft::builder().orbit(
370 Orbit::cartesian(
371 x_col.value(i),
372 y_col.value(i),
373 z_col.value(i),
374 vx_col.value(i),
375 vy_col.value(i),
376 vz_col.value(i),
377 epoch,
378 frame.expect("somehow frame isn't set")
379 )).mass(
380 Mass::from_dry_and_prop_masses(
381 dry_mass_col.value(i),
382 prop_mass_col.value(i))
383 ).srp(SRPData {
384 area_m2: srp_area_m2.expect("somehow srp area isn't set"),
385 coeff_reflectivity: cr_col.value(i)
386 }).drag(DragData{
387 area_m2: drag_area_m2.expect("somehow dragarea isn't set"),
388 coeff_drag: cd_col.value(i)
389 }).build();
390
391 let mut covar = SMatrix::<f64, 9, 9>::zeros();
393 let mut cov_col_idx = 0;
394 for row in 0..state_size {
395 for col in row..state_size {
396 let val = cov_cols[cov_col_idx].value(i);
397 covar[(row, col)] = val;
398 if row != col {
399 covar[(col, row)] = val; }
401 cov_col_idx += 1;
402 }
403 }
404
405 let estimate = KfEstimate {
407 nominal_state,
408 state_deviation: OVector::<f64, Const<9>>::zeros(), covar,
410 covar_bar: covar, stm: OMatrix::<f64, Const<9>, Const<9>>::identity(), predicted: false, };
414 estimates.push(estimate);
415
416 let is_rejected_opt = rejected_col.as_ref().and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
418 let tracker_opt = tracker_col.as_ref().and_then(|col| if col.is_valid(i) { Some(col.value(i).to_string()) } else { None });
419 let ratio_opt = ratio_col.as_ref().and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
420
421 let current_residual: Option<Residual<MsrSize>> = if let (Some(is_rejected), Some(tracker), Some(ratio)) = (is_rejected_opt, tracker_opt.clone(), ratio_opt) {
422 let mut prefit_vec = OVector::<f64, MsrSize>::zeros();
424 let mut whitened_vec = OVector::<f64, MsrSize>::zeros();
425 let mut postfit_vec = OVector::<f64, MsrSize>::zeros();
426 let mut noise_vec = OVector::<f64, MsrSize>::zeros();
427 let mut real_obs_vec = OVector::<f64, MsrSize>::zeros();
428 let mut comp_obs_vec = OVector::<f64, MsrSize>::zeros();
429 let mut current_msr_types = IndexSet::with_capacity(MsrSize::DIM);
430
431 let mut msr_idx = 0;
432 for (msr_type, type_cols) in &residual_data_cols {
433 if msr_idx >= MsrSize::DIM { break; } let prefit_val = type_cols.get("Prefit residual").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
437 let whitened_resid_val = type_cols.get("Whitened residual").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
438 let postfit_val = type_cols.get("Postfit residual").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
439 let noise_val = type_cols.get("Measurement noise").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
440 let real_val = type_cols.get("Real observation").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
441 let comp_val = type_cols.get("Computed observation").and_then(|col| if col.is_valid(i) { Some(col.value(i)) } else { None });
442
443 if prefit_val.is_some() || whitened_resid_val.is_some() || postfit_val.is_some() || noise_val.is_some() || real_val.is_some() || comp_val.is_some() {
445 prefit_vec[msr_idx] = prefit_val.unwrap_or(f64::NAN);
446 whitened_vec[msr_idx] = whitened_resid_val.unwrap_or(f64::NAN);
447 postfit_vec[msr_idx] = postfit_val.unwrap_or(f64::NAN);
448 noise_vec[msr_idx] = noise_val.unwrap_or(f64::NAN);
449 real_obs_vec[msr_idx] = real_val.unwrap_or(f64::NAN);
450 comp_obs_vec[msr_idx] = comp_val.unwrap_or(f64::NAN);
451 current_msr_types.insert(*msr_type);
452 msr_idx += 1;
453 }
454 }
455
456 let resid = Residual {
461 epoch,
462 prefit: prefit_vec,
463 whitened_resid: whitened_vec,
464 postfit: postfit_vec,
465 tracker_msr_noise: noise_vec,
466 ratio,
467 real_obs: real_obs_vec,
468 computed_obs: comp_obs_vec,
469 msr_types: current_msr_types, rejected: is_rejected,
471 tracker: Some(tracker),
472 };
473 Some(resid)
474
475 } else {
476 None
478 };
479 residuals.push(current_residual);
480
481
482 let current_gain: Option<OMatrix<f64, <Spacecraft as State>::Size, MsrSize>> = if gain_available && !gain_cols.is_empty() {
484 let mut gain_mat = OMatrix::<f64, <Spacecraft as State>::Size, MsrSize>::zeros();
485 let mut all_valid = true;
486 let mut col_idx = 0;
487 'gain_outer: for row in 0..state_size {
488 for col in 0..MsrSize::DIM {
489 if let Some(gain_col) = &gain_cols[col_idx] {
490 if gain_col.is_valid(i) {
491 gain_mat[(row, col)] = gain_col.value(i);
492 } else {
493 all_valid = false;
494 break 'gain_outer; }
496 } else {
497 all_valid = false;
499 break 'gain_outer;
500 }
501 col_idx += 1;
502 }
503 }
504 if all_valid { Some(gain_mat) } else { None }
505 } else { None };
506 gains.push(current_gain);
507
508 let current_fsr: Option<OVector<f64, <Spacecraft as State>::Size>> = if fsr_available && !fsr_cols.is_empty() {
510 let mut fsr_vec = OVector::<f64, <Spacecraft as State>::Size>::zeros();
511 let mut all_valid = true;
512 for k in 0..state_size {
513 if let Some(fsr_col) = &fsr_cols[k] {
514 if fsr_col.is_valid(i) {
515 fsr_vec[k] = fsr_col.value(i);
516 } else {
517 all_valid = false;
518 break;
519 }
520 } else {
521 all_valid = false;
522 break;
523 }
524 }
525 if all_valid { Some(fsr_vec) } else { None }
526 } else { None };
527 filter_smoother_ratios.push(current_fsr);
528
529 } } Ok(ODSolution {
535 estimates,
536 residuals,
537 gains,
538 filter_smoother_ratios,
539 devices, measurement_types: measurement_types_found, })
542 }
543}