View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   
9   import org.influxdb.BatchOptions;
10  import org.influxdb.InfluxDB;
11  import org.influxdb.InfluxDBException;
12  import org.influxdb.InfluxDBFactory;
13  import org.influxdb.dto.Pong;
14  import org.influxdb.dto.Query;
15  import org.influxdb.dto.QueryResult;
16  
17  import de.dlr.shepard.util.Constants;
18  import de.dlr.shepard.util.IConnector;
19  import de.dlr.shepard.util.PropertiesHelper;
20  import lombok.extern.slf4j.Slf4j;
21  
22  /**
23   * Connector for read and write access to the Influx timeseries database. The
24   * class represents the lowest level of data access to the Influx database. The
25   * Influx database is accessed directly by using query strings.
26   */
27  @Slf4j
28  public class InfluxDBConnector implements IConnector {
29  
30  	private InfluxDB influxDB;
31  	private static InfluxDBConnector instance = null;
32  
33  	/**
34  	 * Private constructor
35  	 */
36  	private InfluxDBConnector() {
37  	}
38  
39  	/**
40  	 * For development reasons, there should always be just one InfluxConnector
41  	 * instance.
42  	 *
43  	 * @return The one and only InfluxConnector instance.
44  	 */
45  	public static InfluxDBConnector getInstance() {
46  		if (instance == null) {
47  			instance = new InfluxDBConnector();
48  		}
49  		return instance;
50  	}
51  
52  	/**
53  	 * Establishes a connection to the Influx server by using the URL saved in the
54  	 * config.properties file returned by the DatabaseHelper. In addition the
55  	 * logging is being configured.
56  	 *
57  	 */
58  	@Override
59  	public boolean connect() {
60  		PropertiesHelper helper = new PropertiesHelper();
61  		String host = helper.getProperty("influx.host");
62  		String username = helper.getProperty("influx.username");
63  		String password = helper.getProperty("influx.password");
64  
65  		influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password);
66  		influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) -> log
67  				.error("Exception while writing the following points: {}, Exception: {}", failedPoints, throwable)));
68  		return true;
69  	}
70  
71  	@Override
72  	public boolean disconnect() {
73  		if (influxDB != null)
74  			influxDB.close();
75  		return true;
76  	}
77  
78  	/**
79  	 * Tests whether a connection exists.
80  	 *
81  	 * @return True if connection exists, otherwise false.
82  	 */
83  	@Override
84  	public boolean alive() {
85  		Pong response;
86  		try {
87  			response = influxDB.ping();
88  		} catch (InfluxDBException ex) {
89  			return false;
90  		}
91  		return response != null && !response.getVersion().equalsIgnoreCase("unknown");
92  	}
93  
94  	/**
95  	 * Creates a new database.
96  	 *
97  	 * @param databaseName Name of the new database to be created.
98  	 */
99  	public void createDatabase(String databaseName) {
100 		String query = String.format("CREATE DATABASE \"%s\"", databaseName);
101 		influxDB.query(new Query(query));
102 	}
103 
104 	/**
105 	 * Deletes a database.
106 	 *
107 	 * @param databaseName Name of the database to be deleted.
108 	 */
109 	public void deleteDatabase(String databaseName) {
110 		String query = String.format("DROP DATABASE \"%s\"", databaseName);
111 		influxDB.query(new Query(query));
112 	}
113 
114 	/**
115 	 * Writes all InfluxPoint objects, saved in the TimeseriesPayload object, into
116 	 * the influx database. Therefore the method uses the name of the database, the
117 	 * name of the measurement, the location, the device, the symbolic name and the
118 	 * name of the field provided by the given TimeseriesPayload object. The actual
119 	 * write operation is done by using the unix timestamp in nanoseconds and the
120 	 * value of every InfluxPoint object in the TimeseriesPayload object. All these
121 	 * variables have to be defined in the given Timeseries object for a successful
122 	 * write operation.
123 	 *
124 	 * @param database The database to store the payload in
125 	 * @param payload  Combines the required attributes in a structured way.
126 	 * @return An error if there was a problem, empty string if all went well
127 	 */
128 	public String saveTimeseriesPayload(String database, TimeseriesPayload payload) {
129 		var timeseries = payload.getTimeseries();
130 		var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
131 		var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
132 		try {
133 			influxDB.write(batchPoints);
134 		} catch (InfluxDBException e) {
135 			log.error("InfluxdbException while writing payload {}: {}", payload.getTimeseries(), e.getMessage());
136 			return e.getMessage();
137 		}
138 		return "";
139 
140 	}
141 
142 	/**
143 	 * Returns a Timeseries object containing all the influx data points which match
144 	 * the given conditions consigned to the method. The returned InfluxPoint
145 	 * objects are in the given measurement of the specific database between the
146 	 * start and the end timestamp.
147 	 *
148 	 * @param startTimeStamp Start timestamp from which values should be returned
149 	 * @param endTimeStamp   End timestamp to which values should be returned
150 	 * @param database       Name of the database
151 	 * @param timeseries     Specifies the data to load from database
152 	 * @param function       The aggregate function
153 	 * @param groupBy        The time interval measurements get grouped by
154 	 * @param fillOption     The fill option for missing values
155 	 * @return A Timeseries object containing a list of all InfluxPoints in the
156 	 *         given measurement of the specific database between the two given
157 	 *         timestamps matching the "device"-tag, the "location"-tag and the
158 	 *         "symbolic_name"-tag.
159 	 */
160 	public TimeseriesPayload getTimeseriesPayload(long startTimeStamp, long endTimeStamp, String database,
161 			Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
162 		Query query = InfluxUtil.buildQuery(startTimeStamp, endTimeStamp, database, timeseries, function, groupBy,
163 				fillOption);
164 		log.debug("Influx Query: {}", query.getCommand());
165 		QueryResult queryResult;
166 
167 		try {
168 			queryResult = influxDB.query(query);
169 		} catch (InfluxDBException e) {
170 			queryResult = null;
171 			log.error("Could not parse query: {}", query.getCommand());
172 		}
173 		if (InfluxUtil.isQueryResultValid(queryResult)) {
174 			return InfluxUtil.extractPayload(queryResult, timeseries);
175 		}
176 		return new TimeseriesPayload(timeseries, Collections.emptyList());
177 	}
178 
179 	/**
180 	 * Returns a list of timeseries objects that are in the given database.
181 	 *
182 	 * @param database the given database
183 	 * @return a list of timeseries objects
184 	 */
185 	public List<Timeseries> getTimeseriesAvailable(String database) {
186 		Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
187 		QueryResult queryResult = influxDB.query(query);
188 		if (!InfluxUtil.isQueryResultValid(queryResult)) {
189 			log.warn("There was an error while querying the Influxdb for available timeseries");
190 			return Collections.emptyList();
191 		}
192 		var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
193 		var fields = getFields(database);
194 		var result = new ArrayList<Timeseries>(values.size());
195 		for (var value : values) {
196 			var series = ((String) value.get(0)).split(",");
197 			// The first entry is the measurement
198 			var meas = series[0];
199 			var tags = extractTags(series);
200 			var dev = tags.getOrDefault(Constants.DEVICE, "");
201 			var loc = tags.getOrDefault(Constants.LOCATION, "");
202 			var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
203 			for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
204 				result.add(new Timeseries(meas, dev, loc, symName, field));
205 			}
206 		}
207 		return result;
208 	}
209 
210 	private Map<String, String> extractTags(String[] series) {
211 		var result = new HashMap<String, String>();
212 		for (var tagString : series) {
213 			var tags = tagString.split("=", 2);
214 
215 			// Skip entries with less than 2 tuples (most likely the measurement)
216 			if (tags.length < 2)
217 				continue;
218 
219 			result.put(tags[0], tags[1]);
220 		}
221 		return result;
222 	}
223 
224 	private Map<String, List<String>> getFields(String database) {
225 		Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
226 		QueryResult queryResult = influxDB.query(query);
227 		if (!InfluxUtil.isQueryResultValid(queryResult)) {
228 			log.warn("There was an error while querying the Influxdb for available fields");
229 			return Collections.emptyMap();
230 		}
231 		var series = queryResult.getResults().get(0).getSeries();
232 		var result = new HashMap<String, List<String>>();
233 		for (var s : series) {
234 			var fields = new ArrayList<String>();
235 			for (var value : s.getValues()) {
236 				fields.add((String) value.get(0));
237 			}
238 			result.put(s.getName(), fields);
239 		}
240 		return result;
241 	}
242 
243 	public boolean databaseExist(String database) {
244 		QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
245 		if (!InfluxUtil.isQueryResultValid(queryResult)) {
246 			log.warn("There was an error while querying the Influxdb for databases");
247 			return false;
248 		}
249 
250 		var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
251 		for (var databaseName : values) {
252 			if (databaseName.get(0).toString().trim().equals(database)) {
253 				return true;
254 			}
255 		}
256 
257 		return false;
258 	}
259 
260 	/**
261 	 * Returns the expected data type of a field or an empty string according to
262 	 * https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types
263 	 */
264 	private String getExpectedDatatype(String database, String measurement, String field) {
265 		String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM %s", database, measurement);
266 		QueryResult result = influxDB.query(new Query(queryString));
267 		if (!InfluxUtil.isQueryResultValid(result)) {
268 			log.info("Could not get expected datatype query string \"{}\"", queryString);
269 			return "";
270 		}
271 
272 		var values = result.getResults().get(0).getSeries().get(0).getValues();
273 		for (var value : values) {
274 			if (value.get(0).equals(field))
275 				return (String) value.get(1);
276 		}
277 
278 		return "";
279 	}
280 }