View Javadoc
1   package de.dlr.shepard.data.timeseries.repositories;
2   
3   import de.dlr.shepard.common.exceptions.InvalidBodyException;
4   import de.dlr.shepard.common.exceptions.InvalidRequestException;
5   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7   import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
8   import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
9   import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
10  import de.dlr.shepard.data.timeseries.model.enums.FillOption;
11  import io.agroal.api.AgroalDataSource;
12  import io.micrometer.core.annotation.Timed;
13  import io.quarkus.logging.Log;
14  import jakarta.enterprise.context.RequestScoped;
15  import jakarta.inject.Inject;
16  import jakarta.persistence.EntityManager;
17  import jakarta.persistence.PersistenceContext;
18  import jakarta.persistence.Query;
19  import java.io.ByteArrayInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.sql.Connection;
23  import java.sql.SQLException;
24  import java.util.List;
25  import java.util.Optional;
26  import java.util.stream.Collectors;
27  import java.util.stream.IntStream;
28  import org.hibernate.exception.DataException;
29  import org.postgresql.PGConnection;
30  import org.postgresql.copy.CopyManager;
31  
32  @RequestScoped
33  public class TimeseriesDataPointRepository {
34  
35    private final int INSERT_BATCH_SIZE = 20000;
36  
37    @PersistenceContext
38    EntityManager entityManager;
39  
40    @Inject
41    AgroalDataSource defaultDataSource;
42  
43    /**
44     * Insert a list of timeseries data points into the database.
45     *
46     * @param entities         list of timeseries data points
47     * @param timeseriesEntity
48     * @throws InvalidBodyException can be thrown when 'entities' contains the same
49     *                              timestamp more than once (read more in
50     *                              architectural documentation: 'Building Block
51     *                              View' -> 'Timeseries: Multiple Values for One
52     *                              Timestamp')
53     */
54    @Timed(value = "shepard.timeseries-data-point.batch-insert")
55    public void insertManyDataPoints(List<TimeseriesDataPoint> entities, TimeseriesEntity timeseriesEntity) {
56      for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
57        int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
58        Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
59  
60        try {
61          query.executeUpdate();
62        } catch (DataException ex) {
63          if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
64            throw new InvalidBodyException(
65              "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
66            );
67          }
68          throw ex;
69        }
70      }
71    }
72  
73    /**
74     * Insert a list of timeseries data points into the database using the COPY command.
75     * This is used by the influxdb migration but can also be used for csv import or
76     * similar scenarios.
77     * @param entities
78     * @param timeseriesEntity
79     */
80    @Timed(value = "shepard.timeseries-data-point.copy-insert")
81    public void insertManyDataPointsWithCopyCommand(
82      List<TimeseriesDataPoint> entities,
83      TimeseriesEntity timeseriesEntity
84    ) throws SQLException {
85      try (Connection conn = defaultDataSource.getConnection()) {
86        PGConnection pgConn = (PGConnection) conn.unwrap(PGConnection.class);
87        CopyManager copyManager = pgConn.getCopyAPI();
88  
89        var columnName = getColumnName(timeseriesEntity.getValueType());
90        var sb = new StringBuilder();
91  
92        timeseriesEntity.getId();
93  
94        // Strings must be quoted in double quotes in case they contain a comma which is also the delimiter
95        if (timeseriesEntity.getValueType() == DataPointValueType.String) {
96          for (int i = 0; i < entities.size(); i++) {
97            TimeseriesDataPoint entity = entities.get(i);
98            sb
99              .append(timeseriesEntity.getId())
100             .append(",")
101             .append(entity.getTimestamp())
102             .append(",\"")
103             .append(entity.getValue())
104             .append("\"\n");
105         }
106       } else {
107         for (int i = 0; i < entities.size(); i++) {
108           TimeseriesDataPoint entity = entities.get(i);
109           sb
110             .append(timeseriesEntity.getId())
111             .append(",")
112             .append(entity.getTimestamp())
113             .append(",")
114             .append(entity.getValue())
115             .append("\n");
116         }
117       }
118 
119       InputStream input = new ByteArrayInputStream(sb.toString().getBytes());
120       String sql =
121         "COPY timeseries_data_points (timeseries_id, time, %s) FROM STDIN WITH (FORMAT csv);".formatted(columnName);
122 
123       copyManager.copyIn(sql, input);
124     } catch (IOException ex) {
125       Log.errorf("IOException during copy insert: %s", ex.getMessage());
126       throw new RuntimeException("IO Error while inserting data points", ex);
127     }
128   }
129 
130   @Timed(value = "shepard.timeseries-data-point.compression")
131   public void compressAllChunks() {
132     var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
133     Query query = entityManager.createNativeQuery(sqlString);
134     query.getResultList();
135   }
136 
137   @Timed(value = "shepard.timeseries-data-point.query")
138   public List<TimeseriesDataPoint> queryDataPoints(
139     int timeseriesId,
140     DataPointValueType valueType,
141     TimeseriesDataPointsQueryParams queryParams
142   ) {
143     assertNotIntegral(queryParams.getFunction());
144     assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
145     assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
146     assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
147     assertAggregationSetForFillOrGrouping(
148       queryParams.getFunction(),
149       queryParams.getTimeSliceNanoseconds(),
150       queryParams.getFillOption()
151     );
152 
153     var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
154 
155     @SuppressWarnings("unchecked")
156     List<TimeseriesDataPoint> dataPoints = query.getResultList();
157     return dataPoints;
158   }
159 
160   @Timed(value = "shepard.timeseries-data-point-aggregate.query")
161   public List<TimeseriesDataPoint> queryAggregationFunction(
162     int timeseriesId,
163     DataPointValueType valueType,
164     TimeseriesDataPointsQueryParams queryParams
165   ) {
166     assertNotIntegral(queryParams.getFunction());
167     assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
168 
169     var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
170 
171     @SuppressWarnings("unchecked")
172     List<TimeseriesDataPoint> dataPoints = query.getResultList();
173     return dataPoints;
174   }
175 
176   private Query buildInsertQueryObject(
177     List<TimeseriesDataPoint> entities,
178     int startInclusive,
179     int endExclusive,
180     TimeseriesEntity timeseriesEntity
181   ) {
182     StringBuilder queryString = new StringBuilder();
183     queryString.append(
184       "INSERT INTO timeseries_data_points (timeseries_id, time, " +
185       getColumnName(timeseriesEntity.getValueType()) +
186       ") values "
187     );
188     queryString.append(
189       IntStream.range(startInclusive, endExclusive)
190         .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
191         .collect(Collectors.joining(","))
192     );
193     queryString.append(
194       " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
195       getColumnName(timeseriesEntity.getValueType()) +
196       " = " +
197       "EXCLUDED." +
198       getColumnName(timeseriesEntity.getValueType()) +
199       ";"
200     );
201 
202     Query query = entityManager.createNativeQuery(queryString.toString());
203 
204     query.setParameter("timeseriesid", timeseriesEntity.getId());
205 
206     IntStream.range(startInclusive, endExclusive).forEach(index -> {
207       query.setParameter("time" + index, entities.get(index).getTimestamp());
208       query.setParameter("value" + index, entities.get(index).getValue());
209     });
210 
211     return query;
212   }
213 
214   private Query buildSelectQueryObject(
215     int timeseriesId,
216     DataPointValueType valueType,
217     TimeseriesDataPointsQueryParams queryParams
218   ) {
219     String columnName = getColumnName(valueType);
220 
221     FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
222     var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
223 
224     String queryString = "";
225     if (queryParams.getFunction().isPresent()) {
226       AggregateFunction function = queryParams.getFunction().get();
227       if (timeSliceNanoseconds == null) {
228         timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
229       }
230 
231       queryString = "SELECT ";
232 
233       queryString += switch (fillOption) {
234         case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
235         case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
236       };
237 
238       String aggregationString = "";
239       switch (function) {
240         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
241         case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
242         case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
243         case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
244         case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
245         case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
246         case INTEGRAL -> {}
247       }
248 
249       // handle filling - by default bucket_gapfill uses NULL filloption
250       if (fillOption == FillOption.LINEAR) {
251         aggregationString = "interpolate(%s) as value ".formatted(aggregationString);
252       } else if (fillOption == FillOption.PREVIOUS) {
253         aggregationString = "locf(%s) as value ".formatted(aggregationString);
254       } else {
255         aggregationString += " as value ";
256       }
257 
258       queryString += aggregationString;
259     } else {
260       queryString = "SELECT time, %s ".formatted(columnName);
261     }
262 
263     queryString += """
264     FROM timeseries_data_points
265     WHERE timeseries_id = :timeseriesId
266       AND time >= :startTimeNano
267       AND time <= :endTimeNano
268     """;
269 
270     if (queryParams.getFunction().isPresent()) {
271       queryString += " GROUP BY timestamp ORDER BY timestamp";
272     } else {
273       queryString += " ORDER BY time";
274     }
275 
276     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
277 
278     if (timeSliceNanoseconds != null) {
279       query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
280     }
281     query.setParameter("timeseriesId", timeseriesId);
282     query.setParameter("startTimeNano", queryParams.getStartTime());
283     query.setParameter("endTimeNano", queryParams.getEndTime());
284 
285     return query;
286   }
287 
288   private Query buildSelectAggregationFunctionQueryObject(
289     int timeseriesId,
290     DataPointValueType valueType,
291     TimeseriesDataPointsQueryParams queryParams
292   ) {
293     String columnName = getColumnName(valueType);
294 
295     String queryString = "";
296     if (queryParams.getFunction().isPresent()) {
297       AggregateFunction function = queryParams.getFunction().get();
298 
299       queryString = "SELECT 1 as timestamp, ";
300 
301       String aggregationString = "";
302       switch (function) {
303         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
304         case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
305         case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
306         case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
307         case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
308         case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
309         case INTEGRAL -> {}
310       }
311 
312       aggregationString += " as value ";
313 
314       queryString += aggregationString;
315     } else {
316       queryString = "SELECT time, %s ".formatted(columnName);
317     }
318 
319     queryString += """
320     FROM timeseries_data_points
321     WHERE timeseries_id = :timeseriesId
322       AND time >= :startTimeNano
323       AND time <= :endTimeNano
324     """;
325 
326     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
327 
328     query.setParameter("timeseriesId", timeseriesId);
329     query.setParameter("startTimeNano", queryParams.getStartTime());
330     query.setParameter("endTimeNano", queryParams.getEndTime());
331 
332     return query;
333   }
334 
335   private String getColumnName(DataPointValueType valueType) {
336     return switch (valueType) {
337       case Double -> "double_value";
338       case Integer -> "int_value";
339       case String -> "string_value";
340       case Boolean -> "boolean_value";
341     };
342   }
343 
344   /**
345    * Throw when trying to access unsupported aggregation function.
346    */
347   private void assertNotIntegral(Optional<AggregateFunction> function) {
348     if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
349       throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
350     }
351   }
352 
353   /**
354    * Throw when trying to use aggregation functions with boolean or string value
355    * types.
356    * COUNT, FIRST and LAST can be allowed for all data types.
357    */
358   private void assertCorrectValueTypesForAggregation(
359     Optional<AggregateFunction> function,
360     DataPointValueType valueType
361   ) {
362     if (
363       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
364       (function.isPresent() &&
365         function.get() != AggregateFunction.COUNT &&
366         function.get() != AggregateFunction.FIRST &&
367         function.get() != AggregateFunction.LAST)
368     ) {
369       throw new InvalidRequestException(
370         "Cannot execute aggregation functions on data points of type boolean or string."
371       );
372     }
373   }
374 
375   /**
376    * Throw when trying to use gap filling with unsupported value types boolean or
377    * string.
378    */
379   private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
380     if (
381       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
382     ) {
383       throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
384     }
385   }
386 
387   /**
388    * Throw when trying to use fill option without specifying the timeSlice value
389    */
390   private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
391     if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
392       throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
393     }
394   }
395 
396   /**
397    * Throw when trying to use fill option or grouping when no aggregation function
398    * is set.
399    */
400   private void assertAggregationSetForFillOrGrouping(
401     Optional<AggregateFunction> function,
402     Optional<Long> timeSliceNanoseconds,
403     Optional<FillOption> fillOption
404   ) {
405     if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
406       throw new InvalidRequestException(
407         "Cannot use gap filling option or grouping of data when no aggregation function is specified."
408       );
409     }
410   }
411 }