InfluxDBConnector.java

package de.dlr.shepard.influxDB;

import de.dlr.shepard.util.Constants;
import de.dlr.shepard.util.IConnector;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.eclipse.microprofile.config.ConfigProvider;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

/**
 * Connector for read and write access to the Influx timeseries database. The
 * class represents the lowest level of data access to the Influx database. The
 * Influx database is accessed directly by using query strings.
 */
@ApplicationScoped
public class InfluxDBConnector implements IConnector {

  private InfluxDB influxDB;

  /* Use this constructor for testing purpose only.
   *  As soon as influxDB is provided via dependency injection, this constructor can be used for that purpose. */
  InfluxDBConnector(InfluxDB influxDb) {
    this.influxDB = influxDb;
  }

  @Inject
  InfluxDBConnector() {}

  /**
   * Establishes a connection to the Influx server by using the URL saved in the
   * config.properties file returned by the DatabaseHelper. In addition the
   * logging is being configured.
   *
   */
  @Override
  public boolean connect() {
    if (this.influxDB == null) {
      String host = ConfigProvider.getConfig().getValue("influx.host", String.class);
      String username = ConfigProvider.getConfig().getValue("influx.username", String.class);
      String password = ConfigProvider.getConfig().getValue("influx.password", String.class);

      influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password);
    }

    influxDB.enableBatch(
      BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) ->
        Log.errorf("Exception while writing the following points: %s, Exception: %s", failedPoints, throwable)
      )
    );
    return true;
  }

  @Override
  public boolean disconnect() {
    if (influxDB != null) influxDB.close();
    return true;
  }

  /**
   * Tests whether a connection exists.
   *
   * @return True if connection exists, otherwise false.
   */
  @Override
  public boolean alive() {
    Pong response;
    try {
      response = influxDB.ping();
    } catch (InfluxDBException ex) {
      return false;
    }
    return response != null && !response.getVersion().equalsIgnoreCase("unknown");
  }

  /**
   * Creates a new database.
   *
   * @param databaseName Name of the new database to be created.
   */
  public void createDatabase(String databaseName) {
    String query = String.format("CREATE DATABASE \"%s\"", databaseName);
    influxDB.query(new Query(query));
  }

  /**
   * Deletes a database.
   *
   * @param databaseName Name of the database to be deleted.
   */
  public void deleteDatabase(String databaseName) {
    String query = String.format("DROP DATABASE \"%s\"", databaseName);
    influxDB.query(new Query(query));
  }

  /**
   * Writes all InfluxPoint objects, saved in the TimeseriesPayload object, into
   * the influx database. Therefore the method uses the name of the database, the
   * name of the measurement, the location, the device, the symbolic name and the
   * name of the field provided by the given TimeseriesPayload object. The actual
   * write operation is done by using the unix timestamp in nanoseconds and the
   * value of every InfluxPoint object in the TimeseriesPayload object. All these
   * variables have to be defined in the given Timeseries object for a successful
   * write operation.
   *
   * @param database The database to store the payload in
   * @param payload  Combines the required attributes in a structured way.
   * @return An error if there was a problem, empty string if all went well
   */
  public String saveTimeseriesPayload(String database, TimeseriesPayload payload) {
    var timeseries = payload.getTimeseries();
    var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
    var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
    try {
      influxDB.write(batchPoints);
    } catch (InfluxDBException e) {
      Log.errorf("InfluxdbException while writing payload %s: %s", payload.getTimeseries(), e.getMessage());
      return e.getMessage();
    }
    return "";
  }

  /**
   * Returns a Timeseries object containing all the influx data points which match
   * the given conditions consigned to the method. The returned InfluxPoint
   * objects are in the given measurement of the specific database between the
   * start and the end timestamp.
   *
   * @param startTimeStamp Start timestamp from which values should be returned
   * @param endTimeStamp   End timestamp to which values should be returned
   * @param database       Name of the database
   * @param timeseries     Specifies the data to load from database
   * @param function       The aggregate function
   * @param groupBy        The time interval measurements get grouped by
   * @param fillOption     The fill option for missing values
   * @return A Timeseries object containing a list of all InfluxPoints in the
   *         given measurement of the specific database between the two given
   *         timestamps matching the "device"-tag, the "location"-tag and the
   *         "symbolic_name"-tag.
   */
  public TimeseriesPayload getTimeseriesPayload(
    long startTimeStamp,
    long endTimeStamp,
    String database,
    Timeseries timeseries,
    SingleValuedUnaryFunction function,
    Long groupBy,
    FillOption fillOption
  ) {
    Query query = InfluxUtil.buildQuery(
      startTimeStamp,
      endTimeStamp,
      database,
      timeseries,
      function,
      groupBy,
      fillOption
    );
    Log.debugf("Influx Query: %s", query.getCommand());
    QueryResult queryResult;

    try {
      queryResult = influxDB.query(query);
    } catch (InfluxDBException e) {
      queryResult = null;
      Log.errorf("Could not parse query: %s", query.getCommand());
    }
    if (InfluxUtil.isQueryResultValid(queryResult)) {
      return InfluxUtil.extractPayload(queryResult, timeseries);
    }
    return new TimeseriesPayload(timeseries, Collections.emptyList());
  }

  /**
   * Returns a list of timeseries objects that are in the given database.
   *
   * @param database the given database
   * @return a list of timeseries objects
   */
  public List<Timeseries> getTimeseriesAvailable(String database) {
    Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
    QueryResult queryResult = influxDB.query(query);
    if (!InfluxUtil.isQueryResultValid(queryResult)) {
      Log.warn("There was an error while querying the Influxdb for available timeseries");
      return Collections.emptyList();
    }
    var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
    var fields = getFields(database);
    var result = new ArrayList<Timeseries>(values.size());
    for (var value : values) {
      var series = ((String) value.get(0)).split(",");
      // The first entry is the measurement
      var meas = series[0];
      var tags = extractTags(series);
      var dev = tags.getOrDefault(Constants.DEVICE, "");
      var loc = tags.getOrDefault(Constants.LOCATION, "");
      var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
      for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
        result.add(new Timeseries(meas, dev, loc, symName, field));
      }
    }
    return result;
  }

  private Map<String, String> extractTags(String[] series) {
    var result = new HashMap<String, String>();
    for (var tagString : series) {
      var tags = tagString.split("=", 2);

      // Skip entries with less than 2 tuples (most likely the measurement)
      if (tags.length < 2) continue;

      result.put(tags[0], tags[1]);
    }
    return result;
  }

  private Map<String, List<String>> getFields(String database) {
    Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
    QueryResult queryResult = influxDB.query(query);
    if (!InfluxUtil.isQueryResultValid(queryResult)) {
      Log.warn("There was an error while querying the Influxdb for available fields");
      return Collections.emptyMap();
    }
    var series = queryResult.getResults().get(0).getSeries();
    var result = new HashMap<String, List<String>>();
    for (var s : series) {
      var fields = new ArrayList<String>();
      for (var value : s.getValues()) {
        fields.add((String) value.get(0));
      }
      result.put(s.getName(), fields);
    }
    return result;
  }

  public boolean databaseExist(String database) {
    QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
    if (!InfluxUtil.isQueryResultValid(queryResult)) {
      Log.warn("There was an error while querying the Influxdb for databases");
      return false;
    }

    var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
    for (var databaseName : values) {
      if (databaseName.get(0).toString().trim().equals(database)) {
        return true;
      }
    }

    return false;
  }

  /**
   * Returns the expected data type of a field or an empty string according to
   * https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types
   */
  private String getExpectedDatatype(String database, String measurement, String field) {
    String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM %s", database, measurement);
    QueryResult result = influxDB.query(new Query(queryString));
    if (!InfluxUtil.isQueryResultValid(result)) {
      Log.infof("Could not get expected datatype query string \"%s\"", queryString);
      return "";
    }

    var values = result.getResults().get(0).getSeries().get(0).getValues();
    for (var value : values) {
      if (value.get(0).equals(field)) return (String) value.get(1);
    }

    return "";
  }
}