nyx_space/od/msr/trackingdata/
io_ccsds_tdm.rs1use crate::io::watermark::prj_name_ver;
20use crate::io::ExportCfg;
21use crate::io::{InputOutputError, StdIOSnafu};
22use crate::od::msr::{Measurement, MeasurementType};
23use anise::constants::SPEED_OF_LIGHT_KM_S;
24use hifitime::efmt::{Format, Formatter};
25use hifitime::prelude::Epoch;
26use hifitime::TimeScale;
27use indexmap::{IndexMap, IndexSet};
28use log::{error, info, warn};
29use snafu::ResultExt;
30use std::collections::{BTreeMap, HashMap};
31use std::fs::File;
32use std::io::Write;
33use std::io::{BufRead, BufReader, BufWriter};
34use std::path::{Path, PathBuf};
35use std::str::FromStr;
36
37use super::TrackingDataArc;
38
39impl TrackingDataArc {
40 pub fn from_tdm<P: AsRef<Path>>(
88 path: P,
89 aliases: Option<HashMap<String, String>>,
90 ) -> Result<Self, InputOutputError> {
91 let file = File::open(&path).context(StdIOSnafu {
92 action: "opening CCSDS TDM file for tracking arc",
93 })?;
94
95 let source = path.as_ref().to_path_buf().display().to_string();
96 info!("parsing CCSDS TDM {source}");
97
98 let mut measurements = BTreeMap::new();
99 let mut metadata = HashMap::new();
100
101 let reader = BufReader::new(file);
102
103 let mut in_data_section = false;
104 let mut current_tracker = String::new();
105 let mut time_system = TimeScale::UTC;
106 let mut has_freq_data = false;
107 let mut msr_divider = 1.0;
108
109 for line in reader.lines() {
110 let line = line.context(StdIOSnafu {
111 action: "reading CCSDS TDM file",
112 })?;
113 let line = line.trim();
114
115 if line == "DATA_START" {
116 in_data_section = true;
117 continue;
118 } else if line == "DATA_STOP" {
119 in_data_section = false;
120 }
121
122 if !in_data_section {
123 if line.starts_with("PARTICIPANT_1") {
124 current_tracker = line.split('=').nth(1).unwrap_or("").trim().to_string();
125 if let Some(aliases) = &aliases {
127 if let Some(alias) = aliases.get(¤t_tracker) {
128 current_tracker = alias.clone();
129 }
130 }
131 } else if line.starts_with("TIME_SYSTEM") {
132 let ts = line.split('=').nth(1).unwrap_or("UTC").trim();
133 if let Ok(ts) = TimeScale::from_str(ts) {
135 time_system = ts;
136 } else {
137 return Err(InputOutputError::UnsupportedData {
138 which: format!("time scale `{ts}` not supported"),
139 });
140 }
141 } else if line.starts_with("PATH") {
142 match line.split(",").count() {
143 2 => msr_divider = 1.0,
144 3 => msr_divider = 2.0,
145 cnt => {
146 return Err(InputOutputError::UnsupportedData {
147 which: format!(
148 "found {cnt} paths in TDM, only 1 or 2 are supported"
149 ),
150 })
151 }
152 }
153 }
154
155 let mut splt = line.split('=');
156 if let Some(keyword) = splt.nth(0) {
157 if let Some(value) = splt.nth(0) {
159 metadata.insert(keyword.trim().to_string(), value.trim().to_string());
160 }
161 }
162
163 continue;
164 }
165
166 if let Some((mtype, epoch, value)) = parse_measurement_line(line, time_system)? {
167 if [
168 MeasurementType::ReceiveFrequency,
169 MeasurementType::TransmitFrequency,
170 ]
171 .contains(&mtype)
172 {
173 has_freq_data = true;
174 msr_divider = 1.0;
176 }
177 measurements
178 .entry(epoch)
179 .or_insert_with(|| Measurement {
180 tracker: current_tracker.clone(),
181 epoch,
182 data: IndexMap::new(),
183 rejected: false,
184 })
185 .data
186 .insert(mtype, value / msr_divider);
187 }
188 }
189
190 let mut turnaround_ratio = None;
191 let drop_freq_data;
192 if has_freq_data {
193 if let Some(ta_num_str) = metadata.get("TURNAROUND_NUMERATOR") {
195 if let Some(ta_denom_str) = metadata.get("TURNAROUND_DENOMINATOR") {
196 if let Ok(ta_num) = ta_num_str.parse::<i32>() {
197 if let Ok(ta_denom) = ta_denom_str.parse::<i32>() {
198 turnaround_ratio = Some(f64::from(ta_num) / f64::from(ta_denom));
200 info!("turn-around ratio is {ta_num}/{ta_denom}");
201 drop_freq_data = false;
202 } else {
203 error!("turn-around denominator `{ta_denom_str}` is not a valid double precision float");
204 drop_freq_data = true;
205 }
206 } else {
207 error!("turn-around numerator `{ta_num_str}` is not a valid double precision float");
208 drop_freq_data = true;
209 }
210 } else {
211 error!("required turn-around denominator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
212 drop_freq_data = true;
213 }
214 } else {
215 error!("required turn-around numerator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
216 drop_freq_data = true;
217 }
218 } else {
219 drop_freq_data = true;
220 }
221
222 let corrections_applied = if let Some(corr_flag) = metadata.get("CORRECTIONS_APPLIED") {
223 match corr_flag.trim().to_lowercase().as_str() {
224 "no" => false,
225 "yes" => true,
226 _ => {
227 warn!("invalid CORRECTIONS_APPLIED `{corr_flag}`");
228 true
229 }
230 }
231 } else {
232 true
233 };
234
235 let mut freq_types = IndexSet::new();
238 freq_types.insert(MeasurementType::ReceiveFrequency);
239 freq_types.insert(MeasurementType::TransmitFrequency);
240
241 let mut latest_transmit_freq = None;
242 let mut malformed_warning = 0;
243
244 let mut all_applied_corrections = IndexSet::new();
245
246 for (epoch, measurement) in measurements.iter_mut() {
247 if !corrections_applied {
249 for msr_type in [
250 MeasurementType::Range,
251 MeasurementType::Doppler,
252 MeasurementType::Azimuth,
253 MeasurementType::Elevation,
254 MeasurementType::ReceiveFrequency,
255 MeasurementType::TransmitFrequency,
256 ] {
257 let kw = format!("CORRECTION_{}", msr_type.ccsds_tdm_name());
258 if let Some(correction_str) = metadata.get(&kw) {
259 if let Ok(correction) = correction_str.parse::<f64>() {
260 measurement.correct(msr_type, correction);
261 all_applied_corrections.insert(msr_type);
262 } else {
263 warn!("invalid correction value for {kw}");
264 }
265 }
266 }
267 }
268
269 if drop_freq_data {
270 for freq in &freq_types {
271 measurement.data.swap_remove(freq);
272 }
273 continue;
274 }
275
276 let avail = measurement.availability(&freq_types);
277 let use_prev_transmit_freq: bool;
278 let num_freq_msr = avail
279 .iter()
280 .copied()
281 .map(|v| if v { 1 } else { 0 })
282 .sum::<u8>();
283 if num_freq_msr == 0 {
284 continue;
286 } else if num_freq_msr == 1 {
287 if let Some(latest_transmit_freq) = latest_transmit_freq {
292 if avail[0] {
293 use_prev_transmit_freq = true;
294 if malformed_warning == 0 {
295 warn!(
296 "no transmit frequency at {epoch}, using {latest_transmit_freq} Hz",
297 );
298 }
299 malformed_warning += 1;
300 } else {
301 use_prev_transmit_freq = false;
302 }
303 } else {
304 warn!("only one of receive or transmit frequencies found at {epoch}, ignoring");
305 for freq in &freq_types {
306 measurement.data.swap_remove(freq);
307 }
308 continue;
309 }
310 } else {
311 use_prev_transmit_freq = false;
312 }
313
314 if !use_prev_transmit_freq {
315 latest_transmit_freq = Some(
317 *measurement
318 .data
319 .get(&MeasurementType::TransmitFrequency)
320 .unwrap(),
321 );
322 }
323
324 let transmit_freq_hz = latest_transmit_freq.unwrap();
325 let receive_freq_hz = *measurement
326 .data
327 .get(&MeasurementType::ReceiveFrequency)
328 .unwrap();
329
330 let doppler_shift_hz = transmit_freq_hz * turnaround_ratio.unwrap() - receive_freq_hz;
332 let rho_dot_km_s = (doppler_shift_hz * SPEED_OF_LIGHT_KM_S)
334 / (2.0 * transmit_freq_hz * turnaround_ratio.unwrap());
335
336 for freq in &freq_types {
338 measurement.data.swap_remove(freq);
339 }
340 measurement
341 .data
342 .insert(MeasurementType::Doppler, rho_dot_km_s);
343 }
344
345 if malformed_warning > 1 {
346 warn!("missing transmit frequency warning occured {malformed_warning} times",);
347 }
348
349 if !all_applied_corrections.is_empty() {
350 info!("applied corrections for {all_applied_corrections:?}");
351 }
352
353 let moduli = if let Some(range_modulus) = metadata.get("RANGE_MODULUS") {
354 if let Ok(value) = range_modulus.parse::<f64>() {
355 if value > 0.0 {
356 let mut modulos = IndexMap::new();
357 modulos.insert(MeasurementType::Range, value);
358 Some(modulos)
360 } else {
361 None
363 }
364 } else {
365 warn!("could not parse RANGE_MODULUS of `{range_modulus}` as a double");
366 None
367 }
368 } else {
369 None
370 };
371
372 let trk = Self {
373 measurements,
374 source: Some(source),
375 moduli,
376 force_reject: false,
377 };
378
379 if trk.unique_types().is_empty() {
380 Err(InputOutputError::EmptyDataset {
381 action: "CCSDS TDM file",
382 })
383 } else {
384 Ok(trk)
385 }
386 }
387
388 pub fn to_tdm_file<P: AsRef<Path>>(
390 mut self,
391 spacecraft_name: String,
392 aliases: Option<HashMap<String, String>>,
393 path: P,
394 cfg: ExportCfg,
395 ) -> Result<PathBuf, InputOutputError> {
396 if self.is_empty() {
397 return Err(InputOutputError::MissingData {
398 which: " - empty tracking data cannot be exported to TDM".to_string(),
399 });
400 }
401
402 if let Some(start_epoch) = cfg.start_epoch {
404 if let Some(end_epoch) = cfg.end_epoch {
405 self = self.filter_by_epoch(start_epoch..end_epoch);
406 } else {
407 self = self.filter_by_epoch(start_epoch..);
408 }
409 } else if let Some(end_epoch) = cfg.end_epoch {
410 self = self.filter_by_epoch(..end_epoch);
411 }
412
413 let tick = Epoch::now().unwrap();
414 info!("Exporting tracking data to CCSDS TDM file...");
415
416 let path_buf = cfg.actual_path(path);
418
419 let metadata = cfg.metadata.unwrap_or_default();
420
421 let file = File::create(&path_buf).context(StdIOSnafu {
422 action: "creating CCSDS TDM file for tracking arc",
423 })?;
424 let mut writer = BufWriter::new(file);
425
426 let err_hdlr = |source| InputOutputError::StdIOError {
427 source,
428 action: "writing data to TDM file",
429 };
430
431 let iso8601_no_ts = Format::from_str("%Y-%m-%dT%H:%M:%S.%f").unwrap();
433
434 writeln!(writer, "CCSDS_TDM_VERS = 2.0").map_err(err_hdlr)?;
436 writeln!(
437 writer,
438 "\nCOMMENT Build by {} -- https://nyxspace.com",
439 prj_name_ver()
440 )
441 .map_err(err_hdlr)?;
442 writeln!(
443 writer,
444 "COMMENT Nyx Space provided under the AGPL v3 open source license -- https://nyxspace.com/pricing\n"
445 )
446 .map_err(err_hdlr)?;
447 writeln!(
448 writer,
449 "CREATION_DATE = {}",
450 Formatter::new(Epoch::now().unwrap(), iso8601_no_ts)
451 )
452 .map_err(err_hdlr)?;
453 writeln!(
454 writer,
455 "ORIGINATOR = {}\n",
456 metadata
457 .get("originator")
458 .unwrap_or(&"Nyx Space".to_string())
459 )
460 .map_err(err_hdlr)?;
461
462 let trackers = self.unique_aliases();
465
466 for tracker in trackers {
467 let tracker_data = self.clone().filter_by_tracker(tracker.clone());
468
469 let types = tracker_data.unique_types();
470
471 let two_way_types = types
472 .iter()
473 .filter(|msr_type| msr_type.may_be_two_way())
474 .copied()
475 .collect::<Vec<_>>();
476
477 let one_way_types = types
478 .iter()
479 .filter(|msr_type| !msr_type.may_be_two_way())
480 .copied()
481 .collect::<Vec<_>>();
482
483 for (tno, types) in [two_way_types, one_way_types].iter().enumerate() {
485 writeln!(writer, "META_START").map_err(err_hdlr)?;
486 writeln!(writer, "\tTIME_SYSTEM = UTC").map_err(err_hdlr)?;
487 writeln!(
488 writer,
489 "\tSTART_TIME = {}",
490 Formatter::new(tracker_data.start_epoch().unwrap(), iso8601_no_ts)
491 )
492 .map_err(err_hdlr)?;
493 writeln!(
494 writer,
495 "\tSTOP_TIME = {}",
496 Formatter::new(tracker_data.end_epoch().unwrap(), iso8601_no_ts)
497 )
498 .map_err(err_hdlr)?;
499
500 let multiplier = if tno == 0 {
501 writeln!(writer, "\tPATH = 1,2,1").map_err(err_hdlr)?;
502 2.0
503 } else {
504 writeln!(writer, "\tPATH = 1,2").map_err(err_hdlr)?;
505 1.0
506 };
507
508 writeln!(
509 writer,
510 "\tPARTICIPANT_1 = {}",
511 if let Some(aliases) = &aliases {
512 if let Some(alias) = aliases.get(&tracker) {
513 alias
514 } else {
515 &tracker
516 }
517 } else {
518 &tracker
519 }
520 )
521 .map_err(err_hdlr)?;
522
523 writeln!(writer, "\tPARTICIPANT_2 = {spacecraft_name}").map_err(err_hdlr)?;
524
525 writeln!(writer, "\tMODE = SEQUENTIAL").map_err(err_hdlr)?;
526
527 for (k, v) in &metadata {
529 if k != "originator" {
530 writeln!(writer, "\t{k} = {v}").map_err(err_hdlr)?;
531 }
532 }
533
534 if types.contains(&MeasurementType::Range) {
535 writeln!(writer, "\tRANGE_UNITS = km").map_err(err_hdlr)?;
536
537 if let Some(moduli) = &self.moduli {
538 if let Some(range_modulus) = moduli.get(&MeasurementType::Range) {
539 writeln!(writer, "\tRANGE_MODULUS = {range_modulus:E}")
540 .map_err(err_hdlr)?;
541 }
542 }
543 }
544
545 if types.contains(&MeasurementType::Azimuth)
546 || types.contains(&MeasurementType::Elevation)
547 {
548 writeln!(writer, "\tANGLE_TYPE = AZEL").map_err(err_hdlr)?;
549 }
550
551 writeln!(writer, "META_STOP\n").map_err(err_hdlr)?;
552
553 writeln!(writer, "DATA_START").map_err(err_hdlr)?;
555
556 for (epoch, measurement) in &tracker_data.measurements {
558 for (mtype, value) in &measurement.data {
559 if !types.contains(mtype) {
560 continue;
561 }
562
563 writeln!(
564 writer,
565 "\t{:<20} = {:<23}\t{:.12}",
566 mtype.ccsds_tdm_name(),
567 Formatter::new(*epoch, iso8601_no_ts),
568 value * multiplier
569 )
570 .map_err(err_hdlr)?;
571 }
572 }
573
574 writeln!(writer, "DATA_STOP\n").map_err(err_hdlr)?;
575 }
576 }
577
578 #[allow(clippy::writeln_empty_string)]
579 writeln!(writer, "").map_err(err_hdlr)?;
580
581 let tock_time = Epoch::now().unwrap() - tick;
583 info!("CCSDS TDM written to {} in {tock_time}", path_buf.display());
584 Ok(path_buf)
585 }
586}
587
588fn parse_measurement_line(
589 line: &str,
590 time_system: TimeScale,
591) -> Result<Option<(MeasurementType, Epoch, f64)>, InputOutputError> {
592 let parts: Vec<&str> = line.split('=').collect();
593 if parts.len() != 2 {
594 return Ok(None);
595 }
596
597 let (mtype_str, data) = (parts[0].trim(), parts[1].trim());
598 let mtype = match mtype_str {
599 "RANGE" => MeasurementType::Range,
600 "DOPPLER_INSTANTANEOUS" | "DOPPLER_INTEGRATED" => MeasurementType::Doppler,
601 "ANGLE_1" => MeasurementType::Azimuth,
602 "ANGLE_2" => MeasurementType::Elevation,
603 "RECEIVE_FREQ" | "RECEIVE_FREQ_1" | "RECEIVE_FREQ_2" | "RECEIVE_FREQ_3"
604 | "RECEIVE_FREQ_4" | "RECEIVE_FREQ_5" => MeasurementType::ReceiveFrequency,
605 "TRANSMIT_FREQ" | "TRANSMIT_FREQ_1" | "TRANSMIT_FREQ_2" | "TRANSMIT_FREQ_3"
606 | "TRANSMIT_FREQ_4" | "TRANSMIT_FREQ_5" => MeasurementType::TransmitFrequency,
607 _ => {
608 return Err(InputOutputError::UnsupportedData {
609 which: mtype_str.to_string(),
610 })
611 }
612 };
613
614 let data_parts: Vec<&str> = data.split_whitespace().collect();
615 if data_parts.len() != 2 {
616 return Ok(None);
617 }
618
619 let epoch =
620 Epoch::from_gregorian_str(&format!("{} {time_system}", data_parts[0])).map_err(|e| {
621 InputOutputError::Inconsistency {
622 msg: format!("{e} when parsing epoch"),
623 }
624 })?;
625
626 let value = data_parts[1]
627 .parse::<f64>()
628 .map_err(|e| InputOutputError::UnsupportedData {
629 which: format!("`{}` is not a float: {e}", data_parts[1]),
630 })?;
631
632 Ok(Some((mtype, epoch, value)))
633}