nyx_space/od/msr/trackingdata/
io_ccsds_tdm.rs1use crate::io::watermark::prj_name_ver;
20use crate::io::ExportCfg;
21use crate::io::{InputOutputError, StdIOSnafu};
22use crate::od::msr::{Measurement, MeasurementType};
23use anise::constants::SPEED_OF_LIGHT_KM_S;
24use hifitime::efmt::{Format, Formatter};
25use hifitime::prelude::Epoch;
26use hifitime::TimeScale;
27use indexmap::{IndexMap, IndexSet};
28use log::{error, info, warn};
29use snafu::ResultExt;
30use std::collections::{BTreeMap, HashMap};
31use std::fs::File;
32use std::io::Write;
33use std::io::{BufRead, BufReader, BufWriter};
34use std::path::{Path, PathBuf};
35use std::str::FromStr;
36
37use super::TrackingDataArc;
38
39impl TrackingDataArc {
40 pub fn from_tdm<P: AsRef<Path>>(
88 path: P,
89 aliases: Option<HashMap<String, String>>,
90 ) -> Result<Self, InputOutputError> {
91 let file = File::open(&path).context(StdIOSnafu {
92 action: "opening CCSDS TDM file for tracking arc",
93 })?;
94
95 let source = path.as_ref().to_path_buf().display().to_string();
96 info!("parsing CCSDS TDM {source}");
97
98 let mut measurements = BTreeMap::new();
99 let mut metadata = HashMap::new();
100
101 let reader = BufReader::new(file);
102
103 let mut in_data_section = false;
104 let mut current_tracker = String::new();
105 let mut time_system = TimeScale::UTC;
106 let mut has_freq_data = false;
107 let mut msr_divider = 1.0;
108
109 for line in reader.lines() {
110 let line = line.context(StdIOSnafu {
111 action: "reading CCSDS TDM file",
112 })?;
113 let line = line.trim();
114
115 if line == "DATA_START" {
116 in_data_section = true;
117 continue;
118 } else if line == "DATA_STOP" {
119 in_data_section = false;
120 }
121
122 if !in_data_section {
123 if line.starts_with("PARTICIPANT_1") {
124 current_tracker = line.split('=').nth(1).unwrap_or("").trim().to_string();
125 if let Some(aliases) = &aliases {
127 if let Some(alias) = aliases.get(¤t_tracker) {
128 current_tracker = alias.clone();
129 }
130 }
131 } else if line.starts_with("TIME_SYSTEM") {
132 let ts = line.split('=').nth(1).unwrap_or("UTC").trim();
133 if let Ok(ts) = TimeScale::from_str(ts) {
135 time_system = ts;
136 } else {
137 return Err(InputOutputError::UnsupportedData {
138 which: format!("time scale `{ts}` not supported"),
139 });
140 }
141 } else if line.starts_with("PATH") {
142 match line.split(",").count() {
143 2 => msr_divider = 1.0,
144 3 => msr_divider = 2.0,
145 cnt => {
146 return Err(InputOutputError::UnsupportedData {
147 which: format!(
148 "found {cnt} paths in TDM, only 1 or 2 are supported"
149 ),
150 })
151 }
152 }
153 }
154
155 let mut splt = line.split('=');
156 if let Some(keyword) = splt.nth(0) {
157 if let Some(value) = splt.nth(0) {
159 metadata.insert(keyword.trim().to_string(), value.trim().to_string());
160 }
161 }
162
163 continue;
164 }
165
166 if let Some((mtype, epoch, value)) = parse_measurement_line(line, time_system)? {
167 if [
168 MeasurementType::ReceiveFrequency,
169 MeasurementType::TransmitFrequency,
170 ]
171 .contains(&mtype)
172 {
173 has_freq_data = true;
174 msr_divider = 1.0;
176 }
177 measurements
178 .entry(epoch)
179 .or_insert_with(|| Measurement {
180 tracker: current_tracker.clone(),
181 epoch,
182 data: IndexMap::new(),
183 rejected: false,
184 })
185 .data
186 .insert(mtype, value / msr_divider);
187 }
188 }
189
190 let mut turnaround_ratio = None;
191 let drop_freq_data;
192 if has_freq_data {
193 if let Some(ta_num_str) = metadata.get("TURNAROUND_NUMERATOR") {
195 if let Some(ta_denom_str) = metadata.get("TURNAROUND_DENOMINATOR") {
196 if let Ok(ta_num) = ta_num_str.parse::<i32>() {
197 if let Ok(ta_denom) = ta_denom_str.parse::<i32>() {
198 turnaround_ratio = Some(f64::from(ta_num) / f64::from(ta_denom));
200 info!("turn-around ratio is {ta_num}/{ta_denom}");
201 drop_freq_data = false;
202 } else {
203 error!("turn-around denominator `{ta_denom_str}` is not a valid double precision float");
204 drop_freq_data = true;
205 }
206 } else {
207 error!("turn-around numerator `{ta_num_str}` is not a valid double precision float");
208 drop_freq_data = true;
209 }
210 } else {
211 error!("required turn-around denominator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
212 drop_freq_data = true;
213 }
214 } else {
215 error!("required turn-around numerator missing from metadata -- dropping ALL RECEIVE/TRANSMIT data");
216 drop_freq_data = true;
217 }
218 } else {
219 drop_freq_data = true;
220 }
221
222 let mut freq_types = IndexSet::new();
225 freq_types.insert(MeasurementType::ReceiveFrequency);
226 freq_types.insert(MeasurementType::TransmitFrequency);
227 let mut latest_transmit_freq = None;
228 let mut malformed_warning = 0;
229 for (epoch, measurement) in measurements.iter_mut() {
230 if drop_freq_data {
231 for freq in &freq_types {
232 measurement.data.swap_remove(freq);
233 }
234 continue;
235 }
236
237 let avail = measurement.availability(&freq_types);
238 let use_prev_transmit_freq: bool;
239 let num_freq_msr = avail
240 .iter()
241 .copied()
242 .map(|v| if v { 1 } else { 0 })
243 .sum::<u8>();
244 if num_freq_msr == 0 {
245 continue;
247 } else if num_freq_msr == 1 {
248 if let Some(latest_transmit_freq) = latest_transmit_freq {
253 if avail[0] {
254 use_prev_transmit_freq = true;
255 if malformed_warning == 0 {
256 warn!(
257 "no transmit frequency at {epoch}, using {latest_transmit_freq} Hz",
258 );
259 }
260 malformed_warning += 1;
261 } else {
262 use_prev_transmit_freq = false;
263 }
264 } else {
265 warn!("only one of receive or transmit frequencies found at {epoch}, ignoring");
266 for freq in &freq_types {
267 measurement.data.swap_remove(freq);
268 }
269 continue;
270 }
271 } else {
272 use_prev_transmit_freq = false;
273 }
274
275 if !use_prev_transmit_freq {
276 latest_transmit_freq = Some(
278 *measurement
279 .data
280 .get(&MeasurementType::TransmitFrequency)
281 .unwrap(),
282 );
283 }
284
285 let transmit_freq_hz = latest_transmit_freq.unwrap();
286 let receive_freq_hz = *measurement
287 .data
288 .get(&MeasurementType::ReceiveFrequency)
289 .unwrap();
290
291 let doppler_shift_hz = transmit_freq_hz * turnaround_ratio.unwrap() - receive_freq_hz;
293 let rho_dot_km_s = (doppler_shift_hz * SPEED_OF_LIGHT_KM_S)
295 / (2.0 * transmit_freq_hz * turnaround_ratio.unwrap());
296
297 for freq in &freq_types {
299 measurement.data.swap_remove(freq);
300 }
301 measurement
302 .data
303 .insert(MeasurementType::Doppler, rho_dot_km_s);
304 }
305
306 if malformed_warning > 1 {
307 warn!("missing transmit frequency warning occured {malformed_warning} times",);
308 }
309
310 let moduli = if let Some(range_modulus) = metadata.get("RANGE_MODULUS") {
311 if let Ok(value) = range_modulus.parse::<f64>() {
312 if value > 0.0 {
313 let mut modulos = IndexMap::new();
314 modulos.insert(MeasurementType::Range, value);
315 Some(modulos)
317 } else {
318 None
320 }
321 } else {
322 warn!("could not parse RANGE_MODULUS of `{range_modulus}` as a double");
323 None
324 }
325 } else {
326 None
327 };
328
329 let trk = Self {
330 measurements,
331 source: Some(source),
332 moduli,
333 force_reject: false,
334 };
335
336 if trk.unique_types().is_empty() {
337 Err(InputOutputError::EmptyDataset {
338 action: "CCSDS TDM file",
339 })
340 } else {
341 Ok(trk)
342 }
343 }
344
345 pub fn to_tdm_file<P: AsRef<Path>>(
347 mut self,
348 spacecraft_name: String,
349 aliases: Option<HashMap<String, String>>,
350 path: P,
351 cfg: ExportCfg,
352 ) -> Result<PathBuf, InputOutputError> {
353 if self.is_empty() {
354 return Err(InputOutputError::MissingData {
355 which: " - empty tracking data cannot be exported to TDM".to_string(),
356 });
357 }
358
359 if let Some(start_epoch) = cfg.start_epoch {
361 if let Some(end_epoch) = cfg.end_epoch {
362 self = self.filter_by_epoch(start_epoch..end_epoch);
363 } else {
364 self = self.filter_by_epoch(start_epoch..);
365 }
366 } else if let Some(end_epoch) = cfg.end_epoch {
367 self = self.filter_by_epoch(..end_epoch);
368 }
369
370 let tick = Epoch::now().unwrap();
371 info!("Exporting tracking data to CCSDS TDM file...");
372
373 let path_buf = cfg.actual_path(path);
375
376 let metadata = cfg.metadata.unwrap_or_default();
377
378 let file = File::create(&path_buf).context(StdIOSnafu {
379 action: "creating CCSDS TDM file for tracking arc",
380 })?;
381 let mut writer = BufWriter::new(file);
382
383 let err_hdlr = |source| InputOutputError::StdIOError {
384 source,
385 action: "writing data to TDM file",
386 };
387
388 let iso8601_no_ts = Format::from_str("%Y-%m-%dT%H:%M:%S.%f").unwrap();
390
391 writeln!(writer, "CCSDS_TDM_VERS = 2.0").map_err(err_hdlr)?;
393 writeln!(
394 writer,
395 "\nCOMMENT Build by {} -- https://nyxspace.com",
396 prj_name_ver()
397 )
398 .map_err(err_hdlr)?;
399 writeln!(
400 writer,
401 "COMMENT Nyx Space provided under the AGPL v3 open source license -- https://nyxspace.com/pricing\n"
402 )
403 .map_err(err_hdlr)?;
404 writeln!(
405 writer,
406 "CREATION_DATE = {}",
407 Formatter::new(Epoch::now().unwrap(), iso8601_no_ts)
408 )
409 .map_err(err_hdlr)?;
410 writeln!(
411 writer,
412 "ORIGINATOR = {}\n",
413 metadata
414 .get("originator")
415 .unwrap_or(&"Nyx Space".to_string())
416 )
417 .map_err(err_hdlr)?;
418
419 let trackers = self.unique_aliases();
422
423 for tracker in trackers {
424 let tracker_data = self.clone().filter_by_tracker(tracker.clone());
425
426 let types = tracker_data.unique_types();
427
428 let two_way_types = types
429 .iter()
430 .filter(|msr_type| msr_type.may_be_two_way())
431 .copied()
432 .collect::<Vec<_>>();
433
434 let one_way_types = types
435 .iter()
436 .filter(|msr_type| !msr_type.may_be_two_way())
437 .copied()
438 .collect::<Vec<_>>();
439
440 for (tno, types) in [two_way_types, one_way_types].iter().enumerate() {
442 writeln!(writer, "META_START").map_err(err_hdlr)?;
443 writeln!(writer, "\tTIME_SYSTEM = UTC").map_err(err_hdlr)?;
444 writeln!(
445 writer,
446 "\tSTART_TIME = {}",
447 Formatter::new(tracker_data.start_epoch().unwrap(), iso8601_no_ts)
448 )
449 .map_err(err_hdlr)?;
450 writeln!(
451 writer,
452 "\tSTOP_TIME = {}",
453 Formatter::new(tracker_data.end_epoch().unwrap(), iso8601_no_ts)
454 )
455 .map_err(err_hdlr)?;
456
457 let multiplier = if tno == 0 {
458 writeln!(writer, "\tPATH = 1,2,1").map_err(err_hdlr)?;
459 2.0
460 } else {
461 writeln!(writer, "\tPATH = 1,2").map_err(err_hdlr)?;
462 1.0
463 };
464
465 writeln!(
466 writer,
467 "\tPARTICIPANT_1 = {}",
468 if let Some(aliases) = &aliases {
469 if let Some(alias) = aliases.get(&tracker) {
470 alias
471 } else {
472 &tracker
473 }
474 } else {
475 &tracker
476 }
477 )
478 .map_err(err_hdlr)?;
479
480 writeln!(writer, "\tPARTICIPANT_2 = {spacecraft_name}").map_err(err_hdlr)?;
481
482 writeln!(writer, "\tMODE = SEQUENTIAL").map_err(err_hdlr)?;
483
484 for (k, v) in &metadata {
486 if k != "originator" {
487 writeln!(writer, "\t{k} = {v}").map_err(err_hdlr)?;
488 }
489 }
490
491 if types.contains(&MeasurementType::Range) {
492 writeln!(writer, "\tRANGE_UNITS = km").map_err(err_hdlr)?;
493
494 if let Some(moduli) = &self.moduli {
495 if let Some(range_modulus) = moduli.get(&MeasurementType::Range) {
496 writeln!(writer, "\tRANGE_MODULUS = {range_modulus:E}")
497 .map_err(err_hdlr)?;
498 }
499 }
500 }
501
502 if types.contains(&MeasurementType::Azimuth)
503 || types.contains(&MeasurementType::Elevation)
504 {
505 writeln!(writer, "\tANGLE_TYPE = AZEL").map_err(err_hdlr)?;
506 }
507
508 writeln!(writer, "META_STOP\n").map_err(err_hdlr)?;
509
510 writeln!(writer, "DATA_START").map_err(err_hdlr)?;
512
513 for (epoch, measurement) in &tracker_data.measurements {
515 for (mtype, value) in &measurement.data {
516 if !types.contains(mtype) {
517 continue;
518 }
519 let type_str = match mtype {
520 MeasurementType::Range => "RANGE",
521 MeasurementType::Doppler => "DOPPLER_INTEGRATED",
522 MeasurementType::Azimuth => "ANGLE_1",
523 MeasurementType::Elevation => "ANGLE_2",
524 MeasurementType::ReceiveFrequency => "RECEIVE_FREQ",
525 MeasurementType::TransmitFrequency => "TRANSMIT_FREQ",
526 };
527
528 writeln!(
529 writer,
530 "\t{:<20} = {:<23}\t{:.12}",
531 type_str,
532 Formatter::new(*epoch, iso8601_no_ts),
533 value * multiplier
534 )
535 .map_err(err_hdlr)?;
536 }
537 }
538
539 writeln!(writer, "DATA_STOP\n").map_err(err_hdlr)?;
540 }
541 }
542
543 #[allow(clippy::writeln_empty_string)]
544 writeln!(writer, "").map_err(err_hdlr)?;
545
546 let tock_time = Epoch::now().unwrap() - tick;
548 info!("CCSDS TDM written to {} in {tock_time}", path_buf.display());
549 Ok(path_buf)
550 }
551}
552
553fn parse_measurement_line(
554 line: &str,
555 time_system: TimeScale,
556) -> Result<Option<(MeasurementType, Epoch, f64)>, InputOutputError> {
557 let parts: Vec<&str> = line.split('=').collect();
558 if parts.len() != 2 {
559 return Ok(None);
560 }
561
562 let (mtype_str, data) = (parts[0].trim(), parts[1].trim());
563 let mtype = match mtype_str {
564 "RANGE" => MeasurementType::Range,
565 "DOPPLER_INSTANTANEOUS" | "DOPPLER_INTEGRATED" => MeasurementType::Doppler,
566 "ANGLE_1" => MeasurementType::Azimuth,
567 "ANGLE_2" => MeasurementType::Elevation,
568 "RECEIVE_FREQ" | "RECEIVE_FREQ_1" | "RECEIVE_FREQ_2" | "RECEIVE_FREQ_3"
569 | "RECEIVE_FREQ_4" | "RECEIVE_FREQ_5" => MeasurementType::ReceiveFrequency,
570 "TRANSMIT_FREQ" | "TRANSMIT_FREQ_1" | "TRANSMIT_FREQ_2" | "TRANSMIT_FREQ_3"
571 | "TRANSMIT_FREQ_4" | "TRANSMIT_FREQ_5" => MeasurementType::TransmitFrequency,
572 _ => {
573 return Err(InputOutputError::UnsupportedData {
574 which: mtype_str.to_string(),
575 })
576 }
577 };
578
579 let data_parts: Vec<&str> = data.split_whitespace().collect();
580 if data_parts.len() != 2 {
581 return Ok(None);
582 }
583
584 let epoch =
585 Epoch::from_gregorian_str(&format!("{} {time_system}", data_parts[0])).map_err(|e| {
586 InputOutputError::Inconsistency {
587 msg: format!("{e} when parsing epoch"),
588 }
589 })?;
590
591 let value = data_parts[1]
592 .parse::<f64>()
593 .map_err(|e| InputOutputError::UnsupportedData {
594 which: format!("`{}` is not a float: {e}", data_parts[1]),
595 })?;
596
597 Ok(Some((mtype, epoch, value)))
598}