1 package de.dlr.shepard.data.timeseries.repositories;
2
3 import de.dlr.shepard.common.exceptions.InvalidBodyException;
4 import de.dlr.shepard.common.exceptions.InvalidRequestException;
5 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7 import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
8 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
9 import de.dlr.shepard.data.timeseries.model.enums.FillOption;
10 import io.micrometer.core.annotation.Timed;
11 import jakarta.enterprise.context.RequestScoped;
12 import jakarta.persistence.EntityManager;
13 import jakarta.persistence.PersistenceContext;
14 import jakarta.persistence.Query;
15 import java.util.List;
16 import java.util.Optional;
17 import java.util.stream.Collectors;
18 import java.util.stream.IntStream;
19 import org.hibernate.exception.DataException;
20
21 @RequestScoped
22 public class TimeseriesDataPointRepository {
23
24 private static final int INSERT_BATCH_SIZE = 20000;
25
26 @PersistenceContext
27 EntityManager entityManager;
28
29
30
31
32
33
34
35
36
37
38
39
40 @Timed(value = "shepard.timeseries-data-point.batch-insert")
41 public void insertManyDataPoints(
42 List<TimeseriesDataPoint> entities,
43 long timeseriesId,
44 DataPointValueType valueType
45 ) {
46 for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
47 int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
48 Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesId, valueType);
49
50 try {
51 query.executeUpdate();
52 } catch (DataException ex) {
53 if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
54 throw new InvalidBodyException(
55 "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
56 );
57 }
58 throw ex;
59 }
60 }
61 }
62
63 @Timed(value = "shepard.timeseries-data-point.compression")
64 public void compressAllChunks() {
65 var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
66 Query query = entityManager.createNativeQuery(sqlString);
67 query.getResultList();
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @Timed(value = "shepard.timeseries-data-point.query")
85 public List<TimeseriesDataPoint> queryDataPoints(
86 long timeseriesId,
87 DataPointValueType valueType,
88 TimeseriesDataPointsQueryParams queryParams
89 ) {
90 assertNotIntegral(queryParams.getFunction());
91 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
92 assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
93 assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
94 assertAggregationSetForFillOrGrouping(
95 queryParams.getFunction(),
96 queryParams.getTimeSliceNanoseconds(),
97 queryParams.getFillOption()
98 );
99
100 var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
101
102 @SuppressWarnings("unchecked")
103 List<TimeseriesDataPoint> dataPoints = query.getResultList();
104 return dataPoints;
105 }
106
107 @Timed(value = "shepard.timeseries-data-point-aggregate.query")
108 public List<TimeseriesDataPoint> queryAggregationFunction(
109 long timeseriesId,
110 DataPointValueType valueType,
111 TimeseriesDataPointsQueryParams queryParams
112 ) {
113 assertNotIntegral(queryParams.getFunction());
114 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
115
116 var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
117
118 @SuppressWarnings("unchecked")
119 List<TimeseriesDataPoint> dataPoints = query.getResultList();
120 return dataPoints;
121 }
122
123 private Query buildInsertQueryObject(
124 List<TimeseriesDataPoint> entities,
125 int startInclusive,
126 int endExclusive,
127 long timeseriesId,
128 DataPointValueType valueType
129 ) {
130 StringBuilder queryString = new StringBuilder();
131 queryString.append(
132 "INSERT INTO timeseries_data_points (timeseries_id, time, " + getColumnName(valueType) + ") values "
133 );
134 queryString.append(
135 IntStream.range(startInclusive, endExclusive)
136 .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
137 .collect(Collectors.joining(","))
138 );
139 queryString.append(
140 " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
141 getColumnName(valueType) +
142 " = " +
143 "EXCLUDED." +
144 getColumnName(valueType) +
145 ";"
146 );
147
148 Query query = entityManager.createNativeQuery(queryString.toString());
149
150 query.setParameter("timeseriesid", timeseriesId);
151
152 IntStream.range(startInclusive, endExclusive).forEach(index -> {
153 query.setParameter("time" + index, entities.get(index).getTimestamp());
154 query.setParameter("value" + index, entities.get(index).getValue());
155 });
156
157 return query;
158 }
159
160 private Query buildSelectQueryObject(
161 long timeseriesId,
162 DataPointValueType valueType,
163 TimeseriesDataPointsQueryParams queryParams
164 ) {
165 String columnName = getColumnName(valueType);
166
167 FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
168 var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
169
170 String queryString = "";
171 if (queryParams.getFunction().isPresent()) {
172 AggregateFunction function = queryParams.getFunction().get();
173 if (timeSliceNanoseconds == null) {
174 timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
175 }
176
177 queryString = "SELECT ";
178
179 queryString += switch (fillOption) {
180 case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
181 case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
182 };
183
184 String aggregationString = "";
185 switch (function) {
186 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
187 case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
188 case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
189 case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
190 case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
191 case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
192 case INTEGRAL -> {}
193 }
194
195
196 if (fillOption == FillOption.LINEAR) {
197 aggregationString = "interpolate(%s) as value ".formatted(aggregationString);
198 } else if (fillOption == FillOption.PREVIOUS) {
199 aggregationString = "locf(%s) as value ".formatted(aggregationString);
200 } else {
201 aggregationString += " as value ";
202 }
203
204 queryString += aggregationString;
205 } else {
206 queryString = "SELECT time, %s ".formatted(columnName);
207 }
208
209 queryString += """
210 FROM timeseries_data_points
211 WHERE timeseries_id = :timeseriesId
212 AND time >= :startTimeNano
213 AND time <= :endTimeNano
214 """;
215
216 if (queryParams.getFunction().isPresent()) {
217 queryString += " GROUP BY timestamp ORDER BY timestamp";
218 } else {
219 queryString += " ORDER BY time";
220 }
221
222 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
223
224 if (timeSliceNanoseconds != null) {
225 query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
226 }
227 query.setParameter("timeseriesId", timeseriesId);
228 query.setParameter("startTimeNano", queryParams.getStartTime());
229 query.setParameter("endTimeNano", queryParams.getEndTime());
230
231 return query;
232 }
233
234 private Query buildSelectAggregationFunctionQueryObject(
235 long timeseriesId,
236 DataPointValueType valueType,
237 TimeseriesDataPointsQueryParams queryParams
238 ) {
239 String columnName = getColumnName(valueType);
240
241 String queryString = "";
242 if (queryParams.getFunction().isPresent()) {
243 AggregateFunction function = queryParams.getFunction().get();
244
245 queryString = "SELECT 1 as timestamp, ";
246
247 String aggregationString = "";
248 switch (function) {
249 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
250 case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
251 case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
252 case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
253 case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
254 case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
255 case INTEGRAL -> {}
256 }
257
258 aggregationString += " as value ";
259
260 queryString += aggregationString;
261 } else {
262 queryString = "SELECT time, %s ".formatted(columnName);
263 }
264
265 queryString += """
266 FROM timeseries_data_points
267 WHERE timeseries_id = :timeseriesId
268 AND time >= :startTimeNano
269 AND time <= :endTimeNano
270 """;
271
272 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
273
274 query.setParameter("timeseriesId", timeseriesId);
275 query.setParameter("startTimeNano", queryParams.getStartTime());
276 query.setParameter("endTimeNano", queryParams.getEndTime());
277
278 return query;
279 }
280
281 private String getColumnName(DataPointValueType valueType) {
282 return switch (valueType) {
283 case Double -> "double_value";
284 case Integer -> "int_value";
285 case String -> "string_value";
286 case Boolean -> "boolean_value";
287 };
288 }
289
290
291
292
293 private void assertNotIntegral(Optional<AggregateFunction> function) {
294 if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
295 throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
296 }
297 }
298
299
300
301
302
303
304 private void assertCorrectValueTypesForAggregation(
305 Optional<AggregateFunction> function,
306 DataPointValueType valueType
307 ) {
308 if (
309 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
310 (function.isPresent() &&
311 function.get() != AggregateFunction.COUNT &&
312 function.get() != AggregateFunction.FIRST &&
313 function.get() != AggregateFunction.LAST)
314 ) {
315 throw new InvalidRequestException(
316 "Cannot execute aggregation functions on data points of type boolean or string."
317 );
318 }
319 }
320
321
322
323
324
325 private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
326 if (
327 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
328 ) {
329 throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
330 }
331 }
332
333
334
335
336 private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
337 if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
338 throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
339 }
340 }
341
342
343
344
345
346 private void assertAggregationSetForFillOrGrouping(
347 Optional<AggregateFunction> function,
348 Optional<Long> timeSliceNanoseconds,
349 Optional<FillOption> fillOption
350 ) {
351 if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
352 throw new InvalidRequestException(
353 "Cannot use gap filling option or grouping of data when no aggregation function is specified."
354 );
355 }
356 }
357 }