1 package de.dlr.shepard.timeseries.repositories;
2
3 import de.dlr.shepard.exceptions.InvalidRequestException;
4 import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesDataPoint;
5 import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesDataPointsQueryParams;
6 import de.dlr.shepard.timeseries.model.ExperimentalTimeseriesEntity;
7 import de.dlr.shepard.timeseries.model.enums.AggregateFunction;
8 import de.dlr.shepard.timeseries.model.enums.ExperimentalDataPointValueType;
9 import de.dlr.shepard.timeseries.model.enums.FillOption;
10 import jakarta.enterprise.context.ApplicationScoped;
11 import jakarta.inject.Inject;
12 import jakarta.persistence.EntityManager;
13 import jakarta.persistence.Query;
14 import java.util.List;
15 import java.util.Optional;
16 import java.util.stream.Collectors;
17 import java.util.stream.IntStream;
18
19 @ApplicationScoped
20 public class ExperimentalTimeseriesDataPointRepository {
21
22 private final int INSERT_BATCH_SIZE = 1000;
23
24 @Inject
25 EntityManager entityManager;
26
27 public void insertManyDataPoints(
28 List<ExperimentalTimeseriesDataPoint> entities,
29 ExperimentalTimeseriesEntity timeseriesEntity
30 ) {
31 for (int i = 0; i < entities.size(); i += INSERT_BATCH_SIZE) {
32 int currentLimit = Math.min(i + INSERT_BATCH_SIZE, entities.size());
33 Query query = buildInsertQueryObject(entities, i, currentLimit, timeseriesEntity);
34 query.executeUpdate();
35 }
36 }
37
38 public List<ExperimentalTimeseriesDataPoint> queryDataPoints(
39 int timeseriesId,
40 ExperimentalDataPointValueType valueType,
41 ExperimentalTimeseriesDataPointsQueryParams queryParams
42 ) {
43 assertNotIntegral(queryParams.getFunction());
44 assertCorrectValueTypesForAggregation(queryParams.getFunction(), valueType);
45 assertCorrectValueTypesForFillOption(queryParams.getFillOption(), valueType);
46 assertTimeIntervalForFillOption(queryParams.getTimeSliceNanoseconds(), queryParams.getFillOption());
47 assertAggregationSetForFillOrGrouping(
48 queryParams.getFunction(),
49 queryParams.getTimeSliceNanoseconds(),
50 queryParams.getFillOption()
51 );
52
53 var query = buildSelectQueryObject(timeseriesId, valueType, queryParams);
54
55 @SuppressWarnings("unchecked")
56 List<ExperimentalTimeseriesDataPoint> dataPoints = query.getResultList();
57 return dataPoints;
58 }
59
60 private Query buildInsertQueryObject(
61 List<ExperimentalTimeseriesDataPoint> entities,
62 int startInclusive,
63 int endExclusive,
64 ExperimentalTimeseriesEntity timeseriesEntity
65 ) {
66 StringBuilder queryString = new StringBuilder();
67 queryString.append(
68 "INSERT INTO timeseries_data_points (timeseries_id, time, " +
69 getColumnName(timeseriesEntity.getValueType()) +
70 ") values "
71 );
72 queryString.append(
73 IntStream.range(startInclusive, endExclusive)
74 .mapToObj(index -> "(:timeseriesid" + ",:time" + index + ",:value" + index + ")")
75 .collect(Collectors.joining(","))
76 );
77 queryString.append(";");
78
79 Query query = entityManager.createNativeQuery(queryString.toString());
80
81 query.setParameter("timeseriesid", timeseriesEntity.getId());
82
83 IntStream.range(startInclusive, endExclusive).forEach(index -> {
84 query.setParameter("time" + index, entities.get(index).getTimestamp());
85 query.setParameter("value" + index, entities.get(index).getValue());
86 });
87
88 return query;
89 }
90
91 private Query buildSelectQueryObject(
92 int timeseriesId,
93 ExperimentalDataPointValueType valueType,
94 ExperimentalTimeseriesDataPointsQueryParams queryParams
95 ) {
96 String columnName = getColumnName(valueType);
97
98 FillOption fillOption = queryParams.getFillOption().orElse(FillOption.NONE);
99 var timeSliceNanoseconds = queryParams.getTimeSliceNanoseconds().orElse(null);
100
101 String queryString = "";
102 if (queryParams.getFunction().isPresent()) {
103 AggregateFunction function = queryParams.getFunction().get();
104 if (timeSliceNanoseconds == null) {
105 timeSliceNanoseconds = queryParams.getEndTime() - queryParams.getStartTime();
106 }
107
108 queryString = "SELECT ";
109
110 queryString += switch (fillOption) {
111 case NONE -> "time_bucket(:timeInNanoseconds, time) as timestamp, ";
112 case NULL, LINEAR, PREVIOUS -> "time_bucket_gapfill(:timeInNanoseconds, time) as timestamp, ";
113 };
114
115 String aggregationString = "";
116 switch (function) {
117 case MAX, MIN, COUNT, SUM, STDDEV -> aggregationString = String.format("%s(%s)", function.name(), columnName);
118 case MEAN -> aggregationString = String.format("AVG(%s)", columnName);
119 case LAST, FIRST -> aggregationString = String.format("%s(%s, time)", function.name(), columnName);
120 case SPREAD -> aggregationString = String.format("MAX(%s) - MIN(%s)", columnName, columnName);
121 case MEDIAN -> aggregationString = String.format("percentile_cont(0.5) WITHIN GROUP (ORDER BY %s)", columnName);
122 case MODE -> aggregationString = String.format("mode() WITHIN GROUP (ORDER BY %s)", columnName);
123 case INTEGRAL -> {}
124 }
125
126
127 if (fillOption == FillOption.LINEAR) {
128 aggregationString = String.format("interpolate(%s) as value ", aggregationString);
129 } else if (fillOption == FillOption.PREVIOUS) {
130 aggregationString = String.format("locf(%s) as value ", aggregationString);
131 } else {
132 aggregationString += " as value ";
133 }
134
135 queryString += aggregationString;
136 } else {
137 queryString = String.format("SELECT time, %s ", columnName);
138 }
139
140 queryString += """
141 FROM timeseries_data_points
142 WHERE timeseries_id = :timeseriesId
143 AND time >= :startTimeNano
144 AND time <= :endTimeNano
145 """;
146
147 if (queryParams.getFunction().isPresent()) {
148 queryString += " GROUP BY timestamp";
149 }
150
151 Query query = entityManager.createNativeQuery(queryString, ExperimentalTimeseriesDataPoint.class);
152
153 if (timeSliceNanoseconds != null) {
154 query.setParameter("timeInNanoseconds", timeSliceNanoseconds);
155 }
156 query.setParameter("timeseriesId", timeseriesId);
157 query.setParameter("startTimeNano", queryParams.getStartTime());
158 query.setParameter("endTimeNano", queryParams.getEndTime());
159
160 return query;
161 }
162
163 private String getColumnName(ExperimentalDataPointValueType valueType) {
164 return switch (valueType) {
165 case Double -> "double_value";
166 case Integer -> "int_value";
167 case String -> "string_value";
168 case Boolean -> "boolean_value";
169 };
170 }
171
172
173
174
175 private void assertNotIntegral(Optional<AggregateFunction> function) {
176 if (function.isPresent() && function.get() == AggregateFunction.INTEGRAL) {
177 throw new InvalidRequestException("Aggregation function 'integral' is currently not implemented.");
178 }
179 }
180
181
182
183
184 private void assertCorrectValueTypesForAggregation(
185 Optional<AggregateFunction> function,
186 ExperimentalDataPointValueType valueType
187 ) {
188 if (
189 (valueType == ExperimentalDataPointValueType.Boolean || valueType == ExperimentalDataPointValueType.String) &&
190 (function.isPresent())
191 ) {
192 throw new InvalidRequestException(
193 "Cannot execute aggregation functions on data points of type boolean or string."
194 );
195 }
196 }
197
198
199
200
201 private void assertCorrectValueTypesForFillOption(
202 Optional<FillOption> fillOption,
203 ExperimentalDataPointValueType valueType
204 ) {
205 if (
206 (valueType == ExperimentalDataPointValueType.Boolean || valueType == ExperimentalDataPointValueType.String) &&
207 (fillOption.isPresent())
208 ) {
209 throw new InvalidRequestException("Cannot use gap filling options on data points of type boolean or string.");
210 }
211 }
212
213
214
215
216 private void assertTimeIntervalForFillOption(Optional<Long> timeSliceNanoseconds, Optional<FillOption> fillOption) {
217 if (timeSliceNanoseconds.isEmpty() && fillOption.isPresent()) {
218 throw new InvalidRequestException("Cannot use gap filling option when no grouping interval is specified.");
219 }
220 }
221
222
223
224
225 private void assertAggregationSetForFillOrGrouping(
226 Optional<AggregateFunction> function,
227 Optional<Long> timeSliceNanoseconds,
228 Optional<FillOption> fillOption
229 ) {
230 if (function.isEmpty() && (fillOption.isPresent() || timeSliceNanoseconds.isPresent())) {
231 throw new InvalidRequestException(
232 "Cannot use gap filling option or grouping of data when no aggregation function is specified."
233 );
234 }
235 }
236 }