1 package de.dlr.shepard.data.timeseries.migration.services;
2
3 import de.dlr.shepard.common.util.JsonConverter;
4 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxDBConnector;
5 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxSingleValuedUnaryFunction;
6 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
7 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
8 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
9 import de.dlr.shepard.data.timeseries.migration.model.MigrationState;
10 import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskEntity;
11 import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskState;
12 import de.dlr.shepard.data.timeseries.migration.repositories.MigrationTaskRepository;
13 import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
14 import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
15 import de.dlr.shepard.data.timeseries.services.TimeseriesContainerService;
16 import io.quarkus.logging.Log;
17 import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
18 import jakarta.enterprise.context.RequestScoped;
19 import jakarta.inject.Inject;
20 import jakarta.transaction.Transactional;
21 import jakarta.transaction.Transactional.TxType;
22 import java.time.Instant;
23 import java.util.ArrayList;
24 import java.util.Date;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Queue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.CompletionService;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.ExecutorCompletionService;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38 import java.util.stream.Collectors;
39 import lombok.Getter;
40 import org.apache.commons.lang3.StringUtils;
41 import org.eclipse.microprofile.config.inject.ConfigProperty;
42 import org.eclipse.microprofile.context.ManagedExecutor;
43
44 @RequestScoped
45 public class TimeseriesMigrationService {
46
47 private TimeseriesContainerService timeseriesContainerService;
48 private MigrationTaskRepository migrationTaskRepository;
49 private InfluxDBConnector influxConnector;
50 private InfluxTimeseriesService influxTimeseriesService;
51 private TimeseriesDataPointRepository timeseriesDataPointRepository;
52
53 private PayloadReader payloadReader;
54 private PayloadWriter payloadWriter;
55 private CompressionRunner compressionRunner;
56
57 @Inject
58 ManagedExecutor executor;
59
60 int numberOfReaderThreads;
61
62 int numberOfWriterThreads;
63
64 @Getter
65 private int numberOfPointsBeforeCompression;
66
67 private final BlockingQueue<PayloadWriteTask> payloadWriteQueue;
68 private final BlockingQueue<CompressionTask> compressionTasksQueue;
69 private final Queue<PayloadReadTask> payloadReadQueue;
70 private final ReentrantReadWriteLock readWriteLock;
71
72 @Getter
73 private final AtomicInteger insertionCount = new AtomicInteger();
74
75 public Queue<PayloadReadTask> getPayloadReadQueue() {
76 return payloadReadQueue;
77 }
78
79 public int getPayloadReadQueueSize() {
80 synchronized (payloadReadQueue) {
81 return payloadReadQueue.size();
82 }
83 }
84
85 public BlockingQueue<CompressionTask> getCompressionTasksQueue() {
86 return compressionTasksQueue;
87 }
88
89 public ReentrantReadWriteLock getReadWriteLock() {
90 return readWriteLock;
91 }
92
93 public BlockingQueue<PayloadWriteTask> getPayloadWriteQueue() {
94 return payloadWriteQueue;
95 }
96
97 @ConfigProperty(name = "shepard.migration-mode.timeseries-slice-duration", defaultValue = "600000000000")
98 long sliceDuration;
99
100 @Inject
101 TimeseriesMigrationService(
102 TimeseriesContainerService timeseriesContainerService,
103 MigrationTaskRepository migrationTaskRepository,
104 InfluxDBConnector influxConnector,
105 InfluxTimeseriesService influxTimeseriesService,
106 TimeseriesDataPointRepository timeseriesDataPointRepository,
107 PayloadReader payloadReader,
108 PayloadWriter payloadWriter,
109 CompressionRunner compressionRunner,
110 @ConfigProperty(
111 name = "shepard.migration-mode.number-of-writer-threads",
112 defaultValue = "6"
113 ) int numberOfWriterThreads,
114 @ConfigProperty(
115 name = "shepard.migration-mode.number-of-reader-threads",
116 defaultValue = "4"
117 ) int numberOfReaderThreads,
118 @ConfigProperty(
119 name = "shepard.migration-mode.number-of-points-before-compression",
120 defaultValue = "20000000"
121 ) int numberOfPointsBeforeCompression
122 ) {
123 this.timeseriesContainerService = timeseriesContainerService;
124 this.migrationTaskRepository = migrationTaskRepository;
125 this.influxConnector = influxConnector;
126 this.influxTimeseriesService = influxTimeseriesService;
127 this.timeseriesDataPointRepository = timeseriesDataPointRepository;
128 this.payloadReader = payloadReader;
129 this.payloadWriter = payloadWriter;
130 this.compressionRunner = compressionRunner;
131 this.numberOfWriterThreads = numberOfWriterThreads;
132 this.numberOfReaderThreads = numberOfReaderThreads;
133 this.numberOfPointsBeforeCompression = numberOfPointsBeforeCompression;
134
135 payloadWriteQueue = new LinkedBlockingQueue<>(numberOfWriterThreads + 1);
136 compressionTasksQueue = new LinkedBlockingQueue<>();
137 payloadReadQueue = new LinkedList<>();
138 readWriteLock = new ReentrantReadWriteLock();
139 }
140
141 public List<MigrationTaskEntity> getMigrationTasks(boolean onlyShowErrors) {
142 var tasks = migrationTaskRepository.findAll().list();
143 if (onlyShowErrors) tasks = tasks.stream().filter(t -> t.getErrors().size() > 0).toList();
144 return tasks;
145 }
146
147 public MigrationState getMigrationState() {
148 var containerIds = getExistingContainerIds();
149 var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
150 if (containerIdsToMigrate.size() > 0) {
151 Log.infof(
152 "Migration is necessary because there are %s containers that do not have a MigrationTask yet.",
153 containerIdsToMigrate.size()
154 );
155 return MigrationState.Needed;
156 }
157
158 var notFinishedState = migrationTaskRepository.find("state <> 'Finished'").count();
159 if (notFinishedState > 0) {
160 Log.infof(
161 "Migration is necessary because there are %s MigrationTasks that do not have state 'Finished'.",
162 notFinishedState
163 );
164 return MigrationState.HasErrors;
165 }
166
167 var finishedState = migrationTaskRepository.find("state = 'Finished'").list();
168 var finishedWithErrors = finishedState.stream().filter(t -> t.getErrors().size() > 0).toList();
169 if (finishedWithErrors.size() > 0) {
170 Log.infof(
171 "Migration necessary because there are %s MigrationTasks that have an error.",
172 finishedWithErrors.size()
173 );
174 return MigrationState.HasErrors;
175 }
176
177 return MigrationState.NotNeeded;
178 }
179
180 public void runMigrations() {
181 createMigrationTaskForEachContainer();
182
183 Log.info("Starting migrations...");
184 Log.info("To check the current migration state call the REST endpoint [/temp/migrations/state].");
185
186 while (true) {
187 var tasks = getUnfinishedMigrationTasks();
188 if (tasks.isEmpty()) {
189 Log.info("No migration tasks left. Migration finished.");
190 break;
191 }
192 int cnt = 0;
193 var size = tasks.size();
194
195 for (var task : tasks) {
196 cnt++;
197 try {
198 Log.infof("Starting migration task %s of %s (so far)", cnt, size);
199 migrateTask(task);
200 compressAllDataPoints();
201 } catch (Exception ex) {
202 Log.errorf("Exception occurred during migration of container %s: %s", task.getContainerId(), ex.getMessage());
203 persistError(task, ex.getMessage());
204 }
205 }
206
207 try {
208
209
210 Thread.sleep(10000);
211 } catch (InterruptedException e) {
212 Log.errorf("Thread interrupted: %s", e.getMessage());
213 }
214 }
215 }
216
217 @Transactional(value = TxType.REQUIRES_NEW)
218 @TransactionConfiguration(
219 timeoutFromConfigProperty = "shepard.migration-mode.compression.transaction-timeout",
220 timeout = 6000
221 )
222 public void compressAllDataPoints() {
223 Log.info("Starting compression of timeseries data point table...");
224 timeseriesDataPointRepository.compressAllChunks();
225 Log.info("Finished compression of timeseries data point table.");
226 insertionCount.set(0);
227 }
228
229 protected void migrateTask(MigrationTaskEntity task) throws Exception {
230 setStateToRunning(task);
231 Log.infof("Start with migration of container %s now.", task.getContainerId());
232
233 var container = timeseriesContainerService.getContainerNoChecks(task.getContainerId());
234
235 var databaseName = task.getDatabaseName();
236 if (doesDatabaseExist(databaseName) == false) {
237 setStateToFinishedAndRemoveErrors(task);
238 return;
239 }
240
241 InfluxTimeseries influxTimeseries = getTimeseries(task);
242 if (influxTimeseries != null) {
243 Log.infof(
244 "Starting migration of timeseries %s for container %s",
245 influxTimeseries.getUniqueId(),
246 task.getContainerId()
247 );
248 var influxTimeseriesDataType = influxConnector.getTimeseriesDataType(
249 databaseName,
250 influxTimeseries.getMeasurement(),
251 influxTimeseries.getField()
252 );
253
254 migratePayloads(container, databaseName, influxTimeseries, influxTimeseriesDataType);
255 }
256
257 setStateToFinishedAndRemoveErrors(task);
258 Log.infof("Finished migration of container %s", task.getContainerId());
259 }
260
261 private InfluxTimeseries getTimeseries(MigrationTaskEntity migrationTaskEntity) {
262 String timeseriesJson = migrationTaskEntity.getTimeseries();
263 return JsonConverter.convertToObject(timeseriesJson, InfluxTimeseries.class);
264 }
265
266 private List<MigrationTaskEntity> getUnfinishedMigrationTasks() {
267 return migrationTaskRepository.find("state <> 'Finished' OR errors <> ''").list();
268 }
269
270
271
272
273
274 @Transactional(Transactional.TxType.REQUIRES_NEW)
275 protected void createMigrationTaskForEachContainer() {
276 var containerIds = getExistingContainerIds();
277 var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
278 var tasks = createMigrationTasksForContainers(containerIdsToMigrate);
279 storeTasksInDatabase(tasks);
280 }
281
282
283
284
285
286
287
288
289
290
291
292 private List<Long> getExistingContainerIds() {
293 var existingContainerIds = timeseriesContainerService
294 .getContainers()
295 .stream()
296 .filter(c -> StringUtils.isNotEmpty(c.getDatabase()))
297 .map(c -> c.getId())
298 .toList();
299
300 Log.infof(
301 "We found %s containers in neo4j that are not deleted and have a database prop that is not empty.",
302 existingContainerIds.size()
303 );
304 return existingContainerIds;
305 }
306
307 private List<MigrationTaskEntity> createMigrationTasksForContainers(List<Long> containerIds) {
308 return containerIds
309 .stream()
310 .flatMap(containerId -> {
311 var container = timeseriesContainerService.getContainerNoChecks(containerId);
312
313 List<MigrationTaskEntity> migrationTasks = new ArrayList<MigrationTaskEntity>();
314 var databaseName = container.getDatabase();
315 if (doesDatabaseExist(databaseName) == false) {
316 Log.warnf("influxdb %s does not exist.", databaseName);
317 var task = new MigrationTaskEntity(containerId);
318 migrationTasks.add(task);
319 return migrationTasks.stream();
320 }
321
322 var timeseriesAvailable = influxConnector.getTimeseriesAvailable(databaseName);
323 if (timeseriesAvailable.size() == 0) {
324 Log.warnf("No timeseries available for container %s", containerId);
325 var task = new MigrationTaskEntity(containerId);
326 task.setDatabaseName(databaseName);
327 migrationTasks.add(task);
328 }
329
330 for (int i = 0; i < timeseriesAvailable.size(); i++) {
331 MigrationTaskEntity migrationTaskEntity = new MigrationTaskEntity(containerId);
332 InfluxTimeseries influxTimeseries = timeseriesAvailable.get(i);
333 migrationTaskEntity.setTimeseries(JsonConverter.convertToString(influxTimeseries));
334 migrationTaskEntity.setDatabaseName(databaseName);
335 migrationTasks.add(migrationTaskEntity);
336 }
337 return migrationTasks.stream();
338 })
339 .toList();
340 }
341
342 private void storeTasksInDatabase(List<MigrationTaskEntity> tasks) {
343 migrationTaskRepository.persist(tasks);
344 Log.infof("We created %s migration tasks.", tasks.size());
345 }
346
347 private List<Long> getContainerIdsThatDoNotHaveMigrationTaskYet(List<Long> allContainerIds) {
348 var alreadyHandledContainerIds = migrationTaskRepository
349 .findAll()
350 .stream()
351 .map(t -> t.getContainerId())
352 .distinct()
353 .toList();
354 var containerIdsLeft = allContainerIds.stream().filter(t -> !alreadyHandledContainerIds.contains(t)).toList();
355
356 Log.infof(
357 "We found %s containers that are already handled and %s containers that are not handled yet.",
358 alreadyHandledContainerIds.size(),
359 containerIdsLeft.size()
360 );
361
362 Log.info(String.join(", ", containerIdsLeft.stream().map(String::valueOf).toList()));
363 return containerIdsLeft;
364 }
365
366 @Transactional(Transactional.TxType.REQUIRES_NEW)
367 protected void setStateToRunning(MigrationTaskEntity task) {
368 task.setStartedAt(new Date());
369 task.setState(MigrationTaskState.Running);
370 this.migrationTaskRepository.getEntityManager().merge(task);
371 }
372
373 @Transactional(Transactional.TxType.REQUIRES_NEW)
374 protected void setStateToFinishedAndRemoveErrors(MigrationTaskEntity task) {
375 task.setFinishedAt(new Date());
376 task.setState(MigrationTaskState.Finished);
377 task.setErrors(new ArrayList<>());
378 this.migrationTaskRepository.getEntityManager().merge(task);
379 }
380
381 @Transactional(Transactional.TxType.REQUIRES_NEW)
382 protected void persistError(MigrationTaskEntity task, String errorMessage) {
383 task.addError(errorMessage);
384 task.setState(MigrationTaskState.Finished);
385 task.setFinishedAt(new Date());
386 this.migrationTaskRepository.getEntityManager().merge(task);
387 }
388
389
390
391
392 private boolean doesDatabaseExist(String databaseName) {
393 if (influxConnector.databaseExist(databaseName) == false) {
394 Log.warnf(
395 "InfluxDB with name %s does not exist. Migration not possible. We do not treat that as an error.",
396 databaseName
397 );
398 return false;
399 }
400 return true;
401 }
402
403
404
405
406 private long getFirstTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
407 return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.FIRST);
408 }
409
410
411
412
413 private long getLastTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
414 return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.LAST);
415 }
416
417 private long getTimestampOfPayload(
418 String database,
419 InfluxTimeseries influxTimeseries,
420 InfluxSingleValuedUnaryFunction function
421 ) {
422 var payload =
423 this.influxTimeseriesService.getTimeseriesPayload(
424 0,
425 Instant.now().getEpochSecond() * 1_000_000_000,
426 database,
427 influxTimeseries,
428 function,
429 null,
430 null
431 );
432 return (payload.getPoints().size() > 0) ? payload.getPoints().get(0).getTimeInNanoseconds() : 0;
433 }
434
435
436
437
438
439
440
441 private void migratePayloads(
442 TimeseriesContainer container,
443 String databaseName,
444 InfluxTimeseries influxTimeseries,
445 InfluxTimeseriesDataType influxTimeseriesDataType
446 ) throws Exception {
447 var firstTimestamp = getFirstTimestampOfPayload(databaseName, influxTimeseries);
448 var lastTimestamp = getLastTimestampOfPayload(databaseName, influxTimeseries);
449 Log.infof(
450 "Doing migration from timestamp %s to %s of container %s",
451 firstTimestamp,
452 lastTimestamp,
453 container.getId()
454 );
455
456 long currentStartTimestamp = firstTimestamp - 1;
457
458 payloadReadQueue.clear();
459 int runningNumber = 1;
460 while (currentStartTimestamp < lastTimestamp) {
461 long currentEndTimestamp = Math.min(currentStartTimestamp + sliceDuration, lastTimestamp);
462
463 payloadReadQueue.add(
464 new PayloadReadTask(
465 runningNumber++,
466 currentStartTimestamp,
467 currentEndTimestamp,
468 influxTimeseries,
469 container,
470 databaseName,
471 influxTimeseriesDataType,
472 false
473 )
474 );
475 currentStartTimestamp = currentEndTimestamp;
476 }
477
478 for (int i = 0; i < numberOfReaderThreads; i++) {
479 payloadReadQueue.add(PayloadReadTask.poisonPill);
480 }
481 Log.infof("Finished preparing read queue of %s read tasks.", payloadReadQueue.size());
482
483 payloadWriteQueue.clear();
484 compressionTasksQueue.clear();
485
486 List<Callable<Object>> tasks = new ArrayList<>();
487
488 Log.debug("Creating writers...");
489 for (int i = 0; i < numberOfWriterThreads; i++) {
490 tasks.add(payloadWriter);
491 }
492
493 Log.debug("Creating readers...");
494 for (int i = 0; i < numberOfReaderThreads; i++) {
495 tasks.add(payloadReader);
496 }
497
498 tasks.add(compressionRunner);
499
500 Log.infof(
501 "Starting migration with %s reader threads and %s writer threads, compressing each %s points ...",
502 numberOfReaderThreads,
503 numberOfWriterThreads,
504 numberOfPointsBeforeCompression
505 );
506 CompletionService<Object> completionService = new ExecutorCompletionService<>(executor);
507 var plannedFutures = tasks.stream().map(completionService::submit).collect(Collectors.toCollection(ArrayList::new));
508 try {
509 for (int i = 0; i < tasks.size(); i++) {
510 Future<Object> future = completionService.take();
511 plannedFutures.remove(future);
512 future.get();
513 }
514 } catch (InterruptedException | ExecutionException e) {
515
516 for (var future : plannedFutures) {
517 future.cancel(true);
518 }
519
520 Log.info("Waiting for executor task to terminate ...");
521 if (!executor.awaitTermination(300, TimeUnit.SECONDS)) {
522 Log.error("The started executor threads did not terminate within 300s. This can cause undefined behaviour.");
523 }
524
525 Log.errorf("Error while executing tasks in parallel.", e.getMessage());
526 throw new Exception(e.getMessage());
527 } finally {
528 payloadWriteQueue.clear();
529 }
530 }
531
532 void addWriterPoisonPills() {
533 try {
534 for (int i = 0; i < numberOfWriterThreads; i++) {
535 getPayloadWriteQueue().put(PayloadWriteTask.poisonPill);
536 }
537 } catch (InterruptedException e) {
538 Log.errorf("Payload write queue interrupted.", e.getMessage());
539 Thread.currentThread().interrupt();
540 }
541 }
542
543 void addCompressionPoisonPills() {
544 try {
545
546 compressionTasksQueue.clear();
547 compressionTasksQueue.put(new CompressionTask(true));
548 } catch (InterruptedException e) {
549 Log.errorf("Compression task queue interrupted.", e.getMessage());
550 Thread.currentThread().interrupt();
551 }
552 }
553
554 public void addCompressionTask() {
555 try {
556
557 if (!compressionTasksQueue.isEmpty()) {
558 Log.info("Did not add a duplicate compression task");
559 return;
560 }
561 compressionTasksQueue.put(new CompressionTask(false));
562 } catch (InterruptedException e) {
563 Log.errorf("Compression task queue interrupted.", e.getMessage());
564 Thread.currentThread().interrupt();
565 }
566 }
567 }