TimeseriesService.java
package de.dlr.shepard.influxDB;
import de.dlr.shepard.exceptions.InvalidBodyException;
import io.micrometer.core.annotation.Timed;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@RequestScoped
public class TimeseriesService {
private InfluxDBConnector influxConnector;
private CsvConverter csvConverter;
TimeseriesService() {}
@Inject
public TimeseriesService(InfluxDBConnector influxConnector, CsvConverter csvConverter) {
this.influxConnector = influxConnector;
this.csvConverter = csvConverter;
}
/**
* Creates timeseries and writes them to influxDB
*
* @param database The database to be queried
* @param payload the Timeseries with InfluxPoints to be created
* @return An error if there was a problem, empty string if all went well
*/
@Timed(value = "shepard.timeseries.service")
public String createTimeseries(String database, TimeseriesPayload payload) {
String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
if (!sanityCheck.isBlank()) throw new InvalidBodyException(sanityCheck);
if (!influxConnector.databaseExist(database)) {
return String.format("The database %s does not exist", database);
}
return influxConnector.saveTimeseriesPayload(database, payload);
}
/**
* Queries the database for timeseries including aggregate functions.
*
* @param startTimeStamp The beginning of the timeseries
* @param endTimeStamp The end of the timeseries
* @param database The database to be queried
* @param timeseries The timeseries whose points are queried
* @param function The aggregate function
* @param groupBy The time interval measurements get grouped by
* @param fillOption The fill option for missing values
* @return timeseries with influx points
*/
@Timed(value = "shepard.timeseries.service")
public TimeseriesPayload getTimeseriesPayload(
long startTimeStamp,
long endTimeStamp,
String database,
Timeseries timeseries,
SingleValuedUnaryFunction function,
Long groupBy,
FillOption fillOption
) {
TimeseriesPayload payload = influxConnector.getTimeseriesPayload(
startTimeStamp,
endTimeStamp,
database,
timeseries,
function,
groupBy,
fillOption
);
return payload;
}
/**
* Queries the database for many timeseries in parallel. Returns a list of
* timeseries. If the filter sets are empty, no filtering takes place.
*
* @param start The beginning of the timeseries
* @param end The end of the timeseries
* @param database The database to be queried
* @param timeseriesList The list of timeseries whose points are queried
* @param function The aggregate function
* @param groupBy The time interval measurements get grouped by
* @param fillOption The fill option for missing values
* @param devicesFilterSet A set of allowed devices or an empty set
* @param locationsFilterSet A set of allowed locations or an empty set
* @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
* @return a list of timeseries with influx points
*/
@Timed(value = "shepard.timeseries.service")
public List<TimeseriesPayload> getTimeseriesPayloadList(
long start,
long end,
String database,
List<Timeseries> timeseriesList,
SingleValuedUnaryFunction function,
Long groupBy,
FillOption fillOption,
Set<String> devicesFilterSet,
Set<String> locationsFilterSet,
Set<String> symbolicNameFilterSet
) {
var timeseriesPayloadQueue = new ConcurrentLinkedQueue<TimeseriesPayload>();
timeseriesList
.parallelStream()
.forEach(timeseries -> {
TimeseriesPayload payload = null;
if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
}
if (payload != null) {
timeseriesPayloadQueue.add(payload);
}
});
return new ArrayList<>(timeseriesPayloadQueue);
}
/**
* Returns a list of timeseries objects that are in the given database.
*
* @param database the given database
* @return a list of timeseries objects
*/
@Timed(value = "shepard.timeseries.service")
public List<Timeseries> getTimeseriesAvailable(String database) {
return influxConnector.getTimeseriesAvailable(database);
}
/**
* Export timeseries as CSV File. If the filter sets are empty, no filtering
* takes place.
*
* @param start The beginning of the timeseries
* @param end The end of the timeseries
* @param database The database to be queried
* @param timeseriesList The list of timeseries whose points are queried
* @param function The aggregate function
* @param groupBy The time interval measurements get grouped by
* @param fillOption The fill option for missing values
* @param devicesFilterSet A set of allowed devices or an empty set
* @param locationsFilterSet A set of allowed locations or an empty set
* @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
* @return InputStream containing the CSV file
* @throws IOException When the CSV file could not be written
*/
@Timed(value = "shepard.timeseries.service")
public InputStream exportTimeseriesPayload(
long start,
long end,
String database,
List<Timeseries> timeseriesList,
SingleValuedUnaryFunction function,
Long groupBy,
FillOption fillOption,
Set<String> devicesFilterSet,
Set<String> locationsFilterSet,
Set<String> symbolicNameFilterSet
) throws IOException {
var payload = getTimeseriesPayloadList(
start,
end,
database,
timeseriesList,
function,
groupBy,
fillOption,
devicesFilterSet,
locationsFilterSet,
symbolicNameFilterSet
);
var stream = csvConverter.convertToCsv(payload);
return stream;
}
/**
* Import multiple timeseries from a CSV file
*
* @param database The database to write to
* @param stream The InputStream containing the CSV file
* @return An error if there was a problem, empty string if all went well
* @throws IOException If the CSV file could not be read
*/
@Timed(value = "shepard.timeseries.service")
public String importTimeseries(String database, InputStream stream) throws IOException {
List<String> errors = new ArrayList<>();
var timeseriesList = csvConverter.convertToPayload(stream);
for (var timeseries : timeseriesList) {
var error = createTimeseries(database, timeseries);
if (!error.isBlank()) {
errors.add(error);
}
}
return String.join(", ", errors);
}
/**
* Creates a new database called by a random string
*
* @return String the new database
*/
@Timed(value = "shepard.timeseries.service")
public String createDatabase() {
String name = UUID.randomUUID().toString();
influxConnector.createDatabase(name);
return name;
}
@Timed(value = "shepard.timeseries.service")
public void deleteDatabase(String database) {
influxConnector.deleteDatabase(database);
}
private boolean matchFilter(Timeseries timeseries, Set<String> device, Set<String> location, Set<String> symName) {
var deviceMatches = true;
var locatioMatches = true;
var symbolicNameMatches = true;
if (!device.isEmpty()) {
deviceMatches = device.contains(timeseries.getDevice());
}
if (!location.isEmpty()) {
locatioMatches = location.contains(timeseries.getLocation());
}
if (!symName.isEmpty()) {
symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
}
return deviceMatches && locatioMatches && symbolicNameMatches;
}
}