1 package de.dlr.shepard.influxDB;
2
3 import de.dlr.shepard.exceptions.InvalidBodyException;
4 import io.micrometer.core.annotation.Timed;
5 import jakarta.enterprise.context.RequestScoped;
6 import jakarta.inject.Inject;
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.UUID;
13 import java.util.concurrent.ConcurrentLinkedQueue;
14
15 @RequestScoped
16 public class TimeseriesService {
17
18 private InfluxDBConnector influxConnector;
19 private CsvConverter csvConverter;
20
21 TimeseriesService() {}
22
23 @Inject
24 public TimeseriesService(InfluxDBConnector influxConnector, CsvConverter csvConverter) {
25 this.influxConnector = influxConnector;
26 this.csvConverter = csvConverter;
27 }
28
29
30
31
32
33
34
35
36 @Timed(value = "shepard.timeseries.service")
37 public String createTimeseries(String database, TimeseriesPayload payload) {
38 String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
39 if (!sanityCheck.isBlank()) throw new InvalidBodyException(sanityCheck);
40 if (!influxConnector.databaseExist(database)) {
41 return String.format("The database %s does not exist", database);
42 }
43 return influxConnector.saveTimeseriesPayload(database, payload);
44 }
45
46
47
48
49
50
51
52
53
54
55
56
57
58 @Timed(value = "shepard.timeseries.service")
59 public TimeseriesPayload getTimeseriesPayload(
60 long startTimeStamp,
61 long endTimeStamp,
62 String database,
63 Timeseries timeseries,
64 SingleValuedUnaryFunction function,
65 Long groupBy,
66 FillOption fillOption
67 ) {
68 TimeseriesPayload payload = influxConnector.getTimeseriesPayload(
69 startTimeStamp,
70 endTimeStamp,
71 database,
72 timeseries,
73 function,
74 groupBy,
75 fillOption
76 );
77 return payload;
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 @Timed(value = "shepard.timeseries.service")
97 public List<TimeseriesPayload> getTimeseriesPayloadList(
98 long start,
99 long end,
100 String database,
101 List<Timeseries> timeseriesList,
102 SingleValuedUnaryFunction function,
103 Long groupBy,
104 FillOption fillOption,
105 Set<String> devicesFilterSet,
106 Set<String> locationsFilterSet,
107 Set<String> symbolicNameFilterSet
108 ) {
109 var timeseriesPayloadQueue = new ConcurrentLinkedQueue<TimeseriesPayload>();
110 timeseriesList
111 .parallelStream()
112 .forEach(timeseries -> {
113 TimeseriesPayload payload = null;
114 if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
115 payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
116 }
117 if (payload != null) {
118 timeseriesPayloadQueue.add(payload);
119 }
120 });
121 return new ArrayList<>(timeseriesPayloadQueue);
122 }
123
124
125
126
127
128
129
130 @Timed(value = "shepard.timeseries.service")
131 public List<Timeseries> getTimeseriesAvailable(String database) {
132 return influxConnector.getTimeseriesAvailable(database);
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 @Timed(value = "shepard.timeseries.service")
153 public InputStream exportTimeseriesPayload(
154 long start,
155 long end,
156 String database,
157 List<Timeseries> timeseriesList,
158 SingleValuedUnaryFunction function,
159 Long groupBy,
160 FillOption fillOption,
161 Set<String> devicesFilterSet,
162 Set<String> locationsFilterSet,
163 Set<String> symbolicNameFilterSet
164 ) throws IOException {
165 var payload = getTimeseriesPayloadList(
166 start,
167 end,
168 database,
169 timeseriesList,
170 function,
171 groupBy,
172 fillOption,
173 devicesFilterSet,
174 locationsFilterSet,
175 symbolicNameFilterSet
176 );
177 var stream = csvConverter.convertToCsv(payload);
178 return stream;
179 }
180
181
182
183
184
185
186
187
188
189 @Timed(value = "shepard.timeseries.service")
190 public String importTimeseries(String database, InputStream stream) throws IOException {
191 List<String> errors = new ArrayList<>();
192 var timeseriesList = csvConverter.convertToPayload(stream);
193 for (var timeseries : timeseriesList) {
194 var error = createTimeseries(database, timeseries);
195 if (!error.isBlank()) {
196 errors.add(error);
197 }
198 }
199 return String.join(", ", errors);
200 }
201
202
203
204
205
206
207 @Timed(value = "shepard.timeseries.service")
208 public String createDatabase() {
209 String name = UUID.randomUUID().toString();
210 influxConnector.createDatabase(name);
211 return name;
212 }
213
214 @Timed(value = "shepard.timeseries.service")
215 public void deleteDatabase(String database) {
216 influxConnector.deleteDatabase(database);
217 }
218
219 private boolean matchFilter(Timeseries timeseries, Set<String> device, Set<String> location, Set<String> symName) {
220 var deviceMatches = true;
221 var locatioMatches = true;
222 var symbolicNameMatches = true;
223 if (!device.isEmpty()) {
224 deviceMatches = device.contains(timeseries.getDevice());
225 }
226 if (!location.isEmpty()) {
227 locatioMatches = location.contains(timeseries.getLocation());
228 }
229 if (!symName.isEmpty()) {
230 symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
231 }
232 return deviceMatches && locatioMatches && symbolicNameMatches;
233 }
234 }