Skip to main content

nyx_space/od/msr/trackingdata/
io_parquet.rs

1/*
2    Nyx, blazing fast astrodynamics
3    Copyright (C) 2018-onwards Christopher Rabotin <christopher.rabotin@gmail.com>
4
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU Affero General Public License as published
7    by the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU Affero General Public License for more details.
14
15    You should have received a copy of the GNU Affero General Public License
16    along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18use crate::io::watermark::pq_writer;
19use crate::io::{ArrowSnafu, InputOutputError, MissingDataSnafu, ParquetSnafu, StdIOSnafu};
20use crate::io::{EmptyDatasetSnafu, ExportCfg};
21use crate::od::msr::{Measurement, MeasurementType};
22use arrow::array::{Array, Float64Builder, StringBuilder};
23use arrow::datatypes::{DataType, Field, Schema};
24use arrow::record_batch::RecordBatch;
25use arrow::{
26    array::{Float64Array, PrimitiveArray, StringArray},
27    datatypes,
28    record_batch::RecordBatchReader,
29};
30use hifitime::prelude::Epoch;
31use hifitime::TimeScale;
32use indexmap::IndexMap;
33use log::{info, warn};
34use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
35use parquet::arrow::ArrowWriter;
36use snafu::{ensure, ResultExt};
37use std::collections::{BTreeMap, HashMap};
38use std::error::Error;
39use std::fs::File;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42
43use super::TrackingDataArc;
44
45impl TrackingDataArc {
46    /// Loads a tracking arc from its serialization in parquet.
47    ///
48    /// Warning: no metadata is read from the parquet file, even that written to it by Nyx.
49    pub fn from_parquet<P: AsRef<Path>>(path: P) -> Result<Self, InputOutputError> {
50        let file = File::open(&path).context(StdIOSnafu {
51            action: "opening file for tracking arc",
52        })?;
53        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
54
55        let reader = builder.build().context(ParquetSnafu {
56            action: "reading tracking arc",
57        })?;
58
59        // Check the schema
60        let mut has_epoch = false;
61        let mut has_tracking_dev = false;
62        let mut range_avail = false;
63        let mut doppler_avail = false;
64        let mut az_avail = false;
65        let mut el_avail = false;
66        for field in &reader.schema().fields {
67            match field.name().as_str() {
68                "Epoch (UTC)" => has_epoch = true,
69                "Tracking device" => has_tracking_dev = true,
70                "Range (km)" => range_avail = true,
71                "Doppler (km/s)" => doppler_avail = true,
72                "Azimuth (deg)" => az_avail = true,
73                "Elevation (deg)" => el_avail = true,
74                _ => {}
75            }
76        }
77
78        ensure!(
79            has_epoch,
80            MissingDataSnafu {
81                which: "Epoch (UTC)"
82            }
83        );
84
85        ensure!(
86            has_tracking_dev,
87            MissingDataSnafu {
88                which: "Tracking device"
89            }
90        );
91
92        ensure!(
93            range_avail || doppler_avail || az_avail || el_avail,
94            MissingDataSnafu {
95                which: "`Range (km)` or `Doppler (km/s)` or `Azimuth (deg)` or `Elevation (deg)`"
96            }
97        );
98
99        let mut measurements = BTreeMap::new();
100
101        // We can safely unwrap the columns since we've checked for their existance just before.
102        for maybe_batch in reader {
103            let batch = maybe_batch.context(ArrowSnafu {
104                action: "reading batch of tracking data",
105            })?;
106
107            let tracking_device = batch
108                .column_by_name("Tracking device")
109                .unwrap()
110                .as_any()
111                .downcast_ref::<StringArray>()
112                .unwrap();
113
114            let epochs = batch
115                .column_by_name("Epoch (UTC)")
116                .unwrap()
117                .as_any()
118                .downcast_ref::<StringArray>()
119                .unwrap();
120
121            let range_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if range_avail {
122                Some(
123                    batch
124                        .column_by_name("Range (km)")
125                        .unwrap()
126                        .as_any()
127                        .downcast_ref::<Float64Array>()
128                        .unwrap(),
129                )
130            } else {
131                None
132            };
133
134            let doppler_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if doppler_avail {
135                Some(
136                    batch
137                        .column_by_name("Doppler (km/s)")
138                        .unwrap()
139                        .as_any()
140                        .downcast_ref::<Float64Array>()
141                        .unwrap(),
142                )
143            } else {
144                None
145            };
146
147            let azimuth_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if az_avail {
148                Some(
149                    batch
150                        .column_by_name("Azimuth (deg)")
151                        .unwrap()
152                        .as_any()
153                        .downcast_ref::<Float64Array>()
154                        .unwrap(),
155                )
156            } else {
157                None
158            };
159
160            let elevation_data: Option<&PrimitiveArray<datatypes::Float64Type>> = if el_avail {
161                Some(
162                    batch
163                        .column_by_name("Elevation (deg)")
164                        .unwrap()
165                        .as_any()
166                        .downcast_ref::<Float64Array>()
167                        .unwrap(),
168                )
169            } else {
170                None
171            };
172
173            // Set the measurements in the tracking arc
174            for i in 0..batch.num_rows() {
175                let epoch = Epoch::from_gregorian_str(epochs.value(i)).map_err(|e| {
176                    InputOutputError::Inconsistency {
177                        msg: format!("{e} when parsing epoch"),
178                    }
179                })?;
180
181                let mut measurement = Measurement {
182                    epoch,
183                    tracker: tracking_device.value(i).to_string(),
184                    data: IndexMap::new(),
185                    rejected: false,
186                };
187
188                if range_avail {
189                    measurement
190                        .data
191                        .insert(MeasurementType::Range, range_data.unwrap().value(i));
192                }
193
194                if doppler_avail {
195                    measurement
196                        .data
197                        .insert(MeasurementType::Doppler, doppler_data.unwrap().value(i));
198                }
199
200                if az_avail {
201                    measurement
202                        .data
203                        .insert(MeasurementType::Azimuth, azimuth_data.unwrap().value(i));
204                }
205
206                if el_avail {
207                    measurement
208                        .data
209                        .insert(MeasurementType::Elevation, elevation_data.unwrap().value(i));
210                }
211
212                measurements.insert(epoch, measurement);
213            }
214        }
215
216        Ok(Self {
217            measurements,
218            moduli: None,
219            source: Some(path.as_ref().to_path_buf().display().to_string()),
220            force_reject: false,
221        })
222    }
223    /// Store this tracking arc to a parquet file.
224    pub fn to_parquet_simple<P: AsRef<Path>>(&self, path: P) -> Result<PathBuf, Box<dyn Error>> {
225        self.to_parquet(path, ExportCfg::default())
226    }
227
228    /// Store this tracking arc to a parquet file, with optional metadata and a timestamp appended to the filename.
229    pub fn to_parquet<P: AsRef<Path>>(
230        &self,
231        path: P,
232        cfg: ExportCfg,
233    ) -> Result<PathBuf, Box<dyn Error>> {
234        ensure!(
235            !self.is_empty(),
236            EmptyDatasetSnafu {
237                action: "tracking data arc to parquet"
238            }
239        );
240
241        let path_buf = cfg.actual_path(path);
242
243        if cfg.step.is_some() {
244            warn!("The `step` parameter in the export is not supported for tracking arcs.");
245        }
246
247        if cfg.fields.is_some() {
248            warn!("The `fields` parameter in the export is not supported for tracking arcs.");
249        }
250
251        // Build the schema
252        let mut hdrs = vec![
253            Field::new("Epoch (UTC)", DataType::Utf8, false),
254            Field::new("Tracking device", DataType::Utf8, false),
255        ];
256
257        let msr_types = self.unique_types();
258        let mut msr_fields = msr_types
259            .iter()
260            .map(|msr_type| msr_type.to_field())
261            .collect::<Vec<Field>>();
262
263        hdrs.append(&mut msr_fields);
264
265        // Build the schema
266        let schema = Arc::new(Schema::new(hdrs));
267        let mut record: Vec<Arc<dyn Array>> = Vec::new();
268
269        // Build the measurement iterator
270
271        let measurements =
272            if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
273                let start = cfg
274                    .start_epoch
275                    .unwrap_or_else(|| self.start_epoch().unwrap());
276                let end = cfg.end_epoch.unwrap_or_else(|| self.end_epoch().unwrap());
277
278                info!("Exporting measurements from {start} to {end}.");
279
280                self.measurements
281                    .range(start..end)
282                    .map(|(k, v)| (*k, v.clone()))
283                    .collect()
284            } else {
285                self.measurements.clone()
286            };
287
288        // Build all of the records
289
290        // Epochs
291        let mut utc_epoch = StringBuilder::new();
292        for epoch in measurements.keys() {
293            utc_epoch.append_value(epoch.to_time_scale(TimeScale::UTC).to_isoformat());
294        }
295        record.push(Arc::new(utc_epoch.finish()));
296
297        // Device names
298        let mut device_names = StringBuilder::new();
299        for m in measurements.values() {
300            device_names.append_value(m.tracker.clone());
301        }
302        record.push(Arc::new(device_names.finish()));
303
304        // Measurement data, column by column
305        for msr_type in msr_types {
306            let mut data_builder = Float64Builder::new();
307
308            for m in measurements.values() {
309                match m.data.get(&msr_type) {
310                    Some(value) => data_builder.append_value(*value),
311                    None => data_builder.append_null(),
312                };
313            }
314            record.push(Arc::new(data_builder.finish()));
315        }
316
317        // Serialize all of the devices and add that to the parquet file too.
318        let mut metadata = HashMap::new();
319        metadata.insert("Purpose".to_string(), "Tracking Arc Data".to_string());
320        if let Some(add_meta) = cfg.metadata {
321            for (k, v) in add_meta {
322                metadata.insert(k, v);
323            }
324        }
325
326        if let Some(modulos) = &self.moduli {
327            for (msr_type, v) in modulos {
328                metadata.insert(format!("MODULUS:{msr_type:?}"), v.to_string());
329            }
330        }
331
332        let props = pq_writer(Some(metadata));
333
334        let file = File::create(&path_buf)?;
335
336        let mut writer = ArrowWriter::try_new(file, schema.clone(), props).unwrap();
337
338        let batch = RecordBatch::try_new(schema, record)?;
339        writer.write(&batch)?;
340        writer.close()?;
341
342        info!("Serialized {self} to {}", path_buf.display());
343
344        // Return the path this was written to
345        Ok(path_buf)
346    }
347}