1 package de.dlr.shepard.influxDB;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.InputStreamReader;
6 import java.io.OutputStreamWriter;
7 import java.nio.charset.StandardCharsets;
8 import java.nio.file.Files;
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Locale;
13 import java.util.Objects;
14
15 import org.apache.commons.lang3.math.NumberUtils;
16
17 import com.opencsv.bean.CsvToBeanBuilder;
18 import com.opencsv.bean.StatefulBeanToCsvBuilder;
19 import com.opencsv.exceptions.CsvException;
20
21 import de.dlr.shepard.exceptions.InvalidBodyException;
22 import lombok.extern.slf4j.Slf4j;
23
24 @Slf4j
25 public class CsvConverter {
26
27 public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
28
29
30 var tmpfile = Files.createTempFile("shepard", ".csv");
31 var stream = Files.newOutputStream(tmpfile);
32 var streamWriter = new OutputStreamWriter(stream);
33 var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).withApplyQuotesToAll(false).build();
34 log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString());
35
36 for (var payload : payloads) {
37 try {
38 writer.write(convertPayloadToCsv(payload));
39 } catch (CsvException e) {
40 log.error("CsvException while writing stream");
41 }
42 }
43
44 streamWriter.close();
45 var result = Files.newInputStream(tmpfile);
46 return result;
47 }
48
49 public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
50 var reader = new InputStreamReader(stream);
51 var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class)
52 .withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> {
53 var encoder = StandardCharsets.ISO_8859_1.newEncoder();
54 var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
55 log.error("CsvException while reading stream: {}", message);
56 throw new InvalidBodyException(message);
57 }).build();
58
59 List<TimeseriesCsv> result = cb.parse();
60 reader.close();
61 return convertCsvToPayload(result);
62 }
63
64 private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
65 var ts = payload.getTimeseries();
66 var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
67 for (var p : payload.getPoints()) {
68 var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(),
69 ts.getSymbolicName(), ts.getField(), p.getValue());
70 result.add(tsc);
71 }
72 return result;
73 }
74
75 private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
76 var result = new HashMap<Integer, TimeseriesPayload>();
77 for (var input : inputList) {
78 var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(),
79 input.getSymbolicName(), input.getField());
80 var point = new InfluxPoint(input.getTimestamp(), parseValue(input.getValue()));
81 if (result.containsKey(key)) {
82 result.get(key).getPoints().add(point);
83 } else {
84 var points = new ArrayList<InfluxPoint>();
85 points.add(point);
86 var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(),
87 input.getLocation(), input.getSymbolicName(), input.getField()), points);
88 result.put(key, payload);
89 }
90 }
91 return new ArrayList<>(result.values());
92 }
93
94 private Object parseValue(Object input) {
95 List<String> boolString = List.of("true", "false");
96
97 if (input instanceof String sInput) {
98 if (NumberUtils.isCreatable(sInput)) {
99 return NumberUtils.createNumber(sInput);
100 } else if (boolString.contains(sInput.toLowerCase())) {
101 return sInput.equalsIgnoreCase("true");
102 }
103 }
104 return input;
105 }
106
107 }