1 package de.dlr.shepard.data.timeseries.services;
2
3 import de.dlr.shepard.common.exceptions.InvalidAuthException;
4 import de.dlr.shepard.common.exceptions.InvalidBodyException;
5 import de.dlr.shepard.common.exceptions.InvalidPathException;
6 import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
7 import de.dlr.shepard.data.timeseries.model.Timeseries;
8 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
9 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
10 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
11 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
12 import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
13 import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
14 import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
15 import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
16 import io.quarkus.logging.Log;
17 import io.quarkus.narayana.jta.QuarkusTransaction;
18 import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
19 import jakarta.enterprise.context.RequestScoped;
20 import jakarta.enterprise.context.control.ActivateRequestContext;
21 import jakarta.inject.Inject;
22 import jakarta.transaction.Transactional;
23 import jakarta.ws.rs.NotFoundException;
24 import java.sql.SQLException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import org.eclipse.microprofile.config.ConfigProvider;
31
32 @RequestScoped
33 public class TimeseriesService {
34
35 @Inject
36 TimeseriesRepository timeseriesRepository;
37
38 @Inject
39 TimeseriesDataPointRepository timeseriesDataPointRepository;
40
41 @Inject
42 TimeseriesContainerService timeseriesContainerService;
43
44
45
46
47
48
49
50 Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
51 .getOptionalValue("shepard.autoconvert-int", Boolean.class)
52 .orElse(false);
53
54
55
56
57
58
59
60
61
62
63 public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
64 timeseriesContainerService.getContainer(containerId);
65
66 return timeseriesRepository.list("containerId", containerId);
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80 public TimeseriesEntity getTimeseriesById(long containerId, int id) {
81 timeseriesContainerService.getContainer(containerId);
82
83 var timeseries = timeseriesRepository.findById(id);
84 if (timeseries == null) {
85 String errorMsg = String.format(
86 "ID ERROR - Timeseries with id %s in container %s is null or deleted",
87 id,
88 containerId
89 );
90 Log.error(errorMsg);
91 throw new InvalidPathException(errorMsg);
92 }
93 return timeseries;
94 }
95
96
97
98
99
100
101
102
103
104
105
106 public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
107 timeseriesContainerService.getContainer(containerId);
108
109 var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
110 if (timeseriesEntity.isEmpty()) {
111 String errorMsg = String.format(
112 "Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
113 timeseries.getMeasurement(),
114 timeseries.getDevice(),
115 timeseries.getLocation(),
116 timeseries.getSymbolicName(),
117 timeseries.getField(),
118 containerId
119 );
120 Log.error(errorMsg);
121 throw new NotFoundException(errorMsg);
122 }
123 return timeseriesEntity.get();
124 }
125
126
127
128
129
130
131
132
133 @Transactional
134 public void deleteTimeseriesByContainerId(long containerId) {
135 timeseriesContainerService.getContainer(containerId);
136 timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
137 this.timeseriesRepository.deleteByContainerId(containerId);
138 }
139
140
141
142
143
144
145
146
147
148 public List<TimeseriesDataPoint> getDataPointsByTimeseries(
149 long containerId,
150 Timeseries timeseries,
151 TimeseriesDataPointsQueryParams queryParams
152 ) {
153 timeseriesContainerService.getContainer(containerId);
154
155 return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 @ActivateRequestContext
176 public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
177 long containerId,
178 Timeseries timeseries,
179 TimeseriesDataPointsQueryParams queryParams
180 ) {
181 Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
182
183 if (timeseriesEntity.isEmpty()) return Collections.emptyList();
184
185 int timeseriesId = timeseriesEntity.get().getId();
186 DataPointValueType valueType = timeseriesEntity.get().getValueType();
187
188 return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
189 }
190
191 public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
192 Long containerId,
193 List<Timeseries> timeseriesList,
194 TimeseriesDataPointsQueryParams queryParams
195 ) {
196 timeseriesContainerService.getContainer(containerId);
197
198 ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
199 TimeseriesWithDataPoints
200 >();
201 timeseriesList
202 .parallelStream()
203 .forEach(timeseries -> {
204 timeseriesWithDataPointsQueue.add(
205 new TimeseriesWithDataPoints(
206 timeseries,
207 getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
208 )
209 );
210 });
211 return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
212 }
213
214
215
216
217
218
219
220
221
222
223
224 public TimeseriesEntity saveDataPoints(
225 long timeseriesContainerId,
226 Timeseries timeseries,
227 List<TimeseriesDataPoint> dataPoints
228 ) {
229 timeseriesContainerService.getContainer(timeseriesContainerId);
230 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
231
232 DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
233 () -> new InvalidBodyException()
234 );
235
236 return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251 @Transactional(Transactional.TxType.REQUIRES_NEW)
252 @TransactionConfiguration(timeout = 6000)
253 public TimeseriesEntity saveDataPoints(
254 long timeseriesContainerId,
255 Timeseries timeseries,
256 List<TimeseriesDataPoint> dataPoints,
257 DataPointValueType dataType
258 ) {
259 timeseriesContainerService.getContainer(timeseriesContainerId);
260 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
261
262 TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
263
264 assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
265
266 timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
267
268 return timeseriesEntity;
269 }
270
271 @Deprecated
272 @Transactional(Transactional.TxType.REQUIRES_NEW)
273 @TransactionConfiguration(timeout = 6000)
274 public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
275 List<TimeseriesDataPoint> entities,
276 TimeseriesEntity timeseriesEntity
277 ) {
278 timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
279 return timeseriesEntity;
280 }
281
282 private TimeseriesEntity getOrCreateTimeseries(
283 long containerId,
284 Timeseries timeseries,
285 DataPointValueType incomingValueType
286 ) {
287 timeseriesContainerService.getContainer(containerId);
288 timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
289
290
291 Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
292
293 if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
294
295 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
296
297
298 TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
299 QuarkusTransaction.requiringNew()
300 .run(() -> {
301 this.timeseriesRepository.upsert(containerId, timeseriesEntity);
302 });
303
304 var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
305 return found.get();
306 }
307
308 private void assertDataPointsMatchTimeseriesValueType(
309 TimeseriesEntity timeseriesEntity,
310 List<TimeseriesDataPoint> dataPoints
311 ) {
312 for (TimeseriesDataPoint dataPoint : dataPoints) {
313 DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
314 new InvalidBodyException()
315 );
316 assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
317 }
318 }
319
320 private void assertValueTypeMatchesTimeseries(TimeseriesEntity timeseries, DataPointValueType incomingValueType) {
321
322 if (
323 autoConvertIntToDouble &&
324 incomingValueType == DataPointValueType.Integer &&
325 timeseries.getValueType() == DataPointValueType.Double
326 ) return;
327
328 if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
329 "Timeseries already exists for data type %s but new data points are of type %s",
330 timeseries.getValueType(),
331 incomingValueType
332 );
333 }
334 }