View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2   
3   import de.dlr.shepard.common.util.Constants;
4   import io.quarkus.logging.Log;
5   import java.net.URLDecoder;
6   import java.nio.charset.StandardCharsets;
7   import java.time.Instant;
8   import java.util.ArrayList;
9   import java.util.List;
10  import java.util.concurrent.TimeUnit;
11  import org.influxdb.dto.BatchPoints;
12  import org.influxdb.dto.BoundParameterQuery;
13  import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
14  import org.influxdb.dto.Point;
15  import org.influxdb.dto.Point.Builder;
16  import org.influxdb.dto.QueryResult;
17  
18  public final class InfluxUtil {
19  
20    private InfluxUtil() {
21      // Static class needs no constructor
22    }
23  
24    private static final long MULTIPLIER_NANO = 1000000000L;
25    private static final String FIELD_FLOAT = "float";
26    private static final String FIELD_INT = "integer";
27    private static final String FIELD_STRING = "string";
28    private static final String FIELD_BOOL = "boolean";
29  
30    /**
31     * Build an influx query with the given parameters.
32     *
33     * @param startTimeStamp The beginning of the timeseries
34     * @param endTimeStamp   The end of the timeseries
35     * @param database       The database to be queried
36     * @param timeseries     The timeseries whose points are queried
37     * @param function       The aggregate function
38     * @param groupBy        The time interval measurements get grouped by
39     * @param fillOption     The fill option for missing values
40     * @return an influx query
41     */
42    public static BoundParameterQuery buildQuery(
43      long startTimeStamp,
44      long endTimeStamp,
45      String database,
46      InfluxTimeseries timeseries,
47      InfluxSingleValuedUnaryFunction function,
48      Long groupBy,
49      InfluxFillOption fillOption
50    ) {
51      var selectPart = (function != null)
52        ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
53        : String.format("SELECT \"%s\"", timeseries.getField());
54      var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
55      var wherePart = String.format(
56        "WHERE time >= %dns AND time <= %dns " +
57        "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
58        startTimeStamp,
59        endTimeStamp
60      );
61      var query = String.join(" ", selectPart, fromPart, wherePart);
62  
63      if (groupBy != null) {
64        query += String.format(" GROUP BY time(%dns)", groupBy);
65      }
66      if (fillOption != null) {
67        query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
68      }
69      var parameterizedQuery = QueryBuilder.newQuery(query)
70        .forDatabase(database)
71        .bind("device", timeseries.getDevice())
72        .bind("location", timeseries.getLocation())
73        .bind("symbolic_name", timeseries.getSymbolicName())
74        .create();
75      Log.debugf(
76        "Query influxdb %s: %s with params %s",
77        database,
78        parameterizedQuery.getCommand(),
79        URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
80      );
81      return parameterizedQuery;
82    }
83  
84    /**
85     * Extract TimeseriesPayload from influx query result.
86     *
87     * @param queryResult Influx query result
88     * @param timeseries  the timeseries to extract
89     * @return TimeseriesPayload
90     */
91    public static InfluxTimeseriesPayload extractPayload(QueryResult queryResult, InfluxTimeseries timeseries) {
92      var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
93      var influxPoints = new ArrayList<InfluxPoint>(values.size());
94      for (var value : values) {
95        var time = Instant.parse((String) value.get(0));
96        var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
97        influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
98      }
99      return new InfluxTimeseriesPayload(timeseries, influxPoints);
100   }
101 
102   /**
103    * Create a batch out of a given list of influx points.
104    *
105    * @param database          The database where the batch is to be stored
106    * @param timeseriesPayload TimeseriesPayload to be stored
107    * @param expectedType      The expected datatype as string
108    * @return influx batch points
109    */
110   public static BatchPoints createBatch(
111     String database,
112     InfluxTimeseriesPayload timeseriesPayload,
113     String expectedType
114   ) {
115     String error = "";
116     BatchPoints batchPoints = BatchPoints.database(database).build();
117     var influxPoints = timeseriesPayload.getPoints();
118     var timeseries = timeseriesPayload.getTimeseries();
119 
120     for (var influxPoint : influxPoints) {
121       Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
122         .tag(Constants.LOCATION, timeseries.getLocation())
123         .tag(Constants.DEVICE, timeseries.getDevice())
124         .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
125         .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
126       Object value = influxPoint.getValue();
127 
128       if (value != null && expectedType.equals(FIELD_STRING)) {
129         // Expected type is string, we use value.toString()
130         pointBuilder.addField(timeseries.getField(), value.toString());
131       } else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
132         // value is a number and float or nothing is expected
133         pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
134       } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
135         // value is a number and int or nothing is expected
136         pointBuilder.addField(timeseries.getField(), numberValue.longValue());
137       } else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
138         // value is a boolean and boolean or nothing is expected
139         pointBuilder.addField(timeseries.getField(), booleanValue);
140       } else if (value != null) {
141         // value has to be casted
142         var stringValue = value.toString();
143         try {
144           switch (expectedType) {
145             case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
146             case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
147             case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
148             default -> pointBuilder.addField(timeseries.getField(), stringValue);
149           }
150         } catch (NumberFormatException e) {
151           if (error.isBlank()) error = String.format(
152             // log the first error
153             "Invalid influx point detected, cannot cast type %s into type %s",
154             stringValue,
155             expectedType
156           );
157         }
158       }
159       if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
160     }
161     if (!error.isBlank()) Log.error(error);
162 
163     return batchPoints;
164   }
165 
166   /**
167    * Checks whether a QueryResult is valid, meaning that it has no errors and the
168    * results as well as the series-lists are not empty. If this returns true it is
169    * safe to run
170    * {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
171    *
172    * @param queryResult The QueryResult to be checked.
173    * @return False if QueryResult has errors or results or series are empty, true
174    *         otherwise.
175    */
176   public static boolean isQueryResultValid(QueryResult queryResult) {
177     if (queryResult == null) {
178       Log.warn("Query Result is null");
179       return false;
180     }
181     if (queryResult.getError() != null) {
182       Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
183       return false;
184     }
185 
186     var resultList = queryResult.getResults();
187     if (resultList == null || resultList.isEmpty()) return false;
188 
189     var result = resultList.get(0);
190     if (result.hasError()) {
191       Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
192       return false;
193     }
194 
195     var seriesList = result.getSeries();
196     if (seriesList == null || seriesList.isEmpty()) return false;
197 
198     var valueList = seriesList.get(0).getValues();
199     if (valueList == null) return false;
200 
201     return true;
202   }
203 
204   /**
205    * Is the timeseries object valid so that it can be stored in influxdb? This
206    * function returns errors if the time series attributes contain illegal
207    * characters.
208    *
209    * @param timeseries The timeseries object to be sanitized
210    * @return errors or empty string
211    */
212   public static String sanitize(InfluxTimeseries timeseries) {
213     List<String> errors = new ArrayList<>();
214 
215     String measurementString = sanitizeString(timeseries.getMeasurement());
216     String locationString = sanitizeString(timeseries.getLocation());
217     String deviceString = sanitizeString(timeseries.getDevice());
218     String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
219     String fieldString = sanitizeString(timeseries.getField());
220 
221     if (!measurementString.isEmpty()) {
222       errors.add("measurement " + measurementString);
223     }
224     if (!locationString.isEmpty()) {
225       errors.add("location " + locationString);
226     }
227     if (!deviceString.isEmpty()) {
228       errors.add("device " + deviceString);
229     }
230     if (!symbolicNameString.isEmpty()) {
231       errors.add("symbolicName " + symbolicNameString);
232     }
233     if (!fieldString.isEmpty()) {
234       errors.add("field " + fieldString);
235     }
236     return String.join("\n", errors);
237   }
238 
239   private static String sanitizeString(String s) {
240     String[] forbiddenChars = { " ", ".", "/", "," };
241     if (s == null || s.isBlank()) {
242       return "should not be blank";
243     }
244 
245     for (String forbiddenChar : forbiddenChars) {
246       int pos = s.indexOf(forbiddenChar);
247       if (pos != -1) {
248         return (
249           "should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
250         );
251       }
252     }
253 
254     return "";
255   }
256 }