1 package de.dlr.shepard.influxDB;
2
3 import com.opencsv.bean.CsvToBeanBuilder;
4 import com.opencsv.bean.StatefulBeanToCsvBuilder;
5 import com.opencsv.exceptions.CsvException;
6 import de.dlr.shepard.exceptions.InvalidBodyException;
7 import io.quarkus.logging.Log;
8 import jakarta.enterprise.context.RequestScoped;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.io.InputStreamReader;
12 import java.io.OutputStreamWriter;
13 import java.nio.charset.StandardCharsets;
14 import java.nio.file.Files;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Locale;
19 import java.util.Objects;
20 import org.apache.commons.lang3.math.NumberUtils;
21
22 @RequestScoped
23 public class CsvConverter {
24
25 public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
26
27
28 var tmpfile = Files.createTempFile("shepard", ".csv");
29 var stream = Files.newOutputStream(tmpfile);
30 var streamWriter = new OutputStreamWriter(stream);
31 var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).withApplyQuotesToAll(false).build();
32 Log.debugf("Write temp file to: %s", tmpfile.toAbsolutePath().toString());
33
34 for (var payload : payloads) {
35 try {
36 writer.write(convertPayloadToCsv(payload));
37 } catch (CsvException e) {
38 Log.error("CsvException while writing stream");
39 }
40 }
41
42 streamWriter.close();
43 var result = Files.newInputStream(tmpfile);
44 return result;
45 }
46
47 public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
48 var reader = new InputStreamReader(stream);
49 var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader)
50 .withType(TimeseriesCsv.class)
51 .withErrorLocale(Locale.forLanguageTag("en"))
52 .withExceptionHandler(e -> {
53 var encoder = StandardCharsets.ISO_8859_1.newEncoder();
54 var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
55 Log.errorf("CsvException while reading stream: %s", message);
56 throw new InvalidBodyException(message);
57 })
58 .build();
59
60 List<TimeseriesCsv> result = cb.parse();
61 reader.close();
62 return convertCsvToPayload(result);
63 }
64
65 private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
66 var ts = payload.getTimeseries();
67 var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
68 for (var p : payload.getPoints()) {
69 var tsc = new TimeseriesCsv(
70 p.getTimeInNanoseconds(),
71 ts.getMeasurement(),
72 ts.getDevice(),
73 ts.getLocation(),
74 ts.getSymbolicName(),
75 ts.getField(),
76 p.getValue()
77 );
78 result.add(tsc);
79 }
80 return result;
81 }
82
83 private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
84 var result = new HashMap<Integer, TimeseriesPayload>();
85 for (var input : inputList) {
86 var key = Objects.hash(
87 input.getMeasurement(),
88 input.getDevice(),
89 input.getLocation(),
90 input.getSymbolicName(),
91 input.getField()
92 );
93 var point = new InfluxPoint(input.getTimestamp(), parseValue(input.getValue()));
94 if (result.containsKey(key)) {
95 result.get(key).getPoints().add(point);
96 } else {
97 var points = new ArrayList<InfluxPoint>();
98 points.add(point);
99 var payload = new TimeseriesPayload(
100 new Timeseries(
101 input.getMeasurement(),
102 input.getDevice(),
103 input.getLocation(),
104 input.getSymbolicName(),
105 input.getField()
106 ),
107 points
108 );
109 result.put(key, payload);
110 }
111 }
112 return new ArrayList<>(result.values());
113 }
114
115 private Object parseValue(Object input) {
116 List<String> boolString = List.of("true", "false");
117
118 if (input instanceof String sInput) {
119 if (NumberUtils.isCreatable(sInput)) {
120 return NumberUtils.createNumber(sInput);
121 } else if (boolString.contains(sInput.toLowerCase())) {
122 return sInput.equalsIgnoreCase("true");
123 }
124 }
125 return input;
126 }
127 }