1 package de.dlr.shepard.data.timeseries.repositories;
2
3 import de.dlr.shepard.common.exceptions.InvalidBodyException;
4 import de.dlr.shepard.common.exceptions.InvalidRequestException;
5 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
8 import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
9 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
10 import de.dlr.shepard.data.timeseries.model.enums.FillOption;
11 import io.agroal.api.AgroalDataSource;
12 import io.micrometer.core.annotation.Timed;
13 import io.quarkus.logging.Log;
14 import jakarta.enterprise.context.RequestScoped;
15 import jakarta.inject.Inject;
16 import jakarta.persistence.EntityManager;
17 import jakarta.persistence.PersistenceContext;
18 import jakarta.persistence.Query;
19 import java.io.ByteArrayInputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.sql.Connection;
23 import java.sql.SQLException;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28 import org.hibernate.exception.DataException;
29 import org.postgresql.PGConnection;
30 import org.postgresql.copy.CopyManager;
31
32 @RequestScoped
33 public class TimeseriesDataPointRepository {
34
35 private final int INSERT_BATCH_SIZE = 20000;
36
37 @PersistenceContext
38 EntityManager entityManager;
39
40 @Inject
41 AgroalDataSource defaultDataSource;
42
43
44
45
46
47
48
49
50
51
52
53
54 @Timed(value = "shepard.timeseries-data-point.batch-insert")
55 public void insertManyDataPoints(List<TimeseriesDataPoint> entities, TimeseriesEntity timeseriesEntity) {
56 for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
57 int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
58 Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
59
60 try {
61 query.executeUpdate();
62 } catch (DataException ex) {
63 if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
64 throw new InvalidBodyException(
65 "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
66 );
67 }
68 throw ex;
69 }
70 }
71 }
72
73
74
75
76
77
78
79
80 @Timed(value = "shepard.timeseries-data-point.copy-insert")
81 public void insertManyDataPointsWithCopyCommand(
82 List<TimeseriesDataPoint> entities,
83 TimeseriesEntity timeseriesEntity
84 ) throws SQLException {
85 try (Connection conn = defaultDataSource.getConnection()) {
86 PGConnection pgConn = (PGConnection) conn.unwrap(PGConnection.class);
87 CopyManager copyManager = pgConn.getCopyAPI();
88
89 var columnName = getColumnName(timeseriesEntity.getValueType());
90 var sb = new StringBuilder();
91
92 timeseriesEntity.getId();
93
94
95 if (timeseriesEntity.getValueType() == DataPointValueType.String) {
96 for (int i = 0; i < entities.size(); i++) {
97 TimeseriesDataPoint entity = entities.get(i);
98 sb
99 .append(timeseriesEntity.getId())
100 .append(",")
101 .append(entity.getTimestamp())
102 .append(",\"")
103 .append(entity.getValue())
104 .append("\"\n");
105 }
106 } else {
107 for (int i = 0; i < entities.size(); i++) {
108 TimeseriesDataPoint entity = entities.get(i);
109 sb
110 .append(timeseriesEntity.getId())
111 .append(",")
112 .append(entity.getTimestamp())
113 .append(",")
114 .append(entity.getValue())
115 .append("\n");
116 }
117 }
118
119 InputStream input = new ByteArrayInputStream(sb.toString().getBytes());
120 String sql =
121 "COPY timeseries_data_points (timeseries_id, time, %s) FROM STDIN WITH (FORMAT csv);".formatted(columnName);
122
123 copyManager.copyIn(sql, input);
124 } catch (IOException ex) {
125 Log.errorf("IOException during copy insert: %s", ex.getMessage());
126 throw new RuntimeException("IO Error while inserting data points", ex);
127 }
128 }
129
130 @Timed(value = "shepard.timeseries-data-point.compression")
131 public void compressAllChunks() {
132 var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
133 Query query = entityManager.createNativeQuery(sqlString);
134 query.getResultList();
135 }
136
137 @Timed(value = "shepard.timeseries-data-point.query")
138 public List<TimeseriesDataPoint> queryDataPoints(
139 int timeseriesId,
140 DataPointValueType valueType,
141 TimeseriesDataPointsQueryParams queryParams
142 ) {
143 assertNotIntegral(queryParams.getFunction());
144 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
145 assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
146 assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
147 assertAggregationSetForFillOrGrouping(
148 queryParams.getFunction(),
149 queryParams.getTimeSliceNanoseconds(),
150 queryParams.getFillOption()
151 );
152
153 var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
154
155 @SuppressWarnings("unchecked")
156 List<TimeseriesDataPoint> dataPoints = query.getResultList();
157 return dataPoints;
158 }
159
160 @Timed(value = "shepard.timeseries-data-point-aggregate.query")
161 public List<TimeseriesDataPoint> queryAggregationFunction(
162 int timeseriesId,
163 DataPointValueType valueType,
164 TimeseriesDataPointsQueryParams queryParams
165 ) {
166 assertNotIntegral(queryParams.getFunction());
167 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
168
169 var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
170
171 @SuppressWarnings("unchecked")
172 List<TimeseriesDataPoint> dataPoints = query.getResultList();
173 return dataPoints;
174 }
175
176 private Query buildInsertQueryObject(
177 List<TimeseriesDataPoint> entities,
178 int startInclusive,
179 int endExclusive,
180 TimeseriesEntity timeseriesEntity
181 ) {
182 StringBuilder queryString = new StringBuilder();
183 queryString.append(
184 "INSERT INTO timeseries_data_points (timeseries_id, time, " +
185 getColumnName(timeseriesEntity.getValueType()) +
186 ") values "
187 );
188 queryString.append(
189 IntStream.range(startInclusive, endExclusive)
190 .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
191 .collect(Collectors.joining(","))
192 );
193 queryString.append(
194 " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
195 getColumnName(timeseriesEntity.getValueType()) +
196 " = " +
197 "EXCLUDED." +
198 getColumnName(timeseriesEntity.getValueType()) +
199 ";"
200 );
201
202 Query query = entityManager.createNativeQuery(queryString.toString());
203
204 query.setParameter("timeseriesid", timeseriesEntity.getId());
205
206 IntStream.range(startInclusive, endExclusive).forEach(index -> {
207 query.setParameter("time" + index, entities.get(index).getTimestamp());
208 query.setParameter("value" + index, entities.get(index).getValue());
209 });
210
211 return query;
212 }
213
214 private Query buildSelectQueryObject(
215 int timeseriesId,
216 DataPointValueType valueType,
217 TimeseriesDataPointsQueryParams queryParams
218 ) {
219 String columnName = getColumnName(valueType);
220
221 FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
222 var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
223
224 String queryString = "";
225 if (queryParams.getFunction().isPresent()) {
226 AggregateFunction function = queryParams.getFunction().get();
227 if (timeSliceNanoseconds == null) {
228 timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
229 }
230
231 queryString = "SELECT ";
232
233 queryString += switch (fillOption) {
234 case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
235 case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
236 };
237
238 String aggregationString = "";
239 switch (function) {
240 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
241 case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
242 case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
243 case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
244 case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
245 case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
246 case INTEGRAL -> {}
247 }
248
249
250 if (fillOption == FillOption.LINEAR) {
251 aggregationString = "interpolate(%s) as value ".formatted(aggregationString);
252 } else if (fillOption == FillOption.PREVIOUS) {
253 aggregationString = "locf(%s) as value ".formatted(aggregationString);
254 } else {
255 aggregationString += " as value ";
256 }
257
258 queryString += aggregationString;
259 } else {
260 queryString = "SELECT time, %s ".formatted(columnName);
261 }
262
263 queryString += """
264 FROM timeseries_data_points
265 WHERE timeseries_id = :timeseriesId
266 AND time >= :startTimeNano
267 AND time <= :endTimeNano
268 """;
269
270 if (queryParams.getFunction().isPresent()) {
271 queryString += " GROUP BY timestamp ORDER BY timestamp";
272 } else {
273 queryString += " ORDER BY time";
274 }
275
276 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
277
278 if (timeSliceNanoseconds != null) {
279 query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
280 }
281 query.setParameter("timeseriesId", timeseriesId);
282 query.setParameter("startTimeNano", queryParams.getStartTime());
283 query.setParameter("endTimeNano", queryParams.getEndTime());
284
285 return query;
286 }
287
288 private Query buildSelectAggregationFunctionQueryObject(
289 int timeseriesId,
290 DataPointValueType valueType,
291 TimeseriesDataPointsQueryParams queryParams
292 ) {
293 String columnName = getColumnName(valueType);
294
295 String queryString = "";
296 if (queryParams.getFunction().isPresent()) {
297 AggregateFunction function = queryParams.getFunction().get();
298
299 queryString = "SELECT 1 as timestamp, ";
300
301 String aggregationString = "";
302 switch (function) {
303 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
304 case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
305 case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
306 case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
307 case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
308 case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
309 case INTEGRAL -> {}
310 }
311
312 aggregationString += " as value ";
313
314 queryString += aggregationString;
315 } else {
316 queryString = "SELECT time, %s ".formatted(columnName);
317 }
318
319 queryString += """
320 FROM timeseries_data_points
321 WHERE timeseries_id = :timeseriesId
322 AND time >= :startTimeNano
323 AND time <= :endTimeNano
324 """;
325
326 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
327
328 query.setParameter("timeseriesId", timeseriesId);
329 query.setParameter("startTimeNano", queryParams.getStartTime());
330 query.setParameter("endTimeNano", queryParams.getEndTime());
331
332 return query;
333 }
334
335 private String getColumnName(DataPointValueType valueType) {
336 return switch (valueType) {
337 case Double -> "double_value";
338 case Integer -> "int_value";
339 case String -> "string_value";
340 case Boolean -> "boolean_value";
341 };
342 }
343
344
345
346
347 private void assertNotIntegral(Optional<AggregateFunction> function) {
348 if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
349 throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
350 }
351 }
352
353
354
355
356
357
358 private void assertCorrectValueTypesForAggregation(
359 Optional<AggregateFunction> function,
360 DataPointValueType valueType
361 ) {
362 if (
363 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
364 (function.isPresent() &&
365 function.get() != AggregateFunction.COUNT &&
366 function.get() != AggregateFunction.FIRST &&
367 function.get() != AggregateFunction.LAST)
368 ) {
369 throw new InvalidRequestException(
370 "Cannot execute aggregation functions on data points of type boolean or string."
371 );
372 }
373 }
374
375
376
377
378
379 private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
380 if (
381 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
382 ) {
383 throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
384 }
385 }
386
387
388
389
390 private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
391 if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
392 throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
393 }
394 }
395
396
397
398
399
400 private void assertAggregationSetForFillOrGrouping(
401 Optional<AggregateFunction> function,
402 Optional<Long> timeSliceNanoseconds,
403 Optional<FillOption> fillOption
404 ) {
405 if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
406 throw new InvalidRequestException(
407 "Cannot use gap filling option or grouping of data when no aggregation function is specified."
408 );
409 }
410 }
411 }