View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.Set;
8   import java.util.UUID;
9   import java.util.concurrent.ConcurrentLinkedQueue;
10  
11  import de.dlr.shepard.exceptions.InvalidBodyException;
12  
13  public class TimeseriesService {
14  
15  	private InfluxDBConnector influxConnector = InfluxDBConnector.getInstance();
16  	private CsvConverter csvConverter = new CsvConverter();
17  
18  	/**
19  	 * Creates timeseries and writes them to influxDB
20  	 *
21  	 * @param database The database to be queried
22  	 * @param payload  the Timeseries with InfluxPoints to be created
23  	 * @return An error if there was a problem, empty string if all went well
24  	 */
25  	public String createTimeseries(String database, TimeseriesPayload payload) {
26  		String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
27  		if (!sanityCheck.isBlank())
28  			throw new InvalidBodyException(sanityCheck);
29  		if (!influxConnector.databaseExist(database)) {
30  			return String.format("The database %s does not exist", database);
31  		}
32  		return influxConnector.saveTimeseriesPayload(database, payload);
33  	}
34  
35  	/**
36  	 * Queries the database for timeseries including aggregate functions.
37  	 *
38  	 * @param startTimeStamp The beginning of the timeseries
39  	 * @param endTimeStamp   The end of the timeseries
40  	 * @param database       The database to be queried
41  	 * @param timeseries     The timeseries whose points are queried
42  	 * @param function       The aggregate function
43  	 * @param groupBy        The time interval measurements get grouped by
44  	 * @param fillOption     The fill option for missing values
45  	 * @return timeseries with influx points
46  	 */
47  	public TimeseriesPayload getTimeseriesPayload(long startTimeStamp, long endTimeStamp, String database,
48  			Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
49  		TimeseriesPayload payload = influxConnector.getTimeseriesPayload(startTimeStamp, endTimeStamp, database,
50  				timeseries, function, groupBy, fillOption);
51  		return payload;
52  	}
53  
54  	/**
55  	 * Queries the database for many timeseries in parallel. Returns a list of
56  	 * timeseries. If the filter sets are empty, no filtering takes place.
57  	 *
58  	 * @param start                 The beginning of the timeseries
59  	 * @param end                   The end of the timeseries
60  	 * @param database              The database to be queried
61  	 * @param timeseriesList        The list of timeseries whose points are queried
62  	 * @param function              The aggregate function
63  	 * @param groupBy               The time interval measurements get grouped by
64  	 * @param fillOption            The fill option for missing values
65  	 * @param devicesFilterSet      A set of allowed devices or an empty set
66  	 * @param locationsFilterSet    A set of allowed locations or an empty set
67  	 * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
68  	 * @return a list of timeseries with influx points
69  	 */
70  	public List<TimeseriesPayload> getTimeseriesPayloadList(long start, long end, String database,
71  			List<Timeseries> timeseriesList, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption,
72  			Set<String> devicesFilterSet, Set<String> locationsFilterSet, Set<String> symbolicNameFilterSet) {
73  		var timeseriesPayloadQueue = new ConcurrentLinkedQueue<TimeseriesPayload>();
74  		timeseriesList.parallelStream().forEach(timeseries -> {
75  			TimeseriesPayload payload = null;
76  			if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
77  				payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
78  			}
79  			if (payload != null) {
80  				timeseriesPayloadQueue.add(payload);
81  			}
82  
83  		});
84  		return new ArrayList<>(timeseriesPayloadQueue);
85  	}
86  
87  	/**
88  	 * Returns a list of timeseries objects that are in the given database.
89  	 *
90  	 * @param database the given database
91  	 * @return a list of timeseries objects
92  	 */
93  	public List<Timeseries> getTimeseriesAvailable(String database) {
94  		return influxConnector.getTimeseriesAvailable(database);
95  	}
96  
97  	/**
98  	 * Export timeseries as CSV File. If the filter sets are empty, no filtering
99  	 * takes place.
100 	 *
101 	 * @param start                 The beginning of the timeseries
102 	 * @param end                   The end of the timeseries
103 	 * @param database              The database to be queried
104 	 * @param timeseriesList        The list of timeseries whose points are queried
105 	 * @param function              The aggregate function
106 	 * @param groupBy               The time interval measurements get grouped by
107 	 * @param fillOption            The fill option for missing values
108 	 * @param devicesFilterSet      A set of allowed devices or an empty set
109 	 * @param locationsFilterSet    A set of allowed locations or an empty set
110 	 * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
111 	 * @return InputStream containing the CSV file
112 	 * @throws IOException When the CSV file could not be written
113 	 */
114 	public InputStream exportTimeseriesPayload(long start, long end, String database, List<Timeseries> timeseriesList,
115 			SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption, Set<String> devicesFilterSet,
116 			Set<String> locationsFilterSet, Set<String> symbolicNameFilterSet) throws IOException {
117 		var payload = getTimeseriesPayloadList(start, end, database, timeseriesList, function, groupBy, fillOption,
118 				devicesFilterSet, locationsFilterSet, symbolicNameFilterSet);
119 		var stream = csvConverter.convertToCsv(payload);
120 		return stream;
121 	}
122 
123 	/**
124 	 * Import multiple timeseries from a CSV file
125 	 *
126 	 * @param database The database to write to
127 	 * @param stream   The InputStream containing the CSV file
128 	 * @return An error if there was a problem, empty string if all went well
129 	 * @throws IOException If the CSV file could not be read
130 	 */
131 	public String importTimeseries(String database, InputStream stream) throws IOException {
132 		List<String> errors = new ArrayList<>();
133 		var timeseriesList = csvConverter.convertToPayload(stream);
134 		for (var timeseries : timeseriesList) {
135 			var error = createTimeseries(database, timeseries);
136 			if (!error.isBlank()) {
137 				errors.add(error);
138 			}
139 		}
140 		return String.join(", ", errors);
141 	}
142 
143 	/**
144 	 * Creates a new database called by a random string
145 	 *
146 	 * @return String the new database
147 	 */
148 	public String createDatabase() {
149 		String name = UUID.randomUUID().toString();
150 		influxConnector.createDatabase(name);
151 		return name;
152 	}
153 
154 	public void deleteDatabase(String database) {
155 		influxConnector.deleteDatabase(database);
156 	}
157 
158 	private boolean matchFilter(Timeseries timeseries, Set<String> device, Set<String> location, Set<String> symName) {
159 		var deviceMatches = true;
160 		var locatioMatches = true;
161 		var symbolicNameMatches = true;
162 		if (!device.isEmpty()) {
163 			deviceMatches = device.contains(timeseries.getDevice());
164 		}
165 		if (!location.isEmpty()) {
166 			locatioMatches = location.contains(timeseries.getLocation());
167 		}
168 		if (!symName.isEmpty()) {
169 			symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
170 		}
171 		return deviceMatches && locatioMatches && symbolicNameMatches;
172 	}
173 
174 }