View Javadoc
1   package de.dlr.shepard.data.timeseries.services;
2   
3   import de.dlr.shepard.common.exceptions.InvalidAuthException;
4   import de.dlr.shepard.common.exceptions.InvalidBodyException;
5   import de.dlr.shepard.common.exceptions.InvalidPathException;
6   import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7   import de.dlr.shepard.data.timeseries.model.Timeseries;
8   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11  import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13  import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14  import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15  import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16  import io.quarkus.logging.Log;
17  import io.quarkus.narayana.jta.QuarkusTransaction;
18  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19  import jakarta.enterprise.context.RequestScoped;
20  import jakarta.enterprise.context.control.ActivateRequestContext;
21  import jakarta.inject.Inject;
22  import jakarta.transaction.Transactional;
23  import jakarta.ws.rs.NotFoundException;
24  import java.sql.SQLException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Optional;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import org.eclipse.microprofile.config.ConfigProvider;
31  
32  @RequestScoped
33  public class TimeseriesService {
34  
35    @Inject
36    TimeseriesRepository timeseriesRepository;
37  
38    @Inject
39    TimeseriesDataPointRepository timeseriesDataPointRepository;
40  
41    @Inject
42    TimeseriesContainerService timeseriesContainerService;
43  
44    /**
45     * Flag to determine whether integer values received should be automatically converted to double if the
46     * timeseries they are supposed to be inserted is of type double.
47     * This flag is not injected using @ConfigProperty because that would make testing much more complicated.
48     * These properties are set upon startup and cannot be changed within a single test.
49     */
50    Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
51      .getOptionalValue("shepard.autoconvert-int", Boolean.class)
52      .orElse(false);
53  
54    /**
55     * Returns a list of timeseries objects that are in the given database.
56     *
57     * Returns an empty list if the timeseries container is not accessible (cannot
58     * be found or wrong permissions).
59     *
60     * @param containerId of the given timeseries container
61     * @return a list of timeseries entities
62     */
63    public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
64      timeseriesContainerService.getContainer(containerId);
65  
66      return timeseriesRepository.list("containerId", containerId);
67    }
68  
69    /**
70     * Returns a timeseries entity by id
71     *
72     * @param containerId timeseries container id
73     * @param id
74     * @return TimeseriesEntity
75     * @throws InvalidPathException if container with containerId or the timeseries
76     *                              are not accessible
77     * @throws InvalidAuthException if user has no read permissions on the
78     *                              timeseries container
79     */
80    public TimeseriesEntity getTimeseriesById(long containerId, int id) {
81      timeseriesContainerService.getContainer(containerId);
82  
83      var timeseries = timeseriesRepository.findById(id);
84      if (timeseries == null) {
85        String errorMsg =
86          "ID ERROR - Timeseries with id %s in container %s is null or deleted".formatted(id, containerId);
87        Log.error(errorMsg);
88        throw new InvalidPathException(errorMsg);
89      }
90      return timeseries;
91    }
92  
93    /**
94     * Returns a timeseries entity
95     *
96     * @param containerId timeseries container id
97     * @param timeseries
98     * @return TimeseriesEntity
99     * @throws NotFoundException if the timeseries is not found
100    * @throws InvalidPathException if container with containerId is not accessible
101    * @throws InvalidAuthException if user has no read permissions on the timeseries container
102    */
103   public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
104     timeseriesContainerService.getContainer(containerId);
105 
106     var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
107     if (timeseriesEntity.isEmpty()) {
108       String errorMsg =
109         "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted".formatted(
110             timeseries.getMeasurement(),
111             timeseries.getDevice(),
112             timeseries.getLocation(),
113             timeseries.getSymbolicName(),
114             timeseries.getField(),
115             containerId
116           );
117       Log.error(errorMsg);
118       throw new NotFoundException(errorMsg);
119     }
120     return timeseriesEntity.get();
121   }
122 
123   /**
124    * Deletes timeseries container by id
125    *
126    * @param containerId
127    * @throws InvalidPathException if container could not be found
128    * @throws InvalidAuthException if user has no edit permissions on container
129    */
130   @Transactional
131   public void deleteTimeseriesByContainerId(long containerId) {
132     timeseriesContainerService.getContainer(containerId);
133     timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
134     this.timeseriesRepository.deleteByContainerId(containerId);
135   }
136 
137   /**
138    * Retrieve a list of DataPoints for a time-interval with options to grouping/
139    * time slicing, filling and aggregating.
140    *
141    * @return List of TimeseriesDataPoint
142    * @throws InvalidPathException if container is null or deleted
143    * @throws InvalidAuthException if user has no read permissions on container
144    */
145   public List<TimeseriesDataPoint> getDataPointsByTimeseries(
146     long containerId,
147     Timeseries timeseries,
148     TimeseriesDataPointsQueryParams queryParams
149   ) {
150     timeseriesContainerService.getContainer(containerId);
151 
152     return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
153   }
154 
155   /**
156    * Retrieve a list of DataPoints for a time-interval with options to grouping/
157    * time slicing, filling and aggregating.
158    *
159    * This function does not check if the container specified by containerId is
160    * accessible.
161    * We add <code>@ActivateRequestContext</code> in order to call this method in a
162    * parallel stream.
163    * The container check relies on an active request context.
164    * However, the 'ActivateRequestContext' annotation does not allow for a
165    * container check.
166    *
167    * @param containerId
168    * @param timeseries
169    * @param queryParams
170    * @return List<TimeseriesDataPoint>
171    */
172   @ActivateRequestContext
173   public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
174     long containerId,
175     Timeseries timeseries,
176     TimeseriesDataPointsQueryParams queryParams
177   ) {
178     Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
179 
180     if (timeseriesEntity.isEmpty()) return Collections.emptyList();
181 
182     int timeseriesId = timeseriesEntity.get().getId();
183     DataPointValueType valueType = timeseriesEntity.get().getValueType();
184 
185     return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
186   }
187 
188   public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
189     Long containerId,
190     List<Timeseries> timeseriesList,
191     TimeseriesDataPointsQueryParams queryParams
192   ) {
193     timeseriesContainerService.getContainer(containerId);
194 
195     ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
196       TimeseriesWithDataPoints
197     >();
198     timeseriesList
199       .parallelStream()
200       .forEach(timeseries -> {
201         timeseriesWithDataPointsQueue.add(
202           new TimeseriesWithDataPoints(
203             timeseries,
204             getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
205           )
206         );
207       });
208     return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
209   }
210 
211   /**
212    * Saves data points in the database.
213    * If the corresponding timeseries did not exist before, it will be persisted in
214    * the database.
215    *
216    * @param timeseriesContainerId Identifies the TimeseriesContainer
217    * @param timeseries            The timeseries identifiers
218    * @param dataPoints            Data points to be added to the timeseries
219    * @return created timeseries
220    */
221   public TimeseriesEntity saveDataPoints(
222     long timeseriesContainerId,
223     Timeseries timeseries,
224     List<TimeseriesDataPoint> dataPoints
225   ) {
226     timeseriesContainerService.getContainer(timeseriesContainerId);
227     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
228 
229     DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(
230       dataPoints.getFirst().getValue()
231     ).orElseThrow(() -> new InvalidBodyException());
232 
233     return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
234   }
235 
236   /**
237    * Saves data points in the database.
238    * If the corresponding timeseries did not exist before, it will be persisted in
239    * the database.
240    *
241    * @param timeseriesContainerId Identifies the TimeseriesContainer
242    * @param timeseries            The timeseries identifiers
243    * @param dataPoints            Data points to be added to the timeseries
244    * @param dataType              The data type that values in this timeseries
245    *                              will have
246    * @return created timeseries
247    */
248   @Transactional(Transactional.TxType.REQUIRES_NEW)
249   @TransactionConfiguration(timeout = 6000)
250   public TimeseriesEntity saveDataPoints(
251     long timeseriesContainerId,
252     Timeseries timeseries,
253     List<TimeseriesDataPoint> dataPoints,
254     DataPointValueType dataType
255   ) {
256     timeseriesContainerService.getContainer(timeseriesContainerId);
257     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
258 
259     TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
260 
261     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
262 
263     timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
264 
265     return timeseriesEntity;
266   }
267 
268   @Deprecated
269   @Transactional(Transactional.TxType.REQUIRES_NEW)
270   @TransactionConfiguration(timeout = 6000)
271   public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
272     List<TimeseriesDataPoint> entities,
273     TimeseriesEntity timeseriesEntity
274   ) {
275     timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
276     return timeseriesEntity;
277   }
278 
279   private TimeseriesEntity getOrCreateTimeseries(
280     long containerId,
281     Timeseries timeseries,
282     DataPointValueType incomingValueType
283   ) {
284     timeseriesContainerService.getContainer(containerId);
285     timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
286 
287     // try to find timeseries in db
288     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
289 
290     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
291 
292     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
293 
294     // create new timeseries because it does not exist
295     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
296     QuarkusTransaction.requiringNew()
297       .run(() -> {
298         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
299       });
300 
301     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
302     return found.get();
303   }
304 
305   private void assertDataPointsMatchTimeseriesValueType(
306     TimeseriesEntity timeseriesEntity,
307     List<TimeseriesDataPoint> dataPoints
308   ) {
309     for (TimeseriesDataPoint dataPoint : dataPoints) {
310       DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
311         new InvalidBodyException()
312       );
313       assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
314     }
315   }
316 
317   private void assertValueTypeMatchesTimeseries(TimeseriesEntity timeseries, DataPointValueType incomingValueType) {
318     // If auto-conversion is enabled, allow transformation from Integer to Double
319     if (
320       autoConvertIntToDouble &&
321       incomingValueType == DataPointValueType.Integer &&
322       timeseries.getValueType() == DataPointValueType.Double
323     ) return;
324 
325     if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
326       "Timeseries already exists for data type %s but new data points are of type %s",
327       timeseries.getValueType(),
328       incomingValueType
329     );
330   }
331 }