use std::collections::{BTreeMap, HashMap, HashSet};
use std::error::Error;
use std::fmt::{Debug, Display};
use std::fs::File;
use std::ops::RangeBounds;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::io::watermark::pq_writer;
use crate::io::{ConfigError, ExportCfg};
use crate::linalg::allocator::Allocator;
use crate::linalg::{DefaultAllocator, DimName};
use crate::md::trajectory::Interpolatable;
use crate::od::prelude::TrkConfig;
use crate::od::{Measurement, TrackingDeviceSim};
use crate::State;
use arrow::array::{Array, Float64Builder, StringBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use hifitime::prelude::{Duration, Epoch, Unit};
use hifitime::TimeScale;
use parquet::arrow::ArrowWriter;
#[derive(Clone, Default, Debug)]
pub struct TrackingArc<Msr>
where
Msr: Measurement,
DefaultAllocator: Allocator<Msr::MeasurementSize>,
{
pub device_cfg: String,
pub measurements: Vec<(String, Msr)>,
}
impl<Msr> Display for TrackingArc<Msr>
where
Msr: Measurement,
DefaultAllocator:
Allocator<Msr::MeasurementSize> + Allocator<Msr::MeasurementSize, Msr::MeasurementSize>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} measurements from {:?}",
self.measurements.len(),
self.device_names()
)
}
}
impl<Msr> TrackingArc<Msr>
where
Msr: Measurement,
DefaultAllocator:
Allocator<Msr::MeasurementSize> + Allocator<Msr::MeasurementSize, Msr::MeasurementSize>,
{
pub fn to_parquet_simple<P: AsRef<Path> + Debug>(
&self,
path: P,
) -> Result<PathBuf, Box<dyn Error>> {
self.to_parquet(path, ExportCfg::default())
}
pub fn to_parquet<P: AsRef<Path> + Debug>(
&self,
path: P,
cfg: ExportCfg,
) -> Result<PathBuf, Box<dyn Error>> {
let path_buf = cfg.actual_path(path);
if cfg.step.is_some() {
warn!("The `step` parameter in the export is not supported for tracking arcs.");
}
if cfg.fields.is_some() {
warn!("The `fields` parameter in the export is not supported for tracking arcs.");
}
let mut hdrs = vec![
Field::new("Epoch (UTC)", DataType::Utf8, false),
Field::new("Tracking device", DataType::Utf8, false),
];
let mut msr_fields = Msr::fields();
hdrs.append(&mut msr_fields);
let schema = Arc::new(Schema::new(hdrs));
let mut record: Vec<Arc<dyn Array>> = Vec::new();
let measurements =
if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
let start = cfg
.start_epoch
.unwrap_or_else(|| self.measurements.first().unwrap().1.epoch());
let end = cfg
.end_epoch
.unwrap_or_else(|| self.measurements.last().unwrap().1.epoch());
info!("Exporting measurements from {start} to {end}.");
self.measurements
.iter()
.filter(|msr| msr.1.epoch() >= start && msr.1.epoch() <= end)
.cloned()
.collect()
} else {
self.measurements.to_vec()
};
let mut utc_epoch = StringBuilder::new();
for m in &measurements {
utc_epoch.append_value(m.1.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
}
record.push(Arc::new(utc_epoch.finish()));
let mut device_names = StringBuilder::new();
for m in &measurements {
device_names.append_value(m.0.clone());
}
record.push(Arc::new(device_names.finish()));
for obs_no in 0..Msr::MeasurementSize::USIZE {
let mut data_builder = Float64Builder::new();
for m in &measurements {
data_builder.append_value(m.1.observation()[obs_no]);
}
record.push(Arc::new(data_builder.finish()));
}
let mut metadata = HashMap::new();
metadata.insert("devices".to_string(), self.device_cfg.clone());
metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
if let Some(add_meta) = cfg.metadata {
for (k, v) in add_meta {
metadata.insert(k, v);
}
}
let props = pq_writer(Some(metadata));
let file = File::create(&path_buf)?;
let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
let batch = RecordBatch::try_new(schema, record)?;
writer.write(&batch)?;
writer.close()?;
info!("Serialized {self} to {}", path_buf.display());
Ok(path_buf)
}
pub fn device_names(&self) -> HashSet<&String> {
let mut set = HashSet::new();
self.measurements.iter().for_each(|(name, _msr)| {
set.insert(name);
});
set
}
pub fn min_duration_sep(&self) -> Option<Duration> {
if self.measurements.is_empty() {
None
} else {
let mut windows = self.measurements.windows(2);
let first_window = windows.next()?;
let mut min_interval =
(first_window[1].1.epoch() - first_window[0].1.epoch()).max(2 * Unit::Second);
for window in windows {
let interval = window[1].1.epoch() - window[0].1.epoch();
if interval > Duration::ZERO && interval < min_interval {
min_interval = interval;
}
}
Some(min_interval)
}
}
pub fn rebuild_devices<MsrIn, D>(&self) -> Result<BTreeMap<String, D>, ConfigError>
where
MsrIn: Interpolatable,
D: TrackingDeviceSim<MsrIn, Msr>,
DefaultAllocator: Allocator<<MsrIn as State>::Size>
+ Allocator<<MsrIn as State>::Size, <MsrIn as State>::Size>
+ Allocator<<MsrIn as State>::VecLength>,
{
let devices = D::loads_named(&self.device_cfg)?;
for device in devices.keys() {
if !self.device_names().contains(device) {
info!("no measurements from {device} in loaded arc");
continue;
}
}
Ok(devices)
}
pub fn filter_by_epoch<R: RangeBounds<Epoch>>(&self, bound: R) -> Self {
let mut measurements = Vec::new();
for (name, msr) in &self.measurements {
if bound.contains(&msr.epoch()) {
measurements.push((name.clone(), *msr));
}
}
Self {
measurements,
device_cfg: self.device_cfg.clone(),
}
}
pub fn filter_by_offset<R: RangeBounds<Duration>>(&self, bound: R) -> Self {
if self.measurements.is_empty() {
return Self {
device_cfg: self.device_cfg.clone(),
measurements: Vec::new(),
};
}
let ref_epoch = self.measurements[0].1.epoch();
let mut measurements = Vec::new();
for (name, msr) in &self.measurements {
if bound.contains(&(msr.epoch() - ref_epoch)) {
measurements.push((name.clone(), *msr));
}
}
Self {
measurements,
device_cfg: self.device_cfg.clone(),
}
}
pub fn set_devices<MsrIn, D>(
&mut self,
devices: Vec<D>,
configs: BTreeMap<String, TrkConfig>,
) -> Result<(), Box<dyn Error>>
where
MsrIn: Interpolatable,
D: TrackingDeviceSim<MsrIn, Msr>,
DefaultAllocator: Allocator<<MsrIn as State>::Size>
+ Allocator<<MsrIn as State>::Size, <MsrIn as State>::Size>
+ Allocator<<MsrIn as State>::VecLength>,
{
let mut devices_map = BTreeMap::new();
let mut sampling_rates_ns = Vec::with_capacity(devices.len());
for device in devices {
if let Some(cfg) = configs.get(&device.name()) {
if let Err(e) = cfg.sanity_check() {
warn!("Ignoring device {}: {e}", device.name());
continue;
}
sampling_rates_ns.push(cfg.sampling.truncated_nanoseconds());
} else {
warn!(
"Ignoring device {}: no associated tracking configuration",
device.name()
);
continue;
}
devices_map.insert(device.name(), device);
}
if devices_map.is_empty() {
return Err(Box::new(ConfigError::InvalidConfig {
msg: "None of the devices are properly configured".to_string(),
}));
}
self.device_cfg = serde_yaml::to_string(&devices_map).unwrap();
Ok(())
}
}