1 package de.dlr.shepard.influxDB;
2
3 import de.dlr.shepard.util.Constants;
4 import de.dlr.shepard.util.IConnector;
5 import io.quarkus.logging.Log;
6 import jakarta.enterprise.context.ApplicationScoped;
7 import jakarta.inject.Inject;
8 import java.util.ArrayList;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import org.eclipse.microprofile.config.ConfigProvider;
14 import org.influxdb.BatchOptions;
15 import org.influxdb.InfluxDB;
16 import org.influxdb.InfluxDBException;
17 import org.influxdb.InfluxDBFactory;
18 import org.influxdb.dto.Pong;
19 import org.influxdb.dto.Query;
20 import org.influxdb.dto.QueryResult;
21
22
23
24
25
26
27 @ApplicationScoped
28 public class InfluxDBConnector implements IConnector {
29
30 private InfluxDB influxDB;
31
32
33
34 InfluxDBConnector(InfluxDB influxDb) {
35 this.influxDB = influxDb;
36 }
37
38 @Inject
39 InfluxDBConnector() {}
40
41
42
43
44
45
46
47 @Override
48 public boolean connect() {
49 if (this.influxDB == null) {
50 String host = ConfigProvider.getConfig().getValue("influx.host", String.class);
51 String username = ConfigProvider.getConfig().getValue("influx.username", String.class);
52 String password = ConfigProvider.getConfig().getValue("influx.password", String.class);
53
54 influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password);
55 }
56
57 influxDB.enableBatch(
58 BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) ->
59 Log.errorf("Exception while writing the following points: %s, Exception: %s", failedPoints, throwable)
60 )
61 );
62 return true;
63 }
64
65 @Override
66 public boolean disconnect() {
67 if (influxDB != null) influxDB.close();
68 return true;
69 }
70
71
72
73
74
75
76 @Override
77 public boolean alive() {
78 Pong response;
79 try {
80 response = influxDB.ping();
81 } catch (InfluxDBException ex) {
82 return false;
83 }
84 return response != null && !response.getVersion().equalsIgnoreCase("unknown");
85 }
86
87
88
89
90
91
92 public void createDatabase(String databaseName) {
93 String query = String.format("CREATE DATABASE \"%s\"", databaseName);
94 influxDB.query(new Query(query));
95 }
96
97
98
99
100
101
102 public void deleteDatabase(String databaseName) {
103 String query = String.format("DROP DATABASE \"%s\"", databaseName);
104 influxDB.query(new Query(query));
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 public String saveTimeseriesPayload(String database, TimeseriesPayload payload) {
122 var timeseries = payload.getTimeseries();
123 var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
124 var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
125 try {
126 influxDB.write(batchPoints);
127 } catch (InfluxDBException e) {
128 Log.errorf("InfluxdbException while writing payload %s: %s", payload.getTimeseries(), e.getMessage());
129 return e.getMessage();
130 }
131 return "";
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public TimeseriesPayload getTimeseriesPayload(
153 long startTimeStamp,
154 long endTimeStamp,
155 String database,
156 Timeseries timeseries,
157 SingleValuedUnaryFunction function,
158 Long groupBy,
159 FillOption fillOption
160 ) {
161 Query query = InfluxUtil.buildQuery(
162 startTimeStamp,
163 endTimeStamp,
164 database,
165 timeseries,
166 function,
167 groupBy,
168 fillOption
169 );
170 Log.debugf("Influx Query: %s", query.getCommand());
171 QueryResult queryResult;
172
173 try {
174 queryResult = influxDB.query(query);
175 } catch (InfluxDBException e) {
176 queryResult = null;
177 Log.errorf("Could not parse query: %s", query.getCommand());
178 }
179 if (InfluxUtil.isQueryResultValid(queryResult)) {
180 return InfluxUtil.extractPayload(queryResult, timeseries);
181 }
182 return new TimeseriesPayload(timeseries, Collections.emptyList());
183 }
184
185
186
187
188
189
190
191 public List<Timeseries> getTimeseriesAvailable(String database) {
192 Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
193 QueryResult queryResult = influxDB.query(query);
194 if (!InfluxUtil.isQueryResultValid(queryResult)) {
195 Log.warn("There was an error while querying the Influxdb for available timeseries");
196 return Collections.emptyList();
197 }
198 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
199 var fields = getFields(database);
200 var result = new ArrayList<Timeseries>(values.size());
201 for (var value : values) {
202 var series = ((String) value.get(0)).split(",");
203
204 var meas = series[0];
205 var tags = extractTags(series);
206 var dev = tags.getOrDefault(Constants.DEVICE, "");
207 var loc = tags.getOrDefault(Constants.LOCATION, "");
208 var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
209 for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
210 result.add(new Timeseries(meas, dev, loc, symName, field));
211 }
212 }
213 return result;
214 }
215
216 private Map<String, String> extractTags(String[] series) {
217 var result = new HashMap<String, String>();
218 for (var tagString : series) {
219 var tags = tagString.split("=", 2);
220
221
222 if (tags.length < 2) continue;
223
224 result.put(tags[0], tags[1]);
225 }
226 return result;
227 }
228
229 private Map<String, List<String>> getFields(String database) {
230 Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
231 QueryResult queryResult = influxDB.query(query);
232 if (!InfluxUtil.isQueryResultValid(queryResult)) {
233 Log.warn("There was an error while querying the Influxdb for available fields");
234 return Collections.emptyMap();
235 }
236 var series = queryResult.getResults().get(0).getSeries();
237 var result = new HashMap<String, List<String>>();
238 for (var s : series) {
239 var fields = new ArrayList<String>();
240 for (var value : s.getValues()) {
241 fields.add((String) value.get(0));
242 }
243 result.put(s.getName(), fields);
244 }
245 return result;
246 }
247
248 public boolean databaseExist(String database) {
249 QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
250 if (!InfluxUtil.isQueryResultValid(queryResult)) {
251 Log.warn("There was an error while querying the Influxdb for databases");
252 return false;
253 }
254
255 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
256 for (var databaseName : values) {
257 if (databaseName.get(0).toString().trim().equals(database)) {
258 return true;
259 }
260 }
261
262 return false;
263 }
264
265
266
267
268
269 private String getExpectedDatatype(String database, String measurement, String field) {
270 String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM %s", database, measurement);
271 QueryResult result = influxDB.query(new Query(queryString));
272 if (!InfluxUtil.isQueryResultValid(result)) {
273 Log.infof("Could not get expected datatype query string \"%s\"", queryString);
274 return "";
275 }
276
277 var values = result.getResults().get(0).getSeries().get(0).getValues();
278 for (var value : values) {
279 if (value.get(0).equals(field)) return (String) value.get(1);
280 }
281
282 return "";
283 }
284 }