1 package de.dlr.shepard.data.timeseries.repositories;
2
3 import de.dlr.shepard.common.exceptions.InvalidBodyException;
4 import de.dlr.shepard.common.exceptions.InvalidRequestException;
5 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6 import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
8 import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
9 import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
10 import de.dlr.shepard.data.timeseries.model.enums.FillOption;
11 import io.agroal.api.AgroalDataSource;
12 import io.micrometer.core.annotation.Timed;
13 import io.quarkus.logging.Log;
14 import jakarta.enterprise.context.RequestScoped;
15 import jakarta.inject.Inject;
16 import jakarta.persistence.EntityManager;
17 import jakarta.persistence.PersistenceContext;
18 import jakarta.persistence.Query;
19 import java.io.ByteArrayInputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.sql.Connection;
23 import java.sql.SQLException;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28 import org.hibernate.exception.DataException;
29 import org.postgresql.PGConnection;
30 import org.postgresql.copy.CopyManager;
31
32 @RequestScoped
33 public class TimeseriesDataPointRepository {
34
35 private final int INSERT_BATCH_SIZE = 20000;
36
37 @PersistenceContext
38 EntityManager entityManager;
39
40 @Inject
41 AgroalDataSource defaultDataSource;
42
43
44
45
46
47
48
49
50
51
52
53
54 @Timed(value = "shepard.timeseries-data-point.batch-insert")
55 public void insertManyDataPoints(List<TimeseriesDataPoint> entities, TimeseriesEntity timeseriesEntity) {
56 for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
57 int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
58 Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
59
60 try {
61 query.executeUpdate();
62 } catch (DataException ex) {
63 if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
64 throw new InvalidBodyException(
65 "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
66 );
67 }
68 throw ex;
69 }
70 }
71 }
72
73
74
75
76
77
78
79
80 @Timed(value = "shepard.timeseries-data-point.copy-insert")
81 public void insertManyDataPointsWithCopyCommand(
82 List<TimeseriesDataPoint> entities,
83 TimeseriesEntity timeseriesEntity
84 ) throws SQLException {
85 try (Connection conn = defaultDataSource.getConnection()) {
86 PGConnection pgConn = (PGConnection) conn.unwrap(PGConnection.class);
87 CopyManager copyManager = pgConn.getCopyAPI();
88
89 var columnName = getColumnName(timeseriesEntity.getValueType());
90 var sb = new StringBuilder();
91
92 timeseriesEntity.getId();
93
94
95 if (timeseriesEntity.getValueType() == DataPointValueType.String) {
96 for (int i = 0; i < entities.size(); i++) {
97 TimeseriesDataPoint entity = entities.get(i);
98 sb
99 .append(timeseriesEntity.getId())
100 .append(",")
101 .append(entity.getTimestamp())
102 .append(",\"")
103 .append(entity.getValue())
104 .append("\"\n");
105 }
106 } else {
107 for (int i = 0; i < entities.size(); i++) {
108 TimeseriesDataPoint entity = entities.get(i);
109 sb
110 .append(timeseriesEntity.getId())
111 .append(",")
112 .append(entity.getTimestamp())
113 .append(",")
114 .append(entity.getValue())
115 .append("\n");
116 }
117 }
118
119 InputStream input = new ByteArrayInputStream(sb.toString().getBytes());
120 String sql = String.format(
121 "COPY timeseries_data_points (timeseries_id, time, %s) FROM STDIN WITH (FORMAT csv);",
122 columnName
123 );
124
125 copyManager.copyIn(sql, input);
126 } catch (IOException ex) {
127 Log.errorf("IOException during copy insert: %s", ex.getMessage());
128 throw new RuntimeException("IO Error while inserting data points", ex);
129 }
130 }
131
132 @Timed(value = "shepard.timeseries-data-point.compression")
133 public void compressAllChunks() {
134 var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
135 Query query = entityManager.createNativeQuery(sqlString);
136 query.getResultList();
137 }
138
139 @Timed(value = "shepard.timeseries-data-point.query")
140 public List<TimeseriesDataPoint> queryDataPoints(
141 int timeseriesId,
142 DataPointValueType valueType,
143 TimeseriesDataPointsQueryParams queryParams
144 ) {
145 assertNotIntegral(queryParams.getFunction());
146 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
147 assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
148 assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
149 assertAggregationSetForFillOrGrouping(
150 queryParams.getFunction(),
151 queryParams.getTimeSliceNanoseconds(),
152 queryParams.getFillOption()
153 );
154
155 var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
156
157 @SuppressWarnings("unchecked")
158 List<TimeseriesDataPoint> dataPoints = query.getResultList();
159 return dataPoints;
160 }
161
162 @Timed(value = "shepard.timeseries-data-point-aggregate.query")
163 public List<TimeseriesDataPoint> queryAggregationFunction(
164 int timeseriesId,
165 DataPointValueType valueType,
166 TimeseriesDataPointsQueryParams queryParams
167 ) {
168 assertNotIntegral(queryParams.getFunction());
169 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
170
171 var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
172
173 @SuppressWarnings("unchecked")
174 List<TimeseriesDataPoint> dataPoints = query.getResultList();
175 return dataPoints;
176 }
177
178 private Query buildInsertQueryObject(
179 List<TimeseriesDataPoint> entities,
180 int startInclusive,
181 int endExclusive,
182 TimeseriesEntity timeseriesEntity
183 ) {
184 StringBuilder queryString = new StringBuilder();
185 queryString.append(
186 "INSERT INTO timeseries_data_points (timeseries_id, time, " +
187 getColumnName(timeseriesEntity.getValueType()) +
188 ") values "
189 );
190 queryString.append(
191 IntStream.range(startInclusive, endExclusive)
192 .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
193 .collect(Collectors.joining(","))
194 );
195 queryString.append(
196 " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
197 getColumnName(timeseriesEntity.getValueType()) +
198 " = " +
199 "EXCLUDED." +
200 getColumnName(timeseriesEntity.getValueType()) +
201 ";"
202 );
203
204 Query query = entityManager.createNativeQuery(queryString.toString());
205
206 query.setParameter("timeseriesid", timeseriesEntity.getId());
207
208 IntStream.range(startInclusive, endExclusive).forEach(index -> {
209 query.setParameter("time" + index, entities.get(index).getTimestamp());
210 query.setParameter("value" + index, entities.get(index).getValue());
211 });
212
213 return query;
214 }
215
216 private Query buildSelectQueryObject(
217 int timeseriesId,
218 DataPointValueType valueType,
219 TimeseriesDataPointsQueryParams queryParams
220 ) {
221 String columnName = getColumnName(valueType);
222
223 FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
224 var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
225
226 String queryString = "";
227 if (queryParams.getFunction().isPresent()) {
228 AggregateFunction function = queryParams.getFunction().get();
229 if (timeSliceNanoseconds == null) {
230 timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
231 }
232
233 queryString = "SELECT ";
234
235 queryString += switch (fillOption) {
236 case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
237 case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
238 };
239
240 String aggregationString = "";
241 switch (function) {
242 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
243 case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
244 case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
245 case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
246 case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
247 case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
248 case INTEGRAL -> {}
249 }
250
251
252 if (fillOption == FillOption.LINEAR) {
253 aggregationString = String.format("interpolate(%s) as value ", aggregationString);
254 } else if (fillOption == FillOption.PREVIOUS) {
255 aggregationString = String.format("locf(%s) as value ", aggregationString);
256 } else {
257 aggregationString += " as value ";
258 }
259
260 queryString += aggregationString;
261 } else {
262 queryString = String.format("SELECT time, %s ", columnName);
263 }
264
265 queryString += """
266 FROM timeseries_data_points
267 WHERE timeseries_id = :timeseriesId
268 AND time >= :startTimeNano
269 AND time <= :endTimeNano
270 """;
271
272 if (queryParams.getFunction().isPresent()) {
273 queryString += " GROUP BY timestamp ORDER BY timestamp";
274 } else {
275 queryString += " ORDER BY time";
276 }
277
278 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
279
280 if (timeSliceNanoseconds != null) {
281 query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
282 }
283 query.setParameter("timeseriesId", timeseriesId);
284 query.setParameter("startTimeNano", queryParams.getStartTime());
285 query.setParameter("endTimeNano", queryParams.getEndTime());
286
287 return query;
288 }
289
290 private Query buildSelectAggregationFunctionQueryObject(
291 int timeseriesId,
292 DataPointValueType valueType,
293 TimeseriesDataPointsQueryParams queryParams
294 ) {
295 String columnName = getColumnName(valueType);
296
297 String queryString = "";
298 if (queryParams.getFunction().isPresent()) {
299 AggregateFunction function = queryParams.getFunction().get();
300
301 queryString = "SELECT 1 as timestamp, ";
302
303 String aggregationString = "";
304 switch (function) {
305 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
306 case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
307 case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
308 case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
309 case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
310 case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
311 case INTEGRAL -> {}
312 }
313
314 aggregationString += " as value ";
315
316 queryString += aggregationString;
317 } else {
318 queryString = String.format("SELECT time, %s ", columnName);
319 }
320
321 queryString += """
322 FROM timeseries_data_points
323 WHERE timeseries_id = :timeseriesId
324 AND time >= :startTimeNano
325 AND time <= :endTimeNano
326 """;
327
328 Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
329
330 query.setParameter("timeseriesId", timeseriesId);
331 query.setParameter("startTimeNano", queryParams.getStartTime());
332 query.setParameter("endTimeNano", queryParams.getEndTime());
333
334 return query;
335 }
336
337 private String getColumnName(DataPointValueType valueType) {
338 return switch (valueType) {
339 case Double -> "double_value";
340 case Integer -> "int_value";
341 case String -> "string_value";
342 case Boolean -> "boolean_value";
343 };
344 }
345
346
347
348
349 private void assertNotIntegral(Optional<AggregateFunction> function) {
350 if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
351 throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
352 }
353 }
354
355
356
357
358
359
360 private void assertCorrectValueTypesForAggregation(
361 Optional<AggregateFunction> function,
362 DataPointValueType valueType
363 ) {
364 if (
365 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
366 (function.isPresent() &&
367 function.get() != AggregateFunction.COUNT &&
368 function.get() != AggregateFunction.FIRST &&
369 function.get() != AggregateFunction.LAST)
370 ) {
371 throw new InvalidRequestException(
372 "Cannot execute aggregation functions on data points of type boolean or string."
373 );
374 }
375 }
376
377
378
379
380
381 private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
382 if (
383 (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
384 ) {
385 throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
386 }
387 }
388
389
390
391
392 private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
393 if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
394 throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
395 }
396 }
397
398
399
400
401
402 private void assertAggregationSetForFillOrGrouping(
403 Optional<AggregateFunction> function,
404 Optional<Long> timeSliceNanoseconds,
405 Optional<FillOption> fillOption
406 ) {
407 if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
408 throw new InvalidRequestException(
409 "Cannot use gap filling option or grouping of data when no aggregation function is specified."
410 );
411 }
412 }
413 }