View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import de.dlr.shepard.util.Constants;
4   import io.quarkus.logging.Log;
5   import java.net.URLDecoder;
6   import java.nio.charset.StandardCharsets;
7   import java.time.Instant;
8   import java.util.ArrayList;
9   import java.util.List;
10  import java.util.concurrent.TimeUnit;
11  import org.influxdb.dto.BatchPoints;
12  import org.influxdb.dto.BoundParameterQuery;
13  import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
14  import org.influxdb.dto.Point;
15  import org.influxdb.dto.Point.Builder;
16  import org.influxdb.dto.QueryResult;
17  
18  public final class InfluxUtil {
19  
20    private InfluxUtil() {
21      // Static class needs no constructor
22    }
23  
24    private static final long MULTIPLIER_NANO = 1000000000L;
25    private static final String FIELD_FLOAT = "float";
26    private static final String FIELD_INT = "integer";
27    private static final String FIELD_STRING = "string";
28    private static final String FIELD_BOOL = "boolean";
29  
30    /**
31     * Build an influx query with the given parameters.
32     *
33     * @param startTimeStamp The beginning of the timeseries
34     * @param endTimeStamp   The end of the timeseries
35     * @param database       The database to be queried
36     * @param timeseries     The timeseries whose points are queried
37     * @param function       The aggregate function
38     * @param groupBy        The time interval measurements get grouped by
39     * @param fillOption     The fill option for missing values
40     * @return an influx query
41     */
42    public static BoundParameterQuery buildQuery(
43      long startTimeStamp,
44      long endTimeStamp,
45      String database,
46      Timeseries timeseries,
47      SingleValuedUnaryFunction function,
48      Long groupBy,
49      FillOption fillOption
50    ) {
51      var selectPart = (function != null)
52        ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
53        : String.format("SELECT \"%s\"", timeseries.getField());
54      var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
55      var wherePart = String.format(
56        "WHERE time >= %dns AND time <= %dns " +
57        "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
58        startTimeStamp,
59        endTimeStamp
60      );
61      var query = String.join(" ", selectPart, fromPart, wherePart);
62  
63      if (groupBy != null) {
64        query += String.format(" GROUP BY time(%dns)", groupBy);
65      }
66      if (fillOption != null) {
67        query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
68      }
69      var parameterizedQuery = QueryBuilder.newQuery(query)
70        .forDatabase(database)
71        .bind("device", timeseries.getDevice())
72        .bind("location", timeseries.getLocation())
73        .bind("symbolic_name", timeseries.getSymbolicName())
74        .create();
75      Log.debugf(
76        "Query influxdb %s: %s with params %s",
77        database,
78        parameterizedQuery.getCommand(),
79        URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
80      );
81      return parameterizedQuery;
82    }
83  
84    /**
85     * Extract TimeseriesPayload from influx query result.
86     *
87     * @param queryResult Influx query result
88     * @param timeseries  the timeseries to extract
89     * @return TimeseriesPayload
90     */
91    public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
92      var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
93      var influxPoints = new ArrayList<InfluxPoint>(values.size());
94      for (var value : values) {
95        var time = Instant.parse((String) value.get(0));
96        var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
97        influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
98      }
99      return new TimeseriesPayload(timeseries, influxPoints);
100   }
101 
102   /**
103    * Create a batch out of a given list of influx points.
104    *
105    * @param database          The database where the batch is to be stored
106    * @param timeseriesPayload TimeseriesPayload to be stored
107    * @param expectedType      The expected datatype as string
108    * @return influx batch points
109    */
110   public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
111     String error = "";
112     BatchPoints batchPoints = BatchPoints.database(database).build();
113     var influxPoints = timeseriesPayload.getPoints();
114     var timeseries = timeseriesPayload.getTimeseries();
115 
116     for (var influxPoint : influxPoints) {
117       Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
118         .tag(Constants.LOCATION, timeseries.getLocation())
119         .tag(Constants.DEVICE, timeseries.getDevice())
120         .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
121         .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
122       Object value = influxPoint.getValue();
123 
124       if (value != null && expectedType.equals(FIELD_STRING)) {
125         // Expected type is string, we use value.toString()
126         pointBuilder.addField(timeseries.getField(), value.toString());
127       } else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
128         // value is a number and float or nothing is expected
129         pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
130       } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
131         // value is a number and int or nothing is expected
132         pointBuilder.addField(timeseries.getField(), numberValue.longValue());
133       } else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
134         // value is a boolean and boolean or nothing is expected
135         pointBuilder.addField(timeseries.getField(), booleanValue);
136       } else if (value != null) {
137         // value has to be casted
138         var stringValue = value.toString();
139         try {
140           switch (expectedType) {
141             case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
142             case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
143             case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
144             default -> pointBuilder.addField(timeseries.getField(), stringValue);
145           }
146         } catch (NumberFormatException e) {
147           if (error.isBlank()) error = String.format(
148             // log the first error
149             "Invalid influx point detected, cannot cast type %s into type %s",
150             stringValue,
151             expectedType
152           );
153         }
154       }
155       if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
156     }
157     if (!error.isBlank()) Log.error(error);
158 
159     return batchPoints;
160   }
161 
162   /**
163    * Checks whether a QueryResult is valid, meaning that it has no errors and the
164    * results as well as the series-lists are not empty. If this returns true it is
165    * safe to run
166    * {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
167    *
168    * @param queryResult The QueryResult to be checked.
169    * @return False if QueryResult has errors or results or series are empty, true
170    *         otherwise.
171    */
172   public static boolean isQueryResultValid(QueryResult queryResult) {
173     if (queryResult == null) {
174       Log.warn("Query Result is null");
175       return false;
176     }
177     if (queryResult.getError() != null) {
178       Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
179       return false;
180     }
181 
182     var resultList = queryResult.getResults();
183     if (resultList == null || resultList.isEmpty()) return false;
184 
185     var result = resultList.get(0);
186     if (result.hasError()) {
187       Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
188       return false;
189     }
190 
191     var seriesList = result.getSeries();
192     if (seriesList == null || seriesList.isEmpty()) return false;
193 
194     var valueList = seriesList.get(0).getValues();
195     if (valueList == null) return false;
196 
197     return true;
198   }
199 
200   /**
201    * Is the timeseries object valid so that it can be stored in influxdb? This
202    * function returns errors if the time series attributes contain illegal
203    * characters.
204    *
205    * @param timeseries The timeseries object to be sanitized
206    * @return errors or empty string
207    */
208   public static String sanitize(Timeseries timeseries) {
209     List<String> errors = new ArrayList<>();
210 
211     String measurementString = sanitizeString(timeseries.getMeasurement());
212     String locationString = sanitizeString(timeseries.getLocation());
213     String deviceString = sanitizeString(timeseries.getDevice());
214     String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
215     String fieldString = sanitizeString(timeseries.getField());
216 
217     if (!measurementString.isEmpty()) {
218       errors.add("measurement " + measurementString);
219     }
220     if (!locationString.isEmpty()) {
221       errors.add("location " + locationString);
222     }
223     if (!deviceString.isEmpty()) {
224       errors.add("device " + deviceString);
225     }
226     if (!symbolicNameString.isEmpty()) {
227       errors.add("symbolicName " + symbolicNameString);
228     }
229     if (!fieldString.isEmpty()) {
230       errors.add("field " + fieldString);
231     }
232     return String.join("\n", errors);
233   }
234 
235   private static String sanitizeString(String s) {
236     String[] forbiddenChars = { " ", ".", "/", "," };
237     if (s == null || s.isBlank()) {
238       return "should not be blank";
239     }
240 
241     for (String forbiddenChar : forbiddenChars) {
242       int pos = s.indexOf(forbiddenChar);
243       if (pos != -1) {
244         return (
245           "should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
246         );
247       }
248     }
249 
250     return "";
251   }
252 }