View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2   
3   import de.dlr.shepard.common.exceptions.InvalidBodyException;
4   import io.micrometer.core.annotation.Timed;
5   import jakarta.enterprise.context.RequestScoped;
6   import jakarta.inject.Inject;
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.util.ArrayList;
10  import java.util.List;
11  import java.util.Set;
12  import java.util.UUID;
13  import java.util.concurrent.ConcurrentLinkedQueue;
14  
15  @RequestScoped
16  public class InfluxTimeseriesService {
17  
18    @Inject
19    InfluxDBConnector influxConnector;
20  
21    @Inject
22    InfluxCsvConverter csvConverter;
23  
24    /**
25     * Creates timeseries and writes them to influxDB
26     *
27     * @param database The database to be queried
28     * @param payload  the Timeseries with InfluxPoints to be created
29     * @return An error if there was a problem, empty string if all went well
30     */
31    @Timed(value = "shepard.timeseries.service")
32    public String createTimeseries(String database, InfluxTimeseriesPayload payload) {
33      String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
34      if (!sanityCheck.isBlank()) throw new InvalidBodyException(sanityCheck);
35      if (!influxConnector.databaseExist(database)) {
36        return String.format("The database %s does not exist", database);
37      }
38      return influxConnector.saveTimeseriesPayload(database, payload);
39    }
40  
41    /**
42     * Queries the database for timeseries including aggregate functions.
43     *
44     * @param startTimeStamp The beginning of the timeseries
45     * @param endTimeStamp   The end of the timeseries
46     * @param database       The database to be queried
47     * @param timeseries     The timeseries whose points are queried
48     * @param function       The aggregate function
49     * @param groupBy        The time interval measurements get grouped by
50     * @param fillOption     The fill option for missing values
51     * @return timeseries with influx points
52     */
53    @Timed(value = "shepard.timeseries.service")
54    public InfluxTimeseriesPayload getTimeseriesPayload(
55      long startTimeStamp,
56      long endTimeStamp,
57      String database,
58      InfluxTimeseries timeseries,
59      InfluxSingleValuedUnaryFunction function,
60      Long groupBy,
61      InfluxFillOption fillOption
62    ) {
63      InfluxTimeseriesPayload payload = influxConnector.getTimeseriesPayload(
64        startTimeStamp,
65        endTimeStamp,
66        database,
67        timeseries,
68        function,
69        groupBy,
70        fillOption
71      );
72      return payload;
73    }
74  
75    /**
76     * Queries the database for many timeseries in parallel. Returns a list of
77     * timeseries. If the filter sets are empty, no filtering takes place.
78     *
79     * @param start                 The beginning of the timeseries
80     * @param end                   The end of the timeseries
81     * @param database              The database to be queried
82     * @param timeseriesList        The list of timeseries whose points are queried
83     * @param function              The aggregate function
84     * @param groupBy               The time interval measurements get grouped by
85     * @param fillOption            The fill option for missing values
86     * @param devicesFilterSet      A set of allowed devices or an empty set
87     * @param locationsFilterSet    A set of allowed locations or an empty set
88     * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
89     * @return a list of timeseries with influx points
90     */
91    @Timed(value = "shepard.timeseries.service")
92    public List<InfluxTimeseriesPayload> getTimeseriesPayloadList(
93      long start,
94      long end,
95      String database,
96      List<InfluxTimeseries> timeseriesList,
97      InfluxSingleValuedUnaryFunction function,
98      Long groupBy,
99      InfluxFillOption fillOption,
100     Set<String> devicesFilterSet,
101     Set<String> locationsFilterSet,
102     Set<String> symbolicNameFilterSet
103   ) {
104     var timeseriesPayloadQueue = new ConcurrentLinkedQueue<InfluxTimeseriesPayload>();
105     timeseriesList
106       .parallelStream()
107       .forEach(timeseries -> {
108         InfluxTimeseriesPayload payload = null;
109         if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
110           payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
111         }
112         if (payload != null) {
113           timeseriesPayloadQueue.add(payload);
114         }
115       });
116     return new ArrayList<>(timeseriesPayloadQueue);
117   }
118 
119   /**
120    * Returns a list of timeseries objects that are in the given database.
121    *
122    * @param database the given database
123    * @return a list of timeseries objects
124    */
125   @Timed(value = "shepard.timeseries.service")
126   public List<InfluxTimeseries> getTimeseriesAvailable(String database) {
127     return influxConnector.getTimeseriesAvailable(database);
128   }
129 
130   /**
131    * Export timeseries as CSV File. If the filter sets are empty, no filtering
132    * takes place.
133    *
134    * @param start                 The beginning of the timeseries
135    * @param end                   The end of the timeseries
136    * @param database              The database to be queried
137    * @param timeseriesList        The list of timeseries whose points are queried
138    * @param function              The aggregate function
139    * @param groupBy               The time interval measurements get grouped by
140    * @param fillOption            The fill option for missing values
141    * @param devicesFilterSet      A set of allowed devices or an empty set
142    * @param locationsFilterSet    A set of allowed locations or an empty set
143    * @param symbolicNameFilterSet A set of allowed symbolic names or an empty set
144    * @return InputStream containing the CSV file
145    * @throws IOException When the CSV file could not be written
146    */
147   @Timed(value = "shepard.timeseries.service")
148   public InputStream exportTimeseriesPayload(
149     long start,
150     long end,
151     String database,
152     List<InfluxTimeseries> timeseriesList,
153     InfluxSingleValuedUnaryFunction function,
154     Long groupBy,
155     InfluxFillOption fillOption,
156     Set<String> devicesFilterSet,
157     Set<String> locationsFilterSet,
158     Set<String> symbolicNameFilterSet
159   ) throws IOException {
160     var payload = getTimeseriesPayloadList(
161       start,
162       end,
163       database,
164       timeseriesList,
165       function,
166       groupBy,
167       fillOption,
168       devicesFilterSet,
169       locationsFilterSet,
170       symbolicNameFilterSet
171     );
172     var stream = csvConverter.convertToCsv(payload);
173     return stream;
174   }
175 
176   /**
177    * Import multiple timeseries from a CSV file
178    *
179    * @param database The database to write to
180    * @param stream   The InputStream containing the CSV file
181    * @return An error if there was a problem, empty string if all went well
182    * @throws IOException If the CSV file could not be read
183    */
184   @Timed(value = "shepard.timeseries.service")
185   public String importTimeseries(String database, InputStream stream) throws IOException {
186     List<String> errors = new ArrayList<>();
187     var timeseriesList = csvConverter.convertToPayload(stream);
188     for (var timeseries : timeseriesList) {
189       var error = createTimeseries(database, timeseries);
190       if (!error.isBlank()) {
191         errors.add(error);
192       }
193     }
194     return String.join(", ", errors);
195   }
196 
197   /**
198    * Creates a new database called by a random string
199    *
200    * @return String the new database
201    */
202   @Timed(value = "shepard.timeseries.service")
203   public String createDatabase() {
204     String name = UUID.randomUUID().toString();
205     influxConnector.createDatabase(name);
206     return name;
207   }
208 
209   @Timed(value = "shepard.timeseries.service")
210   public void deleteDatabase(String database) {
211     influxConnector.deleteDatabase(database);
212   }
213 
214   private boolean matchFilter(
215     InfluxTimeseries timeseries,
216     Set<String> device,
217     Set<String> location,
218     Set<String> symName
219   ) {
220     var deviceMatches = true;
221     var locatioMatches = true;
222     var symbolicNameMatches = true;
223     if (!device.isEmpty()) {
224       deviceMatches = device.contains(timeseries.getDevice());
225     }
226     if (!location.isEmpty()) {
227       locatioMatches = location.contains(timeseries.getLocation());
228     }
229     if (!symName.isEmpty()) {
230       symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
231     }
232     return deviceMatches && locatioMatches && symbolicNameMatches;
233   }
234 }