View Javadoc
1   package de.dlr.shepard.timeseries.repositories;
2   
3   import de.dlr.shepard.exceptions.InvalidRequestException;
4   import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesDataPoint;
5   import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesDataPointsQueryParams;
6   import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesEntity;
7   import de.dlr.shepard.timeseries.model.enums.AggregateFunction;
8   import de.dlr.shepard.timeseries.model.enums.ExperimentalDataPointValueType;
9   import de.dlr.shepard.timeseries.model.enums.FillOption;
10  import jakarta.enterprise.context.ApplicationScoped;
11  import jakarta.inject.Inject;
12  import jakarta.persistence.EntityManager;
13  import jakarta.persistence.Query;
14  import java.util.List;
15  import java.util.Optional;
16  import java.util.stream.Collectors;
17  import java.util.stream.IntStream;
18  
19  @ApplicationScoped
20  public class ExperimentalTimeseriesDataPointRepository {
21  
22    private final int INSERT_BATCH_SIZE = 1000;
23  
24    @Inject
25    EntityManager entityManager;
26  
27    public void insertManyDataPoints(
28      List<ExperimentalTimeseriesDataPoint> entities,
29      ExperimentalTimeseriesEntity timeseriesEntity
30    ) {
31      for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
32        int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
33        Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
34        query.executeUpdate();
35      }
36    }
37  
38    public List<ExperimentalTimeseriesDataPoint> queryDataPoints(
39      int timeseriesId,
40      ExperimentalDataPointValueType valueType,
41      ExperimentalTimeseriesDataPointsQueryParams queryParams
42    ) {
43      assertNotIntegral(queryParams.getFunction());
44      assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
45      assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
46      assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
47      assertAggregationSetForFillOrGrouping(
48        queryParams.getFunction(),
49        queryParams.getTimeSliceNanoseconds(),
50        queryParams.getFillOption()
51      );
52  
53      var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
54  
55      @SuppressWarnings("unchecked")
56      List<ExperimentalTimeseriesDataPoint> dataPoints = query.getResultList();
57      return dataPoints;
58    }
59  
60    private Query buildInsertQueryObject(
61      List<ExperimentalTimeseriesDataPoint> entities,
62      int startInclusive,
63      int endExclusive,
64      ExperimentalTimeseriesEntity timeseriesEntity
65    ) {
66      StringBuilder queryString = new StringBuilder();
67      queryString.append(
68        "INSERT INTO timeseries_data_points (timeseries_id, time, " +
69        getColumnName(timeseriesEntity.getValueType()) +
70        ") values "
71      );
72      queryString.append(
73        IntStream.range(startInclusive, endExclusive)
74          .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
75          .collect(Collectors.joining(","))
76      );
77      queryString.append(";");
78  
79      Query query = entityManager.createNativeQuery(queryString.toString());
80  
81      query.setParameter("timeseriesid", timeseriesEntity.getId());
82  
83      IntStream.range(startInclusive, endExclusive).forEach(index -> {
84        query.setParameter("time" + index, entities.get(index).getTimestamp());
85        query.setParameter("value" + index, entities.get(index).getValue());
86      });
87  
88      return query;
89    }
90  
91    private Query buildSelectQueryObject(
92      int timeseriesId,
93      ExperimentalDataPointValueType valueType,
94      ExperimentalTimeseriesDataPointsQueryParams queryParams
95    ) {
96      String columnName = getColumnName(valueType);
97  
98      FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
99      var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
100 
101     String queryString = "";
102     if (queryParams.getFunction().isPresent()) {
103       AggregateFunction function = queryParams.getFunction().get();
104       if (timeSliceNanoseconds == null) {
105         timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
106       }
107 
108       queryString = "SELECT ";
109 
110       queryString += switch (fillOption) {
111         case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
112         case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
113       };
114 
115       String aggregationString = "";
116       switch (function) {
117         case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
118         case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
119         case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
120         case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
121         case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
122         case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
123         case INTEGRAL -> {}
124       }
125 
126       // handle filling - by default bucket_gapfill uses NULL filloption
127       if (fillOption == FillOption.LINEAR) {
128         aggregationString = String.format("interpolate(%s) as value ", aggregationString);
129       } else if (fillOption == FillOption.PREVIOUS) {
130         aggregationString = String.format("locf(%s) as value ", aggregationString);
131       } else {
132         aggregationString += " as value ";
133       }
134 
135       queryString += aggregationString;
136     } else {
137       queryString = String.format("SELECT time, %s ", columnName);
138     }
139 
140     queryString += """
141     FROM timeseries_data_points
142     WHERE timeseries_id = :timeseriesId
143       AND time >= :startTimeNano
144       AND time <= :endTimeNano
145     """;
146 
147     if (queryParams.getFunction().isPresent()) {
148       queryString += " GROUP BY timestamp";
149     }
150 
151     Query query = entityManager.createNativeQuery(queryString, ExperimentalTimeseriesDataPoint.class);
152 
153     if (timeSliceNanoseconds != null) {
154       query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
155     }
156     query.setParameter("timeseriesId", timeseriesId);
157     query.setParameter("startTimeNano", queryParams.getStartTime());
158     query.setParameter("endTimeNano", queryParams.getEndTime());
159 
160     return query;
161   }
162 
163   private String getColumnName(ExperimentalDataPointValueType valueType) {
164     return switch (valueType) {
165       case Double -> "double_value";
166       case Integer -> "int_value";
167       case String -> "string_value";
168       case Boolean -> "boolean_value";
169     };
170   }
171 
172   /**
173    * Throw when trying to access unsupported aggregation function.
174    */
175   private void assertNotIntegral(Optional<AggregateFunction> function) {
176     if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
177       throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
178     }
179   }
180 
181   /**
182    * Throw when trying to use aggregation functions with boolean or string value types.
183    */
184   private void assertCorrectValueTypesForAggregation(
185     Optional<AggregateFunction> function,
186     ExperimentalDataPointValueType valueType
187   ) {
188     if (
189       (valueType == ExperimentalDataPointValueType.Boolean || valueType == ExperimentalDataPointValueType.String) &&
190       (function.isPresent())
191     ) {
192       throw new InvalidRequestException(
193         "Cannot execute aggregation functions on data points of type boolean or string."
194       );
195     }
196   }
197 
198   /**
199    * Throw when trying to use gap filling with unsupported value types boolean or string.
200    */
201   private void assertCorrectValueTypesForFillOption(
202     Optional<FillOption> fillOption,
203     ExperimentalDataPointValueType valueType
204   ) {
205     if (
206       (valueType == ExperimentalDataPointValueType.Boolean || valueType == ExperimentalDataPointValueType.String) &&
207       (fillOption.isPresent())
208     ) {
209       throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
210     }
211   }
212 
213   /**
214    * Throw when trying to use fill option without specifying the timeSlice value
215    */
216   private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
217     if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
218       throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
219     }
220   }
221 
222   /**
223    * Throw when trying to use fill option or grouping when no aggregation function is set.
224    */
225   private void assertAggregationSetForFillOrGrouping(
226     Optional<AggregateFunction> function,
227     Optional<Long> timeSliceNanoseconds,
228     Optional<FillOption> fillOption
229   ) {
230     if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
231       throw new InvalidRequestException(
232         "Cannot use gap filling option or grouping of data when no aggregation function is specified."
233       );
234     }
235   }
236 }