InfluxUtil.java
package de.dlr.shepard.influxDB;
import de.dlr.shepard.util.Constants;
import io.quarkus.logging.Log;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.QueryResult;
public final class InfluxUtil {
private InfluxUtil() {
// Static class needs no constructor
}
private static final long MULTIPLIER_NANO = 1000000000L;
private static final String FIELD_FLOAT = "float";
private static final String FIELD_INT = "integer";
private static final String FIELD_STRING = "string";
private static final String FIELD_BOOL = "boolean";
/**
* Build an influx query with the given parameters.
*
* @param startTimeStamp The beginning of the timeseries
* @param endTimeStamp The end of the timeseries
* @param database The database to be queried
* @param timeseries The timeseries whose points are queried
* @param function The aggregate function
* @param groupBy The time interval measurements get grouped by
* @param fillOption The fill option for missing values
* @return an influx query
*/
public static BoundParameterQuery buildQuery(
long startTimeStamp,
long endTimeStamp,
String database,
Timeseries timeseries,
SingleValuedUnaryFunction function,
Long groupBy,
FillOption fillOption
) {
var selectPart = (function != null)
? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
: String.format("SELECT \"%s\"", timeseries.getField());
var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
var wherePart = String.format(
"WHERE time >= %dns AND time <= %dns " +
"AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
startTimeStamp,
endTimeStamp
);
var query = String.join(" ", selectPart, fromPart, wherePart);
if (groupBy != null) {
query += String.format(" GROUP BY time(%dns)", groupBy);
}
if (fillOption != null) {
query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
}
var parameterizedQuery = QueryBuilder.newQuery(query)
.forDatabase(database)
.bind("device", timeseries.getDevice())
.bind("location", timeseries.getLocation())
.bind("symbolic_name", timeseries.getSymbolicName())
.create();
Log.debugf(
"Query influxdb %s: %s with params %s",
database,
parameterizedQuery.getCommand(),
URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
);
return parameterizedQuery;
}
/**
* Extract TimeseriesPayload from influx query result.
*
* @param queryResult Influx query result
* @param timeseries the timeseries to extract
* @return TimeseriesPayload
*/
public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
var influxPoints = new ArrayList<InfluxPoint>(values.size());
for (var value : values) {
var time = Instant.parse((String) value.get(0));
var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
}
return new TimeseriesPayload(timeseries, influxPoints);
}
/**
* Create a batch out of a given list of influx points.
*
* @param database The database where the batch is to be stored
* @param timeseriesPayload TimeseriesPayload to be stored
* @param expectedType The expected datatype as string
* @return influx batch points
*/
public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
String error = "";
BatchPoints batchPoints = BatchPoints.database(database).build();
var influxPoints = timeseriesPayload.getPoints();
var timeseries = timeseriesPayload.getTimeseries();
for (var influxPoint : influxPoints) {
Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
.tag(Constants.LOCATION, timeseries.getLocation())
.tag(Constants.DEVICE, timeseries.getDevice())
.tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
.time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
Object value = influxPoint.getValue();
if (value != null && expectedType.equals(FIELD_STRING)) {
// Expected type is string, we use value.toString()
pointBuilder.addField(timeseries.getField(), value.toString());
} else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
// value is a number and float or nothing is expected
pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
} else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
// value is a number and int or nothing is expected
pointBuilder.addField(timeseries.getField(), numberValue.longValue());
} else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
// value is a boolean and boolean or nothing is expected
pointBuilder.addField(timeseries.getField(), booleanValue);
} else if (value != null) {
// value has to be casted
var stringValue = value.toString();
try {
switch (expectedType) {
case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
default -> pointBuilder.addField(timeseries.getField(), stringValue);
}
} catch (NumberFormatException e) {
if (error.isBlank()) error = String.format(
// log the first error
"Invalid influx point detected, cannot cast type %s into type %s",
stringValue,
expectedType
);
}
}
if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
}
if (!error.isBlank()) Log.error(error);
return batchPoints;
}
/**
* Checks whether a QueryResult is valid, meaning that it has no errors and the
* results as well as the series-lists are not empty. If this returns true it is
* safe to run
* {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
*
* @param queryResult The QueryResult to be checked.
* @return False if QueryResult has errors or results or series are empty, true
* otherwise.
*/
public static boolean isQueryResultValid(QueryResult queryResult) {
if (queryResult == null) {
Log.warn("Query Result is null");
return false;
}
if (queryResult.getError() != null) {
Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
return false;
}
var resultList = queryResult.getResults();
if (resultList == null || resultList.isEmpty()) return false;
var result = resultList.get(0);
if (result.hasError()) {
Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
return false;
}
var seriesList = result.getSeries();
if (seriesList == null || seriesList.isEmpty()) return false;
var valueList = seriesList.get(0).getValues();
if (valueList == null) return false;
return true;
}
/**
* Is the timeseries object valid so that it can be stored in influxdb? This
* function returns errors if the time series attributes contain illegal
* characters.
*
* @param timeseries The timeseries object to be sanitized
* @return errors or empty string
*/
public static String sanitize(Timeseries timeseries) {
List<String> errors = new ArrayList<>();
String measurementString = sanitizeString(timeseries.getMeasurement());
String locationString = sanitizeString(timeseries.getLocation());
String deviceString = sanitizeString(timeseries.getDevice());
String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
String fieldString = sanitizeString(timeseries.getField());
if (!measurementString.isEmpty()) {
errors.add("measurement " + measurementString);
}
if (!locationString.isEmpty()) {
errors.add("location " + locationString);
}
if (!deviceString.isEmpty()) {
errors.add("device " + deviceString);
}
if (!symbolicNameString.isEmpty()) {
errors.add("symbolicName " + symbolicNameString);
}
if (!fieldString.isEmpty()) {
errors.add("field " + fieldString);
}
return String.join("\n", errors);
}
private static String sanitizeString(String s) {
String[] forbiddenChars = { " ", ".", "/", "," };
if (s == null || s.isBlank()) {
return "should not be blank";
}
for (String forbiddenChar : forbiddenChars) {
int pos = s.indexOf(forbiddenChar);
if (pos != -1) {
return (
"should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
);
}
}
return "";
}
}