1 package de.dlr.shepard.data.timeseries.services;
2
3 import de.dlr.shepard.common.exceptions.InvalidAuthException;
4 import de.dlr.shepard.common.exceptions.InvalidBodyException;
5 import de.dlr.shepard.common.exceptions.InvalidPathException;
6 import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7 import de.dlr.shepard.data.timeseries.model.Timeseries;
8 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12 import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13 import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14 import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15 import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16 import io.quarkus.logging.Log;
17 import io.quarkus.narayana.jta.QuarkusTransaction;
18 import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19 import jakarta.enterprise.context.RequestScoped;
20 import jakarta.enterprise.context.control.ActivateRequestContext;
21 import jakarta.inject.Inject;
22 import jakarta.transaction.Transactional;
23 import jakarta.ws.rs.NotFoundException;
24 import java.sql.SQLException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import org.eclipse.microprofile.config.ConfigProvider;
31
32 @RequestScoped
33 public class TimeseriesService {
34
35 @Inject
36 TimeseriesRepository timeseriesRepository;
37
38 @Inject
39 TimeseriesDataPointRepository timeseriesDataPointRepository;
40
41 @Inject
42 TimeseriesContainerService timeseriesContainerService;
43
44
45
46
47
48
49
50 Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
51 .getOptionalValue("shepard.autoconvert-int", Boolean.class)
52 .orElse(false);
53
54
55
56
57
58
59
60
61
62
63 public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
64 timeseriesContainerService.getContainer(containerId);
65
66 return timeseriesRepository.list("containerId", containerId);
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80 public TimeseriesEntity getTimeseriesById(long containerId, int id) {
81 timeseriesContainerService.getContainer(containerId);
82
83 var timeseries = timeseriesRepository.findById(id);
84 if (timeseries == null) {
85 String errorMsg = String.format(
86 "ID ERROR - Timeseries with id %s in container %s is null or deleted",
87 id,
88 containerId
89 );
90 Log.error(errorMsg);
91 throw new InvalidPathException(errorMsg);
92 }
93 return timeseries;
94 }
95
96
97
98
99
100
101
102
103
104
105
106 public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
107 timeseriesContainerService.getContainer(containerId);
108
109 var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
110 if (timeseriesEntity.isEmpty()) {
111 String errorMsg = String.format(
112 "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
113 timeseries.getMeasurement(),
114 timeseries.getDevice(),
115 timeseries.getLocation(),
116 timeseries.getSymbolicName(),
117 timeseries.getField(),
118 containerId
119 );
120 Log.error(errorMsg);
121 throw new NotFoundException(errorMsg);
122 }
123 return timeseriesEntity.get();
124 }
125
126
127
128
129
130
131
132
133 @Transactional
134 public void deleteTimeseriesByContainerId(long containerId) {
135 timeseriesContainerService.getContainer(containerId);
136 timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
137 this.timeseriesRepository.deleteByContainerId(containerId);
138 }
139
140
141
142
143
144
145
146
147
148 public List<TimeseriesDataPoint> getDataPointsByTimeseries(
149 long containerId,
150 Timeseries timeseries,
151 TimeseriesDataPointsQueryParams queryParams
152 ) {
153 timeseriesContainerService.getContainer(containerId);
154
155 return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 @ActivateRequestContext
176 public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
177 long containerId,
178 Timeseries timeseries,
179 TimeseriesDataPointsQueryParams queryParams
180 ) {
181 Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
182
183 if (timeseriesEntity.isEmpty()) return Collections.emptyList();
184
185 int timeseriesId = timeseriesEntity.get().getId();
186 DataPointValueType valueType = timeseriesEntity.get().getValueType();
187
188 return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
189 }
190
191 public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
192 Long containerId,
193 List<Timeseries> timeseriesList,
194 TimeseriesDataPointsQueryParams queryParams
195 ) {
196 timeseriesContainerService.getContainer(containerId);
197
198 ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
199 TimeseriesWithDataPoints
200 >();
201 timeseriesList
202 .parallelStream()
203 .forEach(timeseries -> {
204 timeseriesWithDataPointsQueue.add(
205 new TimeseriesWithDataPoints(
206 timeseries,
207 getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
208 )
209 );
210 });
211 return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
212 }
213
214
215
216
217
218
219
220
221
222
223
224 public TimeseriesEntity saveDataPoints(
225 long timeseriesContainerId,
226 Timeseries timeseries,
227 List<TimeseriesDataPoint> dataPoints
228 ) {
229 timeseriesContainerService.getContainer(timeseriesContainerId);
230 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
231
232 DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
233 () -> new InvalidBodyException()
234 );
235
236 return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251 @Transactional(Transactional.TxType.REQUIRES_NEW)
252 @TransactionConfiguration(timeout = 6000)
253 public TimeseriesEntity saveDataPoints(
254 long timeseriesContainerId,
255 Timeseries timeseries,
256 List<TimeseriesDataPoint> dataPoints,
257 DataPointValueType dataType
258 ) {
259 timeseriesContainerService.getContainer(timeseriesContainerId);
260 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
261
262 TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
263
264 assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
265
266 timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
267
268 return timeseriesEntity;
269 }
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288 @Deprecated
289 @Transactional(Transactional.TxType.REQUIRES_NEW)
290 @TransactionConfiguration(timeout = 6000)
291 public TimeseriesEntity saveDataPointsNoChecks(
292 long timeseriesContainerId,
293 Timeseries timeseries,
294 List<TimeseriesDataPoint> dataPoints,
295 DataPointValueType dataType
296 ) {
297 TimeseriesEntity timeseriesEntity = getOrCreateTimeseriesNoChecks(timeseriesContainerId, timeseries, dataType);
298 assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
299 try {
300 timeseriesDataPointRepository.insertManyDataPointsWithCopyCommand(dataPoints, timeseriesEntity);
301 } catch (SQLException e) {
302
303
304 Log.warnf("SQLException during copy insert (expected): %s ", e.getMessage());
305 Log.warn("We are going to repeat the operation with batch insert.");
306 return repeatSaveDataPointsWithBatchInsert(dataPoints, timeseriesEntity);
307 }
308 return timeseriesEntity;
309 }
310
311 @Deprecated
312 @Transactional(Transactional.TxType.REQUIRES_NEW)
313 @TransactionConfiguration(timeout = 6000)
314 public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
315 List<TimeseriesDataPoint> entities,
316 TimeseriesEntity timeseriesEntity
317 ) {
318 timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
319 return timeseriesEntity;
320 }
321
322
323
324
325
326
327
328 @Deprecated
329 private TimeseriesEntity getOrCreateTimeseriesNoChecks(
330 long containerId,
331 Timeseries timeseries,
332 DataPointValueType incomingValueType
333 ) {
334
335 Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
336 if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
337 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
338
339
340 TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
341 QuarkusTransaction.requiringNew()
342 .run(() -> {
343 this.timeseriesRepository.upsert(containerId, timeseriesEntity);
344 });
345 var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
346 return found.get();
347 }
348
349 private TimeseriesEntity getOrCreateTimeseries(
350 long containerId,
351 Timeseries timeseries,
352 DataPointValueType incomingValueType
353 ) {
354 timeseriesContainerService.getContainer(containerId);
355 timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
356
357
358 Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
359
360 if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
361
362 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
363
364
365 TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
366 QuarkusTransaction.requiringNew()
367 .run(() -> {
368 this.timeseriesRepository.upsert(containerId, timeseriesEntity);
369 });
370
371 var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
372 return found.get();
373 }
374
375 private void assertDataPointsMatchTimeseriesValueType(
376 TimeseriesEntity timeseriesEntity,
377 List<TimeseriesDataPoint> dataPoints
378 ) {
379 for (TimeseriesDataPoint dataPoint : dataPoints) {
380 DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
381 new InvalidBodyException()
382 );
383 assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
384 }
385 }
386
387 private void assertValueTypeMatchesTimeseries(TimeseriesEntity timeseries, DataPointValueType incomingValueType) {
388
389 if (
390 autoConvertIntToDouble &&
391 incomingValueType == DataPointValueType.Integer &&
392 timeseries.getValueType() == DataPointValueType.Double
393 ) return;
394
395 if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
396 "Timeseries already exists for data type %s but new data points are of type %s",
397 timeseries.getValueType(),
398 incomingValueType
399 );
400 }
401 }