View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.InputStreamReader;
6   import java.io.OutputStreamWriter;
7   import java.nio.charset.StandardCharsets;
8   import java.nio.file.Files;
9   import java.util.ArrayList;
10  import java.util.HashMap;
11  import java.util.List;
12  import java.util.Locale;
13  import java.util.Objects;
14  
15  import org.apache.commons.lang3.math.NumberUtils;
16  
17  import com.opencsv.bean.CsvToBeanBuilder;
18  import com.opencsv.bean.StatefulBeanToCsvBuilder;
19  import com.opencsv.exceptions.CsvException;
20  
21  import de.dlr.shepard.exceptions.InvalidBodyException;
22  import lombok.extern.slf4j.Slf4j;
23  
24  @Slf4j
25  public class CsvConverter {
26  
27  	public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
28  		// TODO: Avoid writing files
29  
30  		var tmpfile = Files.createTempFile("shepard", ".csv");
31  		var stream = Files.newOutputStream(tmpfile);
32  		var streamWriter = new OutputStreamWriter(stream);
33  		var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).withApplyQuotesToAll(false).build();
34  		log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString());
35  
36  		for (var payload : payloads) {
37  			try {
38  				writer.write(convertPayloadToCsv(payload));
39  			} catch (CsvException e) {
40  				log.error("CsvException while writing stream");
41  			}
42  		}
43  
44  		streamWriter.close();
45  		var result = Files.newInputStream(tmpfile);
46  		return result;
47  	}
48  
49  	public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
50  		var reader = new InputStreamReader(stream);
51  		var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class)
52  				.withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> {
53  					var encoder = StandardCharsets.ISO_8859_1.newEncoder();
54  					var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
55  					log.error("CsvException while reading stream: {}", message);
56  					throw new InvalidBodyException(message);
57  				}).build();
58  
59  		List<TimeseriesCsv> result = cb.parse();
60  		reader.close();
61  		return convertCsvToPayload(result);
62  	}
63  
64  	private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
65  		var ts = payload.getTimeseries();
66  		var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
67  		for (var p : payload.getPoints()) {
68  			var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(),
69  					ts.getSymbolicName(), ts.getField(), p.getValue());
70  			result.add(tsc);
71  		}
72  		return result;
73  	}
74  
75  	private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
76  		var result = new HashMap<Integer, TimeseriesPayload>();
77  		for (var input : inputList) {
78  			var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(),
79  					input.getSymbolicName(), input.getField());
80  			var point = new InfluxPoint(input.getTimestamp(), parseValue(input.getValue()));
81  			if (result.containsKey(key)) {
82  				result.get(key).getPoints().add(point);
83  			} else {
84  				var points = new ArrayList<InfluxPoint>();
85  				points.add(point);
86  				var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(),
87  						input.getLocation(), input.getSymbolicName(), input.getField()), points);
88  				result.put(key, payload);
89  			}
90  		}
91  		return new ArrayList<>(result.values());
92  	}
93  
94  	private Object parseValue(Object input) {
95  		List<String> boolString = List.of("true", "false");
96  
97  		if (input instanceof String sInput) {
98  			if (NumberUtils.isCreatable(sInput)) {
99  				return NumberUtils.createNumber(sInput);
100 			} else if (boolString.contains(sInput.toLowerCase())) {
101 				return sInput.equalsIgnoreCase("true");
102 			}
103 		}
104 		return input;
105 	}
106 
107 }