1use crate::linalg::allocator::Allocator;
20use crate::linalg::{DefaultAllocator, DimName};
21use crate::md::trajectory::{Interpolatable, Traj};
22pub use crate::od::estimate::*;
23pub use crate::od::ground_station::*;
24pub use crate::od::snc::*;
25pub use crate::od::*;
26use crate::propagators::Propagator;
27pub use crate::time::{Duration, Unit};
28use anise::prelude::Almanac;
29use indexmap::IndexSet;
30use log::{debug, error, info, warn};
31use msr::sensitivity::TrackerSensitivity;
32use snafu::prelude::*;
33use solution::kalman::KalmanVariant;
34use std::collections::BTreeMap;
35use std::marker::PhantomData;
36use std::ops::Add;
37use typed_builder::TypedBuilder;
38
39mod rejectcrit;
40use self::kalman::KalmanFilter;
41use self::msr::TrackingDataArc;
42pub use self::rejectcrit::ResidRejectCrit;
43mod solution;
44pub use solution::ODSolution;
45mod initializers;
46
47#[derive(Clone, TypedBuilder)]
49#[builder(doc)]
50#[allow(clippy::upper_case_acronyms)]
51pub struct KalmanODProcess<
52 D: Dynamics,
53 MsrSize: DimName,
54 Accel: DimName,
55 Trk: TrackerSensitivity<D::StateType, D::StateType>,
56> where
57 D::StateType:
58 Interpolatable + Add<OVector<f64, <D::StateType as State>::Size>, Output = D::StateType>,
59 <DefaultAllocator as Allocator<<D::StateType as State>::VecLength>>::Buffer<f64>: Send,
60 <DefaultAllocator as Allocator<<D::StateType as State>::Size>>::Buffer<f64>: Copy,
61 <DefaultAllocator as Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>>::Buffer<f64>: Copy,
62 DefaultAllocator: Allocator<<D::StateType as State>::Size>
63 + Allocator<<D::StateType as State>::VecLength>
64 + Allocator<MsrSize>
65 + Allocator<MsrSize, <D::StateType as State>::Size>
66 + Allocator<<D::StateType as State>::Size, MsrSize>
67 + Allocator<MsrSize, MsrSize>
68 + Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>
69 + Allocator<Accel>
70 + Allocator<Accel, Accel>
71 + Allocator<<D::StateType as State>::Size, Accel>
72 + Allocator<Accel, <D::StateType as State>::Size>,
73{
74 pub prop: Propagator<D>,
76 #[builder(default)]
78 pub kf_variant: KalmanVariant,
79 #[builder(default, setter(strip_option))]
81 pub resid_crit: Option<ResidRejectCrit>,
82 #[builder(default_code = "BTreeMap::new()")]
84 pub devices: BTreeMap<String, Trk>,
85 #[builder(default_code = "vec![]")]
87 pub process_noise: Vec<ProcessNoise<Accel>>,
88 #[builder(default_code = "1 * Unit::Minute")]
90 pub max_step: Duration,
91 #[builder(default_code = "1 * Unit::Microsecond")]
93 pub epoch_precision: Duration,
94 pub almanac: Arc<Almanac>,
95 #[builder(default_code = "PhantomData::<MsrSize>")]
96 _msr_size: PhantomData<MsrSize>,
97}
98
99impl<
100 D: Dynamics,
101 MsrSize: DimName,
102 Accel: DimName,
103 Trk: TrackerSensitivity<D::StateType, D::StateType>,
104 > KalmanODProcess<D, MsrSize, Accel, Trk>
105where
106 D::StateType:
107 Interpolatable + Add<OVector<f64, <D::StateType as State>::Size>, Output = D::StateType>,
108 <DefaultAllocator as Allocator<<D::StateType as State>::VecLength>>::Buffer<f64>: Send,
109 <DefaultAllocator as Allocator<<D::StateType as State>::Size>>::Buffer<f64>: Copy,
110 <DefaultAllocator as Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>>::Buffer<f64>: Copy,
111 DefaultAllocator: Allocator<<D::StateType as State>::Size>
112 + Allocator<<D::StateType as State>::VecLength>
113 + Allocator<MsrSize>
114 + Allocator<MsrSize, <D::StateType as State>::Size>
115 + Allocator<<D::StateType as State>::Size, MsrSize>
116 + Allocator<MsrSize, MsrSize>
117 + Allocator<<D::StateType as State>::Size, <D::StateType as State>::Size>
118 + Allocator<Accel>
119 + Allocator<Accel, Accel>
120 + Allocator<<D::StateType as State>::Size, Accel>
121 + Allocator<Accel, <D::StateType as State>::Size>
122 + Allocator<nalgebra::Const<1>, MsrSize>,
123{
124 #[allow(clippy::erasing_op)]
126 pub fn process_arc(
127 &self,
128 initial_estimate: KfEstimate<D::StateType>,
129 arc: &TrackingDataArc,
130 ) -> Result<ODSolution<D::StateType, KfEstimate<D::StateType>, MsrSize, Trk>, ODError> {
131 let mut od_sol = ODSolution::new(self.devices.clone(), arc.unique_types());
133
134 let measurements = &arc.measurements;
135 ensure!(
136 measurements.len() >= 2,
137 TooFewMeasurementsSnafu {
138 need: 2_usize,
139 action: "running a Kalman filter"
140 }
141 );
142
143 ensure!(
144 !self.max_step.is_negative() && self.max_step != Duration::ZERO,
145 StepSizeSnafu { step: self.max_step }
146 );
147
148 if MsrSize::DIM > arc.unique_types().len() {
150 error!("Filter misconfigured: expect high rejection count!");
151 error!(
152 "Arc only contains {} measurement types, but filter configured for {}.",
153 arc.unique_types().len(),
154 MsrSize::DIM
155 );
156 error!("Filter should be configured for these numbers to match.");
157 error!("Consider running subsequent arcs if ground stations provide different measurements.")
158 }
159
160 let num_msrs = measurements.len();
162
163 let prop = self.prop.clone();
165 let mut prop_instance = prop.with(initial_estimate.nominal_state().with_stm(), self.almanac.clone()).quiet();
166
167 if !prop_instance.fixed_step {
169 prop_instance.set_step(self.max_step, false);
170 }
171
172 let prop_time = arc.end_epoch().unwrap() - initial_estimate.epoch();
173 info!("Navigation propagating for a total of {prop_time} with step size {}", self.max_step);
174
175 let resid_crit = if arc.force_reject {
176 warn!("Rejecting all measurements from {arc} as requested");
177 Some(ResidRejectCrit { num_sigmas: 0.0 })
178 } else {
179 self.resid_crit
180 };
181
182 let mut epoch = prop_instance.state.epoch();
183
184 let mut reported = [false; 11];
185 reported[0] = true; info!(
187 "Processing {num_msrs} measurements from {:?}",
188 arc.unique_aliases()
189 );
190
191 let mut kf = KalmanFilter::<D::StateType, Accel> {
193 prev_estimate: initial_estimate,
194 process_noise: self.process_noise.clone(),
195 variant: self.kf_variant,
196 prev_used_snc: 0,
197 };
198
199 kf.initialize_process_noises();
200
201 let mut devices = self.devices.clone();
202
203 let mut traj: Traj<D::StateType> = Traj::new();
205
206 let mut msr_accepted_cnt: usize = 0;
207 let mut msr_rejected_cnt: usize = 0;
208 let mut unknown_trackers = IndexSet::new();
209 let tick = Epoch::now().unwrap();
210
211 for (msr_cnt, (epoch_ref, msr)) in measurements.iter().enumerate() {
212 let next_msr_epoch = *epoch_ref;
213
214 loop {
216 let delta_t = next_msr_epoch - epoch;
217
218 let next_step_size = delta_t.min(prop_instance.step_size).min(self.max_step);
220
221 let mut index = traj.states.len();
224 while index > 0 {
225 index -= 1;
226 if traj.states[index].epoch() >= epoch {
227 break;
228 }
229 }
230 traj.states.truncate(index);
231
232 debug!("propagate for {next_step_size} (Δt to next msr: {delta_t})");
233 let (_, traj_covar) = prop_instance
234 .for_duration_with_traj(next_step_size)
235 .context(ODPropSnafu)?;
236
237 for state in traj_covar.states {
238 traj.states.push(state);
241 }
242
243 let nominal_state = prop_instance.state;
247 epoch = nominal_state.epoch();
249
250 if (nominal_state.epoch() - next_msr_epoch).abs() < self.epoch_precision {
252 prop_instance.state.set_epoch(next_msr_epoch);
256
257 if msr.rejected {
258 debug!("Skipping manually rejected measurement at {}", epoch);
259 match kf.time_update(nominal_state) {
260 Ok(est) => {
261 od_sol.push_time_update(est);
262 }
263 Err(e) => return Err(e),
264 }
265 prop_instance.state.reset_stm();
266 msr_rejected_cnt += 1;
267 } else {
268 match devices.get_mut(&msr.tracker) {
270 Some(device) => {
271 if let Some(computed_meas) =
272 device.measure(epoch, &traj, None, self.almanac.clone())?
273 {
274 let msr_types = device.measurement_types();
275
276 let windows = msr_types.len() / MsrSize::DIM;
278 let mut msr_rejected = false;
279 for wno in 0..=windows {
280 let mut cur_msr_types = IndexSet::new();
281 for msr_type in msr_types
282 .iter()
283 .copied()
284 .skip(wno * MsrSize::DIM)
285 .take(MsrSize::DIM)
286 {
287 cur_msr_types.insert(msr_type);
288 }
289
290 if cur_msr_types.is_empty() {
291 break;
293 }
294
295 if !msr.availability(&cur_msr_types)
297 .iter()
298 .any(|avail| *avail)
299 {
300 continue;
301 }
302
303 let mut real_obs: OVector<f64, MsrSize> =
305 msr.observation(&cur_msr_types);
306
307 for val in real_obs.iter().copied() {
309 ensure!(
310 val.is_finite(),
311 InvalidMeasurementSnafu {
312 epoch: *epoch_ref,
313 val
314 }
315 );
316 }
317
318 let h_tilde = device.h_tilde::<MsrSize>(
320 msr,
321 &cur_msr_types,
322 &nominal_state,
323 self.almanac.clone(),
324 )?;
325
326 let measurement_covar = device
327 .measurement_covar_matrix(&cur_msr_types, epoch)?;
328
329 let computed_obs = computed_meas
331 .observation::<MsrSize>(&cur_msr_types)
332 - device.measurement_bias_vector::<MsrSize>(
333 &cur_msr_types,
334 epoch,
335 )?;
336
337 if let Some(moduli) = &arc.moduli {
339 let mut obs_ambiguity =
340 OVector::<f64, MsrSize>::zeros();
341
342 for (i, msr_type) in cur_msr_types.iter().enumerate() {
343 if let Some(modulus) = moduli.get(msr_type) {
344 let k = computed_obs[i].div_euclid(*modulus);
345 obs_ambiguity[i] = k * *modulus;
347 }
348 }
349 real_obs += obs_ambiguity;
350 }
351
352 let (estimate, mut residual, gain) = kf.measurement_update(
353 nominal_state,
354 real_obs,
355 computed_obs,
356 measurement_covar,
357 h_tilde,
358 resid_crit,
359 )?;
360
361 debug!(
362 "processed measurement #{msr_cnt} for {cur_msr_types:?} @ {epoch} from {}",
363 device.name()
364 );
365
366 residual.tracker = Some(device.name());
367 residual.msr_types = cur_msr_types;
368
369 if residual.rejected {
370 msr_rejected = true;
371 }
372
373 if kf.replace_state() {
374 prop_instance.state = estimate.state();
375 }
376
377 prop_instance.state.reset_stm();
378
379 od_sol.push_measurement_update(estimate, residual, gain);
380 }
381 if msr_rejected {
382 msr_rejected_cnt += 1;
383 } else {
384 msr_accepted_cnt += 1;
385 }
386 } else {
387 debug!(
388 "Device {} does not expect measurement at {epoch}, skipping",
389 msr.tracker
390 );
391 }
392 }
393 None => {
394 if !unknown_trackers.contains(&msr.tracker) {
395 error!(
396 "Tracker {} is not in the list of configured devices",
397 msr.tracker
398 );
399 }
400 unknown_trackers.insert(msr.tracker.clone());
401 }
402 }
403 }
404
405 let msr_prct = (10.0 * (msr_cnt as f64) / (num_msrs as f64)) as usize;
406 if !reported[msr_prct] {
407 let msg = format!(
408 "{:>3}% done - {msr_accepted_cnt:.0} measurements accepted, {:.0} rejected",
409 10 * msr_prct, msr_rejected_cnt
410 );
411 if msr_accepted_cnt < msr_rejected_cnt {
412 warn!("{msg}");
413 } else {
414 info!("{msg}");
415 }
416 reported[msr_prct] = true;
417 }
418
419 break;
420 } else {
421 debug!("time update {epoch:?}, next msr {next_msr_epoch:?}");
423 match kf.time_update(nominal_state) {
424 Ok(est) => {
425 od_sol.push_time_update(est);
427 }
428 Err(e) => return Err(e),
429 }
430 prop_instance.state.reset_stm();
431 }
432 }
433 }
434
435 if !reported[10] {
437 let tock_time = Epoch::now().unwrap() - tick;
438 info!(
439 "100% done - {msr_accepted_cnt} measurements accepted, {msr_rejected_cnt} rejected (done in {tock_time})",
440 );
441 }
442
443 Ok(od_sol)
444 }
445
446 pub fn predict_until(
448 &self,
449 initial_estimate: KfEstimate<D::StateType>,
450 end_epoch: Epoch,
451 ) -> Result<ODSolution<D::StateType, KfEstimate<D::StateType>, MsrSize, Trk>, ODError> {
452 let mut od_sol = ODSolution::new(self.devices.clone(), IndexSet::new());
454
455 od_sol.push_time_update(initial_estimate);
456
457 let prop = self.prop.clone();
459 let mut prop_instance = prop.with(initial_estimate.nominal_state().with_stm(), self.almanac.clone()).quiet();
460
461
462 let mut kf = KalmanFilter::<D::StateType, Accel> {
464 prev_estimate: initial_estimate,
465 process_noise: self.process_noise.clone(),
466 variant: self.kf_variant,
467 prev_used_snc: 0,
468 };
469
470 let prop_time = end_epoch - kf.previous_estimate().epoch();
471 info!("Mapping covariance for {prop_time} every {} until {end_epoch}", self.max_step);
472
473 loop {
474 let nominal_state = prop_instance.for_duration(self.max_step).context(ODPropSnafu)?;
475 let epoch = nominal_state.epoch();
478 debug!("time update {epoch}");
480 match kf.time_update(nominal_state) {
481 Ok(est) => {
482 od_sol.push_time_update(est);
483 }
484 Err(e) => return Err(e),
485 }
486 prop_instance.state.reset_stm();
487 if epoch >= end_epoch {
488 break;
489 }
490 }
491
492 Ok(od_sol)
493 }
494
495 pub fn predict_for(
497 &self,
498 initial_estimate: KfEstimate<D::StateType>,
499 duration: Duration,
500 ) -> Result<ODSolution<D::StateType, KfEstimate<D::StateType>, MsrSize, Trk>, ODError> {
501 let end_epoch = initial_estimate.nominal_state().epoch() + duration;
502 self.predict_until(initial_estimate, end_epoch)
503 }
504}