Skip to main content

nyx_space/od/msr/trackingdata/
python.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19use super::{Measurement, MeasurementType, TrackingDataArc};
20use crate::io::{ExportCfg, InputOutputError};
21use hifitime::{Duration, Epoch};
22use pyo3::prelude::*;
23use pyo3::types::PyType;
24use std::collections::{BTreeMap, HashMap};
25use std::ops::Bound::{Excluded, Included, Unbounded};
26
27#[pymethods]
28impl TrackingDataArc {
29    #[new]
30    fn py_new(msrs: HashMap<Epoch, Measurement>) -> Self {
31        let mut measurements = BTreeMap::new();
32        for (epoch, msr) in msrs.iter() {
33            measurements.insert(*epoch, msr.clone());
34        }
35
36        Self {
37            measurements,
38            source: None,
39            moduli: None,
40            force_reject: false,
41        }
42    }
43
44    /// Initializes a new Almanac from a file path to CCSDS OEM file, after converting to to SPICE SPK/BSP
45    ///
46    /// :type path: str
47    /// :type aliases: dict
48    /// :rtype: nyx_space.od.TrackingDataArc
49    #[classmethod]
50    #[pyo3(name = "from_ccsds_tdm")]
51    fn py_from_ccsds_tdm_file(
52        _cls: Bound<'_, PyType>,
53        path: &str,
54        aliases: Option<HashMap<String, String>>,
55    ) -> Result<Self, InputOutputError> {
56        TrackingDataArc::from_tdm(path, aliases)
57    }
58
59    #[pyo3(name = "write_ccsds_tdm")]
60    fn py_write_ccsds_tdm(
61        &self,
62        spacecraft_name: String,
63        aliases: Option<HashMap<String, String>>,
64        path: &str,
65    ) -> Result<String, InputOutputError> {
66        Ok(self
67            .clone()
68            .to_tdm_file(spacecraft_name, aliases, path, ExportCfg::default())?
69            .to_str()
70            .unwrap_or("woah_bug_building_path")
71            .to_string())
72    }
73
74    #[pyo3(name = "unique_aliases")]
75    fn py_unique_aliases(&self) -> Vec<String> {
76        self.unique_aliases().iter().cloned().collect()
77    }
78    #[pyo3(name = "unique_types")]
79    fn py_unique_types(&self) -> Vec<MeasurementType> {
80        self.unique_types().iter().cloned().collect()
81    }
82
83    fn __str__(&self) -> String {
84        format!("{self}")
85    }
86
87    fn __repr__(&self) -> String {
88        format!("{self} @ {self:p}")
89    }
90
91    #[getter]
92    fn get_force_reject(&self) -> bool {
93        self.force_reject
94    }
95
96    #[setter]
97    fn set_force_reject(&mut self, reject: bool) {
98        self.force_reject = reject;
99    }
100
101    #[pyo3(name = "filter_by_epoch")]
102    fn py_filter_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
103        let start_bound = start.map(Included).unwrap_or(Unbounded);
104        let end_bound = end.map(Excluded).unwrap_or(Unbounded);
105        self.clone().filter_by_epoch((start_bound, end_bound))
106    }
107
108    #[pyo3(name = "filter_by_offset")]
109    fn py_filter_by_offset(&self, start: Option<Duration>, end: Option<Duration>) -> Self {
110        let start_bound = match start {
111            Some(s) => Included(s),
112            None => Unbounded,
113        };
114        let end_bound = match end {
115            Some(e) => Excluded(e),
116            None => Unbounded,
117        };
118        self.clone().filter_by_offset((start_bound, end_bound))
119    }
120
121    #[pyo3(name = "filter_by_tracker")]
122    fn py_filter_by_tracker(&self, tracker: String) -> Self {
123        self.clone().filter_by_tracker(tracker)
124    }
125
126    #[pyo3(name = "filter_by_measurement_type")]
127    fn py_filter_by_measurement_type(&self, msr_type: MeasurementType) -> Self {
128        self.clone().filter_by_measurement_type(msr_type)
129    }
130
131    #[pyo3(name = "exclude_tracker")]
132    fn py_exclude_tracker(&self, tracker: String) -> Self {
133        self.clone().exclude_tracker(tracker)
134    }
135
136    #[pyo3(name = "exclude_by_epoch")]
137    fn py_exclude_by_epoch(&self, start: Option<Epoch>, end: Option<Epoch>) -> Self {
138        let start_bound = match start {
139            Some(s) => Included(s),
140            None => Unbounded,
141        };
142        let end_bound = match end {
143            Some(e) => Excluded(e),
144            None => Unbounded,
145        };
146        self.clone().exclude_by_epoch((start_bound, end_bound))
147    }
148
149    #[pyo3(name = "exclude_measurement_type")]
150    fn py_exclude_measurement_type(&self, msr_type: MeasurementType) -> Self {
151        self.clone().exclude_measurement_type(msr_type)
152    }
153
154    #[pyo3(name = "downsample")]
155    fn py_downsample(&self, step: Duration) -> Self {
156        self.clone().downsample(step)
157    }
158
159    #[pyo3(name = "resid_vs_ref_check")]
160    fn py_resid_vs_ref_check(&self) -> Self {
161        self.clone().resid_vs_ref_check()
162    }
163}