View Javadoc
1   package de.dlr.shepard.data.timeseries.repositories;
2   
3   import de.dlr.shepard.common.exceptions.InvalidBodyException;
4   import de.dlr.shepard.common.exceptions.InvalidRequestException;
5   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7   import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
8   import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
9   import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
10  import de.dlr.shepard.data.timeseries.model.enums.FillOption;
11  import io.agroal.api.AgroalDataSource;
12  import io.micrometer.core.annotation.Timed;
13  import io.quarkus.logging.Log;
14  import jakarta.enterprise.context.RequestScoped;
15  import jakarta.inject.Inject;
16  import jakarta.persistence.EntityManager;
17  import jakarta.persistence.PersistenceContext;
18  import jakarta.persistence.Query;
19  import java.io.ByteArrayInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.sql.Connection;
23  import java.sql.SQLException;
24  import java.util.List;
25  import java.util.Optional;
26  import java.util.stream.Collectors;
27  import java.util.stream.IntStream;
28  import org.hibernate.exception.DataException;
29  import org.postgresql.PGConnection;
30  import org.postgresql.copy.CopyManager;
31  
32  @RequestScoped
33  public class TimeseriesDataPointRepository {
34  
35    private final int INSERT_BATCH_SIZE = 20000;
36  
37    @PersistenceContext
38    EntityManager entityManager;
39  
40    @Inject
41    AgroalDataSource defaultDataSource;
42  
43    /**
44     * Insert a list of timeseries data points into the database.
45     *
46     * @param entities         list of timeseries data points
47     * @param timeseriesEntity
48     * @throws InvalidBodyException can be thrown when 'entities' contains the same
49     *                              timestamp more than once (read more in
50     *                              architectural documentation: 'Building Block
51     *                              View' -> 'Timeseries: Multiple Values for One
52     *                              Timestamp')
53     */
54    @Timed(value = "shepard.timeseries-data-point.batch-insert")
55    public void insertManyDataPoints(List<TimeseriesDataPoint> entities, TimeseriesEntity timeseriesEntity) {
56      for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
57        int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
58        Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
59  
60        try {
61          query.executeUpdate();
62        } catch (DataException ex) {
63          if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
64            throw new InvalidBodyException(
65              "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
66            );
67          }
68          throw ex;
69        }
70      }
71    }
72  
73    /**
74     * Insert a list of timeseries data points into the database using the COPY command.
75     * This is used by the influxdb migration but can also be used for csv import or
76     * similar scenarios.
77     * @param entities
78     * @param timeseriesEntity
79     */
80    @Timed(value = "shepard.timeseries-data-point.copy-insert")
81    public void insertManyDataPointsWithCopyCommand(
82      List<TimeseriesDataPoint> entities,
83      TimeseriesEntity timeseriesEntity
84    ) throws SQLException {
85      try (Connection conn = defaultDataSource.getConnection()) {
86        PGConnection pgConn = (PGConnection) conn.unwrap(PGConnection.class);
87        CopyManager copyManager = pgConn.getCopyAPI();
88  
89        var columnName = getColumnName(timeseriesEntity.getValueType());
90        var sb = new StringBuilder();
91  
92        timeseriesEntity.getId();
93  
94        // Strings must be quoted in double quotes in case they contain a comma which is also the delimiter
95        if (timeseriesEntity.getValueType() == DataPointValueType.String) {
96          for (int i = 0; i < entities.size(); i++) {
97            TimeseriesDataPoint entity = entities.get(i);
98            sb
99              .append(timeseriesEntity.getId())
100             .append(",")
101             .append(entity.getTimestamp())
102             .append(",\"")
103             .append(entity.getValue())
104             .append("\"\n");
105         }
106       } else {
107         for (int i = 0; i < entities.size(); i++) {
108           TimeseriesDataPoint entity = entities.get(i);
109           sb
110             .append(timeseriesEntity.getId())
111             .append(",")
112             .append(entity.getTimestamp())
113             .append(",")
114             .append(entity.getValue())
115             .append("\n");
116         }
117       }
118 
119       InputStream input = new ByteArrayInputStream(sb.toString().getBytes());
120       String sql = String.format(
121         "COPY timeseries_data_points (timeseries_id, time, %s) FROM STDIN WITH (FORMAT csv);",
122         columnName
123       );
124 
125       copyManager.copyIn(sql, input);
126     } catch (IOException ex) {
127       Log.errorf("IOException during copy insert: %s", ex.getMessage());
128       throw new RuntimeException("IO Error while inserting data points", ex);
129     }
130   }
131 
132   @Timed(value = "shepard.timeseries-data-point.compression")
133   public void compressAllChunks() {
134     var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
135     Query query = entityManager.createNativeQuery(sqlString);
136     query.getResultList();
137   }
138 
139   @Timed(value = "shepard.timeseries-data-point.query")
140   public List<TimeseriesDataPoint> queryDataPoints(
141     int timeseriesId,
142     DataPointValueType valueType,
143     TimeseriesDataPointsQueryParams queryParams
144   ) {
145     assertNotIntegral(queryParams.getFunction());
146     assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
147     assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
148     assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
149     assertAggregationSetForFillOrGrouping(
150       queryParams.getFunction(),
151       queryParams.getTimeSliceNanoseconds(),
152       queryParams.getFillOption()
153     );
154 
155     var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
156 
157     @SuppressWarnings("unchecked")
158     List<TimeseriesDataPoint> dataPoints = query.getResultList();
159     return dataPoints;
160   }
161 
162   @Timed(value = "shepard.timeseries-data-point-aggregate.query")
163   public List<TimeseriesDataPoint> queryAggregationFunction(
164     int timeseriesId,
165     DataPointValueType valueType,
166     TimeseriesDataPointsQueryParams queryParams
167   ) {
168     assertNotIntegral(queryParams.getFunction());
169     assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
170 
171     var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
172 
173     @SuppressWarnings("unchecked")
174     List<TimeseriesDataPoint> dataPoints = query.getResultList();
175     return dataPoints;
176   }
177 
178   private Query buildInsertQueryObject(
179     List<TimeseriesDataPoint> entities,
180     int startInclusive,
181     int endExclusive,
182     TimeseriesEntity timeseriesEntity
183   ) {
184     StringBuilder queryString = new StringBuilder();
185     queryString.append(
186       "INSERT INTO timeseries_data_points (timeseries_id, time, " +
187       getColumnName(timeseriesEntity.getValueType()) +
188       ") values "
189     );
190     queryString.append(
191       IntStream.range(startInclusive, endExclusive)
192         .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
193         .collect(Collectors.joining(","))
194     );
195     queryString.append(
196       " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
197       getColumnName(timeseriesEntity.getValueType()) +
198       " = " +
199       "EXCLUDED." +
200       getColumnName(timeseriesEntity.getValueType()) +
201       ";"
202     );
203 
204     Query query = entityManager.createNativeQuery(queryString.toString());
205 
206     query.setParameter("timeseriesid", timeseriesEntity.getId());
207 
208     IntStream.range(startInclusive, endExclusive).forEach(index -> {
209       query.setParameter("time" + index, entities.get(index).getTimestamp());
210       query.setParameter("value" + index, entities.get(index).getValue());
211     });
212 
213     return query;
214   }
215 
216   private Query buildSelectQueryObject(
217     int timeseriesId,
218     DataPointValueType valueType,
219     TimeseriesDataPointsQueryParams queryParams
220   ) {
221     String columnName = getColumnName(valueType);
222 
223     FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
224     var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
225 
226     String queryString = "";
227     if (queryParams.getFunction().isPresent()) {
228       AggregateFunction function = queryParams.getFunction().get();
229       if (timeSliceNanoseconds == null) {
230         timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
231       }
232 
233       queryString = "SELECT ";
234 
235       queryString += switch (fillOption) {
236         case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
237         case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
238       };
239 
240       String aggregationString = "";
241       switch (function) {
242         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
243         case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
244         case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
245         case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
246         case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
247         case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
248         case INTEGRAL -> {}
249       }
250 
251       // handle filling - by default bucket_gapfill uses NULL filloption
252       if (fillOption == FillOption.LINEAR) {
253         aggregationString = String.format("interpolate(%s) as value ", aggregationString);
254       } else if (fillOption == FillOption.PREVIOUS) {
255         aggregationString = String.format("locf(%s) as value ", aggregationString);
256       } else {
257         aggregationString += " as value ";
258       }
259 
260       queryString += aggregationString;
261     } else {
262       queryString = String.format("SELECT time, %s ", columnName);
263     }
264 
265     queryString += """
266     FROM timeseries_data_points
267     WHERE timeseries_id = :timeseriesId
268       AND time >= :startTimeNano
269       AND time <= :endTimeNano
270     """;
271 
272     if (queryParams.getFunction().isPresent()) {
273       queryString += " GROUP BY timestamp ORDER BY timestamp";
274     } else {
275       queryString += " ORDER BY time";
276     }
277 
278     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
279 
280     if (timeSliceNanoseconds != null) {
281       query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
282     }
283     query.setParameter("timeseriesId", timeseriesId);
284     query.setParameter("startTimeNano", queryParams.getStartTime());
285     query.setParameter("endTimeNano", queryParams.getEndTime());
286 
287     return query;
288   }
289 
290   private Query buildSelectAggregationFunctionQueryObject(
291     int timeseriesId,
292     DataPointValueType valueType,
293     TimeseriesDataPointsQueryParams queryParams
294   ) {
295     String columnName = getColumnName(valueType);
296 
297     String queryString = "";
298     if (queryParams.getFunction().isPresent()) {
299       AggregateFunction function = queryParams.getFunction().get();
300 
301       queryString = "SELECT 1 as timestamp, ";
302 
303       String aggregationString = "";
304       switch (function) {
305         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
306         case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
307         case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
308         case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
309         case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
310         case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
311         case INTEGRAL -> {}
312       }
313 
314       aggregationString += " as value ";
315 
316       queryString += aggregationString;
317     } else {
318       queryString = String.format("SELECT time, %s ", columnName);
319     }
320 
321     queryString += """
322     FROM timeseries_data_points
323     WHERE timeseries_id = :timeseriesId
324       AND time >= :startTimeNano
325       AND time <= :endTimeNano
326     """;
327 
328     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
329 
330     query.setParameter("timeseriesId", timeseriesId);
331     query.setParameter("startTimeNano", queryParams.getStartTime());
332     query.setParameter("endTimeNano", queryParams.getEndTime());
333 
334     return query;
335   }
336 
337   private String getColumnName(DataPointValueType valueType) {
338     return switch (valueType) {
339       case Double -> "double_value";
340       case Integer -> "int_value";
341       case String -> "string_value";
342       case Boolean -> "boolean_value";
343     };
344   }
345 
346   /**
347    * Throw when trying to access unsupported aggregation function.
348    */
349   private void assertNotIntegral(Optional<AggregateFunction> function) {
350     if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
351       throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
352     }
353   }
354 
355   /**
356    * Throw when trying to use aggregation functions with boolean or string value
357    * types.
358    * COUNT, FIRST and LAST can be allowed for all data types.
359    */
360   private void assertCorrectValueTypesForAggregation(
361     Optional<AggregateFunction> function,
362     DataPointValueType valueType
363   ) {
364     if (
365       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
366       (function.isPresent() &&
367         function.get() != AggregateFunction.COUNT &&
368         function.get() != AggregateFunction.FIRST &&
369         function.get() != AggregateFunction.LAST)
370     ) {
371       throw new InvalidRequestException(
372         "Cannot execute aggregation functions on data points of type boolean or string."
373       );
374     }
375   }
376 
377   /**
378    * Throw when trying to use gap filling with unsupported value types boolean or
379    * string.
380    */
381   private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
382     if (
383       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
384     ) {
385       throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
386     }
387   }
388 
389   /**
390    * Throw when trying to use fill option without specifying the timeSlice value
391    */
392   private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
393     if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
394       throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
395     }
396   }
397 
398   /**
399    * Throw when trying to use fill option or grouping when no aggregation function
400    * is set.
401    */
402   private void assertAggregationSetForFillOrGrouping(
403     Optional<AggregateFunction> function,
404     Optional<Long> timeSliceNanoseconds,
405     Optional<FillOption> fillOption
406   ) {
407     if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
408       throw new InvalidRequestException(
409         "Cannot use gap filling option or grouping of data when no aggregation function is specified."
410       );
411     }
412   }
413 }