1 package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2
3 import de.dlr.shepard.common.util.Constants;
4 import io.quarkus.logging.Log;
5 import java.net.URLDecoder;
6 import java.nio.charset.StandardCharsets;
7 import java.time.Instant;
8 import java.util.ArrayList;
9 import java.util.List;
10 import java.util.concurrent.TimeUnit;
11 import org.influxdb.dto.BatchPoints;
12 import org.influxdb.dto.BoundParameterQuery;
13 import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
14 import org.influxdb.dto.Point;
15 import org.influxdb.dto.Point.Builder;
16 import org.influxdb.dto.QueryResult;
17
18 public final class InfluxUtil {
19
20 private InfluxUtil() {
21
22 }
23
24 private static final long MULTIPLIER_NANO = 1000000000L;
25 private static final String FIELD_FLOAT = "float";
26 private static final String FIELD_INT = "integer";
27 private static final String FIELD_STRING = "string";
28 private static final String FIELD_BOOL = "boolean";
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public static BoundParameterQuery buildQuery(
43 long startTimeStamp,
44 long endTimeStamp,
45 String database,
46 InfluxTimeseries timeseries,
47 InfluxSingleValuedUnaryFunction function,
48 Long groupBy,
49 InfluxFillOption fillOption
50 ) {
51 var selectPart = (function != null)
52 ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
53 : String.format("SELECT \"%s\"", timeseries.getField());
54 var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
55 var wherePart = String.format(
56 "WHERE time >= %dns AND time <= %dns " +
57 "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
58 startTimeStamp,
59 endTimeStamp
60 );
61 var query = String.join(" ", selectPart, fromPart, wherePart);
62
63 if (groupBy != null) {
64 query += String.format(" GROUP BY time(%dns)", groupBy);
65 }
66 if (fillOption != null) {
67 query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
68 }
69 var parameterizedQuery = QueryBuilder.newQuery(query)
70 .forDatabase(database)
71 .bind("device", timeseries.getDevice())
72 .bind("location", timeseries.getLocation())
73 .bind("symbolic_name", timeseries.getSymbolicName())
74 .create();
75 Log.debugf(
76 "Query influxdb %s: %s with params %s",
77 database,
78 parameterizedQuery.getCommand(),
79 URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
80 );
81 return parameterizedQuery;
82 }
83
84
85
86
87
88
89
90
91 public static InfluxTimeseriesPayload extractPayload(QueryResult queryResult, InfluxTimeseries timeseries) {
92 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
93 var influxPoints = new ArrayList<InfluxPoint>(values.size());
94 for (var value : values) {
95 var time = Instant.parse((String) value.get(0));
96 var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
97 influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
98 }
99 return new InfluxTimeseriesPayload(timeseries, influxPoints);
100 }
101
102
103
104
105
106
107
108
109
110 public static BatchPoints createBatch(
111 String database,
112 InfluxTimeseriesPayload timeseriesPayload,
113 String expectedType
114 ) {
115 String error = "";
116 BatchPoints batchPoints = BatchPoints.database(database).build();
117 var influxPoints = timeseriesPayload.getPoints();
118 var timeseries = timeseriesPayload.getTimeseries();
119
120 for (var influxPoint : influxPoints) {
121 Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
122 .tag(Constants.LOCATION, timeseries.getLocation())
123 .tag(Constants.DEVICE, timeseries.getDevice())
124 .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
125 .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
126 Object value = influxPoint.getValue();
127
128 if (value != null && expectedType.equals(FIELD_STRING)) {
129
130 pointBuilder.addField(timeseries.getField(), value.toString());
131 } else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
132
133 pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
134 } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
135
136 pointBuilder.addField(timeseries.getField(), numberValue.longValue());
137 } else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
138
139 pointBuilder.addField(timeseries.getField(), booleanValue);
140 } else if (value != null) {
141
142 var stringValue = value.toString();
143 try {
144 switch (expectedType) {
145 case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
146 case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
147 case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
148 default -> pointBuilder.addField(timeseries.getField(), stringValue);
149 }
150 } catch (NumberFormatException e) {
151 if (error.isBlank()) error = String.format(
152
153 "Invalid influx point detected, cannot cast type %s into type %s",
154 stringValue,
155 expectedType
156 );
157 }
158 }
159 if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
160 }
161 if (!error.isBlank()) Log.error(error);
162
163 return batchPoints;
164 }
165
166
167
168
169
170
171
172
173
174
175
176 public static boolean isQueryResultValid(QueryResult queryResult) {
177 if (queryResult == null) {
178 Log.warn("Query Result is null");
179 return false;
180 }
181 if (queryResult.getError() != null) {
182 Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
183 return false;
184 }
185
186 var resultList = queryResult.getResults();
187 if (resultList == null || resultList.isEmpty()) return false;
188
189 var result = resultList.get(0);
190 if (result.hasError()) {
191 Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
192 return false;
193 }
194
195 var seriesList = result.getSeries();
196 if (seriesList == null || seriesList.isEmpty()) return false;
197
198 var valueList = seriesList.get(0).getValues();
199 if (valueList == null) return false;
200
201 return true;
202 }
203
204
205
206
207
208
209
210
211
212 public static String sanitize(InfluxTimeseries timeseries) {
213 List<String> errors = new ArrayList<>();
214
215 String measurementString = sanitizeString(timeseries.getMeasurement());
216 String locationString = sanitizeString(timeseries.getLocation());
217 String deviceString = sanitizeString(timeseries.getDevice());
218 String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
219 String fieldString = sanitizeString(timeseries.getField());
220
221 if (!measurementString.isEmpty()) {
222 errors.add("measurement " + measurementString);
223 }
224 if (!locationString.isEmpty()) {
225 errors.add("location " + locationString);
226 }
227 if (!deviceString.isEmpty()) {
228 errors.add("device " + deviceString);
229 }
230 if (!symbolicNameString.isEmpty()) {
231 errors.add("symbolicName " + symbolicNameString);
232 }
233 if (!fieldString.isEmpty()) {
234 errors.add("field " + fieldString);
235 }
236 return String.join("\n", errors);
237 }
238
239 private static String sanitizeString(String s) {
240 String[] forbiddenChars = { " ", ".", "/", "," };
241 if (s == null || s.isBlank()) {
242 return "should not be blank";
243 }
244
245 for (String forbiddenChar : forbiddenChars) {
246 int pos = s.indexOf(forbiddenChar);
247 if (pos != -1) {
248 return (
249 "should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
250 );
251 }
252 }
253
254 return "";
255 }
256 }