View Javadoc
1   package de.dlr.shepard.data.timeseries.services;
2   
3   import de.dlr.shepard.common.exceptions.InvalidAuthException;
4   import de.dlr.shepard.common.exceptions.InvalidBodyException;
5   import de.dlr.shepard.common.exceptions.InvalidPathException;
6   import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7   import de.dlr.shepard.data.timeseries.model.Timeseries;
8   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11  import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13  import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14  import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15  import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16  import io.quarkus.logging.Log;
17  import io.quarkus.narayana.jta.QuarkusTransaction;
18  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19  import jakarta.enterprise.context.RequestScoped;
20  import jakarta.enterprise.context.control.ActivateRequestContext;
21  import jakarta.inject.Inject;
22  import jakarta.transaction.Transactional;
23  import jakarta.ws.rs.NotFoundException;
24  import java.sql.SQLException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Optional;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import org.eclipse.microprofile.config.ConfigProvider;
31  
32  @RequestScoped
33  public class TimeseriesService {
34  
35    @Inject
36    TimeseriesRepository timeseriesRepository;
37  
38    @Inject
39    TimeseriesDataPointRepository timeseriesDataPointRepository;
40  
41    @Inject
42    TimeseriesContainerService timeseriesContainerService;
43  
44    /**
45     * Flag to determine whether integer values received should be automatically converted to double if the
46     * timeseries they are supposed to be inserted is of type double.
47     * This flag is not injected using @ConfigProperty because that would make testing much more complicated.
48     * These properties are set upon startup and cannot be changed within a single test.
49     */
50    Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
51      .getOptionalValue("shepard.autoconvert-int", Boolean.class)
52      .orElse(false);
53  
54    /**
55     * Returns a list of timeseries objects that are in the given database.
56     *
57     * Returns an empty list if the timeseries container is not accessible (cannot
58     * be found or wrong permissions).
59     *
60     * @param containerId of the given timeseries container
61     * @return a list of timeseries entities
62     */
63    public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
64      timeseriesContainerService.getContainer(containerId);
65  
66      return timeseriesRepository.list("containerId", containerId);
67    }
68  
69    /**
70     * Returns a timeseries entity by id
71     *
72     * @param containerId timeseries container id
73     * @param id
74     * @return TimeseriesEntity
75     * @throws InvalidPathException if container with containerId or the timeseries
76     *                              are not accessible
77     * @throws InvalidAuthException if user has no read permissions on the
78     *                              timeseries container
79     */
80    public TimeseriesEntity getTimeseriesById(long containerId, int id) {
81      timeseriesContainerService.getContainer(containerId);
82  
83      var timeseries = timeseriesRepository.findById(id);
84      if (timeseries == null) {
85        String errorMsg = String.format(
86          "ID ERROR - Timeseries with id %s in container %s is null or deleted",
87          id,
88          containerId
89        );
90        Log.error(errorMsg);
91        throw new InvalidPathException(errorMsg);
92      }
93      return timeseries;
94    }
95  
96    /**
97     * Returns a timeseries entity
98     *
99     * @param containerId timeseries container id
100    * @param timeseries
101    * @return TimeseriesEntity
102    * @throws NotFoundException if the timeseries is not found
103    * @throws InvalidPathException if container with containerId is not accessible
104    * @throws InvalidAuthException if user has no read permissions on the timeseries container
105    */
106   public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
107     timeseriesContainerService.getContainer(containerId);
108 
109     var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
110     if (timeseriesEntity.isEmpty()) {
111       String errorMsg = String.format(
112         "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
113         timeseries.getMeasurement(),
114         timeseries.getDevice(),
115         timeseries.getLocation(),
116         timeseries.getSymbolicName(),
117         timeseries.getField(),
118         containerId
119       );
120       Log.error(errorMsg);
121       throw new NotFoundException(errorMsg);
122     }
123     return timeseriesEntity.get();
124   }
125 
126   /**
127    * Deletes timeseries container by id
128    *
129    * @param containerId
130    * @throws InvalidPathException if container could not be found
131    * @throws InvalidAuthException if user has no edit permissions on container
132    */
133   @Transactional
134   public void deleteTimeseriesByContainerId(long containerId) {
135     timeseriesContainerService.getContainer(containerId);
136     timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
137     this.timeseriesRepository.deleteByContainerId(containerId);
138   }
139 
140   /**
141    * Retrieve a list of DataPoints for a time-interval with options to grouping/
142    * time slicing, filling and aggregating.
143    *
144    * @return List of TimeseriesDataPoint
145    * @throws InvalidPathException if container is null or deleted
146    * @throws InvalidAuthException if user has no read permissions on container
147    */
148   public List<TimeseriesDataPoint> getDataPointsByTimeseries(
149     long containerId,
150     Timeseries timeseries,
151     TimeseriesDataPointsQueryParams queryParams
152   ) {
153     timeseriesContainerService.getContainer(containerId);
154 
155     return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
156   }
157 
158   /**
159    * Retrieve a list of DataPoints for a time-interval with options to grouping/
160    * time slicing, filling and aggregating.
161    *
162    * This function does not check if the container specified by containerId is
163    * accessible.
164    * We add <code>@ActivateRequestContext</code> in order to call this method in a
165    * parallel stream.
166    * The container check relies on an active request context.
167    * However, the 'ActivateRequestContext' annotation does not allow for a
168    * container check.
169    *
170    * @param containerId
171    * @param timeseries
172    * @param queryParams
173    * @return List<TimeseriesDataPoint>
174    */
175   @ActivateRequestContext
176   public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
177     long containerId,
178     Timeseries timeseries,
179     TimeseriesDataPointsQueryParams queryParams
180   ) {
181     Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
182 
183     if (timeseriesEntity.isEmpty()) return Collections.emptyList();
184 
185     int timeseriesId = timeseriesEntity.get().getId();
186     DataPointValueType valueType = timeseriesEntity.get().getValueType();
187 
188     return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
189   }
190 
191   public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
192     Long containerId,
193     List<Timeseries> timeseriesList,
194     TimeseriesDataPointsQueryParams queryParams
195   ) {
196     timeseriesContainerService.getContainer(containerId);
197 
198     ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
199       TimeseriesWithDataPoints
200     >();
201     timeseriesList
202       .parallelStream()
203       .forEach(timeseries -> {
204         timeseriesWithDataPointsQueue.add(
205           new TimeseriesWithDataPoints(
206             timeseries,
207             getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
208           )
209         );
210       });
211     return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
212   }
213 
214   /**
215    * Saves data points in the database.
216    * If the corresponding timeseries did not exist before, it will be persisted in
217    * the database.
218    *
219    * @param timeseriesContainerId Identifies the TimeseriesContainer
220    * @param timeseries            The timeseries identifiers
221    * @param dataPoints            Data points to be added to the timeseries
222    * @return created timeseries
223    */
224   public TimeseriesEntity saveDataPoints(
225     long timeseriesContainerId,
226     Timeseries timeseries,
227     List<TimeseriesDataPoint> dataPoints
228   ) {
229     timeseriesContainerService.getContainer(timeseriesContainerId);
230     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
231 
232     DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
233       () -> new InvalidBodyException()
234     );
235 
236     return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
237   }
238 
239   /**
240    * Saves data points in the database.
241    * If the corresponding timeseries did not exist before, it will be persisted in
242    * the database.
243    *
244    * @param timeseriesContainerId Identifies the TimeseriesContainer
245    * @param timeseries            The timeseries identifiers
246    * @param dataPoints            Data points to be added to the timeseries
247    * @param dataType              The data type that values in this timeseries
248    *                              will have
249    * @return created timeseries
250    */
251   @Transactional(Transactional.TxType.REQUIRES_NEW)
252   @TransactionConfiguration(timeout = 6000)
253   public TimeseriesEntity saveDataPoints(
254     long timeseriesContainerId,
255     Timeseries timeseries,
256     List<TimeseriesDataPoint> dataPoints,
257     DataPointValueType dataType
258   ) {
259     timeseriesContainerService.getContainer(timeseriesContainerId);
260     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
261 
262     TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
263 
264     assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
265 
266     timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
267 
268     return timeseriesEntity;
269   }
270 
271   @Deprecated
272   @Transactional(Transactional.TxType.REQUIRES_NEW)
273   @TransactionConfiguration(timeout = 6000)
274   public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
275     List<TimeseriesDataPoint> entities,
276     TimeseriesEntity timeseriesEntity
277   ) {
278     timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
279     return timeseriesEntity;
280   }
281 
282   private TimeseriesEntity getOrCreateTimeseries(
283     long containerId,
284     Timeseries timeseries,
285     DataPointValueType incomingValueType
286   ) {
287     timeseriesContainerService.getContainer(containerId);
288     timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
289 
290     // try to find timeseries in db
291     Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
292 
293     if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
294 
295     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
296 
297     // create new timeseries because it does not exist
298     TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
299     QuarkusTransaction.requiringNew()
300       .run(() -> {
301         this.timeseriesRepository.upsert(containerId, timeseriesEntity);
302       });
303 
304     var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
305     return found.get();
306   }
307 
308   private void assertDataPointsMatchTimeseriesValueType(
309     TimeseriesEntity timeseriesEntity,
310     List<TimeseriesDataPoint> dataPoints
311   ) {
312     for (TimeseriesDataPoint dataPoint : dataPoints) {
313       DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
314         new InvalidBodyException()
315       );
316       assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
317     }
318   }
319 
320   private void assertValueTypeMatchesTimeseries(TimeseriesEntity timeseries, DataPointValueType incomingValueType) {
321     // If auto-conversion is enabled, allow transformation from Integer to Double
322     if (
323       autoConvertIntToDouble &&
324       incomingValueType == DataPointValueType.Integer &&
325       timeseries.getValueType() == DataPointValueType.Double
326     ) return;
327 
328     if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
329       "Timeseries already exists for data type %s but new data points are of type %s",
330       timeseries.getValueType(),
331       incomingValueType
332     );
333   }
334 }