1 package de.dlr.shepard.data.timeseries.services;
2
3 import de.dlr.shepard.common.exceptions.InvalidAuthException;
4 import de.dlr.shepard.common.exceptions.InvalidBodyException;
5 import de.dlr.shepard.common.exceptions.InvalidPathException;
6 import de.dlr.shepard.context.references.timeseriesreference.daos.TimeseriesTupleDAO;
7 import de.dlr.shepard.data.timeseries.daos.TimeseriesDAO;
8 import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
9 import de.dlr.shepard.data.timeseries.model.Timeseries;
10 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
11 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
12 import de.dlr.shepard.data.timeseries.model.TimeseriesTuple;
13 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
14 import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
15 import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
16 import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
17 import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
18 import jakarta.enterprise.context.RequestScoped;
19 import jakarta.enterprise.context.control.ActivateRequestContext;
20 import jakarta.inject.Inject;
21 import jakarta.transaction.Transactional;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.Optional;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.stream.Stream;
28 import org.eclipse.microprofile.config.ConfigProvider;
29
30 @RequestScoped
31 public class TimeseriesService {
32
33 @Inject
34 TimeseriesDAO timeseriesDAO;
35
36 @Inject
37 TimeseriesTupleDAO timeseriesTupleDAO;
38
39 @Inject
40 TimeseriesDataPointRepository timeseriesDataPointRepository;
41
42 @Inject
43 TimeseriesContainerService timeseriesContainerService;
44
45
46
47
48
49
50
51 Boolean autoConvertIntToDouble = ConfigProvider.getConfig()
52 .getOptionalValue("shepard.autoconvert-int", Boolean.class)
53 .orElse(false);
54
55
56
57
58
59
60
61
62
63
64 public Stream<Timeseries> getTimeseriesAvailable(long containerId) {
65 timeseriesContainerService.getContainer(containerId);
66 return timeseriesDAO.getAllTimeseriesInContainer(containerId);
67 }
68
69
70
71
72
73
74
75
76
77
78 public Timeseries getTimeseriesById(Long id)
79 throws NoSuchElementException, InvalidAuthException, InvalidPathException {
80 var timeseries = timeseriesDAO.findByTimeseriesId(id).orElseThrow();
81 timeseriesContainerService.getContainer(timeseries.getContainer().getId());
82 return timeseries;
83 }
84
85
86
87
88
89
90
91
92 @Transactional
93 public void deleteTimeseriesByContainerId(long containerId) {
94 timeseriesContainerService.getContainer(containerId);
95 timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
96 timeseriesDAO.deleteAllTimeseriesInContainer(containerId);
97 }
98
99
100
101
102
103
104
105
106
107 public List<TimeseriesDataPoint> getDataPointsByTimeseries(
108 long containerId,
109 TimeseriesTuple timeseries,
110 TimeseriesDataPointsQueryParams queryParams
111 ) {
112 timeseriesContainerService.getContainer(containerId);
113 var ts = timeseriesDAO.findTimeseries(containerId, timeseries).orElseThrow();
114
115 return timeseriesDataPointRepository.queryDataPoints(ts.getTimeseriesId(), ts.getValueType(), queryParams);
116 }
117
118 @ActivateRequestContext
119 public List<TimeseriesDataPoint> getDatapointsParallelizable(
120 Timeseries timeseries,
121 TimeseriesDataPointsQueryParams queryParams
122 ) {
123 return timeseriesDataPointRepository.queryDataPoints(
124 timeseries.getTimeseriesId(),
125 timeseries.getValueType(),
126 queryParams
127 );
128 }
129
130 public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
131 Long containerId,
132 List<TimeseriesTuple> timeseriesTupleList,
133 TimeseriesDataPointsQueryParams queryParams
134 ) {
135 timeseriesContainerService.getContainer(containerId);
136
137 var timeseriesList = timeseriesTupleList
138 .stream()
139 .map(tsTuple -> timeseriesDAO.findTimeseries(containerId, tsTuple).orElseThrow())
140 .toList();
141
142 ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<>();
143 timeseriesList
144 .parallelStream()
145 .forEach(timeseries ->
146 timeseriesWithDataPointsQueue.add(
147 new TimeseriesWithDataPoints(
148 timeseries.getTimeseriesTuple(),
149 getDatapointsParallelizable(timeseries, queryParams)
150 )
151 )
152 );
153 return new ArrayList<>(timeseriesWithDataPointsQueue);
154 }
155
156
157
158
159
160
161
162
163
164
165
166 public Timeseries saveDataPoints(
167 long timeseriesContainerId,
168 TimeseriesTuple timeseries,
169 List<TimeseriesDataPoint> dataPoints
170 ) {
171 timeseriesContainerService.getContainer(timeseriesContainerId);
172 timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
173
174 DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(
175 dataPoints.getFirst().getValue()
176 ).orElseThrow(InvalidBodyException::new);
177
178 return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
179 }
180
181
182
183
184
185
186
187
188
189
190
191
192
193 @Transactional(Transactional.TxType.REQUIRES_NEW)
194 @TransactionConfiguration(timeout = 6000)
195 public Timeseries saveDataPoints(
196 long timeseriesContainerId,
197 TimeseriesTuple timeseriesTuple,
198 List<TimeseriesDataPoint> dataPoints,
199 DataPointValueType dataType
200 ) {
201 var ts = getTimeseries(timeseriesContainerId, timeseriesTuple).orElseGet(() ->
202 createTimeseries(timeseriesContainerId, timeseriesTuple, dataType)
203 );
204 assertDataPointsMatchTimeseriesValueType(ts.getValueType(), dataPoints);
205 timeseriesDataPointRepository.insertManyDataPoints(dataPoints, ts.getTimeseriesId(), ts.getValueType());
206 return ts;
207 }
208
209 public Optional<Timeseries> getTimeseries(long containerId, TimeseriesTuple timeseries) {
210 return timeseriesDAO.findTimeseries(containerId, timeseries);
211 }
212
213
214
215
216
217
218
219
220
221
222
223
224
225 private synchronized Timeseries createTimeseries(
226 long containerId,
227 TimeseriesTuple timeseriesTuple,
228 DataPointValueType incomingValueType
229 ) {
230 timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
231 TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseriesTuple);
232 var container = timeseriesContainerService.getContainer(containerId);
233
234 timeseriesTuple = timeseriesTupleDAO.find(timeseriesTuple).orElse(timeseriesTuple);
235 var tsToCreate = new Timeseries(
236 container,
237 timeseriesTuple,
238 incomingValueType,
239 timeseriesDAO.getCurrentMaximumTimeseriesId() + 1
240 );
241 return timeseriesDAO.createOrUpdate(tsToCreate);
242 }
243
244 private void assertDataPointsMatchTimeseriesValueType(
245 DataPointValueType valueType,
246 List<TimeseriesDataPoint> dataPoints
247 ) {
248 for (TimeseriesDataPoint dataPoint : dataPoints) {
249 DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(
250 InvalidBodyException::new
251 );
252 assertValueTypeMatchesTimeseries(valueType, expectedType);
253 }
254 }
255
256 private void assertValueTypeMatchesTimeseries(DataPointValueType tsValueType, DataPointValueType incomingValueType) {
257
258 if (
259 autoConvertIntToDouble &&
260 incomingValueType == DataPointValueType.Integer &&
261 tsValueType == DataPointValueType.Double
262 ) return;
263
264 if (tsValueType != incomingValueType) throw new InvalidBodyException(
265 "Timeseries already exists for data type %s but new data points are of type %s",
266 tsValueType,
267 incomingValueType
268 );
269 }
270 }