1 package de.dlr.shepard.data.timeseries.migration.influxtimeseries;
2
3 import de.dlr.shepard.common.exceptions.InvalidBodyException;
4 import io.micrometer.core.annotation.Timed;
5 import jakarta.enterprise.context.RequestScoped;
6 import jakarta.inject.Inject;
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.UUID;
13 import java.util.concurrent.ConcurrentLinkedQueue;
14
15 @RequestScoped
16 public class InfluxTimeseriesService {
17
18 @Inject
19 InfluxDBConnector influxConnector;
20
21 @Inject
22 InfluxCsvConverter csvConverter;
23
24
25
26
27
28
29
30
31 @Timed(value = "shepard.timeseries.service")
32 public String createTimeseries(String database, InfluxTimeseriesPayload payload) {
33 String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
34 if (!sanityCheck.isBlank()) throw new InvalidBodyException(sanityCheck);
35 if (!influxConnector.databaseExist(database)) {
36 return String.format("The database %s does not exist", database);
37 }
38 return influxConnector.saveTimeseriesPayload(database, payload);
39 }
40
41
42
43
44
45
46
47
48
49
50
51
52
53 @Timed(value = "shepard.timeseries.service")
54 public InfluxTimeseriesPayload getTimeseriesPayload(
55 long startTimeStamp,
56 long endTimeStamp,
57 String database,
58 InfluxTimeseries timeseries,
59 InfluxSingleValuedUnaryFunction function,
60 Long groupBy,
61 InfluxFillOption fillOption
62 ) {
63 InfluxTimeseriesPayload payload = influxConnector.getTimeseriesPayload(
64 startTimeStamp,
65 endTimeStamp,
66 database,
67 timeseries,
68 function,
69 groupBy,
70 fillOption
71 );
72 return payload;
73 }
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 @Timed(value = "shepard.timeseries.service")
92 public List<InfluxTimeseriesPayload> getTimeseriesPayloadList(
93 long start,
94 long end,
95 String database,
96 List<InfluxTimeseries> timeseriesList,
97 InfluxSingleValuedUnaryFunction function,
98 Long groupBy,
99 InfluxFillOption fillOption,
100 Set<String> devicesFilterSet,
101 Set<String> locationsFilterSet,
102 Set<String> symbolicNameFilterSet
103 ) {
104 var timeseriesPayloadQueue = new ConcurrentLinkedQueue<InfluxTimeseriesPayload>();
105 timeseriesList
106 .parallelStream()
107 .forEach(timeseries -> {
108 InfluxTimeseriesPayload payload = null;
109 if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
110 payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
111 }
112 if (payload != null) {
113 timeseriesPayloadQueue.add(payload);
114 }
115 });
116 return new ArrayList<>(timeseriesPayloadQueue);
117 }
118
119
120
121
122
123
124
125 @Timed(value = "shepard.timeseries.service")
126 public List<InfluxTimeseries> getTimeseriesAvailable(String database) {
127 return influxConnector.getTimeseriesAvailable(database);
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 @Timed(value = "shepard.timeseries.service")
148 public InputStream exportTimeseriesPayload(
149 long start,
150 long end,
151 String database,
152 List<InfluxTimeseries> timeseriesList,
153 InfluxSingleValuedUnaryFunction function,
154 Long groupBy,
155 InfluxFillOption fillOption,
156 Set<String> devicesFilterSet,
157 Set<String> locationsFilterSet,
158 Set<String> symbolicNameFilterSet
159 ) throws IOException {
160 var payload = getTimeseriesPayloadList(
161 start,
162 end,
163 database,
164 timeseriesList,
165 function,
166 groupBy,
167 fillOption,
168 devicesFilterSet,
169 locationsFilterSet,
170 symbolicNameFilterSet
171 );
172 var stream = csvConverter.convertToCsv(payload);
173 return stream;
174 }
175
176
177
178
179
180
181
182
183
184 @Timed(value = "shepard.timeseries.service")
185 public String importTimeseries(String database, InputStream stream) throws IOException {
186 List<String> errors = new ArrayList<>();
187 var timeseriesList = csvConverter.convertToPayload(stream);
188 for (var timeseries : timeseriesList) {
189 var error = createTimeseries(database, timeseries);
190 if (!error.isBlank()) {
191 errors.add(error);
192 }
193 }
194 return String.join(", ", errors);
195 }
196
197
198
199
200
201
202 @Timed(value = "shepard.timeseries.service")
203 public String createDatabase() {
204 String name = UUID.randomUUID().toString();
205 influxConnector.createDatabase(name);
206 return name;
207 }
208
209 @Timed(value = "shepard.timeseries.service")
210 public void deleteDatabase(String database) {
211 influxConnector.deleteDatabase(database);
212 }
213
214 private boolean matchFilter(
215 InfluxTimeseries timeseries,
216 Set<String> device,
217 Set<String> location,
218 Set<String> symName
219 ) {
220 var deviceMatches = true;
221 var locatioMatches = true;
222 var symbolicNameMatches = true;
223 if (!device.isEmpty()) {
224 deviceMatches = device.contains(timeseries.getDevice());
225 }
226 if (!location.isEmpty()) {
227 locatioMatches = location.contains(timeseries.getLocation());
228 }
229 if (!symName.isEmpty()) {
230 symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
231 }
232 return deviceMatches && locatioMatches && symbolicNameMatches;
233 }
234 }