1 package de.dlr.shepard.data.timeseries.services;
2
3 import de.dlr.shepard.common.exceptions.InvalidAuthException;
4 import de.dlr.shepard.common.exceptions.InvalidBodyException;
5 import de.dlr.shepard.common.exceptions.InvalidPathException;
6 import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7 import de.dlr.shepard.data.timeseries.model.Timeseries;
8 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12 import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13 import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14 import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15 import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16 import io.quarkus.logging.Log;
17 import io.quarkus.narayana.jta.QuarkusTransaction;
18 import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19 import jakarta.enterprise.context.RequestScoped;
20 import jakarta.enterprise.context.control.ActivateRequestContext;
21 import jakarta.inject.Inject;
22 import jakarta.transaction.Transactional;
23 import jakarta.ws.rs.NotFoundException;
24 import java.sql.SQLException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30
31 @RequestScoped
32 public class TimeseriesService {
33
34 @Inject
35 TimeseriesRepository timeseriesRepository;
36
37 @Inject
38 TimeseriesDataPointRepository timeseriesDataPointRepository;
39
40 @Inject
41 TimeseriesContainerService timeseriesContainerService;
42
43
44
45
46
47
48
49
50
51
52 public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
53 timeseriesContainerService.getContainer(containerId);
54
55 return timeseriesRepository.list("containerId", containerId);
56 }
57
58
59
60
61
62
63
64
65
66
67
68
69 public TimeseriesEntity getTimeseriesById(long containerId, int id) {
70 timeseriesContainerService.getContainer(containerId);
71
72 var timeseries = timeseriesRepository.findById(id);
73 if (timeseries == null) {
74 String errorMsg = String.format(
75 "ID ERROR - Timeseries with id %s in container %s is null or deleted",
76 id,
77 containerId
78 );
79 Log.error(errorMsg);
80 throw new InvalidPathException(errorMsg);
81 }
82 return timeseries;
83 }
84
85
86
87
88
89
90
91
92
93
94
95 public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
96 timeseriesContainerService.getContainer(containerId);
97
98 var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
99 if (timeseriesEntity.isEmpty()) {
100 String errorMsg = String.format(
101 "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
102 timeseries.getMeasurement(),
103 timeseries.getDevice(),
104 timeseries.getLocation(),
105 timeseries.getSymbolicName(),
106 timeseries.getField(),
107 containerId
108 );
109 Log.error(errorMsg);
110 throw new NotFoundException(errorMsg);
111 }
112 return timeseriesEntity.get();
113 }
114
115
116
117
118
119
120
121
122 @Transactional
123 public void deleteTimeseriesByContainerId(long containerId) {
124 timeseriesContainerService.getContainer(containerId);
125 timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
126 this.timeseriesRepository.deleteByContainerId(containerId);
127 }
128
129
130
131
132
133
134
135
136
137 public List<TimeseriesDataPoint> getDataPointsByTimeseries(
138 long containerId,
139 Timeseries timeseries,
140 TimeseriesDataPointsQueryParams queryParams
141 ) {
142 timeseriesContainerService.getContainer(containerId);
143
144 return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 @ActivateRequestContext
165 public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
166 long containerId,
167 Timeseries timeseries,
168 TimeseriesDataPointsQueryParams queryParams
169 ) {
170 Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
171
172 if (timeseriesEntity.isEmpty()) return Collections.emptyList();
173
174 int timeseriesId = timeseriesEntity.get().getId();
175 DataPointValueType valueType = timeseriesEntity.get().getValueType();
176
177 return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
178 }
179
180 public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
181 Long containerId,
182 List<Timeseries> timeseriesList,
183 TimeseriesDataPointsQueryParams queryParams
184 ) {
185 timeseriesContainerService.getContainer(containerId);
186
187 ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
188 TimeseriesWithDataPoints
189 >();
190 timeseriesList
191 .parallelStream()
192 .forEach(timeseries -> {
193 timeseriesWithDataPointsQueue.add(
194 new TimeseriesWithDataPoints(
195 timeseries,
196 getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
197 )
198 );
199 });
200 return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
201 }
202
203
204
205
206
207
208
209
210
211
212
213 public TimeseriesEntity saveDataPoints(
214 long timeseriesContainerId,
215 Timeseries timeseries,
216 List<TimeseriesDataPoint> dataPoints
217 ) {
218 timeseriesContainerService.getContainer(timeseriesContainerId);
219 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
220
221 DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
222 () -> new InvalidBodyException()
223 );
224
225 return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240 @Transactional(Transactional.TxType.REQUIRES_NEW)
241 @TransactionConfiguration(timeout = 6000)
242 public TimeseriesEntity saveDataPoints(
243 long timeseriesContainerId,
244 Timeseries timeseries,
245 List<TimeseriesDataPoint> dataPoints,
246 DataPointValueType dataType
247 ) {
248 timeseriesContainerService.getContainer(timeseriesContainerId);
249 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
250
251 TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
252
253 assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
254
255 timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
256
257 return timeseriesEntity;
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 @Deprecated
278 @Transactional(Transactional.TxType.REQUIRES_NEW)
279 @TransactionConfiguration(timeout = 6000)
280 public TimeseriesEntity saveDataPointsNoChecks(
281 long timeseriesContainerId,
282 Timeseries timeseries,
283 List<TimeseriesDataPoint> dataPoints,
284 DataPointValueType dataType
285 ) {
286 TimeseriesEntity timeseriesEntity = getOrCreateTimeseriesNoChecks(timeseriesContainerId, timeseries, dataType);
287 assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
288 try {
289 timeseriesDataPointRepository.insertManyDataPointsWithCopyCommand(dataPoints, timeseriesEntity);
290 } catch (SQLException e) {
291
292
293 Log.warnf("SQLException during copy insert (expected): %s ", e.getMessage());
294 Log.warn("We are going to repeat the operation with batch insert.");
295 return repeatSaveDataPointsWithBatchInsert(dataPoints, timeseriesEntity);
296 }
297 return timeseriesEntity;
298 }
299
300 @Deprecated
301 @Transactional(Transactional.TxType.REQUIRES_NEW)
302 @TransactionConfiguration(timeout = 6000)
303 public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
304 List<TimeseriesDataPoint> entities,
305 TimeseriesEntity timeseriesEntity
306 ) {
307 timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
308 return timeseriesEntity;
309 }
310
311
312
313
314
315
316
317 @Deprecated
318 private TimeseriesEntity getOrCreateTimeseriesNoChecks(
319 long containerId,
320 Timeseries timeseries,
321 DataPointValueType incomingValueType
322 ) {
323
324 Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
325 if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
326 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
327
328
329 TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
330 QuarkusTransaction.requiringNew()
331 .run(() -> {
332 this.timeseriesRepository.upsert(containerId, timeseriesEntity);
333 });
334 var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
335 return found.get();
336 }
337
338 private TimeseriesEntity getOrCreateTimeseries(
339 long containerId,
340 Timeseries timeseries,
341 DataPointValueType incomingValueType
342 ) {
343 timeseriesContainerService.getContainer(containerId);
344 timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
345
346
347 Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
348
349 if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
350
351 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
352
353
354 TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
355 QuarkusTransaction.requiringNew()
356 .run(() -> {
357 this.timeseriesRepository.upsert(containerId, timeseriesEntity);
358 });
359
360 var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
361 return found.get();
362 }
363
364 private static void assertDataPointsMatchTimeseriesValueType(
365 TimeseriesEntity timeseriesEntity,
366 List<TimeseriesDataPoint> dataPoints
367 ) {
368 for (TimeseriesDataPoint dataPoint : dataPoints) {
369 DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
370 new InvalidBodyException()
371 );
372 assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
373 }
374 }
375
376 private static void assertValueTypeMatchesTimeseries(
377 TimeseriesEntity timeseries,
378 DataPointValueType incomingValueType
379 ) {
380 if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
381 "Timeseries already exists for data type %s but new data points are of type %s",
382 timeseries.getValueType(),
383 incomingValueType
384 );
385 }
386 }