View Javadoc
1   package de.dlr.shepard.data.timeseries.services;
2   
3   import de.dlr.shepard.common.exceptions.InvalidAuthException;
4   import de.dlr.shepard.common.exceptions.InvalidBodyException;
5   import de.dlr.shepard.common.exceptions.InvalidPathException;
6   import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7   import de.dlr.shepard.data.timeseries.model.Timeseries;
8   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11  import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13  import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14  import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15  import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16  import io.quarkus.logging.Log;
17  import io.quarkus.narayana.jta.QuarkusTransaction;
18  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19  import jakarta.enterprise.context.RequestScoped;
20  import jakarta.enterprise.context.control.ActivateRequestContext;
21  import jakarta.inject.Inject;
22  import jakarta.transaction.Transactional;
23  import jakarta.ws.rs.NotFoundException;
24  import java.sql.SQLException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Optional;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  
31  @RequestScoped
32  public class TimeseriesService {
33  
34    @Inject
35    TimeseriesRepository timeseriesRepository;
36  
37    @Inject
38    TimeseriesDataPointRepository timeseriesDataPointRepository;
39  
40    @Inject
41    TimeseriesContainerService timeseriesContainerService;
42  
43    /**
44     * Returns a list of timeseries objects that are in the given database.
45     *
46     * Returns an empty list if the timeseries container is not accessible (cannot
47     * be found or wrong permissions).
48     *
49     * @param containerId of the given timeseries container
50     * @return a list of timeseries entities
51     */
52    public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
53      timeseriesContainerService.getContainer(containerId);
54  
55      return timeseriesRepository.list("containerId", containerId);
56    }
57  
58    /**
59     * Returns a timeseries entity by id
60     *
61     * @param containerId timeseries container id
62     * @param id
63     * @return TimeseriesEntity
64     * @throws InvalidPathException if container with containerId or the timeseries
65     *                              are not accessible
66     * @throws InvalidAuthException if user has no read permissions on the
67     *                              timeseries container
68     */
69    public TimeseriesEntity getTimeseriesById(long containerId, int id) {
70      timeseriesContainerService.getContainer(containerId);
71  
72      var timeseries = timeseriesRepository.findById(id);
73      if (timeseries == null) {
74        String errorMsg = String.format(
75          "ID ERROR - Timeseries with id %s in container %s is null or deleted",
76          id,
77          containerId
78        );
79        Log.error(errorMsg);
80        throw new InvalidPathException(errorMsg);
81      }
82      return timeseries;
83    }
84  
85    /**
86     * Returns a timeseries entity
87     *
88     * @param containerId timeseries container id
89     * @param timeseries
90     * @return TimeseriesEntity
91     * @throws NotFoundException if the timeseries is not found
92     * @throws InvalidPathException if container with containerId is not accessible
93     * @throws InvalidAuthException if user has no read permissions on the timeseries container
94     */
95    public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
96      timeseriesContainerService.getContainer(containerId);
97  
98      var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
99      if (timeseriesEntity.isEmpty()) {
100       String errorMsg = String.format(
101         "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
102         timeseries.getMeasurement(),
103         timeseries.getDevice(),
104         timeseries.getLocation(),
105         timeseries.getSymbolicName(),
106         timeseries.getField(),
107         containerId
108       );
109       Log.error(errorMsg);
110       throw new NotFoundException(errorMsg);
111     }
112     return timeseriesEntity.get();
113   }
114 
115   /**
116    * Deletes timeseries container by id
117    *
118    * @param containerId
119    * @throws InvalidPathException if container could not be found
120    * @throws InvalidAuthException if user has no edit permissions on container
121    */
122   @Transactional
123   public void deleteTimeseriesByContainerId(long containerId) {
124     timeseriesContainerService.getContainer(containerId);
125     timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
126     this.timeseriesRepository.deleteByContainerId(containerId);
127   }
128 
129   /**
130    * Retrieve a list of DataPoints for a time-interval with options to grouping/
131    * time slicing, filling and aggregating.
132    *
133    * @return List of TimeseriesDataPoint
134    * @throws InvalidPathException if container is null or deleted
135    * @throws InvalidAuthException if user has no read permissions on container
136    */
137   public List<TimeseriesDataPoint> getDataPointsByTimeseries(
138     long containerId,
139     Timeseries timeseries,
140     TimeseriesDataPointsQueryParams queryParams
141   ) {
142     timeseriesContainerService.getContainer(containerId);
143 
144     return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
145   }
146 
147   /**
148    * Retrieve a list of DataPoints for a time-interval with options to grouping/
149    * time slicing, filling and aggregating.
150    *
151    * This function does not check if the container specified by containerId is
152    * accessible.
153    * We add <code>@ActivateRequestContext</code> in order to call this method in a
154    * parallel stream.
155    * The container check relies on an active request context.
156    * However, the 'ActivateRequestContext' annotation does not allow for a
157    * container check.
158    *
159    * @param containerId
160    * @param timeseries
161    * @param queryParams
162    * @return List<TimeseriesDataPoint>
163    */
164   @ActivateRequestContext
165   public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
166     long containerId,
167     Timeseries timeseries,
168     TimeseriesDataPointsQueryParams queryParams
169   ) {
170     Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
171 
172     if (timeseriesEntity.isEmpty()) return Collections.emptyList();
173 
174     int timeseriesId = timeseriesEntity.get().getId();
175     DataPointValueType valueType = timeseriesEntity.get().getValueType();
176 
177     return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
178   }
179 
180   public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
181     Long containerId,
182     List<Timeseries> timeseriesList,
183     TimeseriesDataPointsQueryParams queryParams
184   ) {
185     timeseriesContainerService.getContainer(containerId);
186 
187     ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
188       TimeseriesWithDataPoints
189     >();
190     timeseriesList
191       .parallelStream()
192       .forEach(timeseries -> {
193         timeseriesWithDataPointsQueue.add(
194           new TimeseriesWithDataPoints(
195             timeseries,
196             getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
197           )
198         );
199       });
200     return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
201   }
202 
203   /**
204    * Saves data points in the database.
205    * If the corresponding timeseries did not exist before, it will be persisted in
206    * the database.
207    *
208    * @param timeseriesContainerId Identifies the TimeseriesContainer
209    * @param timeseries            The timeseries identifiers
210    * @param dataPoints            Data points to be added to the timeseries
211    * @return created timeseries
212    */
213   public TimeseriesEntity saveDataPoints(
214     long timeseriesContainerId,
215     Timeseries timeseries,
216     List<TimeseriesDataPoint> dataPoints
217   ) {
218     timeseriesContainerService.getContainer(timeseriesContainerId);
219     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
220 
221     DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
222       () -> new InvalidBodyException()
223     );
224 
225     return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
226   }
227 
228   /**
229    * Saves data points in the database.
230    * If the corresponding timeseries did not exist before, it will be persisted in
231    * the database.
232    *
233    * @param timeseriesContainerId Identifies the TimeseriesContainer
234    * @param timeseries            The timeseries identifiers
235    * @param dataPoints            Data points to be added to the timeseries
236    * @param dataType              The data type that values in this timeseries
237    *                              will have
238    * @return created timeseries
239    */
240   @Transactional(Transactional.TxType.REQUIRES_NEW)
241   @TransactionConfiguration(timeout = 6000)
242   public TimeseriesEntity saveDataPoints(
243     long timeseriesContainerId,
244     Timeseries timeseries,
245     List<TimeseriesDataPoint> dataPoints,
246     DataPointValueType dataType
247   ) {
248     timeseriesContainerService.getContainer(timeseriesContainerId);
249     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
250 
251     TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
252 
253     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
254 
255     timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
256 
257     return timeseriesEntity;
258   }
259 
260   /**
261    * Saves data points in the database.
262    * If the corresponding timeseries did not exist before, it will be persisted in
263    * the database.
264    *
265    * This function is introduced to fix the issue occurring in ticket: #581.
266    *
267    * TODO: This function should only be used for the timeseries migration, and
268    * should be deleted in the future!
269    *
270    * @param timeseriesContainerId Identifies the TimeseriesContainer
271    * @param timeseries            The timeseries identifiers
272    * @param dataPoints            Data points to be added to the timeseries
273    * @param dataType              The data type that values in this timeseries
274    *                              will have
275    * @return created timeseries
276    */
277   @Deprecated
278   @Transactional(Transactional.TxType.REQUIRES_NEW)
279   @TransactionConfiguration(timeout = 6000)
280   public TimeseriesEntity saveDataPointsNoChecks(
281     long timeseriesContainerId,
282     Timeseries timeseries,
283     List<TimeseriesDataPoint> dataPoints,
284     DataPointValueType dataType
285   ) {
286     TimeseriesEntity timeseriesEntity = getOrCreateTimeseriesNoChecks(timeseriesContainerId, timeseries, dataType);
287     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
288     try {
289       timeseriesDataPointRepository.insertManyDataPointsWithCopyCommand(dataPoints, timeseriesEntity);
290     } catch (SQLException e) {
291       // COPY command cannot handle duplicates (conflict on timeseries_id and time column)
292       // We are going to use the normal batch insertion instead.
293       Log.warnf("SQLException during copy insert (expected): %s ", e.getMessage());
294       Log.warn("We are going to repeat the operation with batch insert.");
295       return repeatSaveDataPointsWithBatchInsert(dataPoints, timeseriesEntity);
296     }
297     return timeseriesEntity;
298   }
299 
300   @Deprecated
301   @Transactional(Transactional.TxType.REQUIRES_NEW)
302   @TransactionConfiguration(timeout = 6000)
303   public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
304     List<TimeseriesDataPoint> entities,
305     TimeseriesEntity timeseriesEntity
306   ) {
307     timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
308     return timeseriesEntity;
309   }
310 
311   /**
312    * This function is introduced to fix the issue occurring in ticket: #581.
313    *
314    * TODO: This function should only be used for the timeseries migration, and
315    * should be deleted in the future!
316    */
317   @Deprecated
318   private TimeseriesEntity getOrCreateTimeseriesNoChecks(
319     long containerId,
320     Timeseries timeseries,
321     DataPointValueType incomingValueType
322   ) {
323     // try to find timeseries in db
324     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
325     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
326     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
327 
328     // create new timeseries because it does not exist
329     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
330     QuarkusTransaction.requiringNew()
331       .run(() -> {
332         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
333       });
334     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
335     return found.get();
336   }
337 
338   private TimeseriesEntity getOrCreateTimeseries(
339     long containerId,
340     Timeseries timeseries,
341     DataPointValueType incomingValueType
342   ) {
343     timeseriesContainerService.getContainer(containerId);
344     timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
345 
346     // try to find timeseries in db
347     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
348 
349     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
350 
351     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
352 
353     // create new timeseries because it does not exist
354     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
355     QuarkusTransaction.requiringNew()
356       .run(() -> {
357         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
358       });
359 
360     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
361     return found.get();
362   }
363 
364   private static void assertDataPointsMatchTimeseriesValueType(
365     TimeseriesEntity timeseriesEntity,
366     List<TimeseriesDataPoint> dataPoints
367   ) {
368     for (TimeseriesDataPoint dataPoint : dataPoints) {
369       DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
370         new InvalidBodyException()
371       );
372       assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
373     }
374   }
375 
376   private static void assertValueTypeMatchesTimeseries(
377     TimeseriesEntity timeseries,
378     DataPointValueType incomingValueType
379   ) {
380     if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
381       "Timeseries already exists for data type %s but new data points are of type %s",
382       timeseries.getValueType(),
383       incomingValueType
384     );
385   }
386 }