1 package de.dlr.shepard.influxDB;
2
3 import de.dlr.shepard.util.Constants;
4 import io.quarkus.logging.Log;
5 import java.net.URLDecoder;
6 import java.nio.charset.StandardCharsets;
7 import java.time.Instant;
8 import java.util.ArrayList;
9 import java.util.List;
10 import java.util.concurrent.TimeUnit;
11 import org.influxdb.dto.BatchPoints;
12 import org.influxdb.dto.BoundParameterQuery;
13 import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
14 import org.influxdb.dto.Point;
15 import org.influxdb.dto.Point.Builder;
16 import org.influxdb.dto.QueryResult;
17
18 public final class InfluxUtil {
19
20 private InfluxUtil() {
21
22 }
23
24 private static final long MULTIPLIER_NANO = 1000000000L;
25 private static final String FIELD_FLOAT = "float";
26 private static final String FIELD_INT = "integer";
27 private static final String FIELD_STRING = "string";
28 private static final String FIELD_BOOL = "boolean";
29
30
31
32
33
34
35
36
37
38
39
40
41
42 public static BoundParameterQuery buildQuery(
43 long startTimeStamp,
44 long endTimeStamp,
45 String database,
46 Timeseries timeseries,
47 SingleValuedUnaryFunction function,
48 Long groupBy,
49 FillOption fillOption
50 ) {
51 var selectPart = (function != null)
52 ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
53 : String.format("SELECT \"%s\"", timeseries.getField());
54 var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
55 var wherePart = String.format(
56 "WHERE time >= %dns AND time <= %dns " +
57 "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
58 startTimeStamp,
59 endTimeStamp
60 );
61 var query = String.join(" ", selectPart, fromPart, wherePart);
62
63 if (groupBy != null) {
64 query += String.format(" GROUP BY time(%dns)", groupBy);
65 }
66 if (fillOption != null) {
67 query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
68 }
69 var parameterizedQuery = QueryBuilder.newQuery(query)
70 .forDatabase(database)
71 .bind("device", timeseries.getDevice())
72 .bind("location", timeseries.getLocation())
73 .bind("symbolic_name", timeseries.getSymbolicName())
74 .create();
75 Log.debugf(
76 "Query influxdb %s: %s with params %s",
77 database,
78 parameterizedQuery.getCommand(),
79 URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8)
80 );
81 return parameterizedQuery;
82 }
83
84
85
86
87
88
89
90
91 public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
92 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
93 var influxPoints = new ArrayList<InfluxPoint>(values.size());
94 for (var value : values) {
95 var time = Instant.parse((String) value.get(0));
96 var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
97 influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
98 }
99 return new TimeseriesPayload(timeseries, influxPoints);
100 }
101
102
103
104
105
106
107
108
109
110 public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
111 String error = "";
112 BatchPoints batchPoints = BatchPoints.database(database).build();
113 var influxPoints = timeseriesPayload.getPoints();
114 var timeseries = timeseriesPayload.getTimeseries();
115
116 for (var influxPoint : influxPoints) {
117 Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
118 .tag(Constants.LOCATION, timeseries.getLocation())
119 .tag(Constants.DEVICE, timeseries.getDevice())
120 .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
121 .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
122 Object value = influxPoint.getValue();
123
124 if (value != null && expectedType.equals(FIELD_STRING)) {
125
126 pointBuilder.addField(timeseries.getField(), value.toString());
127 } else if (value instanceof Number numberValue && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
128
129 pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
130 } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
131
132 pointBuilder.addField(timeseries.getField(), numberValue.longValue());
133 } else if (value instanceof Boolean booleanValue && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
134
135 pointBuilder.addField(timeseries.getField(), booleanValue);
136 } else if (value != null) {
137
138 var stringValue = value.toString();
139 try {
140 switch (expectedType) {
141 case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
142 case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
143 case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
144 default -> pointBuilder.addField(timeseries.getField(), stringValue);
145 }
146 } catch (NumberFormatException e) {
147 if (error.isBlank()) error = String.format(
148
149 "Invalid influx point detected, cannot cast type %s into type %s",
150 stringValue,
151 expectedType
152 );
153 }
154 }
155 if (pointBuilder.hasFields()) batchPoints.point(pointBuilder.build());
156 }
157 if (!error.isBlank()) Log.error(error);
158
159 return batchPoints;
160 }
161
162
163
164
165
166
167
168
169
170
171
172 public static boolean isQueryResultValid(QueryResult queryResult) {
173 if (queryResult == null) {
174 Log.warn("Query Result is null");
175 return false;
176 }
177 if (queryResult.getError() != null) {
178 Log.warnf("There was an error while querying the Influxdb: %s", queryResult.getError());
179 return false;
180 }
181
182 var resultList = queryResult.getResults();
183 if (resultList == null || resultList.isEmpty()) return false;
184
185 var result = resultList.get(0);
186 if (result.hasError()) {
187 Log.warnf("There was an error while querying the Influxdb: %s", result.getError());
188 return false;
189 }
190
191 var seriesList = result.getSeries();
192 if (seriesList == null || seriesList.isEmpty()) return false;
193
194 var valueList = seriesList.get(0).getValues();
195 if (valueList == null) return false;
196
197 return true;
198 }
199
200
201
202
203
204
205
206
207
208 public static String sanitize(Timeseries timeseries) {
209 List<String> errors = new ArrayList<>();
210
211 String measurementString = sanitizeString(timeseries.getMeasurement());
212 String locationString = sanitizeString(timeseries.getLocation());
213 String deviceString = sanitizeString(timeseries.getDevice());
214 String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
215 String fieldString = sanitizeString(timeseries.getField());
216
217 if (!measurementString.isEmpty()) {
218 errors.add("measurement " + measurementString);
219 }
220 if (!locationString.isEmpty()) {
221 errors.add("location " + locationString);
222 }
223 if (!deviceString.isEmpty()) {
224 errors.add("device " + deviceString);
225 }
226 if (!symbolicNameString.isEmpty()) {
227 errors.add("symbolicName " + symbolicNameString);
228 }
229 if (!fieldString.isEmpty()) {
230 errors.add("field " + fieldString);
231 }
232 return String.join("\n", errors);
233 }
234
235 private static String sanitizeString(String s) {
236 String[] forbiddenChars = { " ", ".", "/", "," };
237 if (s == null || s.isBlank()) {
238 return "should not be blank";
239 }
240
241 for (String forbiddenChar : forbiddenChars) {
242 int pos = s.indexOf(forbiddenChar);
243 if (pos != -1) {
244 return (
245 "should not contain whitespaces or dots or slashes or commas: " + s.substring(0, pos + forbiddenChar.length())
246 );
247 }
248 }
249
250 return "";
251 }
252 }