View Javadoc
1   package de.dlr.shepard.data.timeseries.repositories;
2   
3   import de.dlr.shepard.common.exceptions.InvalidBodyException;
4   import de.dlr.shepard.common.exceptions.InvalidRequestException;
5   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
6   import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
7   import de.dlr.shepard.data.timeseries.model.enums.AggregateFunction;
8   import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
9   import de.dlr.shepard.data.timeseries.model.enums.FillOption;
10  import io.micrometer.core.annotation.Timed;
11  import jakarta.enterprise.context.RequestScoped;
12  import jakarta.persistence.EntityManager;
13  import jakarta.persistence.PersistenceContext;
14  import jakarta.persistence.Query;
15  import java.util.List;
16  import java.util.Optional;
17  import java.util.stream.Collectors;
18  import java.util.stream.IntStream;
19  import org.hibernate.exception.DataException;
20  
21  @RequestScoped
22  public class TimeseriesDataPointRepository {
23  
24    private static final int INSERT_BATCH_SIZE = 20000;
25  
26    @PersistenceContext
27    EntityManager entityManager;
28  
29    /**
30     * Insert a list of timeseries data points into the database.
31     *
32     * @param entities         list of timeseries data points
33     * @param timeseriesId The ID of the timeseries
34     * @throws InvalidBodyException can be thrown when 'entities' contains the same
35     *                              timestamp more than once (read more in
36     *                              architectural documentation: 'Building Block
37     *                              View' -> 'Timeseries: Multiple Values for One
38     *                              Timestamp')
39     */
40    @Timed(value = "shepard.timeseries-data-point.batch-insert")
41    public void insertManyDataPoints(
42      List<TimeseriesDataPoint> entities,
43      long timeseriesId,
44      DataPointValueType valueType
45    ) {
46      for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
47        int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
48        Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesId, valueType);
49  
50        try {
51          query.executeUpdate();
52        } catch (DataException ex) {
53          if (ex.getCause().toString().contains("ON CONFLICT DO UPDATE command cannot affect row a second time")) {
54            throw new InvalidBodyException(
55              "You provided the same timestamp value multiple times. Please make sure that there are only unique timestamps in a timeseries payload request!"
56            );
57          }
58          throw ex;
59        }
60      }
61    }
62  
63    @Timed(value = "shepard.timeseries-data-point.compression")
64    public void compressAllChunks() {
65      var sqlString = "SELECT compress_chunk(c) FROM show_chunks('timeseries_data_points') c;";
66      Query query = entityManager.createNativeQuery(sqlString);
67      query.getResultList();
68    }
69  
70    /**
71     * Retrieve a list of DataPoints for a time-interval with options to grouping/
72     * time slicing, filling and aggregating.
73     * <p />
74     * This function does not check if the container specified by containerId is
75     * accessible.
76     * We add <code>@ActivateRequestContext</code> in order to call this method in a
77     * parallel stream.
78     *
79     * @param timeseriesId timeseriesId identifying a timeseries
80     * @param valueType type of the timeseries values for column lookup
81     * @param queryParams additional query parameters
82     * @return List<TimeseriesDataPoint>
83     */
84    @Timed(value = "shepard.timeseries-data-point.query")
85    public List<TimeseriesDataPoint> queryDataPoints(
86      long timeseriesId,
87      DataPointValueType valueType,
88      TimeseriesDataPointsQueryParams queryParams
89    ) {
90      assertNotIntegral(queryParams.getFunction());
91      assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
92      assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
93      assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
94      assertAggregationSetForFillOrGrouping(
95        queryParams.getFunction(),
96        queryParams.getTimeSliceNanoseconds(),
97        queryParams.getFillOption()
98      );
99  
100     var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
101 
102     @SuppressWarnings("unchecked")
103     List<TimeseriesDataPoint> dataPoints = query.getResultList();
104     return dataPoints;
105   }
106 
107   @Timed(value = "shepard.timeseries-data-point-aggregate.query")
108   public List<TimeseriesDataPoint> queryAggregationFunction(
109     long timeseriesId,
110     DataPointValueType valueType,
111     TimeseriesDataPointsQueryParams queryParams
112   ) {
113     assertNotIntegral(queryParams.getFunction());
114     assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
115 
116     var query = buildSelectAggregationFunctionQueryObject(timeseriesId, valueType, queryParams);
117 
118     @SuppressWarnings("unchecked")
119     List<TimeseriesDataPoint> dataPoints = query.getResultList();
120     return dataPoints;
121   }
122 
123   private Query buildInsertQueryObject(
124     List<TimeseriesDataPoint> entities,
125     int startInclusive,
126     int endExclusive,
127     long timeseriesId,
128     DataPointValueType valueType
129   ) {
130     StringBuilder queryString = new StringBuilder();
131     queryString.append(
132       "INSERT INTO timeseries_data_points (timeseries_id, time, " + getColumnName(valueType) + ") values "
133     );
134     queryString.append(
135       IntStream.range(startInclusive, endExclusive)
136         .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
137         .collect(Collectors.joining(","))
138     );
139     queryString.append(
140       " ON CONFLICT (timeseries_id, time) DO UPDATE SET time = EXCLUDED.time, timeseries_id = EXCLUDED.timeseries_id, " +
141       getColumnName(valueType) +
142       " = " +
143       "EXCLUDED." +
144       getColumnName(valueType) +
145       ";"
146     );
147 
148     Query query = entityManager.createNativeQuery(queryString.toString());
149 
150     query.setParameter("timeseriesid", timeseriesId);
151 
152     IntStream.range(startInclusive, endExclusive).forEach(index -> {
153       query.setParameter("time" + index, entities.get(index).getTimestamp());
154       query.setParameter("value" + index, entities.get(index).getValue());
155     });
156 
157     return query;
158   }
159 
160   private Query buildSelectQueryObject(
161     long timeseriesId,
162     DataPointValueType valueType,
163     TimeseriesDataPointsQueryParams queryParams
164   ) {
165     String columnName = getColumnName(valueType);
166 
167     FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
168     var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
169 
170     String queryString = "";
171     if (queryParams.getFunction().isPresent()) {
172       AggregateFunction function = queryParams.getFunction().get();
173       if (timeSliceNanoseconds == null) {
174         timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
175       }
176 
177       queryString = "SELECT ";
178 
179       queryString += switch (fillOption) {
180         case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
181         case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
182       };
183 
184       String aggregationString = "";
185       switch (function) {
186         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
187         case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
188         case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
189         case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
190         case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
191         case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
192         case INTEGRAL -> {}
193       }
194 
195       // handle filling - by default bucket_gapfill uses NULL filloption
196       if (fillOption == FillOption.LINEAR) {
197         aggregationString = "interpolate(%s) as value ".formatted(aggregationString);
198       } else if (fillOption == FillOption.PREVIOUS) {
199         aggregationString = "locf(%s) as value ".formatted(aggregationString);
200       } else {
201         aggregationString += " as value ";
202       }
203 
204       queryString += aggregationString;
205     } else {
206       queryString = "SELECT time, %s ".formatted(columnName);
207     }
208 
209     queryString += """
210     FROM timeseries_data_points
211     WHERE timeseries_id = :timeseriesId
212       AND time >= :startTimeNano
213       AND time <= :endTimeNano
214     """;
215 
216     if (queryParams.getFunction().isPresent()) {
217       queryString += " GROUP BY timestamp ORDER BY timestamp";
218     } else {
219       queryString += " ORDER BY time";
220     }
221 
222     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
223 
224     if (timeSliceNanoseconds != null) {
225       query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
226     }
227     query.setParameter("timeseriesId", timeseriesId);
228     query.setParameter("startTimeNano", queryParams.getStartTime());
229     query.setParameter("endTimeNano", queryParams.getEndTime());
230 
231     return query;
232   }
233 
234   private Query buildSelectAggregationFunctionQueryObject(
235     long timeseriesId,
236     DataPointValueType valueType,
237     TimeseriesDataPointsQueryParams queryParams
238   ) {
239     String columnName = getColumnName(valueType);
240 
241     String queryString = "";
242     if (queryParams.getFunction().isPresent()) {
243       AggregateFunction function = queryParams.getFunction().get();
244 
245       queryString = "SELECT 1 as timestamp, ";
246 
247       String aggregationString = "";
248       switch (function) {
249         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = "%s(%s)".formatted(function.name(), columnName);
250         case MEAN -> aggregationString = "AVG(%s)".formatted(columnName);
251         case LAST, FIRST -> aggregationString = "%s(%s, time)".formatted(function.name(), columnName);
252         case SPREAD -> aggregationString = "MAX(%s) - MIN(%s)".formatted(columnName, columnName);
253         case MEDIAN -> aggregationString = "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)".formatted(columnName);
254         case MODE -> aggregationString = "mode() WITHIN GROUP (ORDER BY %s)".formatted(columnName);
255         case INTEGRAL -> {}
256       }
257 
258       aggregationString += " as value ";
259 
260       queryString += aggregationString;
261     } else {
262       queryString = "SELECT time, %s ".formatted(columnName);
263     }
264 
265     queryString += """
266     FROM timeseries_data_points
267     WHERE timeseries_id = :timeseriesId
268       AND time >= :startTimeNano
269       AND time <= :endTimeNano
270     """;
271 
272     Query query = entityManager.createNativeQuery(queryString, TimeseriesDataPoint.class);
273 
274     query.setParameter("timeseriesId", timeseriesId);
275     query.setParameter("startTimeNano", queryParams.getStartTime());
276     query.setParameter("endTimeNano", queryParams.getEndTime());
277 
278     return query;
279   }
280 
281   private String getColumnName(DataPointValueType valueType) {
282     return switch (valueType) {
283       case Double -> "double_value";
284       case Integer -> "int_value";
285       case String -> "string_value";
286       case Boolean -> "boolean_value";
287     };
288   }
289 
290   /**
291    * Throw when trying to access unsupported aggregation function.
292    */
293   private void assertNotIntegral(Optional<AggregateFunction> function) {
294     if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
295       throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
296     }
297   }
298 
299   /**
300    * Throw when trying to use aggregation functions with boolean or string value
301    * types.
302    * COUNT, FIRST and LAST can be allowed for all data types.
303    */
304   private void assertCorrectValueTypesForAggregation(
305     Optional<AggregateFunction> function,
306     DataPointValueType valueType
307   ) {
308     if (
309       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) &&
310       (function.isPresent() &&
311         function.get() != AggregateFunction.COUNT &&
312         function.get() != AggregateFunction.FIRST &&
313         function.get() != AggregateFunction.LAST)
314     ) {
315       throw new InvalidRequestException(
316         "Cannot execute aggregation functions on data points of type boolean or string."
317       );
318     }
319   }
320 
321   /**
322    * Throw when trying to use gap filling with unsupported value types boolean or
323    * string.
324    */
325   private void assertCorrectValueTypesForFillOption(Optional<FillOption> fillOption, DataPointValueType valueType) {
326     if (
327       (valueType == DataPointValueType.Boolean || valueType == DataPointValueType.String) && (fillOption.isPresent())
328     ) {
329       throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
330     }
331   }
332 
333   /**
334    * Throw when trying to use fill option without specifying the timeSlice value
335    */
336   private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
337     if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
338       throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
339     }
340   }
341 
342   /**
343    * Throw when trying to use fill option or grouping when no aggregation function
344    * is set.
345    */
346   private void assertAggregationSetForFillOrGrouping(
347     Optional<AggregateFunction> function,
348     Optional<Long> timeSliceNanoseconds,
349     Optional<FillOption> fillOption
350   ) {
351     if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
352       throw new InvalidRequestException(
353         "Cannot use gap filling option or grouping of data when no aggregation function is specified."
354       );
355     }
356   }
357 }