View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import com.opencsv.bean.CsvToBeanBuilder;
4   import com.opencsv.bean.StatefulBeanToCsvBuilder;
5   import com.opencsv.exceptions.CsvException;
6   import de.dlr.shepard.exceptions.InvalidBodyException;
7   import io.quarkus.logging.Log;
8   import jakarta.enterprise.context.RequestScoped;
9   import java.io.IOException;
10  import java.io.InputStream;
11  import java.io.InputStreamReader;
12  import java.io.OutputStreamWriter;
13  import java.nio.charset.StandardCharsets;
14  import java.nio.file.Files;
15  import java.util.ArrayList;
16  import java.util.HashMap;
17  import java.util.List;
18  import java.util.Locale;
19  import java.util.Objects;
20  import org.apache.commons.lang3.math.NumberUtils;
21  
22  @RequestScoped
23  public class CsvConverter {
24  
25    public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
26      // TODO: Avoid writing files
27  
28      var tmpfile = Files.createTempFile("shepard", ".csv");
29      var stream = Files.newOutputStream(tmpfile);
30      var streamWriter = new OutputStreamWriter(stream);
31      var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).withApplyQuotesToAll(false).build();
32      Log.debugf("Write temp file to: %s", tmpfile.toAbsolutePath().toString());
33  
34      for (var payload : payloads) {
35        try {
36          writer.write(convertPayloadToCsv(payload));
37        } catch (CsvException e) {
38          Log.error("CsvException while writing stream");
39        }
40      }
41  
42      streamWriter.close();
43      var result = Files.newInputStream(tmpfile);
44      return result;
45    }
46  
47    public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
48      var reader = new InputStreamReader(stream);
49      var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader)
50        .withType(TimeseriesCsv.class)
51        .withErrorLocale(Locale.forLanguageTag("en"))
52        .withExceptionHandler(e -> {
53          var encoder = StandardCharsets.ISO_8859_1.newEncoder();
54          var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
55          Log.errorf("CsvException while reading stream: %s", message);
56          throw new InvalidBodyException(message);
57        })
58        .build();
59  
60      List<TimeseriesCsv> result = cb.parse();
61      reader.close();
62      return convertCsvToPayload(result);
63    }
64  
65    private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
66      var ts = payload.getTimeseries();
67      var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
68      for (var p : payload.getPoints()) {
69        var tsc = new TimeseriesCsv(
70          p.getTimeInNanoseconds(),
71          ts.getMeasurement(),
72          ts.getDevice(),
73          ts.getLocation(),
74          ts.getSymbolicName(),
75          ts.getField(),
76          p.getValue()
77        );
78        result.add(tsc);
79      }
80      return result;
81    }
82  
83    private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
84      var result = new HashMap<Integer, TimeseriesPayload>();
85      for (var input : inputList) {
86        var key = Objects.hash(
87          input.getMeasurement(),
88          input.getDevice(),
89          input.getLocation(),
90          input.getSymbolicName(),
91          input.getField()
92        );
93        var point = new InfluxPoint(input.getTimestamp(), parseValue(input.getValue()));
94        if (result.containsKey(key)) {
95          result.get(key).getPoints().add(point);
96        } else {
97          var points = new ArrayList<InfluxPoint>();
98          points.add(point);
99          var payload = new TimeseriesPayload(
100           new Timeseries(
101             input.getMeasurement(),
102             input.getDevice(),
103             input.getLocation(),
104             input.getSymbolicName(),
105             input.getField()
106           ),
107           points
108         );
109         result.put(key, payload);
110       }
111     }
112     return new ArrayList<>(result.values());
113   }
114 
115   private Object parseValue(Object input) {
116     List<String> boolString = List.of("true", "false");
117 
118     if (input instanceof String sInput) {
119       if (NumberUtils.isCreatable(sInput)) {
120         return NumberUtils.createNumber(sInput);
121       } else if (boolString.contains(sInput.toLowerCase())) {
122         return sInput.equalsIgnoreCase("true");
123       }
124     }
125     return input;
126   }
127 }