View Javadoc
1   package de.dlr.shepard.data.timeseries.services;
2   
3   import de.dlr.shepard.common.exceptions.InvalidAuthException;
4   import de.dlr.shepard.common.exceptions.InvalidBodyException;
5   import de.dlr.shepard.common.exceptions.InvalidPathException;
6   import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7   import de.dlr.shepard.data.timeseries.model.Timeseries;
8   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11  import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13  import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14  import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15  import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16  import io.quarkus.logging.Log;
17  import io.quarkus.narayana.jta.QuarkusTransaction;
18  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19  import jakarta.enterprise.context.RequestScoped;
20  import jakarta.enterprise.context.control.ActivateRequestContext;
21  import jakarta.inject.Inject;
22  import jakarta.transaction.Transactional;
23  import jakarta.ws.rs.NotFoundException;
24  import java.sql.SQLException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Optional;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import org.eclipse.microprofile.config.ConfigProvider;
31  
32  @RequestScoped
33  public class TimeseriesService {
34  
35    @Inject
36    TimeseriesRepository timeseriesRepository;
37  
38    @Inject
39    TimeseriesDataPointRepository timeseriesDataPointRepository;
40  
41    @Inject
42    TimeseriesContainerService timeseriesContainerService;
43  
44    /**
45     * Flag to determine whether integer values received should be automatically converted to double if the
46     * timeseries they are supposed to be inserted is of type double.
47     * This flag is not injected using @ConfigProperty because that would make testing much more complicated.
48     * These properties are set upon startup and cannot be changed within a single test.
49     */
50    Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
51      .getOptionalValue("shepard.autoconvert-int", Boolean.class)
52      .orElse(false);
53  
54    /**
55     * Returns a list of timeseries objects that are in the given database.
56     *
57     * Returns an empty list if the timeseries container is not accessible (cannot
58     * be found or wrong permissions).
59     *
60     * @param containerId of the given timeseries container
61     * @return a list of timeseries entities
62     */
63    public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
64      timeseriesContainerService.getContainer(containerId);
65  
66      return timeseriesRepository.list("containerId", containerId);
67    }
68  
69    /**
70     * Returns a timeseries entity by id
71     *
72     * @param containerId timeseries container id
73     * @param id
74     * @return TimeseriesEntity
75     * @throws InvalidPathException if container with containerId or the timeseries
76     *                              are not accessible
77     * @throws InvalidAuthException if user has no read permissions on the
78     *                              timeseries container
79     */
80    public TimeseriesEntity getTimeseriesById(long containerId, int id) {
81      timeseriesContainerService.getContainer(containerId);
82  
83      var timeseries = timeseriesRepository.findById(id);
84      if (timeseries == null) {
85        String errorMsg = String.format(
86          "ID ERROR - Timeseries with id %s in container %s is null or deleted",
87          id,
88          containerId
89        );
90        Log.error(errorMsg);
91        throw new InvalidPathException(errorMsg);
92      }
93      return timeseries;
94    }
95  
96    /**
97     * Returns a timeseries entity
98     *
99     * @param containerId timeseries container id
100    * @param timeseries
101    * @return TimeseriesEntity
102    * @throws NotFoundException if the timeseries is not found
103    * @throws InvalidPathException if container with containerId is not accessible
104    * @throws InvalidAuthException if user has no read permissions on the timeseries container
105    */
106   public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
107     timeseriesContainerService.getContainer(containerId);
108 
109     var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
110     if (timeseriesEntity.isEmpty()) {
111       String errorMsg = String.format(
112         "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
113         timeseries.getMeasurement(),
114         timeseries.getDevice(),
115         timeseries.getLocation(),
116         timeseries.getSymbolicName(),
117         timeseries.getField(),
118         containerId
119       );
120       Log.error(errorMsg);
121       throw new NotFoundException(errorMsg);
122     }
123     return timeseriesEntity.get();
124   }
125 
126   /**
127    * Deletes timeseries container by id
128    *
129    * @param containerId
130    * @throws InvalidPathException if container could not be found
131    * @throws InvalidAuthException if user has no edit permissions on container
132    */
133   @Transactional
134   public void deleteTimeseriesByContainerId(long containerId) {
135     timeseriesContainerService.getContainer(containerId);
136     timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
137     this.timeseriesRepository.deleteByContainerId(containerId);
138   }
139 
140   /**
141    * Retrieve a list of DataPoints for a time-interval with options to grouping/
142    * time slicing, filling and aggregating.
143    *
144    * @return List of TimeseriesDataPoint
145    * @throws InvalidPathException if container is null or deleted
146    * @throws InvalidAuthException if user has no read permissions on container
147    */
148   public List<TimeseriesDataPoint> getDataPointsByTimeseries(
149     long containerId,
150     Timeseries timeseries,
151     TimeseriesDataPointsQueryParams queryParams
152   ) {
153     timeseriesContainerService.getContainer(containerId);
154 
155     return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
156   }
157 
158   /**
159    * Retrieve a list of DataPoints for a time-interval with options to grouping/
160    * time slicing, filling and aggregating.
161    *
162    * This function does not check if the container specified by containerId is
163    * accessible.
164    * We add <code>@ActivateRequestContext</code> in order to call this method in a
165    * parallel stream.
166    * The container check relies on an active request context.
167    * However, the 'ActivateRequestContext' annotation does not allow for a
168    * container check.
169    *
170    * @param containerId
171    * @param timeseries
172    * @param queryParams
173    * @return List<TimeseriesDataPoint>
174    */
175   @ActivateRequestContext
176   public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
177     long containerId,
178     Timeseries timeseries,
179     TimeseriesDataPointsQueryParams queryParams
180   ) {
181     Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
182 
183     if (timeseriesEntity.isEmpty()) return Collections.emptyList();
184 
185     int timeseriesId = timeseriesEntity.get().getId();
186     DataPointValueType valueType = timeseriesEntity.get().getValueType();
187 
188     return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
189   }
190 
191   public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
192     Long containerId,
193     List<Timeseries> timeseriesList,
194     TimeseriesDataPointsQueryParams queryParams
195   ) {
196     timeseriesContainerService.getContainer(containerId);
197 
198     ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
199       TimeseriesWithDataPoints
200     >();
201     timeseriesList
202       .parallelStream()
203       .forEach(timeseries -> {
204         timeseriesWithDataPointsQueue.add(
205           new TimeseriesWithDataPoints(
206             timeseries,
207             getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
208           )
209         );
210       });
211     return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
212   }
213 
214   /**
215    * Saves data points in the database.
216    * If the corresponding timeseries did not exist before, it will be persisted in
217    * the database.
218    *
219    * @param timeseriesContainerId Identifies the TimeseriesContainer
220    * @param timeseries            The timeseries identifiers
221    * @param dataPoints            Data points to be added to the timeseries
222    * @return created timeseries
223    */
224   public TimeseriesEntity saveDataPoints(
225     long timeseriesContainerId,
226     Timeseries timeseries,
227     List<TimeseriesDataPoint> dataPoints
228   ) {
229     timeseriesContainerService.getContainer(timeseriesContainerId);
230     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
231 
232     DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
233       () -> new InvalidBodyException()
234     );
235 
236     return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
237   }
238 
239   /**
240    * Saves data points in the database.
241    * If the corresponding timeseries did not exist before, it will be persisted in
242    * the database.
243    *
244    * @param timeseriesContainerId Identifies the TimeseriesContainer
245    * @param timeseries            The timeseries identifiers
246    * @param dataPoints            Data points to be added to the timeseries
247    * @param dataType              The data type that values in this timeseries
248    *                              will have
249    * @return created timeseries
250    */
251   @Transactional(Transactional.TxType.REQUIRES_NEW)
252   @TransactionConfiguration(timeout = 6000)
253   public TimeseriesEntity saveDataPoints(
254     long timeseriesContainerId,
255     Timeseries timeseries,
256     List<TimeseriesDataPoint> dataPoints,
257     DataPointValueType dataType
258   ) {
259     timeseriesContainerService.getContainer(timeseriesContainerId);
260     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
261 
262     TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
263 
264     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
265 
266     timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
267 
268     return timeseriesEntity;
269   }
270 
271   /**
272    * Saves data points in the database.
273    * If the corresponding timeseries did not exist before, it will be persisted in
274    * the database.
275    *
276    * This function is introduced to fix the issue occurring in ticket: #581.
277    *
278    * TODO: This function should only be used for the timeseries migration, and
279    * should be deleted in the future!
280    *
281    * @param timeseriesContainerId Identifies the TimeseriesContainer
282    * @param timeseries            The timeseries identifiers
283    * @param dataPoints            Data points to be added to the timeseries
284    * @param dataType              The data type that values in this timeseries
285    *                              will have
286    * @return created timeseries
287    */
288   @Deprecated
289   @Transactional(Transactional.TxType.REQUIRES_NEW)
290   @TransactionConfiguration(timeout = 6000)
291   public TimeseriesEntity saveDataPointsNoChecks(
292     long timeseriesContainerId,
293     Timeseries timeseries,
294     List<TimeseriesDataPoint> dataPoints,
295     DataPointValueType dataType
296   ) {
297     TimeseriesEntity timeseriesEntity = getOrCreateTimeseriesNoChecks(timeseriesContainerId, timeseries, dataType);
298     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
299     try {
300       timeseriesDataPointRepository.insertManyDataPointsWithCopyCommand(dataPoints, timeseriesEntity);
301     } catch (SQLException e) {
302       // COPY command cannot handle duplicates (conflict on timeseries_id and time column)
303       // We are going to use the normal batch insertion instead.
304       Log.warnf("SQLException during copy insert (expected): %s ", e.getMessage());
305       Log.warn("We are going to repeat the operation with batch insert.");
306       return repeatSaveDataPointsWithBatchInsert(dataPoints, timeseriesEntity);
307     }
308     return timeseriesEntity;
309   }
310 
311   @Deprecated
312   @Transactional(Transactional.TxType.REQUIRES_NEW)
313   @TransactionConfiguration(timeout = 6000)
314   public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
315     List<TimeseriesDataPoint> entities,
316     TimeseriesEntity timeseriesEntity
317   ) {
318     timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
319     return timeseriesEntity;
320   }
321 
322   /**
323    * This function is introduced to fix the issue occurring in ticket: #581.
324    *
325    * TODO: This function should only be used for the timeseries migration, and
326    * should be deleted in the future!
327    */
328   @Deprecated
329   private TimeseriesEntity getOrCreateTimeseriesNoChecks(
330     long containerId,
331     Timeseries timeseries,
332     DataPointValueType incomingValueType
333   ) {
334     // try to find timeseries in db
335     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
336     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
337     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
338 
339     // create new timeseries because it does not exist
340     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
341     QuarkusTransaction.requiringNew()
342       .run(() -> {
343         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
344       });
345     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
346     return found.get();
347   }
348 
349   private TimeseriesEntity getOrCreateTimeseries(
350     long containerId,
351     Timeseries timeseries,
352     DataPointValueType incomingValueType
353   ) {
354     timeseriesContainerService.getContainer(containerId);
355     timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
356 
357     // try to find timeseries in db
358     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
359 
360     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
361 
362     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
363 
364     // create new timeseries because it does not exist
365     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
366     QuarkusTransaction.requiringNew()
367       .run(() -> {
368         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
369       });
370 
371     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
372     return found.get();
373   }
374 
375   private void assertDataPointsMatchTimeseriesValueType(
376     TimeseriesEntity timeseriesEntity,
377     List<TimeseriesDataPoint> dataPoints
378   ) {
379     for (TimeseriesDataPoint dataPoint : dataPoints) {
380       DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
381         new InvalidBodyException()
382       );
383       assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
384     }
385   }
386 
387   private void assertValueTypeMatchesTimeseries(TimeseriesEntity timeseries, DataPointValueType incomingValueType) {
388     // If auto-conversion is enabled, allow transformation from Integer to Double
389     if (
390       autoConvertIntToDouble &&
391       incomingValueType == DataPointValueType.Integer &&
392       timeseries.getValueType() == DataPointValueType.Double
393     ) return;
394 
395     if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
396       "Timeseries already exists for data type %s but new data points are of type %s",
397       timeseries.getValueType(),
398       incomingValueType
399     );
400   }
401 }