nyx_space/od/msr/trackingdata/
io_parquet.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18use crate::io::watermark::pq_writer;
19use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
20use crate::io::{EmptyDatasetSnafu, ExportCfg};
21use crate::od::msr::{Measurement, MeasurementType};
22use arrow::array::{Array, Float64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24use arrow::record_batch::RecordBatch;
25use arrow::{
26    array::{Float64Array, PrimitiveArray, StringArray},
27    datatypes,
28    record_batch::RecordBatchReader,
29};
30use hifitime::prelude::Epoch;
31use hifitime::TimeScale;
32use indexmap::IndexMap;
33use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
34use parquet::arrow::ArrowWriter;
35use snafu::{ensure, ResultExt};
36use std::collections::{BTreeMap, HashMap};
37use std::error::Error;
38use std::fs::File;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use super::TrackingDataArc;
43
44impl TrackingDataArc {
45    /// Loads a tracking arc from its serialization in parquet.
46    ///
47    /// Warning: no metadata is read from the parquet file, even that written to it by Nyx.
48    pub fn from_parquet<P: AsRef<Path>>(path: P) -> Result<Self, InputOutputError> {
49        let file = File::open(&path).context(StdIOSnafu {
50            action: "opening file for tracking arc",
51        })?;
52        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
53
54        let reader = builder.build().context(ParquetSnafu {
55            action: "reading tracking arc",
56        })?;
57
58        // Check the schema
59        let mut has_epoch = false;
60        let mut has_tracking_dev = false;
61        let mut range_avail = false;
62        let mut doppler_avail = false;
63        let mut az_avail = false;
64        let mut el_avail = false;
65        for field in &reader.schema().fields {
66            match field.name().as_str() {
67                "Epoch (UTC)" => has_epoch = true,
68                "Tracking device" => has_tracking_dev = true,
69                "Range (km)" => range_avail = true,
70                "Doppler (km/s)" => doppler_avail = true,
71                "Azimuth (deg)" => az_avail = true,
72                "Elevation (deg)" => el_avail = true,
73                _ => {}
74            }
75        }
76
77        ensure!(
78            has_epoch,
79            MissingDataSnafu {
80                which: "Epoch (UTC)"
81            }
82        );
83
84        ensure!(
85            has_tracking_dev,
86            MissingDataSnafu {
87                which: "Tracking device"
88            }
89        );
90
91        ensure!(
92            range_avail || doppler_avail || az_avail || el_avail,
93            MissingDataSnafu {
94                which: "`Range (km)` or `Doppler (km/s)` or `Azimuth (deg)` or `Elevation (deg)`"
95            }
96        );
97
98        let mut measurements = BTreeMap::new();
99
100        // We can safely unwrap the columns since we've checked for their existance just before.
101        for maybe_batch in reader {
102            let batch = maybe_batch.context(ArrowSnafu {
103                action: "reading batch of tracking data",
104            })?;
105
106            let tracking_device = batch
107                .column_by_name("Tracking device")
108                .unwrap()
109                .as_any()
110                .downcast_ref::<StringArray>()
111                .unwrap();
112
113            let epochs = batch
114                .column_by_name("Epoch (UTC)")
115                .unwrap()
116                .as_any()
117                .downcast_ref::<StringArray>()
118                .unwrap();
119
120            let range_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if range_avail {
121                Some(
122                    batch
123                        .column_by_name("Range (km)")
124                        .unwrap()
125                        .as_any()
126                        .downcast_ref::<Float64Array>()
127                        .unwrap(),
128                )
129            } else {
130                None
131            };
132
133            let doppler_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if doppler_avail {
134                Some(
135                    batch
136                        .column_by_name("Doppler (km/s)")
137                        .unwrap()
138                        .as_any()
139                        .downcast_ref::<Float64Array>()
140                        .unwrap(),
141                )
142            } else {
143                None
144            };
145
146            let azimuth_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if az_avail {
147                Some(
148                    batch
149                        .column_by_name("Azimuth (deg)")
150                        .unwrap()
151                        .as_any()
152                        .downcast_ref::<Float64Array>()
153                        .unwrap(),
154                )
155            } else {
156                None
157            };
158
159            let elevation_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if el_avail {
160                Some(
161                    batch
162                        .column_by_name("Elevation (deg)")
163                        .unwrap()
164                        .as_any()
165                        .downcast_ref::<Float64Array>()
166                        .unwrap(),
167                )
168            } else {
169                None
170            };
171
172            // Set the measurements in the tracking arc
173            for i in 0..batch.num_rows() {
174                let epoch = Epoch::from_gregorian_str(epochs.value(i)).map_err(|e| {
175                    InputOutputError::Inconsistency {
176                        msg: format!("{e} when parsing epoch"),
177                    }
178                })?;
179
180                let mut measurement = Measurement {
181                    epoch,
182                    tracker: tracking_device.value(i).to_string(),
183                    data: IndexMap::new(),
184                };
185
186                if range_avail {
187                    measurement
188                        .data
189                        .insert(MeasurementType::Range, range_data.unwrap().value(i));
190                }
191
192                if doppler_avail {
193                    measurement
194                        .data
195                        .insert(MeasurementType::Doppler, doppler_data.unwrap().value(i));
196                }
197
198                if az_avail {
199                    measurement
200                        .data
201                        .insert(MeasurementType::Azimuth, azimuth_data.unwrap().value(i));
202                }
203
204                if el_avail {
205                    measurement
206                        .data
207                        .insert(MeasurementType::Elevation, elevation_data.unwrap().value(i));
208                }
209
210                measurements.insert(epoch, measurement);
211            }
212        }
213
214        Ok(Self {
215            measurements,
216            moduli: None,
217            source: Some(path.as_ref().to_path_buf().display().to_string()),
218            force_reject: false,
219        })
220    }
221    /// Store this tracking arc to a parquet file.
222    pub fn to_parquet_simple<P: AsRef<Path>>(&self, path: P) -> Result<PathBuf, Box<dyn Error>> {
223        self.to_parquet(path, ExportCfg::default())
224    }
225
226    /// Store this tracking arc to a parquet file, with optional metadata and a timestamp appended to the filename.
227    pub fn to_parquet<P: AsRef<Path>>(
228        &self,
229        path: P,
230        cfg: ExportCfg,
231    ) -> Result<PathBuf, Box<dyn Error>> {
232        ensure!(
233            !self.is_empty(),
234            EmptyDatasetSnafu {
235                action: "tracking data arc to parquet"
236            }
237        );
238
239        let path_buf = cfg.actual_path(path);
240
241        if cfg.step.is_some() {
242            warn!("The `step` parameter in the export is not supported for tracking arcs.");
243        }
244
245        if cfg.fields.is_some() {
246            warn!("The `fields` parameter in the export is not supported for tracking arcs.");
247        }
248
249        // Build the schema
250        let mut hdrs = vec![
251            Field::new("Epoch (UTC)", DataType::Utf8, false),
252            Field::new("Tracking device", DataType::Utf8, false),
253        ];
254
255        let msr_types = self.unique_types();
256        let mut msr_fields = msr_types
257            .iter()
258            .map(|msr_type| msr_type.to_field())
259            .collect::<Vec<Field>>();
260
261        hdrs.append(&mut msr_fields);
262
263        // Build the schema
264        let schema = Arc::new(Schema::new(hdrs));
265        let mut record: Vec<Arc<dyn Array>> = Vec::new();
266
267        // Build the measurement iterator
268
269        let measurements =
270            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
271                let start = cfg
272                    .start_epoch
273                    .unwrap_or_else(|| self.start_epoch().unwrap());
274                let end = cfg.end_epoch.unwrap_or_else(|| self.end_epoch().unwrap());
275
276                info!("Exporting measurements from {start} to {end}.");
277
278                self.measurements
279                    .range(start..end)
280                    .map(|(k, v)| (*k, v.clone()))
281                    .collect()
282            } else {
283                self.measurements.clone()
284            };
285
286        // Build all of the records
287
288        // Epochs
289        let mut utc_epoch = StringBuilder::new();
290        for epoch in measurements.keys() {
291            utc_epoch.append_value(epoch.to_time_scale(TimeScale::UTC).to_isoformat());
292        }
293        record.push(Arc::new(utc_epoch.finish()));
294
295        // Device names
296        let mut device_names = StringBuilder::new();
297        for m in measurements.values() {
298            device_names.append_value(m.tracker.clone());
299        }
300        record.push(Arc::new(device_names.finish()));
301
302        // Measurement data, column by column
303        for msr_type in msr_types {
304            let mut data_builder = Float64Builder::new();
305
306            for m in measurements.values() {
307                match m.data.get(&msr_type) {
308                    Some(value) => data_builder.append_value(*value),
309                    None => data_builder.append_null(),
310                };
311            }
312            record.push(Arc::new(data_builder.finish()));
313        }
314
315        // Serialize all of the devices and add that to the parquet file too.
316        let mut metadata = HashMap::new();
317        metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
318        if let Some(add_meta) = cfg.metadata {
319            for (k, v) in add_meta {
320                metadata.insert(k, v);
321            }
322        }
323
324        if let Some(modulos) = &self.moduli {
325            for (msr_type, v) in modulos {
326                metadata.insert(format!("MODULUS:{msr_type:?}"), v.to_string());
327            }
328        }
329
330        let props = pq_writer(Some(metadata));
331
332        let file = File::create(&path_buf)?;
333
334        let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
335
336        let batch = RecordBatch::try_new(schema, record)?;
337        writer.write(&batch)?;
338        writer.close()?;
339
340        info!("Serialized {self} to {}", path_buf.display());
341
342        // Return the path this was written to
343        Ok(path_buf)
344    }
345}