View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import de.dlr.shepard.util.Constants;
4   import de.dlr.shepard.util.IConnector;
5   import io.quarkus.logging.Log;
6   import jakarta.enterprise.context.ApplicationScoped;
7   import jakarta.inject.Inject;
8   import java.util.ArrayList;
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.List;
12  import java.util.Map;
13  import org.eclipse.microprofile.config.ConfigProvider;
14  import org.influxdb.BatchOptions;
15  import org.influxdb.InfluxDB;
16  import org.influxdb.InfluxDBException;
17  import org.influxdb.InfluxDBFactory;
18  import org.influxdb.dto.Pong;
19  import org.influxdb.dto.Query;
20  import org.influxdb.dto.QueryResult;
21  
22  /**
23   * Connector for read and write access to the Influx timeseries database. The
24   * class represents the lowest level of data access to the Influx database. The
25   * Influx database is accessed directly by using query strings.
26   */
27  @ApplicationScoped
28  public class InfluxDBConnector implements IConnector {
29  
30    private InfluxDB influxDB;
31  
32    /* Use this constructor for testing purpose only.
33     *  As soon as influxDB is provided via dependency injection, this constructor can be used for that purpose. */
34    InfluxDBConnector(InfluxDB influxDb) {
35      this.influxDB = influxDb;
36    }
37  
38    @Inject
39    InfluxDBConnector() {}
40  
41    /**
42     * Establishes a connection to the Influx server by using the URL saved in the
43     * config.properties file returned by the DatabaseHelper. In addition the
44     * logging is being configured.
45     *
46     */
47    @Override
48    public boolean connect() {
49      if (this.influxDB == null) {
50        String host = ConfigProvider.getConfig().getValue("influx.host", String.class);
51        String username = ConfigProvider.getConfig().getValue("influx.username", String.class);
52        String password = ConfigProvider.getConfig().getValue("influx.password", String.class);
53  
54        influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password);
55      }
56  
57      influxDB.enableBatch(
58        BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) ->
59          Log.errorf("Exception while writing the following points: %s, Exception: %s", failedPoints, throwable)
60        )
61      );
62      return true;
63    }
64  
65    @Override
66    public boolean disconnect() {
67      if (influxDB != null) influxDB.close();
68      return true;
69    }
70  
71    /**
72     * Tests whether a connection exists.
73     *
74     * @return True if connection exists, otherwise false.
75     */
76    @Override
77    public boolean alive() {
78      Pong response;
79      try {
80        response = influxDB.ping();
81      } catch (InfluxDBException ex) {
82        return false;
83      }
84      return response != null && !response.getVersion().equalsIgnoreCase("unknown");
85    }
86  
87    /**
88     * Creates a new database.
89     *
90     * @param databaseName Name of the new database to be created.
91     */
92    public void createDatabase(String databaseName) {
93      String query = String.format("CREATE DATABASE \"%s\"", databaseName);
94      influxDB.query(new Query(query));
95    }
96  
97    /**
98     * Deletes a database.
99     *
100    * @param databaseName Name of the database to be deleted.
101    */
102   public void deleteDatabase(String databaseName) {
103     String query = String.format("DROP DATABASE \"%s\"", databaseName);
104     influxDB.query(new Query(query));
105   }
106 
107   /**
108    * Writes all InfluxPoint objects, saved in the TimeseriesPayload object, into
109    * the influx database. Therefore the method uses the name of the database, the
110    * name of the measurement, the location, the device, the symbolic name and the
111    * name of the field provided by the given TimeseriesPayload object. The actual
112    * write operation is done by using the unix timestamp in nanoseconds and the
113    * value of every InfluxPoint object in the TimeseriesPayload object. All these
114    * variables have to be defined in the given Timeseries object for a successful
115    * write operation.
116    *
117    * @param database The database to store the payload in
118    * @param payload  Combines the required attributes in a structured way.
119    * @return An error if there was a problem, empty string if all went well
120    */
121   public String saveTimeseriesPayload(String database, TimeseriesPayload payload) {
122     var timeseries = payload.getTimeseries();
123     var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
124     var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
125     try {
126       influxDB.write(batchPoints);
127     } catch (InfluxDBException e) {
128       Log.errorf("InfluxdbException while writing payload %s: %s", payload.getTimeseries(), e.getMessage());
129       return e.getMessage();
130     }
131     return "";
132   }
133 
134   /**
135    * Returns a Timeseries object containing all the influx data points which match
136    * the given conditions consigned to the method. The returned InfluxPoint
137    * objects are in the given measurement of the specific database between the
138    * start and the end timestamp.
139    *
140    * @param startTimeStamp Start timestamp from which values should be returned
141    * @param endTimeStamp   End timestamp to which values should be returned
142    * @param database       Name of the database
143    * @param timeseries     Specifies the data to load from database
144    * @param function       The aggregate function
145    * @param groupBy        The time interval measurements get grouped by
146    * @param fillOption     The fill option for missing values
147    * @return A Timeseries object containing a list of all InfluxPoints in the
148    *         given measurement of the specific database between the two given
149    *         timestamps matching the "device"-tag, the "location"-tag and the
150    *         "symbolic_name"-tag.
151    */
152   public TimeseriesPayload getTimeseriesPayload(
153     long startTimeStamp,
154     long endTimeStamp,
155     String database,
156     Timeseries timeseries,
157     SingleValuedUnaryFunction function,
158     Long groupBy,
159     FillOption fillOption
160   ) {
161     Query query = InfluxUtil.buildQuery(
162       startTimeStamp,
163       endTimeStamp,
164       database,
165       timeseries,
166       function,
167       groupBy,
168       fillOption
169     );
170     Log.debugf("Influx Query: %s", query.getCommand());
171     QueryResult queryResult;
172 
173     try {
174       queryResult = influxDB.query(query);
175     } catch (InfluxDBException e) {
176       queryResult = null;
177       Log.errorf("Could not parse query: %s", query.getCommand());
178     }
179     if (InfluxUtil.isQueryResultValid(queryResult)) {
180       return InfluxUtil.extractPayload(queryResult, timeseries);
181     }
182     return new TimeseriesPayload(timeseries, Collections.emptyList());
183   }
184 
185   /**
186    * Returns a list of timeseries objects that are in the given database.
187    *
188    * @param database the given database
189    * @return a list of timeseries objects
190    */
191   public List<Timeseries> getTimeseriesAvailable(String database) {
192     Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
193     QueryResult queryResult = influxDB.query(query);
194     if (!InfluxUtil.isQueryResultValid(queryResult)) {
195       Log.warn("There was an error while querying the Influxdb for available timeseries");
196       return Collections.emptyList();
197     }
198     var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
199     var fields = getFields(database);
200     var result = new ArrayList<Timeseries>(values.size());
201     for (var value : values) {
202       var series = ((String) value.get(0)).split(",");
203       // The first entry is the measurement
204       var meas = series[0];
205       var tags = extractTags(series);
206       var dev = tags.getOrDefault(Constants.DEVICE, "");
207       var loc = tags.getOrDefault(Constants.LOCATION, "");
208       var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
209       for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
210         result.add(new Timeseries(meas, dev, loc, symName, field));
211       }
212     }
213     return result;
214   }
215 
216   private Map<String, String> extractTags(String[] series) {
217     var result = new HashMap<String, String>();
218     for (var tagString : series) {
219       var tags = tagString.split("=", 2);
220 
221       // Skip entries with less than 2 tuples (most likely the measurement)
222       if (tags.length < 2) continue;
223 
224       result.put(tags[0], tags[1]);
225     }
226     return result;
227   }
228 
229   private Map<String, List<String>> getFields(String database) {
230     Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
231     QueryResult queryResult = influxDB.query(query);
232     if (!InfluxUtil.isQueryResultValid(queryResult)) {
233       Log.warn("There was an error while querying the Influxdb for available fields");
234       return Collections.emptyMap();
235     }
236     var series = queryResult.getResults().get(0).getSeries();
237     var result = new HashMap<String, List<String>>();
238     for (var s : series) {
239       var fields = new ArrayList<String>();
240       for (var value : s.getValues()) {
241         fields.add((String) value.get(0));
242       }
243       result.put(s.getName(), fields);
244     }
245     return result;
246   }
247 
248   public boolean databaseExist(String database) {
249     QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
250     if (!InfluxUtil.isQueryResultValid(queryResult)) {
251       Log.warn("There was an error while querying the Influxdb for databases");
252       return false;
253     }
254 
255     var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
256     for (var databaseName : values) {
257       if (databaseName.get(0).toString().trim().equals(database)) {
258         return true;
259       }
260     }
261 
262     return false;
263   }
264 
265   /**
266    * Returns the expected data type of a field or an empty string according to
267    * https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types
268    */
269   private String getExpectedDatatype(String database, String measurement, String field) {
270     String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM %s", database, measurement);
271     QueryResult result = influxDB.query(new Query(queryString));
272     if (!InfluxUtil.isQueryResultValid(result)) {
273       Log.infof("Could not get expected datatype query string \"%s\"", queryString);
274       return "";
275     }
276 
277     var values = result.getResults().get(0).getSeries().get(0).getValues();
278     for (var value : values) {
279       if (value.get(0).equals(field)) return (String) value.get(1);
280     }
281 
282     return "";
283   }
284 }