1 package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2
3 import de.dlr.shepard.common.util.Constants;
4 import de.dlr.shepard.common.util.IConnector;
5 import io.quarkus.logging.Log;
6 import jakarta.enterprise.context.ApplicationScoped;
7 import jakarta.inject.Inject;
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.TimeUnit;
14 import okhttp3.OkHttpClient;
15 import org.eclipse.microprofile.config.ConfigProvider;
16 import org.influxdb.BatchOptions;
17 import org.influxdb.InfluxDB;
18 import org.influxdb.InfluxDBException;
19 import org.influxdb.InfluxDBFactory;
20 import org.influxdb.dto.Pong;
21 import org.influxdb.dto.Query;
22 import org.influxdb.dto.QueryResult;
23
24
25
26
27
28
29 @ApplicationScoped
30 public class InfluxDBConnector implements IConnector {
31
32 private InfluxDB influxDB;
33
34
35
36 InfluxDBConnector(InfluxDB influxDb) {
37 this.influxDB = influxDb;
38 }
39
40 @Inject
41 InfluxDBConnector() {}
42
43
44
45
46
47
48
49 @Override
50 public boolean connect() {
51 if (this.influxDB == null) {
52 String host = ConfigProvider.getConfig().getValue("influx.host", String.class);
53 String username = ConfigProvider.getConfig().getValue("influx.username", String.class);
54 String password = ConfigProvider.getConfig().getValue("influx.password", String.class);
55
56 OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
57 .connectTimeout(60, TimeUnit.SECONDS)
58 .readTimeout(600, TimeUnit.SECONDS);
59
60 influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password, okHttpClientBuilder);
61 }
62
63 influxDB.enableBatch(
64 BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) ->
65 Log.errorf("Exception while writing the following points: %s, Exception: %s", failedPoints, throwable)
66 )
67 );
68 return true;
69 }
70
71 @Override
72 public boolean disconnect() {
73 if (influxDB != null) influxDB.close();
74 return true;
75 }
76
77
78
79
80
81
82 @Override
83 public boolean alive() {
84 Pong response;
85 try {
86 response = influxDB.ping();
87 } catch (InfluxDBException ex) {
88 return false;
89 }
90 return response != null && !response.getVersion().equalsIgnoreCase("unknown");
91 }
92
93
94
95
96
97
98 public void createDatabase(String databaseName) {
99 String query = String.format("CREATE DATABASE \"%s\"", databaseName);
100 influxDB.query(new Query(query));
101 }
102
103
104
105
106
107
108 public void deleteDatabase(String databaseName) {
109 String query = String.format("DROP DATABASE \"%s\"", databaseName);
110 influxDB.query(new Query(query));
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 public String saveTimeseriesPayload(String database, InfluxTimeseriesPayload payload) {
128 var timeseries = payload.getTimeseries();
129 var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
130 var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
131 try {
132 influxDB.write(batchPoints);
133 } catch (InfluxDBException e) {
134 Log.errorf("InfluxdbException while writing payload %s: %s", payload.getTimeseries(), e.getMessage());
135 return e.getMessage();
136 }
137 return "";
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 public InfluxTimeseriesPayload getTimeseriesPayload(
159 long startTimeStamp,
160 long endTimeStamp,
161 String database,
162 InfluxTimeseries timeseries,
163 InfluxSingleValuedUnaryFunction function,
164 Long groupBy,
165 InfluxFillOption fillOption
166 ) {
167 Query query = InfluxUtil.buildQuery(
168 startTimeStamp,
169 endTimeStamp,
170 database,
171 timeseries,
172 function,
173 groupBy,
174 fillOption
175 );
176 Log.debugf("Influx Query: %s", query.getCommand());
177 QueryResult queryResult;
178
179 queryResult = influxDB.query(query);
180 if (InfluxUtil.isQueryResultValid(queryResult)) {
181 return InfluxUtil.extractPayload(queryResult, timeseries);
182 }
183 return new InfluxTimeseriesPayload(timeseries, Collections.emptyList());
184 }
185
186
187
188
189
190
191
192 public List<InfluxTimeseries> getTimeseriesAvailable(String database) {
193 Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
194 QueryResult queryResult = influxDB.query(query);
195 if (!InfluxUtil.isQueryResultValid(queryResult)) {
196 Log.warn("There was an error while querying the Influxdb for available timeseries");
197 return Collections.emptyList();
198 }
199 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
200 var fields = getFields(database);
201 var result = new ArrayList<InfluxTimeseries>(values.size());
202 for (var value : values) {
203 var series = ((String) value.get(0)).split(",");
204
205 var meas = series[0];
206 var tags = extractTags(series);
207 var dev = tags.getOrDefault(Constants.DEVICE, "");
208 var loc = tags.getOrDefault(Constants.LOCATION, "");
209 var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
210 for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
211 result.add(new InfluxTimeseries(meas, dev, loc, symName, field));
212 }
213 }
214 return result;
215 }
216
217 private Map<String, String> extractTags(String[] series) {
218 var result = new HashMap<String, String>();
219 for (var tagString : series) {
220 var tags = tagString.split("=", 2);
221
222
223 if (tags.length < 2) continue;
224
225 result.put(tags[0], tags[1]);
226 }
227 return result;
228 }
229
230 private Map<String, List<String>> getFields(String database) {
231 Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
232 QueryResult queryResult = influxDB.query(query);
233 if (!InfluxUtil.isQueryResultValid(queryResult)) {
234 Log.warn("There was an error while querying the Influxdb for available fields");
235 return Collections.emptyMap();
236 }
237 var series = queryResult.getResults().get(0).getSeries();
238 var result = new HashMap<String, List<String>>();
239 for (var s : series) {
240 var fields = new ArrayList<String>();
241 for (var value : s.getValues()) {
242 fields.add((String) value.get(0));
243 }
244 result.put(s.getName(), fields);
245 }
246 return result;
247 }
248
249 public boolean databaseExist(String database) {
250 QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
251 if (!InfluxUtil.isQueryResultValid(queryResult)) {
252 Log.warn("There was an error while querying the Influxdb for databases");
253 return false;
254 }
255
256 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
257 for (var databaseName : values) {
258 if (databaseName.get(0).toString().trim().equals(database)) {
259 return true;
260 }
261 }
262
263 return false;
264 }
265
266 public InfluxTimeseriesDataType getTimeseriesDataType(String database, String measurement, String field)
267 throws Exception {
268 var dataTypeAsString = getExpectedDatatype(database, measurement, field);
269 try {
270 return InfluxTimeseriesDataType.valueOf(dataTypeAsString.toUpperCase());
271 } catch (IllegalArgumentException ex) {
272 var message = String.format("Timeseries in influxDB has unknown data type: %s", dataTypeAsString);
273 throw new Exception(message, ex);
274 }
275 }
276
277
278
279
280
281 private String getExpectedDatatype(String database, String measurement, String field) {
282 String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM \"%s\"", database, measurement);
283 QueryResult result = influxDB.query(new Query(queryString));
284 if (!InfluxUtil.isQueryResultValid(result)) {
285 Log.infof("Could not get expected datatype query string \"%s\"", queryString);
286 return "";
287 }
288
289 var values = result.getResults().get(0).getSeries().get(0).getValues();
290 for (var value : values) {
291 if (value.get(0).equals(field)) return (String) value.get(1);
292 }
293
294 return "";
295 }
296 }