nyx_space/od/msr/trackingdata/
io_parquet.rs1use crate::io::watermark::pq_writer;
19use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
20use crate::io::{EmptyDatasetSnafu, ExportCfg};
21use crate::od::msr::{Measurement, MeasurementType};
22use arrow::array::{Array, Float64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24use arrow::record_batch::RecordBatch;
25use arrow::{
26 array::{Float64Array, PrimitiveArray, StringArray},
27 datatypes,
28 record_batch::RecordBatchReader,
29};
30use hifitime::prelude::Epoch;
31use hifitime::TimeScale;
32use indexmap::IndexMap;
33use log::{info, warn};
34use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
35use parquet::arrow::ArrowWriter;
36use snafu::{ensure, ResultExt};
37use std::collections::{BTreeMap, HashMap};
38use std::error::Error;
39use std::fs::File;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42
43use super::TrackingDataArc;
44
45impl TrackingDataArc {
46 pub fn from_parquet<P: AsRef<Path>>(path: P) -> Result<Self, InputOutputError> {
50 let file = File::open(&path).context(StdIOSnafu {
51 action: "opening file for tracking arc",
52 })?;
53 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
54
55 let reader = builder.build().context(ParquetSnafu {
56 action: "reading tracking arc",
57 })?;
58
59 let mut has_epoch = false;
61 let mut has_tracking_dev = false;
62 let mut range_avail = false;
63 let mut doppler_avail = false;
64 let mut az_avail = false;
65 let mut el_avail = false;
66 for field in &reader.schema().fields {
67 match field.name().as_str() {
68 "Epoch (UTC)" => has_epoch = true,
69 "Tracking device" => has_tracking_dev = true,
70 "Range (km)" => range_avail = true,
71 "Doppler (km/s)" => doppler_avail = true,
72 "Azimuth (deg)" => az_avail = true,
73 "Elevation (deg)" => el_avail = true,
74 _ => {}
75 }
76 }
77
78 ensure!(
79 has_epoch,
80 MissingDataSnafu {
81 which: "Epoch (UTC)"
82 }
83 );
84
85 ensure!(
86 has_tracking_dev,
87 MissingDataSnafu {
88 which: "Tracking device"
89 }
90 );
91
92 ensure!(
93 range_avail || doppler_avail || az_avail || el_avail,
94 MissingDataSnafu {
95 which: "`Range (km)` or `Doppler (km/s)` or `Azimuth (deg)` or `Elevation (deg)`"
96 }
97 );
98
99 let mut measurements = BTreeMap::new();
100
101 for maybe_batch in reader {
103 let batch = maybe_batch.context(ArrowSnafu {
104 action: "reading batch of tracking data",
105 })?;
106
107 let tracking_device = batch
108 .column_by_name("Tracking device")
109 .unwrap()
110 .as_any()
111 .downcast_ref::<StringArray>()
112 .unwrap();
113
114 let epochs = batch
115 .column_by_name("Epoch (UTC)")
116 .unwrap()
117 .as_any()
118 .downcast_ref::<StringArray>()
119 .unwrap();
120
121 let range_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if range_avail {
122 Some(
123 batch
124 .column_by_name("Range (km)")
125 .unwrap()
126 .as_any()
127 .downcast_ref::<Float64Array>()
128 .unwrap(),
129 )
130 } else {
131 None
132 };
133
134 let doppler_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if doppler_avail {
135 Some(
136 batch
137 .column_by_name("Doppler (km/s)")
138 .unwrap()
139 .as_any()
140 .downcast_ref::<Float64Array>()
141 .unwrap(),
142 )
143 } else {
144 None
145 };
146
147 let azimuth_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if az_avail {
148 Some(
149 batch
150 .column_by_name("Azimuth (deg)")
151 .unwrap()
152 .as_any()
153 .downcast_ref::<Float64Array>()
154 .unwrap(),
155 )
156 } else {
157 None
158 };
159
160 let elevation_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if el_avail {
161 Some(
162 batch
163 .column_by_name("Elevation (deg)")
164 .unwrap()
165 .as_any()
166 .downcast_ref::<Float64Array>()
167 .unwrap(),
168 )
169 } else {
170 None
171 };
172
173 for i in 0..batch.num_rows() {
175 let epoch = Epoch::from_gregorian_str(epochs.value(i)).map_err(|e| {
176 InputOutputError::Inconsistency {
177 msg: format!("{e} when parsing epoch"),
178 }
179 })?;
180
181 let mut measurement = Measurement {
182 epoch,
183 tracker: tracking_device.value(i).to_string(),
184 data: IndexMap::new(),
185 };
186
187 if range_avail {
188 measurement
189 .data
190 .insert(MeasurementType::Range, range_data.unwrap().value(i));
191 }
192
193 if doppler_avail {
194 measurement
195 .data
196 .insert(MeasurementType::Doppler, doppler_data.unwrap().value(i));
197 }
198
199 if az_avail {
200 measurement
201 .data
202 .insert(MeasurementType::Azimuth, azimuth_data.unwrap().value(i));
203 }
204
205 if el_avail {
206 measurement
207 .data
208 .insert(MeasurementType::Elevation, elevation_data.unwrap().value(i));
209 }
210
211 measurements.insert(epoch, measurement);
212 }
213 }
214
215 Ok(Self {
216 measurements,
217 moduli: None,
218 source: Some(path.as_ref().to_path_buf().display().to_string()),
219 force_reject: false,
220 })
221 }
222 pub fn to_parquet_simple<P: AsRef<Path>>(&self, path: P) -> Result<PathBuf, Box<dyn Error>> {
224 self.to_parquet(path, ExportCfg::default())
225 }
226
227 pub fn to_parquet<P: AsRef<Path>>(
229 &self,
230 path: P,
231 cfg: ExportCfg,
232 ) -> Result<PathBuf, Box<dyn Error>> {
233 ensure!(
234 !self.is_empty(),
235 EmptyDatasetSnafu {
236 action: "tracking data arc to parquet"
237 }
238 );
239
240 let path_buf = cfg.actual_path(path);
241
242 if cfg.step.is_some() {
243 warn!("The `step` parameter in the export is not supported for tracking arcs.");
244 }
245
246 if cfg.fields.is_some() {
247 warn!("The `fields` parameter in the export is not supported for tracking arcs.");
248 }
249
250 let mut hdrs = vec![
252 Field::new("Epoch (UTC)", DataType::Utf8, false),
253 Field::new("Tracking device", DataType::Utf8, false),
254 ];
255
256 let msr_types = self.unique_types();
257 let mut msr_fields = msr_types
258 .iter()
259 .map(|msr_type| msr_type.to_field())
260 .collect::<Vec<Field>>();
261
262 hdrs.append(&mut msr_fields);
263
264 let schema = Arc::new(Schema::new(hdrs));
266 let mut record: Vec<Arc<dyn Array>> = Vec::new();
267
268 let measurements =
271 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
272 let start = cfg
273 .start_epoch
274 .unwrap_or_else(|| self.start_epoch().unwrap());
275 let end = cfg.end_epoch.unwrap_or_else(|| self.end_epoch().unwrap());
276
277 info!("Exporting measurements from {start} to {end}.");
278
279 self.measurements
280 .range(start..end)
281 .map(|(k, v)| (*k, v.clone()))
282 .collect()
283 } else {
284 self.measurements.clone()
285 };
286
287 let mut utc_epoch = StringBuilder::new();
291 for epoch in measurements.keys() {
292 utc_epoch.append_value(epoch.to_time_scale(TimeScale::UTC).to_isoformat());
293 }
294 record.push(Arc::new(utc_epoch.finish()));
295
296 let mut device_names = StringBuilder::new();
298 for m in measurements.values() {
299 device_names.append_value(m.tracker.clone());
300 }
301 record.push(Arc::new(device_names.finish()));
302
303 for msr_type in msr_types {
305 let mut data_builder = Float64Builder::new();
306
307 for m in measurements.values() {
308 match m.data.get(&msr_type) {
309 Some(value) => data_builder.append_value(*value),
310 None => data_builder.append_null(),
311 };
312 }
313 record.push(Arc::new(data_builder.finish()));
314 }
315
316 let mut metadata = HashMap::new();
318 metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
319 if let Some(add_meta) = cfg.metadata {
320 for (k, v) in add_meta {
321 metadata.insert(k, v);
322 }
323 }
324
325 if let Some(modulos) = &self.moduli {
326 for (msr_type, v) in modulos {
327 metadata.insert(format!("MODULUS:{msr_type:?}"), v.to_string());
328 }
329 }
330
331 let props = pq_writer(Some(metadata));
332
333 let file = File::create(&path_buf)?;
334
335 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
336
337 let batch = RecordBatch::try_new(schema, record)?;
338 writer.write(&batch)?;
339 writer.close()?;
340
341 info!("Serialized {self} to {}", path_buf.display());
342
343 Ok(path_buf)
345 }
346}