InfluxUtil.java

package de.dlr.shepard.influxDB;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.QueryResult;

import de.dlr.shepard.util.Constants;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public final class InfluxUtil {

	private InfluxUtil() {
		// Static class needs no constructor
	}

	private static final long MULTIPLIER_NANO = 1000000000L;
	private static final String FIELD_FLOAT = "float";
	private static final String FIELD_INT = "integer";
	private static final String FIELD_STRING = "string";
	private static final String FIELD_BOOL = "boolean";

	/**
	 * Build an influx query with the given parameters.
	 *
	 * @param startTimeStamp The beginning of the timeseries
	 * @param endTimeStamp   The end of the timeseries
	 * @param database       The database to be queried
	 * @param timeseries     The timeseries whose points are queried
	 * @param function       The aggregate function
	 * @param groupBy        The time interval measurements get grouped by
	 * @param fillOption     The fill option for missing values
	 * @return an influx query
	 */
	public static BoundParameterQuery buildQuery(long startTimeStamp, long endTimeStamp, String database,
			Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
		var selectPart = (function != null)
				? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
				: String.format("SELECT \"%s\"", timeseries.getField());
		var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
		var wherePart = String.format("WHERE time >= %dns AND time <= %dns "
				+ "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
				startTimeStamp, endTimeStamp);
		var query = String.join(" ", selectPart, fromPart, wherePart);

		if (groupBy != null) {
			query += String.format(" GROUP BY time(%dns)", groupBy);
		}
		if (fillOption != null) {
			query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
		}
		var parameterizedQuery = QueryBuilder.newQuery(query).forDatabase(database)
				.bind("device", timeseries.getDevice()).bind("location", timeseries.getLocation())
				.bind("symbolic_name", timeseries.getSymbolicName()).create();
		log.debug("Query influxdb {}: {} with params {}", database, parameterizedQuery.getCommand(),
				URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8));
		return parameterizedQuery;
	}

	/**
	 * Extract TimeseriesPayload from influx query result.
	 *
	 * @param queryResult Influx query result
	 * @param timeseries  the timeseries to extract
	 * @return TimeseriesPayload
	 */
	public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
		var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
		var influxPoints = new ArrayList<InfluxPoint>(values.size());
		for (var value : values) {
			var time = Instant.parse((String) value.get(0));
			var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
			influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
		}
		return new TimeseriesPayload(timeseries, influxPoints);
	}

	/**
	 * Create a batch out of a given list of influx points.
	 *
	 * @param database          The database where the batch is to be stored
	 * @param timeseriesPayload TimeseriesPayload to be stored
	 * @param expectedType      The expected datatype as string
	 * @return influx batch points
	 */
	public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
		String error = "";
		BatchPoints batchPoints = BatchPoints.database(database).build();
		var influxPoints = timeseriesPayload.getPoints();
		var timeseries = timeseriesPayload.getTimeseries();

		for (var influxPoint : influxPoints) {
			Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
					.tag(Constants.LOCATION, timeseries.getLocation()).tag(Constants.DEVICE, timeseries.getDevice())
					.tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
					.time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
			Object value = influxPoint.getValue();

			if (value != null && expectedType.equals(FIELD_STRING)) {
				// Expected type is string, we use value.toString()
				pointBuilder.addField(timeseries.getField(), value.toString());
			} else if (value instanceof Number numberValue
					&& (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
				// value is a number and float or nothing is expected
				pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
			} else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
				// value is a number and int or nothing is expected
				pointBuilder.addField(timeseries.getField(), numberValue.longValue());
			} else if (value instanceof Boolean booleanValue
					&& (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
				// value is a boolean and boolean or nothing is expected
				pointBuilder.addField(timeseries.getField(), booleanValue);
			} else if (value != null) {
				// value has to be casted
				var stringValue = value.toString();
				try {
					switch (expectedType) {
					case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
					case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
					case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
					default -> pointBuilder.addField(timeseries.getField(), stringValue);
					}
				} catch (NumberFormatException e) {
					if (error.isBlank())
						// log the first error
						error = String.format("Invalid influx point detected, cannot cast type %s into type %s",
								stringValue, expectedType);
				}
			}
			if (pointBuilder.hasFields())
				batchPoints.point(pointBuilder.build());
		}
		if (!error.isBlank())
			log.error(error);

		return batchPoints;

	}

	/**
	 * Checks whether a QueryResult is valid, meaning that it has no errors and the
	 * results as well as the series-lists are not empty. If this returns true it is
	 * safe to run
	 * {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
	 *
	 * @param queryResult The QueryResult to be checked.
	 * @return False if QueryResult has errors or results or series are empty, true
	 *         otherwise.
	 */
	public static boolean isQueryResultValid(QueryResult queryResult) {
		if (queryResult == null) {
			log.warn("Query Result is null");
			return false;
		}
		if (queryResult.getError() != null) {
			log.warn("There was an error while querying the Influxdb: {}", queryResult.getError());
			return false;
		}

		var resultList = queryResult.getResults();
		if (resultList == null || resultList.isEmpty())
			return false;

		var result = resultList.get(0);
		if (result.hasError()) {
			log.warn("There was an error while querying the Influxdb: {}", result.getError());
			return false;
		}

		var seriesList = result.getSeries();
		if (seriesList == null || seriesList.isEmpty())
			return false;

		var valueList = seriesList.get(0).getValues();
		if (valueList == null)
			return false;

		return true;
	}

	/**
	 * Is the timeseries object valid so that it can be stored in influxdb? This
	 * function returns errors if the time series attributes contain illegal
	 * characters.
	 *
	 * @param timeseries The timeseries object to be sanitized
	 * @return errors or empty string
	 */
	public static String sanitize(Timeseries timeseries) {
		List<String> errors = new ArrayList<>();

		String measurementString = sanitizeString(timeseries.getMeasurement());
		String locationString = sanitizeString(timeseries.getLocation());
		String deviceString = sanitizeString(timeseries.getDevice());
		String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
		String fieldString = sanitizeString(timeseries.getField());

		if (!measurementString.isEmpty()) {
			errors.add("measurement " + measurementString);
		}
		if (!locationString.isEmpty()) {
			errors.add("location " + locationString);
		}
		if (!deviceString.isEmpty()) {
			errors.add("device " + deviceString);
		}
		if (!symbolicNameString.isEmpty()) {
			errors.add("symbolicName " + symbolicNameString);
		}
		if (!fieldString.isEmpty()) {
			errors.add("field " + fieldString);
		}
		return String.join("\n", errors);
	}

	private static String sanitizeString(String s) {
		String[] forbiddenChars = { " ", ".", "/", "," };
		if (s == null || s.isBlank()) {
			return "should not be blank";
		}

		for (String forbiddenChar : forbiddenChars) {
			int pos = s.indexOf(forbiddenChar);
			if (pos != -1) {
				return "should not contain whitespaces or dots or slashes or commas: "
						+ s.substring(0, pos + forbiddenChar.length());
			}
		}

		return "";
	}
}