View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2   
3   import de.dlr.shepard.common.util.Constants;
4   import de.dlr.shepard.common.util.IConnector;
5   import io.quarkus.logging.Log;
6   import jakarta.enterprise.context.ApplicationScoped;
7   import jakarta.inject.Inject;
8   import java.util.ArrayList;
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.concurrent.TimeUnit;
14  import okhttp3.OkHttpClient;
15  import org.eclipse.microprofile.config.ConfigProvider;
16  import org.influxdb.BatchOptions;
17  import org.influxdb.InfluxDB;
18  import org.influxdb.InfluxDBException;
19  import org.influxdb.InfluxDBFactory;
20  import org.influxdb.dto.Pong;
21  import org.influxdb.dto.Query;
22  import org.influxdb.dto.QueryResult;
23  
24  /**
25   * Connector for read and write access to the Influx timeseries database. The
26   * class represents the lowest level of data access to the Influx database. The
27   * Influx database is accessed directly by using query strings.
28   */
29  @ApplicationScoped
30  public class InfluxDBConnector implements IConnector {
31  
32    private InfluxDB influxDB;
33  
34    /* Use this constructor for testing purpose only.
35     *  As soon as influxDB is provided via dependency injection, this constructor can be used for that purpose. */
36    InfluxDBConnector(InfluxDB influxDb) {
37      this.influxDB = influxDb;
38    }
39  
40    @Inject
41    InfluxDBConnector() {}
42  
43    /**
44     * Establishes a connection to the Influx server by using the URL saved in the
45     * config.properties file returned by the DatabaseHelper. In addition the
46     * logging is being configured.
47     *
48     */
49    @Override
50    public boolean connect() {
51      if (this.influxDB == null) {
52        String host = ConfigProvider.getConfig().getValue("influx.host", String.class);
53        String username = ConfigProvider.getConfig().getValue("influx.username", String.class);
54        String password = ConfigProvider.getConfig().getValue("influx.password", String.class);
55  
56        OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
57          .connectTimeout(60, TimeUnit.SECONDS)
58          .readTimeout(600, TimeUnit.SECONDS);
59  
60        influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password, okHttpClientBuilder);
61      }
62  
63      influxDB.enableBatch(
64        BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) ->
65          Log.errorf("Exception while writing the following points: %s, Exception: %s", failedPoints, throwable)
66        )
67      );
68      return true;
69    }
70  
71    @Override
72    public boolean disconnect() {
73      if (influxDB != null) influxDB.close();
74      return true;
75    }
76  
77    /**
78     * Tests whether a connection exists.
79     *
80     * @return True if connection exists, otherwise false.
81     */
82    @Override
83    public boolean alive() {
84      Pong response;
85      try {
86        response = influxDB.ping();
87      } catch (InfluxDBException ex) {
88        return false;
89      }
90      return response != null && !response.getVersion().equalsIgnoreCase("unknown");
91    }
92  
93    /**
94     * Creates a new database.
95     *
96     * @param databaseName Name of the new database to be created.
97     */
98    public void createDatabase(String databaseName) {
99      String query = String.format("CREATE DATABASE \"%s\"", databaseName);
100     influxDB.query(new Query(query));
101   }
102 
103   /**
104    * Deletes a database.
105    *
106    * @param databaseName Name of the database to be deleted.
107    */
108   public void deleteDatabase(String databaseName) {
109     String query = String.format("DROP DATABASE \"%s\"", databaseName);
110     influxDB.query(new Query(query));
111   }
112 
113   /**
114    * Writes all InfluxPoint objects, saved in the TimeseriesPayload object, into
115    * the influx database. Therefore the method uses the name of the database, the
116    * name of the measurement, the location, the device, the symbolic name and the
117    * name of the field provided by the given TimeseriesPayload object. The actual
118    * write operation is done by using the unix timestamp in nanoseconds and the
119    * value of every InfluxPoint object in the TimeseriesPayload object. All these
120    * variables have to be defined in the given Timeseries object for a successful
121    * write operation.
122    *
123    * @param database The database to store the payload in
124    * @param payload  Combines the required attributes in a structured way.
125    * @return An error if there was a problem, empty string if all went well
126    */
127   public String saveTimeseriesPayload(String database, InfluxTimeseriesPayload payload) {
128     var timeseries = payload.getTimeseries();
129     var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
130     var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
131     try {
132       influxDB.write(batchPoints);
133     } catch (InfluxDBException e) {
134       Log.errorf("InfluxdbException while writing payload %s: %s", payload.getTimeseries(), e.getMessage());
135       return e.getMessage();
136     }
137     return "";
138   }
139 
140   /**
141    * Returns a Timeseries object containing all the influx data points which match
142    * the given conditions consigned to the method. The returned InfluxPoint
143    * objects are in the given measurement of the specific database between the
144    * start and the end timestamp.
145    *
146    * @param startTimeStamp Start timestamp from which values should be returned
147    * @param endTimeStamp   End timestamp to which values should be returned
148    * @param database       Name of the database
149    * @param timeseries     Specifies the data to load from database
150    * @param function       The aggregate function
151    * @param groupBy        The time interval measurements get grouped by
152    * @param fillOption     The fill option for missing values
153    * @return A Timeseries object containing a list of all InfluxPoints in the
154    *         given measurement of the specific database between the two given
155    *         timestamps matching the "device"-tag, the "location"-tag and the
156    *         "symbolic_name"-tag.
157    */
158   public InfluxTimeseriesPayload getTimeseriesPayload(
159     long startTimeStamp,
160     long endTimeStamp,
161     String database,
162     InfluxTimeseries timeseries,
163     InfluxSingleValuedUnaryFunction function,
164     Long groupBy,
165     InfluxFillOption fillOption
166   ) {
167     Query query = InfluxUtil.buildQuery(
168       startTimeStamp,
169       endTimeStamp,
170       database,
171       timeseries,
172       function,
173       groupBy,
174       fillOption
175     );
176     Log.debugf("Influx Query: %s", query.getCommand());
177     QueryResult queryResult;
178 
179     queryResult = influxDB.query(query);
180     if (InfluxUtil.isQueryResultValid(queryResult)) {
181       return InfluxUtil.extractPayload(queryResult, timeseries);
182     }
183     return new InfluxTimeseriesPayload(timeseries, Collections.emptyList());
184   }
185 
186   /**
187    * Returns a list of timeseries objects that are in the given database.
188    *
189    * @param database the given database
190    * @return a list of timeseries objects
191    */
192   public List<InfluxTimeseries> getTimeseriesAvailable(String database) {
193     Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
194     QueryResult queryResult = influxDB.query(query);
195     if (!InfluxUtil.isQueryResultValid(queryResult)) {
196       Log.warn("There was an error while querying the Influxdb for available timeseries");
197       return Collections.emptyList();
198     }
199     var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
200     var fields = getFields(database);
201     var result = new ArrayList<InfluxTimeseries>(values.size());
202     for (var value : values) {
203       var series = ((String) value.get(0)).split(",");
204       // The first entry is the measurement
205       var meas = series[0];
206       var tags = extractTags(series);
207       var dev = tags.getOrDefault(Constants.DEVICE, "");
208       var loc = tags.getOrDefault(Constants.LOCATION, "");
209       var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
210       for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
211         result.add(new InfluxTimeseries(meas, dev, loc, symName, field));
212       }
213     }
214     return result;
215   }
216 
217   private Map<String, String> extractTags(String[] series) {
218     var result = new HashMap<String, String>();
219     for (var tagString : series) {
220       var tags = tagString.split("=", 2);
221 
222       // Skip entries with less than 2 tuples (most likely the measurement)
223       if (tags.length < 2) continue;
224 
225       result.put(tags[0], tags[1]);
226     }
227     return result;
228   }
229 
230   private Map<String, List<String>> getFields(String database) {
231     Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
232     QueryResult queryResult = influxDB.query(query);
233     if (!InfluxUtil.isQueryResultValid(queryResult)) {
234       Log.warn("There was an error while querying the Influxdb for available fields");
235       return Collections.emptyMap();
236     }
237     var series = queryResult.getResults().get(0).getSeries();
238     var result = new HashMap<String, List<String>>();
239     for (var s : series) {
240       var fields = new ArrayList<String>();
241       for (var value : s.getValues()) {
242         fields.add((String) value.get(0));
243       }
244       result.put(s.getName(), fields);
245     }
246     return result;
247   }
248 
249   public boolean databaseExist(String database) {
250     QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
251     if (!InfluxUtil.isQueryResultValid(queryResult)) {
252       Log.warn("There was an error while querying the Influxdb for databases");
253       return false;
254     }
255 
256     var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
257     for (var databaseName : values) {
258       if (databaseName.get(0).toString().trim().equals(database)) {
259         return true;
260       }
261     }
262 
263     return false;
264   }
265 
266   public InfluxTimeseriesDataType getTimeseriesDataType(String database, String measurement, String field)
267     throws Exception {
268     var dataTypeAsString = getExpectedDatatype(database, measurement, field);
269     try {
270       return InfluxTimeseriesDataType.valueOf(dataTypeAsString.toUpperCase());
271     } catch (IllegalArgumentException ex) {
272       var message = String.format("Timeseries in influxDB has unknown data type: %s", dataTypeAsString);
273       throw new Exception(message, ex);
274     }
275   }
276 
277   /**
278    * Returns the expected data type of a field or an empty string according to
279    * https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types
280    */
281   private String getExpectedDatatype(String database, String measurement, String field) {
282     String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM \"%s\"", database, measurement);
283     QueryResult result = influxDB.query(new Query(queryString));
284     if (!InfluxUtil.isQueryResultValid(result)) {
285       Log.infof("Could not get expected datatype query string \"%s\"", queryString);
286       return "";
287     }
288 
289     var values = result.getResults().get(0).getSeries().get(0).getValues();
290     for (var value : values) {
291       if (value.get(0).equals(field)) return (String) value.get(1);
292     }
293 
294     return "";
295   }
296 }