nyx_space/od/msr/trackingdata/
io_parquet.rs1use crate::io::watermark::pq_writer;
19use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
20use crate::io::{EmptyDatasetSnafu, ExportCfg};
21use crate::od::msr::{Measurement, MeasurementType};
22use arrow::array::{Array, Float64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24use arrow::record_batch::RecordBatch;
25use arrow::{
26 array::{Float64Array, PrimitiveArray, StringArray},
27 datatypes,
28 record_batch::RecordBatchReader,
29};
30use hifitime::prelude::Epoch;
31use hifitime::TimeScale;
32use indexmap::IndexMap;
33use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
34use parquet::arrow::ArrowWriter;
35use snafu::{ensure, ResultExt};
36use std::collections::{BTreeMap, HashMap};
37use std::error::Error;
38use std::fs::File;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use super::TrackingDataArc;
43
44impl TrackingDataArc {
45 pub fn from_parquet<P: AsRef<Path>>(path: P) -> Result<Self, InputOutputError> {
49 let file = File::open(&path).context(StdIOSnafu {
50 action: "opening file for tracking arc",
51 })?;
52 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
53
54 let reader = builder.build().context(ParquetSnafu {
55 action: "reading tracking arc",
56 })?;
57
58 let mut has_epoch = false;
60 let mut has_tracking_dev = false;
61 let mut range_avail = false;
62 let mut doppler_avail = false;
63 let mut az_avail = false;
64 let mut el_avail = false;
65 for field in &reader.schema().fields {
66 match field.name().as_str() {
67 "Epoch (UTC)" => has_epoch = true,
68 "Tracking device" => has_tracking_dev = true,
69 "Range (km)" => range_avail = true,
70 "Doppler (km/s)" => doppler_avail = true,
71 "Azimuth (deg)" => az_avail = true,
72 "Elevation (deg)" => el_avail = true,
73 _ => {}
74 }
75 }
76
77 ensure!(
78 has_epoch,
79 MissingDataSnafu {
80 which: "Epoch (UTC)"
81 }
82 );
83
84 ensure!(
85 has_tracking_dev,
86 MissingDataSnafu {
87 which: "Tracking device"
88 }
89 );
90
91 ensure!(
92 range_avail || doppler_avail || az_avail || el_avail,
93 MissingDataSnafu {
94 which: "`Range (km)` or `Doppler (km/s)` or `Azimuth (deg)` or `Elevation (deg)`"
95 }
96 );
97
98 let mut measurements = BTreeMap::new();
99
100 for maybe_batch in reader {
102 let batch = maybe_batch.context(ArrowSnafu {
103 action: "reading batch of tracking data",
104 })?;
105
106 let tracking_device = batch
107 .column_by_name("Tracking device")
108 .unwrap()
109 .as_any()
110 .downcast_ref::<StringArray>()
111 .unwrap();
112
113 let epochs = batch
114 .column_by_name("Epoch (UTC)")
115 .unwrap()
116 .as_any()
117 .downcast_ref::<StringArray>()
118 .unwrap();
119
120 let range_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if range_avail {
121 Some(
122 batch
123 .column_by_name("Range (km)")
124 .unwrap()
125 .as_any()
126 .downcast_ref::<Float64Array>()
127 .unwrap(),
128 )
129 } else {
130 None
131 };
132
133 let doppler_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if doppler_avail {
134 Some(
135 batch
136 .column_by_name("Doppler (km/s)")
137 .unwrap()
138 .as_any()
139 .downcast_ref::<Float64Array>()
140 .unwrap(),
141 )
142 } else {
143 None
144 };
145
146 let azimuth_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if az_avail {
147 Some(
148 batch
149 .column_by_name("Azimuth (deg)")
150 .unwrap()
151 .as_any()
152 .downcast_ref::<Float64Array>()
153 .unwrap(),
154 )
155 } else {
156 None
157 };
158
159 let elevation_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if el_avail {
160 Some(
161 batch
162 .column_by_name("Elevation (deg)")
163 .unwrap()
164 .as_any()
165 .downcast_ref::<Float64Array>()
166 .unwrap(),
167 )
168 } else {
169 None
170 };
171
172 for i in 0..batch.num_rows() {
174 let epoch = Epoch::from_gregorian_str(epochs.value(i)).map_err(|e| {
175 InputOutputError::Inconsistency {
176 msg: format!("{e} when parsing epoch"),
177 }
178 })?;
179
180 let mut measurement = Measurement {
181 epoch,
182 tracker: tracking_device.value(i).to_string(),
183 data: IndexMap::new(),
184 };
185
186 if range_avail {
187 measurement
188 .data
189 .insert(MeasurementType::Range, range_data.unwrap().value(i));
190 }
191
192 if doppler_avail {
193 measurement
194 .data
195 .insert(MeasurementType::Doppler, doppler_data.unwrap().value(i));
196 }
197
198 if az_avail {
199 measurement
200 .data
201 .insert(MeasurementType::Azimuth, azimuth_data.unwrap().value(i));
202 }
203
204 if el_avail {
205 measurement
206 .data
207 .insert(MeasurementType::Elevation, elevation_data.unwrap().value(i));
208 }
209
210 measurements.insert(epoch, measurement);
211 }
212 }
213
214 Ok(Self {
215 measurements,
216 moduli: None,
217 source: Some(path.as_ref().to_path_buf().display().to_string()),
218 force_reject: false,
219 })
220 }
221 pub fn to_parquet_simple<P: AsRef<Path>>(&self, path: P) -> Result<PathBuf, Box<dyn Error>> {
223 self.to_parquet(path, ExportCfg::default())
224 }
225
226 pub fn to_parquet<P: AsRef<Path>>(
228 &self,
229 path: P,
230 cfg: ExportCfg,
231 ) -> Result<PathBuf, Box<dyn Error>> {
232 ensure!(
233 !self.is_empty(),
234 EmptyDatasetSnafu {
235 action: "tracking data arc to parquet"
236 }
237 );
238
239 let path_buf = cfg.actual_path(path);
240
241 if cfg.step.is_some() {
242 warn!("The `step` parameter in the export is not supported for tracking arcs.");
243 }
244
245 if cfg.fields.is_some() {
246 warn!("The `fields` parameter in the export is not supported for tracking arcs.");
247 }
248
249 let mut hdrs = vec![
251 Field::new("Epoch (UTC)", DataType::Utf8, false),
252 Field::new("Tracking device", DataType::Utf8, false),
253 ];
254
255 let msr_types = self.unique_types();
256 let mut msr_fields = msr_types
257 .iter()
258 .map(|msr_type| msr_type.to_field())
259 .collect::<Vec<Field>>();
260
261 hdrs.append(&mut msr_fields);
262
263 let schema = Arc::new(Schema::new(hdrs));
265 let mut record: Vec<Arc<dyn Array>> = Vec::new();
266
267 let measurements =
270 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
271 let start = cfg
272 .start_epoch
273 .unwrap_or_else(|| self.start_epoch().unwrap());
274 let end = cfg.end_epoch.unwrap_or_else(|| self.end_epoch().unwrap());
275
276 info!("Exporting measurements from {start} to {end}.");
277
278 self.measurements
279 .range(start..end)
280 .map(|(k, v)| (*k, v.clone()))
281 .collect()
282 } else {
283 self.measurements.clone()
284 };
285
286 let mut utc_epoch = StringBuilder::new();
290 for epoch in measurements.keys() {
291 utc_epoch.append_value(epoch.to_time_scale(TimeScale::UTC).to_isoformat());
292 }
293 record.push(Arc::new(utc_epoch.finish()));
294
295 let mut device_names = StringBuilder::new();
297 for m in measurements.values() {
298 device_names.append_value(m.tracker.clone());
299 }
300 record.push(Arc::new(device_names.finish()));
301
302 for msr_type in msr_types {
304 let mut data_builder = Float64Builder::new();
305
306 for m in measurements.values() {
307 match m.data.get(&msr_type) {
308 Some(value) => data_builder.append_value(*value),
309 None => data_builder.append_null(),
310 };
311 }
312 record.push(Arc::new(data_builder.finish()));
313 }
314
315 let mut metadata = HashMap::new();
317 metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
318 if let Some(add_meta) = cfg.metadata {
319 for (k, v) in add_meta {
320 metadata.insert(k, v);
321 }
322 }
323
324 if let Some(modulos) = &self.moduli {
325 for (msr_type, v) in modulos {
326 metadata.insert(format!("MODULUS:{msr_type:?}"), v.to_string());
327 }
328 }
329
330 let props = pq_writer(Some(metadata));
331
332 let file = File::create(&path_buf)?;
333
334 let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
335
336 let batch = RecordBatch::try_new(schema, record)?;
337 writer.write(&batch)?;
338 writer.close()?;
339
340 info!("Serialized {self} to {}", path_buf.display());
341
342 Ok(path_buf)
344 }
345}