CsvConverter.java

package de.dlr.shepard.influxDB;

import com.opencsv.bean.CsvToBeanBuilder;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import com.opencsv.exceptions.CsvException;
import de.dlr.shepard.exceptions.InvalidBodyException;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import org.apache.commons.lang3.math.NumberUtils;

@RequestScoped
public class CsvConverter {

  public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
    // TODO: Avoid writing files

    var tmpfile = Files.createTempFile("shepard", ".csv");
    var stream = Files.newOutputStream(tmpfile);
    var streamWriter = new OutputStreamWriter(stream);
    var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).withApplyQuotesToAll(false).build();
    Log.debugf("Write temp file to: %s", tmpfile.toAbsolutePath().toString());

    for (var payload : payloads) {
      try {
        writer.write(convertPayloadToCsv(payload));
      } catch (CsvException e) {
        Log.error("CsvException while writing stream");
      }
    }

    streamWriter.close();
    var result = Files.newInputStream(tmpfile);
    return result;
  }

  public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
    var reader = new InputStreamReader(stream);
    var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader)
      .withType(TimeseriesCsv.class)
      .withErrorLocale(Locale.forLanguageTag("en"))
      .withExceptionHandler(e -> {
        var encoder = StandardCharsets.ISO_8859_1.newEncoder();
        var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
        Log.errorf("CsvException while reading stream: %s", message);
        throw new InvalidBodyException(message);
      })
      .build();

    List<TimeseriesCsv> result = cb.parse();
    reader.close();
    return convertCsvToPayload(result);
  }

  private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
    var ts = payload.getTimeseries();
    var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
    for (var p : payload.getPoints()) {
      var tsc = new TimeseriesCsv(
        p.getTimeInNanoseconds(),
        ts.getMeasurement(),
        ts.getDevice(),
        ts.getLocation(),
        ts.getSymbolicName(),
        ts.getField(),
        p.getValue()
      );
      result.add(tsc);
    }
    return result;
  }

  private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
    var result = new HashMap<Integer, TimeseriesPayload>();
    for (var input : inputList) {
      var key = Objects.hash(
        input.getMeasurement(),
        input.getDevice(),
        input.getLocation(),
        input.getSymbolicName(),
        input.getField()
      );
      var point = new InfluxPoint(input.getTimestamp(), parseValue(input.getValue()));
      if (result.containsKey(key)) {
        result.get(key).getPoints().add(point);
      } else {
        var points = new ArrayList<InfluxPoint>();
        points.add(point);
        var payload = new TimeseriesPayload(
          new Timeseries(
            input.getMeasurement(),
            input.getDevice(),
            input.getLocation(),
            input.getSymbolicName(),
            input.getField()
          ),
          points
        );
        result.put(key, payload);
      }
    }
    return new ArrayList<>(result.values());
  }

  private Object parseValue(Object input) {
    List<String> boolString = List.of("true", "false");

    if (input instanceof String sInput) {
      if (NumberUtils.isCreatable(sInput)) {
        return NumberUtils.createNumber(sInput);
      } else if (boolString.contains(sInput.toLowerCase())) {
        return sInput.equalsIgnoreCase("true");
      }
    }
    return input;
  }
}