View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import de.dlr.shepard.exceptions.InvalidBodyException;
4   import io.micrometer.core.annotation.Timed;
5   import jakarta.enterprise.context.RequestScoped;
6   import jakarta.inject.Inject;
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.util.ArrayList;
10  import java.util.List;
11  import java.util.Set;
12  import java.util.UUID;
13  import java.util.concurrent.ConcurrentLinkedQueue;
14  
15  @RequestScoped
16  public class TimeseriesService {
17  
18    private InfluxDBConnector influxConnector;
19    private CsvConverter csvConverter;
20  
21    TimeseriesService() {}
22  
23    @Inject
24    public TimeseriesService(InfluxDBConnector influxConnector, CsvConverter csvConverter) {
25      this.influxConnector = influxConnector;
26      this.csvConverter = csvConverter;
27    }
28  
29    /**
30     * Creates timeseries and writes them to influxDB
31     *
32     * @param database The database to be queried
33     * @param payload  the Timeseries with InfluxPoints to be created
34     * @return An error if there was a problem, empty string if all went well
35     */
36    @Timed(value = "shepard.timeseries.service")
37    public String createTimeseries(String database, TimeseriesPayload payload) {
38      String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
39      if (!sanityCheck.isBlank()) throw new InvalidBodyException(sanityCheck);
40      if (!influxConnector.databaseExist(database)) {
41        return String.format("The database %s does not exist", database);
42      }
43      return influxConnector.saveTimeseriesPayload(database, payload);
44    }
45  
46    /**
47     * Queries the database for timeseries including aggregate functions.
48     *
49     * @param startTimeStamp The beginning of the timeseries
50     * @param endTimeStamp   The end of the timeseries
51     * @param database       The database to be queried
52     * @param timeseries     The timeseries whose points are queried
53     * @param function       The aggregate function
54     * @param groupBy        The time interval measurements get grouped by
55     * @param fillOption     The fill option for missing values
56     * @return timeseries with influx points
57     */
58    @Timed(value = "shepard.timeseries.service")
59    public TimeseriesPayload getTimeseriesPayload(
60      long startTimeStamp,
61      long endTimeStamp,
62      String database,
63      Timeseries timeseries,
64      SingleValuedUnaryFunction function,
65      Long groupBy,
66      FillOption fillOption
67    ) {
68      TimeseriesPayload payload = influxConnector.getTimeseriesPayload(
69        startTimeStamp,
70        endTimeStamp,
71        database,
72        timeseries,
73        function,
74        groupBy,
75        fillOption
76      );
77      return payload;
78    }
79  
80    /**
81     * Queries the database for many timeseries in parallel. Returns a list of
82     * timeseries. If the filter sets are empty, no filtering takes place.
83     *
84     * @param start                 The beginning of the timeseries
85     * @param end                   The end of the timeseries
86     * @param database              The database to be queried
87     * @param timeseriesList        The list of timeseries whose points are queried
88     * @param function              The aggregate function
89     * @param groupBy               The time interval measurements get grouped by
90     * @param fillOption            The fill option for missing values
91     * @param devicesFilterSet      A set of allowed devices or an empty set
92     * @param locationsFilterSet    A set of allowed locations or an empty set
93     * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
94     * @return a list of timeseries with influx points
95     */
96    @Timed(value = "shepard.timeseries.service")
97    public List<TimeseriesPayload> getTimeseriesPayloadList(
98      long start,
99      long end,
100     String database,
101     List<Timeseries> timeseriesList,
102     SingleValuedUnaryFunction function,
103     Long groupBy,
104     FillOption fillOption,
105     Set<String> devicesFilterSet,
106     Set<String> locationsFilterSet,
107     Set<String> symbolicNameFilterSet
108   ) {
109     var timeseriesPayloadQueue = new ConcurrentLinkedQueue<TimeseriesPayload>();
110     timeseriesList
111       .parallelStream()
112       .forEach(timeseries -> {
113         TimeseriesPayload payload = null;
114         if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
115           payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
116         }
117         if (payload != null) {
118           timeseriesPayloadQueue.add(payload);
119         }
120       });
121     return new ArrayList<>(timeseriesPayloadQueue);
122   }
123 
124   /**
125    * Returns a list of timeseries objects that are in the given database.
126    *
127    * @param database the given database
128    * @return a list of timeseries objects
129    */
130   @Timed(value = "shepard.timeseries.service")
131   public List<Timeseries> getTimeseriesAvailable(String database) {
132     return influxConnector.getTimeseriesAvailable(database);
133   }
134 
135   /**
136    * Export timeseries as CSV File. If the filter sets are empty, no filtering
137    * takes place.
138    *
139    * @param start                 The beginning of the timeseries
140    * @param end                   The end of the timeseries
141    * @param database              The database to be queried
142    * @param timeseriesList        The list of timeseries whose points are queried
143    * @param function              The aggregate function
144    * @param groupBy               The time interval measurements get grouped by
145    * @param fillOption            The fill option for missing values
146    * @param devicesFilterSet      A set of allowed devices or an empty set
147    * @param locationsFilterSet    A set of allowed locations or an empty set
148    * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
149    * @return InputStream containing the CSV file
150    * @throws IOException When the CSV file could not be written
151    */
152   @Timed(value = "shepard.timeseries.service")
153   public InputStream exportTimeseriesPayload(
154     long start,
155     long end,
156     String database,
157     List<Timeseries> timeseriesList,
158     SingleValuedUnaryFunction function,
159     Long groupBy,
160     FillOption fillOption,
161     Set<String> devicesFilterSet,
162     Set<String> locationsFilterSet,
163     Set<String> symbolicNameFilterSet
164   ) throws IOException {
165     var payload = getTimeseriesPayloadList(
166       start,
167       end,
168       database,
169       timeseriesList,
170       function,
171       groupBy,
172       fillOption,
173       devicesFilterSet,
174       locationsFilterSet,
175       symbolicNameFilterSet
176     );
177     var stream = csvConverter.convertToCsv(payload);
178     return stream;
179   }
180 
181   /**
182    * Import multiple timeseries from a CSV file
183    *
184    * @param database The database to write to
185    * @param stream   The InputStream containing the CSV file
186    * @return An error if there was a problem, empty string if all went well
187    * @throws IOException If the CSV file could not be read
188    */
189   @Timed(value = "shepard.timeseries.service")
190   public String importTimeseries(String database, InputStream stream) throws IOException {
191     List<String> errors = new ArrayList<>();
192     var timeseriesList = csvConverter.convertToPayload(stream);
193     for (var timeseries : timeseriesList) {
194       var error = createTimeseries(database, timeseries);
195       if (!error.isBlank()) {
196         errors.add(error);
197       }
198     }
199     return String.join(", ", errors);
200   }
201 
202   /**
203    * Creates a new database called by a random string
204    *
205    * @return String the new database
206    */
207   @Timed(value = "shepard.timeseries.service")
208   public String createDatabase() {
209     String name = UUID.randomUUID().toString();
210     influxConnector.createDatabase(name);
211     return name;
212   }
213 
214   @Timed(value = "shepard.timeseries.service")
215   public void deleteDatabase(String database) {
216     influxConnector.deleteDatabase(database);
217   }
218 
219   private boolean matchFilter(Timeseries timeseries, Set<String> device, Set<String> location, Set<String> symName) {
220     var deviceMatches = true;
221     var locatioMatches = true;
222     var symbolicNameMatches = true;
223     if (!device.isEmpty()) {
224       deviceMatches = device.contains(timeseries.getDevice());
225     }
226     if (!location.isEmpty()) {
227       locatioMatches = location.contains(timeseries.getLocation());
228     }
229     if (!symName.isEmpty()) {
230       symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
231     }
232     return deviceMatches && locatioMatches && symbolicNameMatches;
233   }
234 }