View Javadoc
1   package de.dlr.shepard.data.timeseries.services;
2   
3   import de.dlr.shepard.common.exceptions.InvalidAuthException;
4   import de.dlr.shepard.common.exceptions.InvalidBodyException;
5   import de.dlr.shepard.common.exceptions.InvalidPathException;
6   import de.dlr.shepard.context.references.timeseriesreference.daos.TimeseriesTupleDAO;
7   import de.dlr.shepard.data.timeseries.daos.TimeseriesDAO;
8   import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
9   import de.dlr.shepard.data.timeseries.model.Timeseries;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
11  import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
12  import de.dlr.shepard.data.timeseries.model.TimeseriesTuple;
13  import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
14  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
15  import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
16  import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
17  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
18  import jakarta.enterprise.context.RequestScoped;
19  import jakarta.enterprise.context.control.ActivateRequestContext;
20  import jakarta.inject.Inject;
21  import jakarta.transaction.Transactional;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.NoSuchElementException;
25  import java.util.Optional;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.stream.Stream;
28  import org.eclipse.microprofile.config.ConfigProvider;
29  
30  @RequestScoped
31  public class TimeseriesService {
32  
33    @Inject
34    TimeseriesDAO timeseriesDAO;
35  
36    @Inject
37    TimeseriesTupleDAO timeseriesTupleDAO;
38  
39    @Inject
40    TimeseriesDataPointRepository timeseriesDataPointRepository;
41  
42    @Inject
43    TimeseriesContainerService timeseriesContainerService;
44  
45    /**
46     * Flag to determine whether integer values received should be automatically converted to double if the
47     * timeseries they are supposed to be inserted is of type double.
48     * This flag is not injected using @ConfigProperty because that would make testing much more complicated.
49     * These properties are set upon startup and cannot be changed within a single test.
50     */
51    Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
52      .getOptionalValue("shepard.autoconvert-int", Boolean.class)
53      .orElse(false);
54  
55    /**
56     * Returns a list of timeseries objects that are in the given database.
57     * <p/>
58     * Returns an empty list if the timeseries container is not accessible (cannot
59     * be found or wrong permissions).
60     *
61     * @param containerId of the given timeseries container
62     * @return a list of timeseries entities
63     */
64    public Stream<Timeseries> getTimeseriesAvailable(long containerId) {
65      timeseriesContainerService.getContainer(containerId);
66      return timeseriesDAO.getAllTimeseriesInContainer(containerId);
67    }
68  
69    /**
70     * Returns a timeseries entity by its timeseries id.
71     *
72     * @param id timeseries id
73     * @return timeseries
74     * @throws NoSuchElementException if the timeseries does not exist
75     * @throws InvalidAuthException   if user has no read permissions on the timeseries container
76     * @throws InvalidPathException   if container with containerId or the timeseries are not accessible
77     */
78    public Timeseries getTimeseriesById(Long id)
79      throws NoSuchElementException, InvalidAuthException, InvalidPathException {
80      var timeseries = timeseriesDAO.findByTimeseriesId(id).orElseThrow();
81      timeseriesContainerService.getContainer(timeseries.getContainer().getId());
82      return timeseries;
83    }
84  
85    /**
86     * Deletes timeseries container by id
87     *
88     * @param containerId timeseries container id
89     * @throws InvalidPathException if container could not be found
90     * @throws InvalidAuthException if user has no edit permissions on container
91     */
92    @Transactional
93    public void deleteTimeseriesByContainerId(long containerId) {
94      timeseriesContainerService.getContainer(containerId);
95      timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
96      timeseriesDAO.deleteAllTimeseriesInContainer(containerId);
97    }
98  
99    /**
100    * Retrieve a list of DataPoints for a time-interval with options to grouping/
101    * time slicing, filling and aggregating.
102    *
103    * @return List of TimeseriesDataPoint
104    * @throws InvalidPathException if container is null or deleted
105    * @throws InvalidAuthException if user has no read permissions on container
106    */
107   public List<TimeseriesDataPoint> getDataPointsByTimeseries(
108     long containerId,
109     TimeseriesTuple timeseries,
110     TimeseriesDataPointsQueryParams queryParams
111   ) {
112     timeseriesContainerService.getContainer(containerId);
113     var ts = timeseriesDAO.findTimeseries(containerId, timeseries).orElseThrow();
114 
115     return timeseriesDataPointRepository.queryDataPoints(ts.getTimeseriesId(), ts.getValueType(), queryParams);
116   }
117 
118   @ActivateRequestContext
119   public List<TimeseriesDataPoint> getDatapointsParallelizable(
120     Timeseries timeseries,
121     TimeseriesDataPointsQueryParams queryParams
122   ) {
123     return timeseriesDataPointRepository.queryDataPoints(
124       timeseries.getTimeseriesId(),
125       timeseries.getValueType(),
126       queryParams
127     );
128   }
129 
130   public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
131     Long containerId,
132     List<TimeseriesTuple> timeseriesTupleList,
133     TimeseriesDataPointsQueryParams queryParams
134   ) {
135     timeseriesContainerService.getContainer(containerId);
136 
137     var timeseriesList = timeseriesTupleList
138       .stream()
139       .map(tsTuple -> timeseriesDAO.findTimeseries(containerId, tsTuple).orElseThrow())
140       .toList();
141 
142     ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<>();
143     timeseriesList
144       .parallelStream()
145       .forEach(timeseries ->
146         timeseriesWithDataPointsQueue.add(
147           new TimeseriesWithDataPoints(
148             timeseries.getTimeseriesTuple(),
149             getDatapointsParallelizable(timeseries, queryParams)
150           )
151         )
152       );
153     return new ArrayList<>(timeseriesWithDataPointsQueue);
154   }
155 
156   /**
157    * Saves data points in the database.
158    * If the corresponding timeseries did not exist before, it will be persisted in
159    * the database.
160    *
161    * @param timeseriesContainerId Identifies the TimeseriesContainer
162    * @param timeseries            The timeseries identifiers
163    * @param dataPoints            Data points to be added to the timeseries
164    * @return created timeseries
165    */
166   public Timeseries saveDataPoints(
167     long timeseriesContainerId,
168     TimeseriesTuple timeseries,
169     List<TimeseriesDataPoint> dataPoints
170   ) {
171     timeseriesContainerService.getContainer(timeseriesContainerId);
172     timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
173 
174     DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(
175       dataPoints.getFirst().getValue()
176     ).orElseThrow(InvalidBodyException::new);
177 
178     return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
179   }
180 
181   /**
182    * Saves data points in the database.
183    * If the corresponding timeseries did not exist before, it will be persisted in
184    * the database.
185    *
186    * @param timeseriesContainerId Identifies the TimeseriesContainer
187    * @param timeseriesTuple       The timeseries identifiers
188    * @param dataPoints            Data points to be added to the timeseries
189    * @param dataType              The data type that values in this timeseries
190    *                              will have
191    * @return created timeseries
192    */
193   @Transactional(Transactional.TxType.REQUIRES_NEW)
194   @TransactionConfiguration(timeout = 6000)
195   public Timeseries saveDataPoints(
196     long timeseriesContainerId,
197     TimeseriesTuple timeseriesTuple,
198     List<TimeseriesDataPoint> dataPoints,
199     DataPointValueType dataType
200   ) {
201     var ts = getTimeseries(timeseriesContainerId, timeseriesTuple).orElseGet(() ->
202       createTimeseries(timeseriesContainerId, timeseriesTuple, dataType)
203     );
204     assertDataPointsMatchTimeseriesValueType(ts.getValueType(), dataPoints);
205     timeseriesDataPointRepository.insertManyDataPoints(dataPoints, ts.getTimeseriesId(), ts.getValueType());
206     return ts;
207   }
208 
209   public Optional<Timeseries> getTimeseries(long containerId, TimeseriesTuple timeseries) {
210     return timeseriesDAO.findTimeseries(containerId, timeseries);
211   }
212 
213   /**
214    * Persist a {@link Timeseries} in Neo4j.
215    * If its referenced {@link TimeseriesTuple} does not exist yet create it.
216    *
217    *
218    * @param containerId The ID of the container that the timeseries is associated with
219    * @param timeseriesTuple The {@link TimeseriesTuple} referenced by the timeseries.
220    *                        If it exists the timeseries will reference the existing tuple.
221    *                        If it does not exist yet, it is newly created.
222    * @param incomingValueType The value type the timeseries should have
223    * @return The created {@link Timeseries} as found in the database.
224    */
225   private synchronized Timeseries createTimeseries(
226     long containerId,
227     TimeseriesTuple timeseriesTuple,
228     DataPointValueType incomingValueType
229   ) {
230     timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
231     TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseriesTuple);
232     var container = timeseriesContainerService.getContainer(containerId);
233     // If TimeseriesTuple already exists update it, mainly with the id so the framework can handle the update correctly
234     timeseriesTuple = timeseriesTupleDAO.find(timeseriesTuple).orElse(timeseriesTuple);
235     var tsToCreate = new Timeseries(
236       container,
237       timeseriesTuple,
238       incomingValueType,
239       timeseriesDAO.getCurrentMaximumTimeseriesId() + 1
240     );
241     return timeseriesDAO.createOrUpdate(tsToCreate);
242   }
243 
244   private void assertDataPointsMatchTimeseriesValueType(
245     DataPointValueType valueType,
246     List<TimeseriesDataPoint> dataPoints
247   ) {
248     for (TimeseriesDataPoint dataPoint : dataPoints) {
249       DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(
250         InvalidBodyException::new
251       );
252       assertValueTypeMatchesTimeseries(valueType, expectedType);
253     }
254   }
255 
256   private void assertValueTypeMatchesTimeseries(DataPointValueType tsValueType, DataPointValueType incomingValueType) {
257     // If auto-conversion is enabled, allow transformation from Integer to Double
258     if (
259       autoConvertIntToDouble &&
260       incomingValueType == DataPointValueType.Integer &&
261       tsValueType == DataPointValueType.Double
262     ) return;
263 
264     if (tsValueType != incomingValueType) throw new InvalidBodyException(
265       "Timeseries already exists for data type %s but new data points are of type %s",
266       tsValueType,
267       incomingValueType
268     );
269   }
270 }