nyx_space/od/msr/trackingdata/
io_ccsds_tdm.rs1use crate::io::watermark::prj_name_ver;
20use crate::io::ExportCfg;
21use crate::io::{InputOutputError, StdIOSnafu};
22use crate::od::msr::{Measurement, MeasurementType};
23use anise::constants::SPEED_OF_LIGHT_KM_S;
24use hifitime::efmt::{Format, Formatter};
25use hifitime::prelude::Epoch;
26use hifitime::TimeScale;
27use indexmap::{IndexMap, IndexSet};
28use log::{error, info, warn};
29use snafu::ResultExt;
30use std::collections::{BTreeMap, HashMap};
31use std::fs::File;
32use std::io::Write;
33use std::io::{BufRead, BufReader, BufWriter};
34use std::path::{Path, PathBuf};
35use std::str::FromStr;
36
37use super::TrackingDataArc;
38
39impl TrackingDataArc {
40 pub fn from_tdm<P: AsRef<Path>>(
88 path: P,
89 aliases: Option<HashMap<String, String>>,
90 ) -> Result<Self, InputOutputError> {
91 let file = File::open(&path).context(StdIOSnafu {
92 action: "opening CCSDS TDM file for tracking arc",
93 })?;
94
95 let source = path.as_ref().to_path_buf().display().to_string();
96 info!("parsing CCSDS TDM {source}");
97
98 let mut measurements = BTreeMap::new();
99 let mut metadata = HashMap::new();
100
101 let reader = BufReader::new(file);
102
103 let mut in_data_section = false;
104 let mut current_tracker = String::new();
105 let mut time_system = TimeScale::UTC;
106 let mut has_freq_data = false;
107 let mut msr_divider = 1.0;
108
109 for line in reader.lines() {
110 let line = line.context(StdIOSnafu {
111 action: "reading CCSDS TDM file",
112 })?;
113 let line = line.trim();
114
115 if line == "DATA_START" {
116 in_data_section = true;
117 continue;
118 } else if line == "DATA_STOP" {
119 in_data_section = false;
120 }
121
122 if !in_data_section {
123 if line.starts_with("PARTICIPANT_1") {
124 current_tracker = line.split('=').nth(1).unwrap_or("").trim().to_string();
125 if let Some(aliases) = &aliases {
127 if let Some(alias) = aliases.get(¤t_tracker) {
128 current_tracker = alias.clone();
129 }
130 }
131 } else if line.starts_with("TIME_SYSTEM") {
132 let ts = line.split('=').nth(1).unwrap_or("UTC").trim();
133 if let Ok(ts) = TimeScale::from_str(ts) {
135 time_system = ts;
136 } else {
137 return Err(InputOutputError::UnsupportedData {
138 which: format!("time scale `{ts}` not supported"),
139 });
140 }
141 } else if line.starts_with("PATH") {
142 match line.split(",").count() {
143 2 => msr_divider = 1.0,
144 3 => msr_divider = 2.0,
145 cnt => {
146 return Err(InputOutputError::UnsupportedData {
147 which: format!(
148 "found {cnt} paths in TDM, only 1 or 2 are supported"
149 ),
150 })
151 }
152 }
153 }
154
155 let mut splt = line.split('=');
156 if let Some(keyword) = splt.nth(0) {
157 if let Some(value) = splt.nth(0) {
159 metadata.insert(keyword.trim().to_string(), value.trim().to_string());
160 }
161 }
162
163 continue;
164 }
165
166 if let Some((mtype, epoch, value)) = parse_measurement_line(line, time_system)? {
167 if [
168 MeasurementType::ReceiveFrequency,
169 MeasurementType::TransmitFrequency,
170 ]
171 .contains(&mtype)
172 {
173 has_freq_data = true;
174 msr_divider = 1.0;
176 }
177 measurements
178 .entry(epoch)
179 .or_insert_with(|| Measurement {
180 tracker: current_tracker.clone(),
181 epoch,
182 data: IndexMap::new(),
183 })
184 .data
185 .insert(mtype, value / msr_divider);
186 }
187 }
188
189 let mut turnaround_ratio = None;
190 let drop_freq_data;
191 if has_freq_data {
192 if let Some(ta_num_str) = metadata.get("TURNAROUND_NUMERATOR") {
194 if let Some(ta_denom_str) = metadata.get("TURNAROUND_DENOMINATOR") {
195 if let Ok(ta_num) = ta_num_str.parse::<i32>() {
196 if let Ok(ta_denom) = ta_denom_str.parse::<i32>() {
197 turnaround_ratio = Some(f64::from(ta_num) / f64::from(ta_denom));
199 info!("turn-around ratio is {ta_num}/{ta_denom}");
200 drop_freq_data = false;
201 } else {
202 error!("turn-around denominator `{ta_denom_str}` is not a valid double precision float");
203 drop_freq_data = true;
204 }
205 } else {
206 error!("turn-around numerator `{ta_num_str}` is not a valid double precision float");
207 drop_freq_data = true;
208 }
209 } else {
210 error!("required turn-around denominator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
211 drop_freq_data = true;
212 }
213 } else {
214 error!("required turn-around numerator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
215 drop_freq_data = true;
216 }
217 } else {
218 drop_freq_data = true;
219 }
220
221 let mut freq_types = IndexSet::new();
224 freq_types.insert(MeasurementType::ReceiveFrequency);
225 freq_types.insert(MeasurementType::TransmitFrequency);
226 let mut latest_transmit_freq = None;
227 let mut malformed_warning = 0;
228 for (epoch, measurement) in measurements.iter_mut() {
229 if drop_freq_data {
230 for freq in &freq_types {
231 measurement.data.swap_remove(freq);
232 }
233 continue;
234 }
235
236 let avail = measurement.availability(&freq_types);
237 let use_prev_transmit_freq;
238 let num_freq_msr = avail
239 .iter()
240 .copied()
241 .map(|v| if v { 1 } else { 0 })
242 .sum::<u8>();
243 if num_freq_msr == 0 {
244 continue;
246 } else if num_freq_msr == 1 {
247 if latest_transmit_freq.is_some() && avail[0] {
252 use_prev_transmit_freq = true;
253 if malformed_warning == 0 {
254 warn!(
255 "no transmit frequency at {epoch}, using previous value of {} Hz",
256 latest_transmit_freq.unwrap()
257 );
258 }
259 malformed_warning += 1;
260 } else {
261 warn!("only one of receive or transmit frequencies found at {epoch}, ignoring");
262 for freq in &freq_types {
263 measurement.data.swap_remove(freq);
264 }
265 continue;
266 }
267 } else {
268 use_prev_transmit_freq = false;
269 }
270
271 if !use_prev_transmit_freq {
272 latest_transmit_freq = Some(
274 *measurement
275 .data
276 .get(&MeasurementType::TransmitFrequency)
277 .unwrap(),
278 );
279 }
280
281 let transmit_freq_hz = latest_transmit_freq.unwrap();
282 let receive_freq_hz = *measurement
283 .data
284 .get(&MeasurementType::ReceiveFrequency)
285 .unwrap();
286
287 let doppler_shift_hz = transmit_freq_hz * turnaround_ratio.unwrap() - receive_freq_hz;
289 let rho_dot_km_s = (doppler_shift_hz * SPEED_OF_LIGHT_KM_S)
291 / (2.0 * transmit_freq_hz * turnaround_ratio.unwrap());
292
293 for freq in &freq_types {
295 measurement.data.swap_remove(freq);
296 }
297 measurement
298 .data
299 .insert(MeasurementType::Doppler, rho_dot_km_s);
300 }
301
302 if malformed_warning > 1 {
303 warn!("missing transmit frequency warning occured {malformed_warning} times",);
304 }
305
306 let moduli = if let Some(range_modulus) = metadata.get("RANGE_MODULUS") {
307 if let Ok(value) = range_modulus.parse::<f64>() {
308 if value > 0.0 {
309 let mut modulos = IndexMap::new();
310 modulos.insert(MeasurementType::Range, value);
311 Some(modulos)
313 } else {
314 None
316 }
317 } else {
318 warn!("could not parse RANGE_MODULUS of `{range_modulus}` as a double");
319 None
320 }
321 } else {
322 None
323 };
324
325 let trk = Self {
326 measurements,
327 source: Some(source),
328 moduli,
329 force_reject: false,
330 };
331
332 if trk.unique_types().is_empty() {
333 Err(InputOutputError::EmptyDataset {
334 action: "CCSDS TDM file",
335 })
336 } else {
337 Ok(trk)
338 }
339 }
340
341 pub fn to_tdm_file<P: AsRef<Path>>(
343 mut self,
344 spacecraft_name: String,
345 aliases: Option<HashMap<String, String>>,
346 path: P,
347 cfg: ExportCfg,
348 ) -> Result<PathBuf, InputOutputError> {
349 if self.is_empty() {
350 return Err(InputOutputError::MissingData {
351 which: " - empty tracking data cannot be exported to TDM".to_string(),
352 });
353 }
354
355 if cfg.start_epoch.is_some() && cfg.end_epoch.is_some() {
357 self = self.filter_by_epoch(cfg.start_epoch.unwrap()..cfg.end_epoch.unwrap());
358 } else if cfg.start_epoch.is_some() {
359 self = self.filter_by_epoch(cfg.start_epoch.unwrap()..);
360 } else if cfg.end_epoch.is_some() {
361 self = self.filter_by_epoch(..cfg.end_epoch.unwrap());
362 }
363
364 let tick = Epoch::now().unwrap();
365 info!("Exporting tracking data to CCSDS TDM file...");
366
367 let path_buf = cfg.actual_path(path);
369
370 let metadata = cfg.metadata.unwrap_or_default();
371
372 let file = File::create(&path_buf).context(StdIOSnafu {
373 action: "creating CCSDS TDM file for tracking arc",
374 })?;
375 let mut writer = BufWriter::new(file);
376
377 let err_hdlr = |source| InputOutputError::StdIOError {
378 source,
379 action: "writing data to TDM file",
380 };
381
382 let iso8601_no_ts = Format::from_str("%Y-%m-%dT%H:%M:%S.%f").unwrap();
384
385 writeln!(writer, "CCSDS_TDM_VERS = 2.0").map_err(err_hdlr)?;
387 writeln!(
388 writer,
389 "\nCOMMENT Build by {} -- https://nyxspace.com",
390 prj_name_ver()
391 )
392 .map_err(err_hdlr)?;
393 writeln!(
394 writer,
395 "COMMENT Nyx Space provided under the AGPL v3 open source license -- https://nyxspace.com/pricing\n"
396 )
397 .map_err(err_hdlr)?;
398 writeln!(
399 writer,
400 "CREATION_DATE = {}",
401 Formatter::new(Epoch::now().unwrap(), iso8601_no_ts)
402 )
403 .map_err(err_hdlr)?;
404 writeln!(
405 writer,
406 "ORIGINATOR = {}\n",
407 metadata
408 .get("originator")
409 .unwrap_or(&"Nyx Space".to_string())
410 )
411 .map_err(err_hdlr)?;
412
413 let trackers = self.unique_aliases();
416
417 for tracker in trackers {
418 let tracker_data = self.clone().filter_by_tracker(tracker.clone());
419
420 let types = tracker_data.unique_types();
421
422 let two_way_types = types
423 .iter()
424 .filter(|msr_type| msr_type.may_be_two_way())
425 .copied()
426 .collect::<Vec<_>>();
427
428 let one_way_types = types
429 .iter()
430 .filter(|msr_type| !msr_type.may_be_two_way())
431 .copied()
432 .collect::<Vec<_>>();
433
434 for (tno, types) in [two_way_types, one_way_types].iter().enumerate() {
436 writeln!(writer, "META_START").map_err(err_hdlr)?;
437 writeln!(writer, "\tTIME_SYSTEM = UTC").map_err(err_hdlr)?;
438 writeln!(
439 writer,
440 "\tSTART_TIME = {}",
441 Formatter::new(tracker_data.start_epoch().unwrap(), iso8601_no_ts)
442 )
443 .map_err(err_hdlr)?;
444 writeln!(
445 writer,
446 "\tSTOP_TIME = {}",
447 Formatter::new(tracker_data.end_epoch().unwrap(), iso8601_no_ts)
448 )
449 .map_err(err_hdlr)?;
450
451 let multiplier = if tno == 0 {
452 writeln!(writer, "\tPATH = 1,2,1").map_err(err_hdlr)?;
453 2.0
454 } else {
455 writeln!(writer, "\tPATH = 1,2").map_err(err_hdlr)?;
456 1.0
457 };
458
459 writeln!(
460 writer,
461 "\tPARTICIPANT_1 = {}",
462 if let Some(aliases) = &aliases {
463 if let Some(alias) = aliases.get(&tracker) {
464 alias
465 } else {
466 &tracker
467 }
468 } else {
469 &tracker
470 }
471 )
472 .map_err(err_hdlr)?;
473
474 writeln!(writer, "\tPARTICIPANT_2 = {spacecraft_name}").map_err(err_hdlr)?;
475
476 writeln!(writer, "\tMODE = SEQUENTIAL").map_err(err_hdlr)?;
477
478 for (k, v) in &metadata {
480 if k != "originator" {
481 writeln!(writer, "\t{k} = {v}").map_err(err_hdlr)?;
482 }
483 }
484
485 if types.contains(&MeasurementType::Range) {
486 writeln!(writer, "\tRANGE_UNITS = km").map_err(err_hdlr)?;
487
488 if let Some(moduli) = &self.moduli {
489 if let Some(range_modulus) = moduli.get(&MeasurementType::Range) {
490 writeln!(writer, "\tRANGE_MODULUS = {range_modulus:E}")
491 .map_err(err_hdlr)?;
492 }
493 }
494 }
495
496 if types.contains(&MeasurementType::Azimuth)
497 || types.contains(&MeasurementType::Elevation)
498 {
499 writeln!(writer, "\tANGLE_TYPE = AZEL").map_err(err_hdlr)?;
500 }
501
502 writeln!(writer, "META_STOP\n").map_err(err_hdlr)?;
503
504 writeln!(writer, "DATA_START").map_err(err_hdlr)?;
506
507 for (epoch, measurement) in &tracker_data.measurements {
509 for (mtype, value) in &measurement.data {
510 if !types.contains(mtype) {
511 continue;
512 }
513 let type_str = match mtype {
514 MeasurementType::Range => "RANGE",
515 MeasurementType::Doppler => "DOPPLER_INTEGRATED",
516 MeasurementType::Azimuth => "ANGLE_1",
517 MeasurementType::Elevation => "ANGLE_2",
518 MeasurementType::ReceiveFrequency => "RECEIVE_FREQ",
519 MeasurementType::TransmitFrequency => "TRANSMIT_FREQ",
520 };
521
522 writeln!(
523 writer,
524 "\t{:<20} = {:<23}\t{:.12}",
525 type_str,
526 Formatter::new(*epoch, iso8601_no_ts),
527 value * multiplier
528 )
529 .map_err(err_hdlr)?;
530 }
531 }
532
533 writeln!(writer, "DATA_STOP\n").map_err(err_hdlr)?;
534 }
535 }
536
537 #[allow(clippy::writeln_empty_string)]
538 writeln!(writer, "").map_err(err_hdlr)?;
539
540 let tock_time = Epoch::now().unwrap() - tick;
542 info!("CCSDS TDM written to {} in {tock_time}", path_buf.display());
543 Ok(path_buf)
544 }
545}
546
547fn parse_measurement_line(
548 line: &str,
549 time_system: TimeScale,
550) -> Result<Option<(MeasurementType, Epoch, f64)>, InputOutputError> {
551 let parts: Vec<&str> = line.split('=').collect();
552 if parts.len() != 2 {
553 return Ok(None);
554 }
555
556 let (mtype_str, data) = (parts[0].trim(), parts[1].trim());
557 let mtype = match mtype_str {
558 "RANGE" => MeasurementType::Range,
559 "DOPPLER_INSTANTANEOUS" | "DOPPLER_INTEGRATED" => MeasurementType::Doppler,
560 "ANGLE_1" => MeasurementType::Azimuth,
561 "ANGLE_2" => MeasurementType::Elevation,
562 "RECEIVE_FREQ" | "RECEIVE_FREQ_1" | "RECEIVE_FREQ_2" | "RECEIVE_FREQ_3"
563 | "RECEIVE_FREQ_4" | "RECEIVE_FREQ_5" => MeasurementType::ReceiveFrequency,
564 "TRANSMIT_FREQ" | "TRANSMIT_FREQ_1" | "TRANSMIT_FREQ_2" | "TRANSMIT_FREQ_3"
565 | "TRANSMIT_FREQ_4" | "TRANSMIT_FREQ_5" => MeasurementType::TransmitFrequency,
566 _ => {
567 return Err(InputOutputError::UnsupportedData {
568 which: mtype_str.to_string(),
569 })
570 }
571 };
572
573 let data_parts: Vec<&str> = data.split_whitespace().collect();
574 if data_parts.len() != 2 {
575 return Ok(None);
576 }
577
578 let epoch =
579 Epoch::from_gregorian_str(&format!("{} {time_system}", data_parts[0])).map_err(|e| {
580 InputOutputError::Inconsistency {
581 msg: format!("{e} when parsing epoch"),
582 }
583 })?;
584
585 let value = data_parts[1]
586 .parse::<f64>()
587 .map_err(|e| InputOutputError::UnsupportedData {
588 which: format!("`{}` is not a float: {e}", data_parts[1]),
589 })?;
590
591 Ok(Some((mtype, epoch, value)))
592}