nyx_space/od/msr/trackingdata/
mod.rs1use super::{measurement::Measurement, MeasurementType};
19use core::fmt;
20use hifitime::prelude::{Duration, Epoch};
21use indexmap::{IndexMap, IndexSet};
22use log::{error, info, warn};
23use std::collections::btree_map::Entry;
24use std::collections::BTreeMap;
25use std::ops::Bound::{self, Excluded, Included, Unbounded};
26use std::ops::{Add, AddAssign, RangeBounds};
27
28mod io_ccsds_tdm;
29mod io_parquet;
30
31#[cfg(feature = "python")]
32use pyo3::prelude::*;
33#[cfg(feature = "python")]
34mod python;
35
36#[derive(Clone, Default)]
77#[cfg_attr(feature = "python", pyclass)]
78pub struct TrackingDataArc {
79 pub measurements: BTreeMap<Epoch, Measurement>, pub source: Option<String>,
83 pub moduli: Option<IndexMap<MeasurementType, f64>>,
85 pub force_reject: bool,
87}
88
89#[cfg_attr(feature = "python", pymethods)]
90impl TrackingDataArc {
91 pub fn start_epoch(&self) -> Option<Epoch> {
93 self.measurements.first_key_value().map(|(k, _)| *k)
94 }
95
96 pub fn end_epoch(&self) -> Option<Epoch> {
98 self.measurements.last_key_value().map(|(k, _)| *k)
99 }
100
101 pub fn duration(&self) -> Option<Duration> {
103 match self.start_epoch() {
104 Some(start) => self.end_epoch().map(|end| end - start),
105 None => None,
106 }
107 }
108
109 pub fn len(&self) -> usize {
111 self.measurements.len()
112 }
113
114 pub fn is_empty(&self) -> bool {
116 self.measurements.is_empty()
117 }
118
119 pub fn min_duration_sep(&self) -> Option<Duration> {
121 if self.is_empty() {
122 None
123 } else {
124 let mut min_sep = Duration::MAX;
125 let mut prev_epoch = self.start_epoch().unwrap();
126 for (epoch, _) in self.measurements.iter().skip(1) {
127 let this_sep = *epoch - prev_epoch;
128 min_sep = min_sep.min(this_sep);
129 prev_epoch = *epoch;
130 }
131 Some(min_sep)
132 }
133 }
134}
135
136impl TrackingDataArc {
137 pub fn set_moduli(&mut self, msr_type: MeasurementType, modulus: f64) {
139 if modulus.is_nan() || modulus.abs() < f64::EPSILON {
140 warn!("cannot set modulus for {msr_type:?} to {modulus}");
141 return;
142 }
143 if self.moduli.is_none() {
144 self.moduli = Some(IndexMap::new());
145 }
146
147 self.moduli.as_mut().unwrap().insert(msr_type, modulus);
148 }
149
150 pub fn apply_moduli(&mut self) {
152 if let Some(moduli) = &self.moduli {
153 for msr in self.measurements.values_mut() {
154 for (msr_type, modulus) in moduli {
155 if let Some(msr_value) = msr.data.get_mut(msr_type) {
156 *msr_value %= *modulus;
157 }
158 }
159 }
160 }
161 }
162
163 pub fn unique_aliases(&self) -> IndexSet<String> {
165 self.unique().0
166 }
167
168 pub fn unique_types(&self) -> IndexSet<MeasurementType> {
170 self.unique().1
171 }
172
173 pub fn unique(&self) -> (IndexSet<String>, IndexSet<MeasurementType>) {
175 let mut aliases = IndexSet::new();
176 let mut types = IndexSet::new();
177 for msr in self.measurements.values() {
178 aliases.insert(msr.tracker.clone());
179 for k in msr.data.keys() {
180 types.insert(*k);
181 }
182 }
183 (aliases, types)
184 }
185
186 pub fn filter_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
188 self.measurements = self
189 .measurements
190 .range(bound)
191 .map(|(epoch, msr)| (*epoch, msr.clone()))
192 .collect::<BTreeMap<Epoch, Measurement>>();
193 self
194 }
195
196 pub fn filter_by_offset<R: RangeBounds<Duration>>(self, bound: R) -> Self {
199 if self.is_empty() {
200 return self;
201 }
202 let start = match bound.start_bound() {
204 Unbounded => self.start_epoch().unwrap(),
205 Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
206 };
207
208 let end = match bound.end_bound() {
209 Unbounded => self.end_epoch().unwrap(),
210 Included(offset) | Excluded(offset) => self.start_epoch().unwrap() + *offset,
211 };
212
213 self.filter_by_epoch(start..end)
214 }
215
216 pub fn filter_by_tracker(mut self, tracker: String) -> Self {
218 self.measurements = self
219 .measurements
220 .iter()
221 .filter_map(|(epoch, msr)| {
222 if msr.tracker == tracker {
223 Some((*epoch, msr.clone()))
224 } else {
225 None
226 }
227 })
228 .collect::<BTreeMap<Epoch, Measurement>>();
229 self
230 }
231
232 pub fn filter_by_measurement_type(mut self, included_type: MeasurementType) -> Self {
234 self.measurements.retain(|_epoch, msr| {
235 msr.data.retain(|msr_type, _| *msr_type == included_type);
236 !msr.data.is_empty()
237 });
238 self
239 }
240
241 pub fn exclude_tracker(mut self, excluded_tracker: String) -> Self {
243 self.measurements = self
244 .measurements
245 .iter()
246 .filter_map(|(epoch, msr)| {
247 if msr.tracker != excluded_tracker {
248 Some((*epoch, msr.clone()))
249 } else {
250 None
251 }
252 })
253 .collect::<BTreeMap<Epoch, Measurement>>();
254 self
255 }
256
257 pub fn exclude_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
259 info!(
260 "Excluding measurements from {:?} to {:?}",
261 bound.start_bound(),
262 bound.end_bound()
263 );
264
265 let mut new_measurements = BTreeMap::new();
266
267 new_measurements.extend(
269 self.measurements
270 .range((Bound::Unbounded, bound.start_bound()))
271 .map(|(epoch, msr)| (*epoch, msr.clone())),
272 );
273
274 new_measurements.extend(
276 self.measurements
277 .range((bound.end_bound(), Bound::Unbounded))
278 .map(|(epoch, msr)| (*epoch, msr.clone())),
279 );
280
281 self.measurements = new_measurements;
282 self
283 }
284
285 pub fn exclude_measurement_type(mut self, excluded_type: MeasurementType) -> Self {
287 self.measurements = self
288 .measurements
289 .iter_mut()
290 .map(|(epoch, msr)| {
291 msr.data.retain(|msr_type, _| *msr_type != excluded_type);
292
293 (*epoch, msr.clone())
294 })
295 .collect::<BTreeMap<Epoch, Measurement>>();
296 self
297 }
298
299 pub fn reject_by_epoch<R: RangeBounds<Epoch>>(mut self, bound: R) -> Self {
301 for (_epoch, msr) in self.measurements.range_mut(bound) {
302 msr.rejected = true;
303 }
304 self
305 }
306
307 pub fn reject_by_tracker(mut self, tracker: String) -> Self {
309 for msr in self.measurements.values_mut() {
310 if msr.tracker == tracker {
311 msr.rejected = true;
312 }
313 }
314 self
315 }
316
317 pub fn downsample(self, target_step: Duration) -> Self {
345 if self.is_empty() {
346 return self;
347 }
348 let current_step = self.min_duration_sep().unwrap();
349
350 if current_step >= target_step {
351 warn!("cannot downsample tracking data from {current_step} to {target_step} (that would be upsampling)");
352 return self;
353 }
354
355 let current_hz = 1.0 / current_step.to_seconds();
356 let target_hz = 1.0 / target_step.to_seconds();
357
358 let window_size = (current_hz / target_hz).round() as usize;
360
361 info!("downsampling tracking data from {current_step} ({current_hz:.6} Hz) to {target_step} ({target_hz:.6} Hz) (N = {window_size})");
362
363 let mut result = TrackingDataArc {
364 source: self.source.clone(),
365 ..Default::default()
366 };
367
368 let measurements: Vec<_> = self.measurements.iter().collect();
369
370 for (i, (epoch, _)) in measurements.iter().enumerate().step_by(window_size) {
371 let start = i.saturating_sub(window_size / 2);
372 let end = (i + window_size / 2 + 1).min(measurements.len());
373 let window = &measurements[start..end];
374
375 let mut filtered_measurement = Measurement {
376 tracker: window[0].1.tracker.clone(),
377 epoch: **epoch,
378 data: IndexMap::new(),
379 rejected: false,
380 };
381
382 for mtype in self.unique_types() {
384 let sum: f64 = window.iter().filter_map(|(_, m)| m.data.get(&mtype)).sum();
385 let count = window
386 .iter()
387 .filter(|(_, m)| m.data.contains_key(&mtype))
388 .count();
389
390 if count > 0 {
391 filtered_measurement.data.insert(mtype, sum / count as f64);
392 }
393 }
394
395 result.measurements.insert(**epoch, filtered_measurement);
396 }
397 result
398 }
399
400 pub fn resid_vs_ref_check(mut self) -> Self {
401 self.force_reject = true;
402 self
403 }
404}
405
406impl fmt::Display for TrackingDataArc {
407 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
408 if self.is_empty() {
409 write!(f, "Empty tracking arc")
410 } else {
411 let start = self.start_epoch().unwrap();
412 let end = self.end_epoch().unwrap();
413 let src = match &self.source {
414 Some(src) => format!(" (source: {src})"),
415 None => String::new(),
416 };
417 write!(
418 f,
419 "Tracking arc with {} measurements of type {:?} over {} (from {start} to {end}) with trackers {:?}{src}",
420 self.len(),
421 self.unique_types(),
422 end - start,
423 self.unique_aliases()
424 )
425 }
426 }
427}
428
429impl fmt::Debug for TrackingDataArc {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 write!(f, "{self} @ {self:p}")
432 }
433}
434
435impl PartialEq for TrackingDataArc {
436 fn eq(&self, other: &Self) -> bool {
437 self.measurements == other.measurements
438 }
439}
440
441impl Add for TrackingDataArc {
442 type Output = Self;
443
444 fn add(mut self, rhs: Self) -> Self::Output {
445 self.force_reject = false;
446 for (epoch, msr) in rhs.measurements {
447 if let Entry::Vacant(e) = self.measurements.entry(epoch) {
448 e.insert(msr);
449 } else {
450 error!("merging tracking data with overlapping epoch is not supported");
451 }
452 }
453
454 self
455 }
456}
457
458impl AddAssign for TrackingDataArc {
459 fn add_assign(&mut self, rhs: Self) {
460 *self = self.clone() + rhs;
461 }
462}