1use crate::linalg::allocator::Allocator;
20use crate::linalg::{DefaultAllocator, DimName};
21use crate::md::trajectory::{Interpolatable, Traj};
22pub use crate::od::estimate::*;
23pub use crate::od::ground_station::*;
24pub use crate::od::snc::*;
25pub use crate::od::*;
26use crate::propagators::PropInstance;
27pub use crate::time::{Duration, Unit};
28use anise::prelude::Almanac;
29use indexmap::IndexSet;
30use msr::sensitivity::TrackerSensitivity;
31use snafu::prelude::*;
32mod conf;
33pub use conf::{IterationConf, SmoothingArc};
34mod trigger;
35pub use trigger::EkfTrigger;
36mod rejectcrit;
37use self::msr::TrackingDataArc;
38pub use self::rejectcrit::ResidRejectCrit;
39use std::collections::BTreeMap;
40use std::iter::Zip;
41use std::marker::PhantomData;
42use std::ops::Add;
43use std::slice::Iter;
44mod export;
45
46#[allow(clippy::upper_case_acronyms)]
63pub struct ODProcess<
64 'a,
65 D: Dynamics,
66 MsrSize: DimName,
67 Accel: DimName,
68 K: Filter<D::StateType, Accel, MsrSize>,
69 Trk: TrackerSensitivity<D::StateType, D::StateType>,
70> where
71 D::StateType:
72 Interpolatable + Add<OVector<f64, <D::StateType as State>::Size>, Output = D::StateType>,
73 <DefaultAllocator as Allocator<<D::StateType as State>::VecLength>>::Buffer<f64>: Send,
74 DefaultAllocator: Allocator<<D::StateType as State>::Size>
75 + Allocator<<D::StateType as State>::VecLength>
76 + Allocator<MsrSize>
77 + Allocator<MsrSize, <D::StateType as State>::Size>
78 + Allocator<MsrSize, MsrSize>
79 + Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>
80 + Allocator<Accel>
81 + Allocator<Accel, Accel>
82 + Allocator<<D::StateType as State>::Size, Accel>
83 + Allocator<Accel, <D::StateType as State>::Size>,
84{
85 pub prop: PropInstance<'a, D>,
87 pub kf: K,
89 pub devices: BTreeMap<String, Trk>,
91 pub estimates: Vec<K::Estimate>,
93 pub residuals: Vec<Option<Residual<MsrSize>>>,
95 pub ekf_trigger: Option<EkfTrigger>,
96 pub resid_crit: Option<ResidRejectCrit>,
98 pub almanac: Arc<Almanac>,
99 init_state: D::StateType,
100 _marker: PhantomData<Accel>,
101}
102
103impl<
104 'a,
105 D: Dynamics,
106 MsrSize: DimName,
107 Accel: DimName,
108 K: Filter<D::StateType, Accel, MsrSize>,
109 Trk: TrackerSensitivity<D::StateType, D::StateType>,
110 > ODProcess<'a, D, MsrSize, Accel, K, Trk>
111where
112 D::StateType:
113 Interpolatable + Add<OVector<f64, <D::StateType as State>::Size>, Output = D::StateType>,
114 <DefaultAllocator as Allocator<<D::StateType as State>::VecLength>>::Buffer<f64>: Send,
115 DefaultAllocator: Allocator<<D::StateType as State>::Size>
116 + Allocator<<D::StateType as State>::VecLength>
117 + Allocator<MsrSize>
118 + Allocator<MsrSize, <D::StateType as State>::Size>
119 + Allocator<MsrSize, MsrSize>
120 + Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>
121 + Allocator<Accel>
122 + Allocator<Accel, Accel>
123 + Allocator<<D::StateType as State>::Size, Accel>
124 + Allocator<Accel, <D::StateType as State>::Size>,
125{
126 pub fn new(
128 prop: PropInstance<'a, D>,
129 kf: K,
130 devices: BTreeMap<String, Trk>,
131 ekf_trigger: Option<EkfTrigger>,
132 resid_crit: Option<ResidRejectCrit>,
133 almanac: Arc<Almanac>,
134 ) -> Self {
135 let init_state = prop.state;
136 Self {
137 prop: prop.quiet(),
138 kf,
139 devices,
140 estimates: Vec::with_capacity(10_000),
141 residuals: Vec::with_capacity(10_000),
142 ekf_trigger,
143 resid_crit,
144 almanac,
145 init_state,
146 _marker: PhantomData::<Accel>,
147 }
148 }
149
150 pub fn ekf(
152 prop: PropInstance<'a, D>,
153 kf: K,
154 devices: BTreeMap<String, Trk>,
155 trigger: EkfTrigger,
156 resid_crit: Option<ResidRejectCrit>,
157 almanac: Arc<Almanac>,
158 ) -> Self {
159 let init_state = prop.state;
160 Self {
161 prop: prop.quiet(),
162 kf,
163 devices,
164 estimates: Vec::with_capacity(10_000),
165 residuals: Vec::with_capacity(10_000),
166 ekf_trigger: Some(trigger),
167 resid_crit,
168 almanac,
169 init_state,
170 _marker: PhantomData::<Accel>,
171 }
172 }
173
174 pub fn smooth(&self, condition: SmoothingArc) -> Result<Vec<K::Estimate>, ODError> {
179 let l = self.estimates.len() - 1;
180
181 info!("Smoothing {} estimates until {}", l + 1, condition);
182 let mut smoothed = Vec::with_capacity(self.estimates.len());
183 smoothed.push(self.estimates.last().unwrap().clone());
185
186 loop {
187 let k = l - smoothed.len();
188 let sm_est_kp1 = &self.estimates[k + 1];
190 let x_kp1_l = sm_est_kp1.state_deviation();
191 let p_kp1_l = sm_est_kp1.covar();
192 let est_k = &self.estimates[k];
194 let est_kp1 = &self.estimates[k + 1];
196
197 match condition {
199 SmoothingArc::Epoch(e) => {
200 if est_kp1.epoch() < e {
202 break;
203 }
204 }
205 SmoothingArc::TimeGap(gap_s) => {
206 if est_kp1.epoch() - est_k.epoch() > gap_s {
207 break;
208 }
209 }
210 SmoothingArc::Prediction => {
211 if est_kp1.predicted() {
212 break;
213 }
214 }
215 SmoothingArc::All => {}
216 }
217
218 let phi_kp1_k = &est_kp1
224 .stm()
225 .clone()
226 .try_inverse()
227 .ok_or(ODError::SingularStateTransitionMatrix)?;
228
229 let x_k_l = phi_kp1_k * x_kp1_l;
231 let p_k_l = phi_kp1_k * p_kp1_l * phi_kp1_k.transpose();
233 let mut smoothed_est_k = est_k.clone();
235 smoothed_est_k.set_state_deviation(x_k_l);
237 smoothed_est_k.set_covar(p_k_l);
239
240 smoothed.push(smoothed_est_k);
242 if smoothed.len() == self.estimates.len() {
243 break;
244 }
245 }
246
247 info!(
249 "Smoothed {} estimates (from {} to {})",
250 smoothed.len(),
251 smoothed.last().unwrap().epoch(),
252 smoothed[0].epoch(),
253 );
254
255 if smoothed.len() < self.estimates.len() {
258 let mut k = self.estimates.len() - smoothed.len();
260 loop {
261 smoothed.push(self.estimates[k].clone());
262 if k == 0 {
263 break;
264 }
265 k -= 1;
266 }
267 }
268
269 smoothed.reverse();
271
272 Ok(smoothed)
273 }
274
275 pub fn rms_residual_ratios(&self) -> f64 {
277 let mut sum = 0.0;
278 for residual in self.residuals.iter().flatten() {
279 sum += residual.ratio.powi(2);
280 }
281 (sum / (self.residuals.len() as f64)).sqrt()
282 }
283
284 pub fn iterate_arc(
286 &mut self,
287 arc: &TrackingDataArc,
288 config: IterationConf,
289 ) -> Result<(), ODError> {
290 let mut best_rms = self.rms_residual_ratios();
291 let mut previous_rms = best_rms;
292 let mut divergence_cnt = 0;
293 let mut iter_cnt = 0;
294 loop {
295 if best_rms <= config.absolute_tol {
296 info!("*****************");
297 info!("*** CONVERGED ***");
298 info!("*****************");
299
300 info!(
301 "Filter converged to absolute tolerance ({:.2e} < {:.2e}) after {} iterations",
302 best_rms, config.absolute_tol, iter_cnt
303 );
304 break;
305 }
306
307 iter_cnt += 1;
308
309 if let Some(trigger) = &mut self.ekf_trigger {
311 trigger.reset();
312 }
313
314 info!("***************************");
315 info!("*** Iteration number {iter_cnt:02} ***");
316 info!("***************************");
317
318 let smoothed = self.smooth(config.smoother)?;
320 self.prop.state = self.init_state;
322 self.estimates = Vec::with_capacity(arc.measurements.len().max(self.estimates.len()));
324 self.residuals = Vec::with_capacity(arc.measurements.len().max(self.estimates.len()));
325
326 self.kf.set_previous_estimate(&smoothed[0]);
327 self.process_arc(arc)?;
329
330 let new_rms = self.rms_residual_ratios();
332 let cur_rms_num = (new_rms - previous_rms).abs();
333 let cur_rel_rms = cur_rms_num / previous_rms;
334 if cur_rel_rms < config.relative_tol || cur_rms_num < config.absolute_tol * best_rms {
335 if previous_rms < best_rms {
336 best_rms = previous_rms;
337 }
338 info!("*****************");
339 info!("*** CONVERGED ***");
340 info!("*****************");
341 info!(
342 "New residual RMS: {:.5}\tPrevious RMS: {:.5}\tBest RMS: {:.5}",
343 new_rms, previous_rms, best_rms
344 );
345 if cur_rel_rms < config.relative_tol {
346 info!(
347 "Filter converged on relative tolerance ({:.2e} < {:.2e}) after {} iterations",
348 cur_rel_rms, config.relative_tol, iter_cnt
349 );
350 } else {
351 info!(
352 "Filter converged on relative change ({:.2e} < {:.2e} * {:.2e}) after {} iterations",
353 cur_rms_num, config.absolute_tol, best_rms, iter_cnt
354 );
355 }
356 break;
357 } else if new_rms > previous_rms {
358 warn!(
359 "New residual RMS: {:.5}\tPrevious RMS: {:.5}\tBest RMS: {:.5} ({cur_rel_rms:.2e} > {:.2e})",
360 new_rms, previous_rms, best_rms, config.relative_tol
361 );
362 divergence_cnt += 1;
363 previous_rms = new_rms;
364 if divergence_cnt >= config.max_divergences {
365 let msg = format!(
366 "Filter iterations have continuously diverged {} times: {}",
367 config.max_divergences, config
368 );
369 if config.force_failure {
370 return Err(ODError::Diverged {
371 loops: config.max_divergences,
372 });
373 } else {
374 error!("{}", msg);
375 break;
376 }
377 } else {
378 warn!("Filter iteration caused divergence {} of {} acceptable subsequent divergences", divergence_cnt, config.max_divergences);
379 }
380 } else {
381 info!(
382 "New residual RMS: {:.5}\tPrevious RMS: {:.5}\tBest RMS: {:.5} ({cur_rel_rms:.2e} > {:.2e})",
383 new_rms, previous_rms, best_rms, config.relative_tol
384 );
385 divergence_cnt = 0;
387 previous_rms = new_rms;
388 if previous_rms < best_rms {
389 best_rms = previous_rms;
390 }
391 }
392
393 if iter_cnt >= config.max_iterations {
394 let msg = format!(
395 "Filter has iterated {} times but failed to reach filter convergence criteria: {}",
396 config.max_iterations, config
397 );
398 if config.force_failure {
399 return Err(ODError::Diverged {
400 loops: config.max_divergences,
401 });
402 } else {
403 error!("{}", msg);
404 break;
405 }
406 }
407 }
408
409 Ok(())
410 }
411
412 #[allow(clippy::erasing_op)]
419 pub fn process_arc(&mut self, arc: &TrackingDataArc) -> Result<(), ODError> {
420 let measurements = &arc.measurements;
421 ensure!(
422 measurements.len() >= 2,
423 TooFewMeasurementsSnafu {
424 need: 2_usize,
425 action: "running a Kalman filter"
426 }
427 );
428
429 let max_step = match arc.min_duration_sep() {
430 Some(step_size) => step_size,
431 None => {
432 return Err(ODError::TooFewMeasurements {
433 action: "determining the minimum step size",
434 need: 2,
435 })
436 }
437 };
438
439 ensure!(
440 !max_step.is_negative() && max_step != Duration::ZERO,
441 StepSizeSnafu { step: max_step }
442 );
443
444 if MsrSize::USIZE > arc.unique_types().len() {
446 error!("Filter misconfigured: expect high rejection count!");
447 error!(
448 "Arc only contains {} measurement types, but filter configured for {}.",
449 arc.unique_types().len(),
450 MsrSize::USIZE
451 );
452 error!("Filter should be configured for these numbers to match.");
453 error!("Consider running subsequent arcs if ground stations provide different measurements.")
454 }
455
456 let num_msrs = measurements.len();
458
459 if !self.prop.fixed_step {
461 self.prop.set_step(max_step, false);
462 }
463
464 let prop_time = arc.end_epoch().unwrap() - self.kf.previous_estimate().epoch();
465 info!("Navigation propagating for a total of {prop_time} with step size {max_step}");
466
467 let resid_crit = if arc.force_reject {
468 warn!("Rejecting all measurements from {arc} as requested");
469 Some(ResidRejectCrit { num_sigmas: 0.0 })
470 } else {
471 self.resid_crit
472 };
473
474 let mut epoch = self.prop.state.epoch();
475
476 let mut reported = [false; 11];
477 reported[0] = true; info!(
479 "Processing {num_msrs} measurements from {:?}",
480 arc.unique_aliases()
481 );
482
483 let mut traj: Traj<D::StateType> = Traj::new();
485
486 let mut msr_accepted_cnt: usize = 0;
487 let tick = Epoch::now().unwrap();
488
489 for (msr_cnt, (epoch_ref, msr)) in measurements.iter().enumerate() {
490 let next_msr_epoch = *epoch_ref;
491
492 loop {
494 let delta_t = next_msr_epoch - epoch;
495
496 let next_step_size = delta_t.min(self.prop.step_size).min(max_step);
498
499 let mut index = traj.states.len();
502 while index > 0 {
503 index -= 1;
504 if traj.states[index].epoch() >= epoch {
505 break;
506 }
507 }
508 traj.states.truncate(index);
509
510 debug!("propagate for {next_step_size} (Δt to next msr: {delta_t})");
511 let (_, traj_covar) = self
512 .prop
513 .for_duration_with_traj(next_step_size)
514 .context(ODPropSnafu)?;
515
516 for state in traj_covar.states {
517 traj.states.push(state);
520 }
521
522 let nominal_state = self.prop.state;
526 epoch = nominal_state.epoch();
528
529 if (nominal_state.epoch() - next_msr_epoch).abs() < Unit::Microsecond * 1 {
531 match self.devices.get_mut(&msr.tracker) {
533 Some(device) => {
534 if let Some(computed_meas) =
535 device.measure(epoch, &traj, None, self.almanac.clone())?
536 {
537 let msr_types = device.measurement_types();
538
539 if let Some(trigger) = &mut self.ekf_trigger {
541 if self.kf.is_extended() && trigger.disable_ekf(epoch) {
542 self.kf.set_extended(false);
543 info!("EKF disabled @ {epoch}");
544 }
545 }
546
547 let windows = msr_types.len() / MsrSize::USIZE;
549 let mut msr_rejected = false;
550 for wno in 0..=windows {
551 let mut cur_msr_types = IndexSet::new();
552 for msr_type in msr_types
553 .iter()
554 .copied()
555 .skip(wno * MsrSize::USIZE)
556 .take(MsrSize::USIZE)
557 {
558 cur_msr_types.insert(msr_type);
559 }
560
561 if cur_msr_types.is_empty() {
562 break;
564 }
565
566 if !msr.availability(&cur_msr_types).iter().any(|avail| *avail)
568 {
569 continue;
570 }
571
572 let mut real_obs: OVector<f64, MsrSize> =
574 msr.observation(&cur_msr_types);
575
576 for val in real_obs.iter().copied() {
578 ensure!(
579 val.is_finite(),
580 InvalidMeasurementSnafu {
581 epoch: *epoch_ref,
582 val
583 }
584 );
585 }
586
587 let h_tilde = device
589 .h_tilde::<MsrSize>(
590 msr,
591 &cur_msr_types,
592 &nominal_state,
593 self.almanac.clone(),
594 )
595 .unwrap();
596
597 let measurement_covar =
598 device.measurement_covar_matrix(&cur_msr_types, epoch)?;
599
600 let computed_obs = computed_meas
602 .observation::<MsrSize>(&cur_msr_types)
603 - device.measurement_bias_vector::<MsrSize>(
604 &cur_msr_types,
605 epoch,
606 )?;
607
608 if let Some(moduli) = &arc.moduli {
610 let mut obs_ambiguity = OVector::<f64, MsrSize>::zeros();
611
612 for (i, msr_type) in cur_msr_types.iter().enumerate() {
613 if let Some(modulus) = moduli.get(msr_type) {
614 let k = computed_obs[i].div_euclid(*modulus);
615 obs_ambiguity[i] = k * *modulus;
617 }
618 }
619 real_obs += obs_ambiguity;
620 }
621
622 match self.kf.measurement_update(
623 nominal_state,
624 real_obs,
625 computed_obs,
626 measurement_covar,
627 h_tilde,
628 resid_crit,
629 ) {
630 Ok((estimate, mut residual)) => {
631 debug!("processed measurement #{msr_cnt} for {cur_msr_types:?} @ {epoch} from {}", device.name());
632
633 residual.tracker = Some(device.name());
634 residual.msr_types = cur_msr_types;
635
636 if residual.rejected {
637 msr_rejected = true;
638 }
639
640 if let Some(trigger) = &mut self.ekf_trigger {
646 if !msr_rejected
647 && trigger.enable_ekf(&estimate)
648 && !self.kf.is_extended()
649 {
650 self.kf.set_extended(true);
651 if !estimate.within_3sigma() {
652 warn!("EKF enabled @ {epoch} but filter DIVERGING");
653 } else {
654 info!("EKF enabled @ {epoch}");
655 }
656 }
657 if self.kf.is_extended() {
658 self.prop.state = estimate.state();
659 }
660 }
661
662 self.prop.state.reset_stm();
663
664 self.estimates.push(estimate);
665 self.residuals.push(Some(residual));
666 }
667 Err(e) => return Err(e),
668 }
669 }
670 if !msr_rejected {
671 msr_accepted_cnt += 1;
672 }
673 } else {
674 debug!("ignoring observation @ {epoch} because simulated {} does not expect it", msr.tracker);
675 }
676 }
677 None => {
678 error!(
679 "Tracker {} is not in the list of configured devices",
680 msr.tracker
681 )
682 }
683 }
684
685 let msr_prct = (10.0 * (msr_cnt as f64) / (num_msrs as f64)) as usize;
686 if !reported[msr_prct] {
687 let num_rejected = msr_cnt - msr_accepted_cnt.saturating_sub(1);
688 let msg = format!(
689 "{:>3}% done - {msr_accepted_cnt:.0} measurements accepted, {:.0} rejected",
690 10 * msr_prct, num_rejected
691 );
692 if msr_accepted_cnt < num_rejected {
693 warn!("{msg}");
694 } else {
695 info!("{msg}");
696 }
697 reported[msr_prct] = true;
698 }
699
700 break;
701 } else {
702 debug!("time update {epoch:?}, next msr {next_msr_epoch:?}");
704 match self.kf.time_update(nominal_state) {
705 Ok(est) => {
706 self.estimates.push(est);
709 self.residuals.push(None);
711 }
712 Err(e) => return Err(e),
713 }
714 self.prop.state.reset_stm();
715 }
716 }
717 }
718
719 if !reported[10] {
721 let tock_time = Epoch::now().unwrap() - tick;
722 info!(
723 "100% done - {msr_accepted_cnt:.0} measurements accepted, {:.0} rejected (done in {tock_time})",
724 num_msrs - msr_accepted_cnt
725 );
726 }
727
728 Ok(())
729 }
730
731 pub fn predict_until(&mut self, step: Duration, end_epoch: Epoch) -> Result<(), ODError> {
733 let prop_time = end_epoch - self.kf.previous_estimate().epoch();
734 info!("Mapping covariance for {prop_time} until {end_epoch} with {step} step");
735
736 loop {
737 let nominal_state = self.prop.for_duration(step).context(ODPropSnafu)?;
738 let epoch = nominal_state.epoch();
741 debug!("time update {epoch}");
743 match self.kf.time_update(nominal_state) {
744 Ok(est) => {
745 self.estimates.push(est);
748 self.residuals.push(None);
749 }
750 Err(e) => return Err(e),
751 }
752 self.prop.state.reset_stm();
753 if epoch >= end_epoch {
754 break;
755 }
756 }
757
758 Ok(())
759 }
760
761 pub fn predict_for(&mut self, step: Duration, duration: Duration) -> Result<(), ODError> {
763 let end_epoch = self.kf.previous_estimate().epoch() + duration;
764 self.predict_until(step, end_epoch)
765 }
766
767 pub fn to_traj(&self) -> Result<Traj<D::StateType>, NyxError>
769 where
770 DefaultAllocator: Allocator<<D::StateType as State>::VecLength>,
771 {
772 if self.estimates.is_empty() {
773 Err(NyxError::NoStateData {
774 msg: "No navigation trajectory to generate: run the OD process first".to_string(),
775 })
776 } else {
777 let mut traj = Traj {
779 states: self.estimates.iter().map(|est| est.state()).collect(),
780 name: None,
781 };
782 traj.finalize();
783 Ok(traj)
784 }
785 }
786
787 pub fn results(&self) -> Zip<Iter<'_, K::Estimate>, Iter<'_, Option<Residual<MsrSize>>>> {
789 self.estimates.iter().zip(self.residuals.iter())
790 }
791}
792
793impl<
794 'a,
795 D: Dynamics,
796 MsrSize: DimName,
797 Accel: DimName,
798 K: Filter<D::StateType, Accel, MsrSize>,
799 Trk: TrackerSensitivity<D::StateType, D::StateType>,
800 > ODProcess<'a, D, MsrSize, Accel, K, Trk>
801where
802 D::StateType:
803 Interpolatable + Add<OVector<f64, <D::StateType as State>::Size>, Output = D::StateType>,
804 <DefaultAllocator as Allocator<<D::StateType as State>::VecLength>>::Buffer<f64>: Send,
805 DefaultAllocator: Allocator<<D::StateType as State>::Size>
806 + Allocator<<D::StateType as State>::VecLength>
807 + Allocator<MsrSize>
808 + Allocator<MsrSize, <D::StateType as State>::Size>
809 + Allocator<MsrSize, MsrSize>
810 + Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>
811 + Allocator<Accel>
812 + Allocator<Accel, Accel>
813 + Allocator<<D::StateType as State>::Size, Accel>
814 + Allocator<Accel, <D::StateType as State>::Size>,
815{
816 pub fn ckf(
817 prop: PropInstance<'a, D>,
818 kf: K,
819 devices: BTreeMap<String, Trk>,
820 resid_crit: Option<ResidRejectCrit>,
821 almanac: Arc<Almanac>,
822 ) -> Self {
823 let init_state = prop.state;
824 Self {
825 prop: prop.quiet(),
826 kf,
827 devices,
828 estimates: Vec::with_capacity(10_000),
829 residuals: Vec::with_capacity(10_000),
830 resid_crit,
831 ekf_trigger: None,
832 init_state,
833 almanac,
834 _marker: PhantomData::<Accel>,
835 }
836 }
837}