1use super::traj_it::TrajIterator;
20use super::{ExportCfg, InterpolationSnafu, INTERPOLATION_SAMPLES};
21use super::{Interpolatable, TrajError};
22use crate::errors::NyxError;
23use crate::io::watermark::pq_writer;
24use crate::io::InputOutputError;
25use crate::linalg::allocator::Allocator;
26use crate::linalg::DefaultAllocator;
27use crate::md::prelude::{GuidanceMode, StateParameter};
28use crate::md::trajectory::smooth_state_diff_in_place;
29use crate::md::EventEvaluator;
30use crate::time::{Duration, Epoch, TimeSeries, TimeUnits};
31use anise::almanac::Almanac;
32use arrow::array::{Array, Float64Builder, StringBuilder};
33use arrow::datatypes::{DataType, Field, Schema};
34use arrow::record_batch::RecordBatch;
35use hifitime::TimeScale;
36use parquet::arrow::ArrowWriter;
37use snafu::ResultExt;
38use std::collections::HashMap;
39use std::error::Error;
40use std::fmt;
41use std::fs::File;
42use std::iter::Iterator;
43use std::ops;
44use std::path::{Path, PathBuf};
45use std::sync::Arc;
46
47#[derive(Clone, PartialEq)]
49pub struct Traj<S: Interpolatable>
50where
51 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
52{
53 pub name: Option<String>,
55 pub states: Vec<S>,
57}
58
59impl<S: Interpolatable> Traj<S>
60where
61 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
62{
63 pub fn new() -> Self {
64 Self {
65 name: None,
66 states: Vec::new(),
67 }
68 }
69 pub fn finalize(&mut self) {
71 self.states.dedup_by(|a, b| a.epoch().eq(&b.epoch()));
73 self.states.sort_by_key(|a| a.epoch());
75 }
76
77 pub fn at(&self, epoch: Epoch) -> Result<S, TrajError> {
79 if self.states.is_empty() || self.first().epoch() > epoch || self.last().epoch() < epoch {
80 return Err(TrajError::NoInterpolationData { epoch });
81 }
82 match self
83 .states
84 .binary_search_by(|state| state.epoch().cmp(&epoch))
85 {
86 Ok(idx) => {
87 Ok(self.states[idx])
89 }
90 Err(idx) => {
91 if idx == 0 || idx >= self.states.len() {
92 return Err(TrajError::NoInterpolationData { epoch });
95 }
96 let num_left = INTERPOLATION_SAMPLES / 2;
101
102 let mut first_idx = idx.saturating_sub(num_left);
104 let last_idx = self.states.len().min(first_idx + INTERPOLATION_SAMPLES);
105
106 if last_idx == self.states.len() {
108 first_idx = last_idx.saturating_sub(2 * num_left);
109 }
110
111 let mut states = Vec::with_capacity(last_idx - first_idx);
112 for idx in first_idx..last_idx {
113 states.push(self.states[idx]);
114 }
115
116 self.states[idx]
117 .interpolate(epoch, &states)
118 .context(InterpolationSnafu)
119 }
120 }
121 }
122
123 pub fn first(&self) -> &S {
125 self.states.first().unwrap()
127 }
128
129 pub fn last(&self) -> &S {
131 self.states.last().unwrap()
132 }
133
134 pub fn every(&self, step: Duration) -> TrajIterator<S> {
136 self.every_between(step, self.first().epoch(), self.last().epoch())
137 }
138
139 pub fn every_between(&self, step: Duration, start: Epoch, end: Epoch) -> TrajIterator<S> {
141 TrajIterator {
142 time_series: TimeSeries::inclusive(
143 start.max(self.first().epoch()),
144 end.min(self.last().epoch()),
145 step,
146 ),
147 traj: self,
148 }
149 }
150
151 pub fn to_parquet_simple<P: AsRef<Path>>(
153 &self,
154 path: P,
155 almanac: Arc<Almanac>,
156 ) -> Result<PathBuf, Box<dyn Error>> {
157 self.to_parquet(path, None, ExportCfg::default(), almanac)
158 }
159
160 pub fn to_parquet_with_cfg<P: AsRef<Path>>(
162 &self,
163 path: P,
164 cfg: ExportCfg,
165 almanac: Arc<Almanac>,
166 ) -> Result<PathBuf, Box<dyn Error>> {
167 self.to_parquet(path, None, cfg, almanac)
168 }
169
170 pub fn to_parquet_with_step<P: AsRef<Path>>(
172 &self,
173 path: P,
174 step: Duration,
175 almanac: Arc<Almanac>,
176 ) -> Result<(), Box<dyn Error>> {
177 self.to_parquet_with_cfg(
178 path,
179 ExportCfg {
180 step: Some(step),
181 ..Default::default()
182 },
183 almanac,
184 )?;
185
186 Ok(())
187 }
188
189 pub fn to_parquet<P: AsRef<Path>>(
191 &self,
192 path: P,
193 events: Option<Vec<&dyn EventEvaluator<S>>>,
194 cfg: ExportCfg,
195 almanac: Arc<Almanac>,
196 ) -> Result<PathBuf, Box<dyn Error>> {
197 let tick = Epoch::now().unwrap();
198 info!("Exporting trajectory to parquet file...");
199
200 let path_buf = cfg.actual_path(path);
202
203 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
205
206 let frame = self.states[0].frame();
207 let more_meta = Some(vec![(
208 "Frame".to_string(),
209 serde_dhall::serialize(&frame).to_string().map_err(|e| {
210 Box::new(InputOutputError::SerializeDhall {
211 what: format!("frame `{frame}`"),
212 err: e.to_string(),
213 })
214 })?,
215 )]);
216
217 let mut fields = match cfg.fields {
218 Some(fields) => fields,
219 None => S::export_params(),
220 };
221
222 fields.retain(|param| self.first().value(*param).is_ok());
224
225 for field in &fields {
226 hdrs.push(field.to_field(more_meta.clone()));
227 }
228
229 if let Some(events) = events.as_ref() {
230 for event in events {
231 let field = Field::new(format!("{event}"), DataType::Float64, false);
232 hdrs.push(field);
233 }
234 }
235
236 let schema = Arc::new(Schema::new(hdrs));
238 let mut record: Vec<Arc<dyn Array>> = Vec::new();
239
240 let states = if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
242 let start = cfg.start_epoch.unwrap_or_else(|| self.first().epoch());
244 let end = cfg.end_epoch.unwrap_or_else(|| self.last().epoch());
245 let step = cfg.step.unwrap_or_else(|| 1.minutes());
246 self.every_between(step, start, end).collect::<Vec<S>>()
247 } else {
248 self.states.to_vec()
249 };
250
251 let mut utc_epoch = StringBuilder::new();
255 for s in &states {
256 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
257 }
258 record.push(Arc::new(utc_epoch.finish()));
259
260 for field in fields {
262 if field == StateParameter::GuidanceMode {
263 let mut guid_mode = StringBuilder::new();
264 for s in &states {
265 guid_mode
266 .append_value(format!("{:?}", GuidanceMode::from(s.value(field).unwrap())));
267 }
268 record.push(Arc::new(guid_mode.finish()));
269 } else {
270 let mut data = Float64Builder::new();
271 for s in &states {
272 data.append_value(s.value(field).unwrap());
273 }
274 record.push(Arc::new(data.finish()));
275 }
276 }
277
278 info!(
279 "Serialized {} states from {} to {}",
280 states.len(),
281 states.first().unwrap().epoch(),
282 states.last().unwrap().epoch()
283 );
284
285 if let Some(events) = events {
287 info!("Evaluating {} event(s)", events.len());
288 for event in events {
289 let mut data = Float64Builder::new();
290 for s in &states {
291 data.append_value(event.eval(s, almanac.clone()).map_err(Box::new)?);
292 }
293 record.push(Arc::new(data.finish()));
294 }
295 }
296
297 let mut metadata = HashMap::new();
299 metadata.insert("Purpose".to_string(), "Trajectory data".to_string());
300 if let Some(add_meta) = cfg.metadata {
301 for (k, v) in add_meta {
302 metadata.insert(k, v);
303 }
304 }
305
306 let props = pq_writer(Some(metadata));
307
308 let file = File::create(&path_buf)?;
309 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
310
311 let batch = RecordBatch::try_new(schema, record)?;
312 writer.write(&batch)?;
313 writer.close()?;
314
315 let tock_time = Epoch::now().unwrap() - tick;
317 info!(
318 "Trajectory written to {} in {tock_time}",
319 path_buf.display()
320 );
321 Ok(path_buf)
322 }
323
324 pub fn resample(&self, step: Duration) -> Result<Self, NyxError> {
327 if self.states.is_empty() {
328 return Err(NyxError::Trajectory {
329 source: TrajError::CreationError {
330 msg: "No trajectory to convert".to_string(),
331 },
332 });
333 }
334
335 let mut traj = Self::new();
336 for state in self.every(step) {
337 traj.states.push(state);
338 }
339
340 traj.finalize();
341
342 Ok(traj)
343 }
344
345 pub fn rebuild(&self, epochs: &[Epoch]) -> Result<Self, NyxError> {
348 if self.states.is_empty() {
349 return Err(NyxError::Trajectory {
350 source: TrajError::CreationError {
351 msg: "No trajectory to convert".to_string(),
352 },
353 });
354 }
355
356 let mut traj = Self::new();
357 for epoch in epochs {
358 traj.states.push(self.at(*epoch)?);
359 }
360
361 traj.finalize();
362
363 Ok(traj)
364 }
365
366 pub fn ric_diff_to_parquet<P: AsRef<Path>>(
371 &self,
372 other: &Self,
373 path: P,
374 cfg: ExportCfg,
375 ) -> Result<PathBuf, Box<dyn Error>> {
376 let tick = Epoch::now().unwrap();
377 info!("Exporting trajectory to parquet file...");
378
379 let path_buf = cfg.actual_path(path);
381
382 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
384
385 for coord in ["X", "Y", "Z"] {
387 let mut meta = HashMap::new();
388 meta.insert("unit".to_string(), "km".to_string());
389
390 let field = Field::new(
391 format!("Delta {coord} (RIC) (km)"),
392 DataType::Float64,
393 false,
394 )
395 .with_metadata(meta);
396
397 hdrs.push(field);
398 }
399
400 for coord in ["x", "y", "z"] {
401 let mut meta = HashMap::new();
402 meta.insert("unit".to_string(), "km/s".to_string());
403
404 let field = Field::new(
405 format!("Delta V{coord} (RIC) (km/s)"),
406 DataType::Float64,
407 false,
408 )
409 .with_metadata(meta);
410
411 hdrs.push(field);
412 }
413
414 let frame = self.states[0].frame();
415 let more_meta = Some(vec![(
416 "Frame".to_string(),
417 serde_dhall::serialize(&frame)
418 .to_string()
419 .unwrap_or(frame.to_string()),
420 )]);
421
422 let mut cfg = cfg;
423
424 let mut fields = match cfg.fields {
425 Some(fields) => fields,
426 None => S::export_params(),
427 };
428
429 fields.retain(|param| {
431 param != &StateParameter::GuidanceMode && self.first().value(*param).is_ok()
432 });
433
434 for field in &fields {
435 hdrs.push(field.to_field(more_meta.clone()));
436 }
437
438 let schema = Arc::new(Schema::new(hdrs));
440 let mut record: Vec<Arc<dyn Array>> = Vec::new();
441
442 cfg.start_epoch = if self.first().epoch() > other.first().epoch() {
444 Some(self.first().epoch())
445 } else {
446 Some(other.first().epoch())
447 };
448
449 cfg.end_epoch = if self.last().epoch() > other.last().epoch() {
450 Some(other.last().epoch())
451 } else {
452 Some(self.last().epoch())
453 };
454
455 let step = cfg.step.unwrap_or_else(|| 1.minutes());
457 let self_states = self
458 .every_between(step, cfg.start_epoch.unwrap(), cfg.end_epoch.unwrap())
459 .collect::<Vec<S>>();
460
461 let other_states = other
462 .every_between(step, cfg.start_epoch.unwrap(), cfg.end_epoch.unwrap())
463 .collect::<Vec<S>>();
464
465 let mut ric_diff = Vec::with_capacity(other_states.len());
467 for (other_state, self_state) in other_states.iter().zip(self_states.iter()) {
468 let self_orbit = self_state.orbit();
469 let other_orbit = other_state.orbit();
470
471 let this_ric_diff = self_orbit.ric_difference(&other_orbit).map_err(Box::new)?;
472
473 ric_diff.push(this_ric_diff);
474 }
475
476 smooth_state_diff_in_place(&mut ric_diff, if other_states.len() > 5 { 5 } else { 1 });
477
478 let mut utc_epoch = StringBuilder::new();
482 for s in &self_states {
483 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
484 }
485 record.push(Arc::new(utc_epoch.finish()));
486
487 for coord_no in 0..6 {
489 let mut data = Float64Builder::new();
490 for this_ric_dff in &ric_diff {
491 data.append_value(this_ric_dff.to_cartesian_pos_vel()[coord_no]);
492 }
493 record.push(Arc::new(data.finish()));
494 }
495
496 for field in fields {
498 let mut data = Float64Builder::new();
499 for (other_state, self_state) in other_states.iter().zip(self_states.iter()) {
500 let self_val = self_state.value(field).map_err(Box::new)?;
501 let other_val = other_state.value(field).map_err(Box::new)?;
502
503 data.append_value(self_val - other_val);
504 }
505
506 record.push(Arc::new(data.finish()));
507 }
508
509 info!("Serialized {} states differences", self_states.len());
510
511 let mut metadata = HashMap::new();
513 metadata.insert(
514 "Purpose".to_string(),
515 "Trajectory difference data".to_string(),
516 );
517 if let Some(add_meta) = cfg.metadata {
518 for (k, v) in add_meta {
519 metadata.insert(k, v);
520 }
521 }
522
523 let props = pq_writer(Some(metadata));
524
525 let file = File::create(&path_buf)?;
526 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
527
528 let batch = RecordBatch::try_new(schema, record)?;
529 writer.write(&batch)?;
530 writer.close()?;
531
532 let tock_time = Epoch::now().unwrap() - tick;
534 info!(
535 "Trajectory written to {} in {tock_time}",
536 path_buf.display()
537 );
538 Ok(path_buf)
539 }
540}
541
542impl<S: Interpolatable> ops::Add for Traj<S>
543where
544 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
545{
546 type Output = Result<Traj<S>, NyxError>;
547
548 fn add(self, other: Traj<S>) -> Self::Output {
550 &self + &other
551 }
552}
553
554impl<S: Interpolatable> ops::Add<&Traj<S>> for &Traj<S>
555where
556 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
557{
558 type Output = Result<Traj<S>, NyxError>;
559
560 fn add(self, other: &Traj<S>) -> Self::Output {
562 if self.first().frame() != other.first().frame() {
563 Err(NyxError::Trajectory {
564 source: TrajError::CreationError {
565 msg: format!(
566 "Frame mismatch in add operation: {} != {}",
567 self.first().frame(),
568 other.first().frame()
569 ),
570 },
571 })
572 } else {
573 if self.last().epoch() < other.first().epoch() {
574 let gap = other.first().epoch() - self.last().epoch();
575 warn!(
576 "Resulting merged trajectory will have a time-gap of {} starting at {}",
577 gap,
578 self.last().epoch()
579 );
580 }
581
582 let mut me = self.clone();
583 for state in &other
585 .states
586 .iter()
587 .copied()
588 .filter(|s| s.epoch() > self.last().epoch())
589 .collect::<Vec<S>>()
590 {
591 me.states.push(*state);
592 }
593 me.finalize();
594
595 Ok(me)
596 }
597 }
598}
599
600impl<S: Interpolatable> ops::AddAssign<&Traj<S>> for Traj<S>
601where
602 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
603{
604 fn add_assign(&mut self, rhs: &Self) {
610 *self = (self.clone() + rhs.clone()).unwrap();
611 }
612}
613
614impl<S: Interpolatable> fmt::Display for Traj<S>
615where
616 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
617{
618 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
619 if self.states.is_empty() {
620 write!(f, "Empty Trajectory!")
621 } else {
622 let dur = self.last().epoch() - self.first().epoch();
623 write!(
624 f,
625 "Trajectory {}in {} from {} to {} ({}, or {:.3} s) [{} states]",
626 match &self.name {
627 Some(name) => format!("of {name} "),
628 None => String::new(),
629 },
630 self.first().frame(),
631 self.first().epoch(),
632 self.last().epoch(),
633 dur,
634 dur.to_seconds(),
635 self.states.len()
636 )
637 }
638 }
639}
640
641impl<S: Interpolatable> fmt::Debug for Traj<S>
642where
643 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
644{
645 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
646 write!(f, "{self}",)
647 }
648}
649
650impl<S: Interpolatable> Default for Traj<S>
651where
652 DefaultAllocator: Allocator<S::VecLength> + Allocator<S::Size> + Allocator<S::Size, S::Size>,
653{
654 fn default() -> Self {
655 Self::new()
656 }
657}