InfluxUtil.java

package de.dlr.shepard.influxDB;

import de.dlr.shepard.util.Constants;
import io.quarkus.logging.Log;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.QueryResult;

public final class InfluxUtil {

  private InfluxUtil() {
    // Static class needs no constructor
  }

  private static final long MULTIPLIER_NANO = 1000000000L;
  private static final String FIELD_FLOAT = "float";
  private static final String FIELD_INT = "integer";
  private static final String FIELD_STRING = "string";
  private static final String FIELD_BOOL = "boolean";

  /**
   * Build an influx query with the given parameters.
   *
   * @param startTimeStamp The beginning of the timeseries
   * @param endTimeStamp   The end of the timeseries
   * @param database       The database to be queried
   * @param timeseries     The timeseries whose points are queried
   * @param function       The aggregate function
   * @param groupBy        The time interval measurements get grouped by
   * @param fillOption     The fill option for missing values
   * @return an influx query
   */
  public static BoundParameterQuery buildQuery(
    long startTimeStamp,
    long endTimeStamp,
    String database,
    Timeseries timeseries,
    SingleValuedUnaryFunction function,
    Long groupBy,
    FillOption fillOption
  ) {
    var selectPart = (function != null)
      ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
      : String.format("SELECT \"%s\"", timeseries.getField());
    var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
    var wherePart = String.format(
      "WHERE time >= %dns AND time <= %dns " +
      "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
      startTimeStamp,
      endTimeStamp
    );
    var query = String.join(" ", selectPart, fromPart, wherePart);

    if (groupBy != null) {
      query += String.format(" GROUP BY time(%dns)", groupBy);
    }
    if (fillOption != null) {
      query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
    }
    var parameterizedQuery = QueryBuilder.newQuery(query)
      .forDatabase(database)
      .bind("device", timeseries.getDevice())
      .bind("location", timeseries.getLocation())
      .bind("symbolic_name", timeseries.getSymbolicName())
      .create();
    Log.debugf(
      "Query influxdb %s: %s with params %s",
      database,
      parameterizedQuery.getCommand(),
      URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
    );
    return parameterizedQuery;
  }

  /**
   * Extract TimeseriesPayload from influx query result.
   *
   * @param queryResult Influx query result
   * @param timeseries  the timeseries to extract
   * @return TimeseriesPayload
   */
  public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
    var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
    var influxPoints = new ArrayList<InfluxPoint>(values.size());
    for (var value : values) {
      var time = Instant.parse((String) value.get(0));
      var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
      influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
    }
    return new TimeseriesPayload(timeseries, influxPoints);
  }

  /**
   * Create a batch out of a given list of influx points.
   *
   * @param database          The database where the batch is to be stored
   * @param timeseriesPayload TimeseriesPayload to be stored
   * @param expectedType      The expected datatype as string
   * @return influx batch points
   */
  public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
    String error = "";
    BatchPoints batchPoints = BatchPoints.database(database).build();
    var influxPoints = timeseriesPayload.getPoints();
    var timeseries = timeseriesPayload.getTimeseries();

    for (var influxPoint : influxPoints) {
      Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
        .tag(Constants.LOCATION, timeseries.getLocation())
        .tag(Constants.DEVICE, timeseries.getDevice())
        .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
        .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
      Object value = influxPoint.getValue();

      if (value != null && expectedType.equals(FIELD_STRING)) {
        // Expected type is string, we use value.toString()
        pointBuilder.addField(timeseries.getField(), value.toString());
      } else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
        // value is a number and float or nothing is expected
        pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
      } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
        // value is a number and int or nothing is expected
        pointBuilder.addField(timeseries.getField(), numberValue.longValue());
      } else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
        // value is a boolean and boolean or nothing is expected
        pointBuilder.addField(timeseries.getField(), booleanValue);
      } else if (value != null) {
        // value has to be casted
        var stringValue = value.toString();
        try {
          switch (expectedType) {
            case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
            case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
            case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
            default -> pointBuilder.addField(timeseries.getField(), stringValue);
          }
        } catch (NumberFormatException e) {
          if (error.isBlank()) error = String.format(
            // log the first error
            "Invalid influx point detected, cannot cast type %s into type %s",
            stringValue,
            expectedType
          );
        }
      }
      if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
    }
    if (!error.isBlank()) Log.error(error);

    return batchPoints;
  }

  /**
   * Checks whether a QueryResult is valid, meaning that it has no errors and the
   * results as well as the series-lists are not empty. If this returns true it is
   * safe to run
   * {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
   *
   * @param queryResult The QueryResult to be checked.
   * @return False if QueryResult has errors or results or series are empty, true
   *         otherwise.
   */
  public static boolean isQueryResultValid(QueryResult queryResult) {
    if (queryResult == null) {
      Log.warn("Query Result is null");
      return false;
    }
    if (queryResult.getError() != null) {
      Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
      return false;
    }

    var resultList = queryResult.getResults();
    if (resultList == null || resultList.isEmpty()) return false;

    var result = resultList.get(0);
    if (result.hasError()) {
      Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
      return false;
    }

    var seriesList = result.getSeries();
    if (seriesList == null || seriesList.isEmpty()) return false;

    var valueList = seriesList.get(0).getValues();
    if (valueList == null) return false;

    return true;
  }

  /**
   * Is the timeseries object valid so that it can be stored in influxdb? This
   * function returns errors if the time series attributes contain illegal
   * characters.
   *
   * @param timeseries The timeseries object to be sanitized
   * @return errors or empty string
   */
  public static String sanitize(Timeseries timeseries) {
    List<String> errors = new ArrayList<>();

    String measurementString = sanitizeString(timeseries.getMeasurement());
    String locationString = sanitizeString(timeseries.getLocation());
    String deviceString = sanitizeString(timeseries.getDevice());
    String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
    String fieldString = sanitizeString(timeseries.getField());

    if (!measurementString.isEmpty()) {
      errors.add("measurement " + measurementString);
    }
    if (!locationString.isEmpty()) {
      errors.add("location " + locationString);
    }
    if (!deviceString.isEmpty()) {
      errors.add("device " + deviceString);
    }
    if (!symbolicNameString.isEmpty()) {
      errors.add("symbolicName " + symbolicNameString);
    }
    if (!fieldString.isEmpty()) {
      errors.add("field " + fieldString);
    }
    return String.join("\n", errors);
  }

  private static String sanitizeString(String s) {
    String[] forbiddenChars = { " ", ".", "/", "," };
    if (s == null || s.isBlank()) {
      return "should not be blank";
    }

    for (String forbiddenChar : forbiddenChars) {
      int pos = s.indexOf(forbiddenChar);
      if (pos != -1) {
        return (
          "should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
        );
      }
    }

    return "";
  }
}