View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.services;
2   
3   import de.dlr.shepard.common.util.JsonConverter;
4   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxDBConnector;
5   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxSingleValuedUnaryFunction;
6   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
7   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
8   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
9   import de.dlr.shepard.data.timeseries.migration.model.MigrationState;
10  import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskEntity;
11  import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskState;
12  import de.dlr.shepard.data.timeseries.migration.repositories.MigrationTaskRepository;
13  import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
14  import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
15  import de.dlr.shepard.data.timeseries.services.TimeseriesContainerService;
16  import io.quarkus.logging.Log;
17  import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
18  import jakarta.enterprise.context.RequestScoped;
19  import jakarta.inject.Inject;
20  import jakarta.transaction.Transactional;
21  import jakarta.transaction.Transactional.TxType;
22  import java.time.Instant;
23  import java.util.ArrayList;
24  import java.util.Date;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Queue;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.CompletionService;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorCompletionService;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.LinkedBlockingQueue;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.concurrent.locks.ReentrantReadWriteLock;
38  import java.util.stream.Collectors;
39  import lombok.Getter;
40  import org.apache.commons.lang3.StringUtils;
41  import org.eclipse.microprofile.config.inject.ConfigProperty;
42  import org.eclipse.microprofile.context.ManagedExecutor;
43  
44  @RequestScoped
45  public class TimeseriesMigrationService {
46  
47    private TimeseriesContainerService timeseriesContainerService;
48    private MigrationTaskRepository migrationTaskRepository;
49    private InfluxDBConnector influxConnector;
50    private InfluxTimeseriesService influxTimeseriesService;
51    private TimeseriesDataPointRepository timeseriesDataPointRepository;
52  
53    private PayloadReader payloadReader;
54    private PayloadWriter payloadWriter;
55    private CompressionRunner compressionRunner;
56  
57    @Inject
58    ManagedExecutor executor;
59  
60    int numberOfReaderThreads;
61  
62    int numberOfWriterThreads;
63  
64    @Getter
65    private int numberOfPointsBeforeCompression;
66  
67    private final BlockingQueue<PayloadWriteTask> payloadWriteQueue;
68    private final BlockingQueue<CompressionTask> compressionTasksQueue;
69    private final Queue<PayloadReadTask> payloadReadQueue;
70    private final ReentrantReadWriteLock readWriteLock;
71  
72    @Getter
73    private final AtomicInteger insertionCount = new AtomicInteger();
74  
75    public Queue<PayloadReadTask> getPayloadReadQueue() {
76      return payloadReadQueue;
77    }
78  
79    public int getPayloadReadQueueSize() {
80      synchronized (payloadReadQueue) {
81        return payloadReadQueue.size();
82      }
83    }
84  
85    public BlockingQueue<CompressionTask> getCompressionTasksQueue() {
86      return compressionTasksQueue;
87    }
88  
89    public ReentrantReadWriteLock getReadWriteLock() {
90      return readWriteLock;
91    }
92  
93    public BlockingQueue<PayloadWriteTask> getPayloadWriteQueue() {
94      return payloadWriteQueue;
95    }
96  
97    @ConfigProperty(name = "shepard.migration-mode.timeseries-slice-duration", defaultValue = "600000000000")
98    long sliceDuration;
99  
100   @Inject
101   TimeseriesMigrationService(
102     TimeseriesContainerService timeseriesContainerService,
103     MigrationTaskRepository migrationTaskRepository,
104     InfluxDBConnector influxConnector,
105     InfluxTimeseriesService influxTimeseriesService,
106     TimeseriesDataPointRepository timeseriesDataPointRepository,
107     PayloadReader payloadReader,
108     PayloadWriter payloadWriter,
109     CompressionRunner compressionRunner,
110     @ConfigProperty(
111       name = "shepard.migration-mode.number-of-writer-threads",
112       defaultValue = "6"
113     ) int numberOfWriterThreads,
114     @ConfigProperty(
115       name = "shepard.migration-mode.number-of-reader-threads",
116       defaultValue = "4"
117     ) int numberOfReaderThreads,
118     @ConfigProperty(
119       name = "shepard.migration-mode.number-of-points-before-compression",
120       defaultValue = "20000000"
121     ) int numberOfPointsBeforeCompression
122   ) {
123     this.timeseriesContainerService = timeseriesContainerService;
124     this.migrationTaskRepository = migrationTaskRepository;
125     this.influxConnector = influxConnector;
126     this.influxTimeseriesService = influxTimeseriesService;
127     this.timeseriesDataPointRepository = timeseriesDataPointRepository;
128     this.payloadReader = payloadReader;
129     this.payloadWriter = payloadWriter;
130     this.compressionRunner = compressionRunner;
131     this.numberOfWriterThreads = numberOfWriterThreads;
132     this.numberOfReaderThreads = numberOfReaderThreads;
133     this.numberOfPointsBeforeCompression = numberOfPointsBeforeCompression;
134 
135     payloadWriteQueue = new LinkedBlockingQueue<>(numberOfWriterThreads + 1);
136     compressionTasksQueue = new LinkedBlockingQueue<>();
137     payloadReadQueue = new LinkedList<>();
138     readWriteLock = new ReentrantReadWriteLock();
139   }
140 
141   public List<MigrationTaskEntity> getMigrationTasks(boolean onlyShowErrors) {
142     var tasks = migrationTaskRepository.findAll().list();
143     if (onlyShowErrors) tasks = tasks.stream().filter(t -> t.getErrors().size() > 0).toList();
144     return tasks;
145   }
146 
147   public MigrationState getMigrationState() {
148     var containerIds = getExistingContainerIds();
149     var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
150     if (containerIdsToMigrate.size() > 0) {
151       Log.infof(
152         "Migration is necessary because there are %s containers that do not have a MigrationTask yet.",
153         containerIdsToMigrate.size()
154       );
155       return MigrationState.Needed;
156     }
157 
158     var notFinishedState = migrationTaskRepository.find("state <> 'Finished'").count();
159     if (notFinishedState > 0) {
160       Log.infof(
161         "Migration is necessary because there are %s MigrationTasks that do not have state 'Finished'.",
162         notFinishedState
163       );
164       return MigrationState.HasErrors;
165     }
166 
167     var finishedState = migrationTaskRepository.find("state = 'Finished'").list();
168     var finishedWithErrors = finishedState.stream().filter(t -> t.getErrors().size() > 0).toList();
169     if (finishedWithErrors.size() > 0) {
170       Log.infof(
171         "Migration necessary because there are %s MigrationTasks that have an error.",
172         finishedWithErrors.size()
173       );
174       return MigrationState.HasErrors;
175     }
176 
177     return MigrationState.NotNeeded;
178   }
179 
180   public void runMigrations() {
181     createMigrationTaskForEachContainer();
182 
183     Log.info("Starting migrations...");
184     Log.info("To check the current migration state call the REST endpoint [/temp/migrations/state].");
185 
186     while (true) {
187       var tasks = getUnfinishedMigrationTasks();
188       if (tasks.isEmpty()) {
189         Log.info("No migration tasks left. Migration finished.");
190         break;
191       }
192       int cnt = 0;
193       var size = tasks.size();
194 
195       for (var task : tasks) {
196         cnt++;
197         try {
198           Log.infof("Starting migration task %s of %s (so far)", cnt, size);
199           migrateTask(task);
200           compressAllDataPoints();
201         } catch (Exception ex) {
202           Log.errorf("Exception occurred during migration of container %s: %s", task.getContainerId(), ex.getMessage());
203           persistError(task, ex.getMessage());
204         }
205       }
206 
207       try {
208         // Wait some time before checking for new migration tasks to give the system
209         // some time to recover
210         Thread.sleep(10000);
211       } catch (InterruptedException e) {
212         Log.errorf("Thread interrupted: %s", e.getMessage());
213       }
214     }
215   }
216 
217   @Transactional(value = TxType.REQUIRES_NEW)
218   @TransactionConfiguration(
219     timeoutFromConfigProperty = "shepard.migration-mode.compression.transaction-timeout",
220     timeout = 6000
221   )
222   public void compressAllDataPoints() {
223     Log.info("Starting compression of timeseries data point table...");
224     timeseriesDataPointRepository.compressAllChunks();
225     Log.info("Finished compression of timeseries data point table.");
226     insertionCount.set(0);
227   }
228 
229   protected void migrateTask(MigrationTaskEntity task) throws Exception {
230     setStateToRunning(task);
231     Log.infof("Start with migration of container %s now.", task.getContainerId());
232 
233     var container = timeseriesContainerService.getContainerNoChecks(task.getContainerId());
234 
235     var databaseName = task.getDatabaseName();
236     if (doesDatabaseExist(databaseName) == false) {
237       setStateToFinishedAndRemoveErrors(task);
238       return;
239     }
240 
241     InfluxTimeseries influxTimeseries = getTimeseries(task);
242     if (influxTimeseries != null) {
243       Log.infof(
244         "Starting migration of timeseries %s for container %s",
245         influxTimeseries.getUniqueId(),
246         task.getContainerId()
247       );
248       var influxTimeseriesDataType = influxConnector.getTimeseriesDataType(
249         databaseName,
250         influxTimeseries.getMeasurement(),
251         influxTimeseries.getField()
252       );
253 
254       migratePayloads(container, databaseName, influxTimeseries, influxTimeseriesDataType);
255     }
256 
257     setStateToFinishedAndRemoveErrors(task);
258     Log.infof("Finished migration of container %s", task.getContainerId());
259   }
260 
261   private InfluxTimeseries getTimeseries(MigrationTaskEntity migrationTaskEntity) {
262     String timeseriesJson = migrationTaskEntity.getTimeseries();
263     return JsonConverter.convertToObject(timeseriesJson, InfluxTimeseries.class);
264   }
265 
266   private List<MigrationTaskEntity> getUnfinishedMigrationTasks() {
267     return migrationTaskRepository.find("state <> 'Finished' OR errors <> ''").list();
268   }
269 
270   /**
271    * Get all containers in neo4j that are not deleted.
272    * Create a MigrationTask for each container that does not already have one.
273    */
274   @Transactional(Transactional.TxType.REQUIRES_NEW)
275   protected void createMigrationTaskForEachContainer() {
276     var containerIds = getExistingContainerIds();
277     var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
278     var tasks = createMigrationTasksForContainers(containerIdsToMigrate);
279     storeTasksInDatabase(tasks);
280   }
281 
282   /**
283    * Get ids of all containers from neo4j that are not deleted and do not have a
284    * database name.
285    * Hint: Containers that have a database name are Timeseries containers stored
286    * in influxdb.
287    * If the database prop is empty, it is a Timeseries container stored in
288    * TimescaleDB.
289    *
290    * @return
291    */
292   private List<Long> getExistingContainerIds() {
293     var existingContainerIds = timeseriesContainerService
294       .getContainers()
295       .stream()
296       .filter(c -> StringUtils.isNotEmpty(c.getDatabase()))
297       .map(c -> c.getId())
298       .toList();
299 
300     Log.infof(
301       "We found %s containers in neo4j that are not deleted and have a database prop that is not empty.",
302       existingContainerIds.size()
303     );
304     return existingContainerIds;
305   }
306 
307   private List<MigrationTaskEntity> createMigrationTasksForContainers(List<Long> containerIds) {
308     return containerIds
309       .stream()
310       .flatMap(containerId -> {
311         var container = timeseriesContainerService.getContainerNoChecks(containerId);
312 
313         List<MigrationTaskEntity> migrationTasks = new ArrayList<MigrationTaskEntity>();
314         var databaseName = container.getDatabase();
315         if (doesDatabaseExist(databaseName) == false) {
316           Log.warnf("influxdb %s does not exist.", databaseName);
317           var task = new MigrationTaskEntity(containerId);
318           migrationTasks.add(task);
319           return migrationTasks.stream();
320         }
321 
322         var timeseriesAvailable = influxConnector.getTimeseriesAvailable(databaseName);
323         if (timeseriesAvailable.size() == 0) {
324           Log.warnf("No timeseries available for container %s", containerId);
325           var task = new MigrationTaskEntity(containerId);
326           task.setDatabaseName(databaseName);
327           migrationTasks.add(task);
328         }
329 
330         for (int i = 0; i < timeseriesAvailable.size(); i++) {
331           MigrationTaskEntity migrationTaskEntity = new MigrationTaskEntity(containerId);
332           InfluxTimeseries influxTimeseries = timeseriesAvailable.get(i);
333           migrationTaskEntity.setTimeseries(JsonConverter.convertToString(influxTimeseries));
334           migrationTaskEntity.setDatabaseName(databaseName);
335           migrationTasks.add(migrationTaskEntity);
336         }
337         return migrationTasks.stream();
338       })
339       .toList();
340   }
341 
342   private void storeTasksInDatabase(List<MigrationTaskEntity> tasks) {
343     migrationTaskRepository.persist(tasks);
344     Log.infof("We created %s migration tasks.", tasks.size());
345   }
346 
347   private List<Long> getContainerIdsThatDoNotHaveMigrationTaskYet(List<Long> allContainerIds) {
348     var alreadyHandledContainerIds = migrationTaskRepository
349       .findAll()
350       .stream()
351       .map(t -> t.getContainerId())
352       .distinct()
353       .toList();
354     var containerIdsLeft = allContainerIds.stream().filter(t -> !alreadyHandledContainerIds.contains(t)).toList();
355 
356     Log.infof(
357       "We found %s containers that are already handled and %s containers that are not handled yet.",
358       alreadyHandledContainerIds.size(),
359       containerIdsLeft.size()
360     );
361 
362     Log.info(String.join(", ", containerIdsLeft.stream().map(String::valueOf).toList()));
363     return containerIdsLeft;
364   }
365 
366   @Transactional(Transactional.TxType.REQUIRES_NEW)
367   protected void setStateToRunning(MigrationTaskEntity task) {
368     task.setStartedAt(new Date());
369     task.setState(MigrationTaskState.Running);
370     this.migrationTaskRepository.getEntityManager().merge(task);
371   }
372 
373   @Transactional(Transactional.TxType.REQUIRES_NEW)
374   protected void setStateToFinishedAndRemoveErrors(MigrationTaskEntity task) {
375     task.setFinishedAt(new Date());
376     task.setState(MigrationTaskState.Finished);
377     task.setErrors(new ArrayList<>());
378     this.migrationTaskRepository.getEntityManager().merge(task);
379   }
380 
381   @Transactional(Transactional.TxType.REQUIRES_NEW)
382   protected void persistError(MigrationTaskEntity task, String errorMessage) {
383     task.addError(errorMessage);
384     task.setState(MigrationTaskState.Finished);
385     task.setFinishedAt(new Date());
386     this.migrationTaskRepository.getEntityManager().merge(task);
387   }
388 
389   /**
390    * Throws an error if the database with the given name does not exist.
391    */
392   private boolean doesDatabaseExist(String databaseName) {
393     if (influxConnector.databaseExist(databaseName) == false) {
394       Log.warnf(
395         "InfluxDB with name %s does not exist. Migration not possible. We do not treat that as an error.",
396         databaseName
397       );
398       return false;
399     }
400     return true;
401   }
402 
403   /**
404    * Identifies the first record of the timeseries and returns that timestamp.
405    */
406   private long getFirstTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
407     return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.FIRST);
408   }
409 
410   /**
411    * Identifies the last record of the timeseries and returns that timestamp.
412    */
413   private long getLastTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
414     return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.LAST);
415   }
416 
417   private long getTimestampOfPayload(
418     String database,
419     InfluxTimeseries influxTimeseries,
420     InfluxSingleValuedUnaryFunction function
421   ) {
422     var payload =
423       this.influxTimeseriesService.getTimeseriesPayload(
424           0,
425           Instant.now().getEpochSecond() * 1_000_000_000,
426           database,
427           influxTimeseries,
428           function,
429           null,
430           null
431         );
432     return (payload.getPoints().size() > 0) ? payload.getPoints().get(0).getTimeInNanoseconds() : 0;
433   }
434 
435   /**
436    * Copy all payloads from a InfluxTimeseries to the TimeseriesContainer.
437    * This method will try to do the copy in batches based on time based slices.
438    * The slice duration is defined in env.
439    * [shepard.migration-mode.timeseries-slice-duration]
440    */
441   private void migratePayloads(
442     TimeseriesContainer container,
443     String databaseName,
444     InfluxTimeseries influxTimeseries,
445     InfluxTimeseriesDataType influxTimeseriesDataType
446   ) throws Exception {
447     var firstTimestamp = getFirstTimestampOfPayload(databaseName, influxTimeseries);
448     var lastTimestamp = getLastTimestampOfPayload(databaseName, influxTimeseries);
449     Log.infof(
450       "Doing migration from timestamp %s to %s of container %s",
451       firstTimestamp,
452       lastTimestamp,
453       container.getId()
454     );
455 
456     long currentStartTimestamp = firstTimestamp - 1;
457 
458     payloadReadQueue.clear();
459     int runningNumber = 1;
460     while (currentStartTimestamp < lastTimestamp) {
461       long currentEndTimestamp = Math.min(currentStartTimestamp + sliceDuration, lastTimestamp);
462 
463       payloadReadQueue.add(
464         new PayloadReadTask(
465           runningNumber++,
466           currentStartTimestamp,
467           currentEndTimestamp,
468           influxTimeseries,
469           container,
470           databaseName,
471           influxTimeseriesDataType,
472           false
473         )
474       );
475       currentStartTimestamp = currentEndTimestamp;
476     }
477 
478     for (int i = 0; i < numberOfReaderThreads; i++) {
479       payloadReadQueue.add(PayloadReadTask.poisonPill);
480     }
481     Log.infof("Finished preparing read queue of %s read tasks.", payloadReadQueue.size());
482     // Ensure tasks queue is empty
483     payloadWriteQueue.clear();
484     compressionTasksQueue.clear();
485 
486     List<Callable<Object>> tasks = new ArrayList<>();
487 
488     Log.debug("Creating writers...");
489     for (int i = 0; i < numberOfWriterThreads; i++) {
490       tasks.add(payloadWriter);
491     }
492 
493     Log.debug("Creating readers...");
494     for (int i = 0; i < numberOfReaderThreads; i++) {
495       tasks.add(payloadReader);
496     }
497 
498     tasks.add(compressionRunner);
499 
500     Log.infof(
501       "Starting migration with %s reader threads and %s writer threads, compressing each %s points ...",
502       numberOfReaderThreads,
503       numberOfWriterThreads,
504       numberOfPointsBeforeCompression
505     );
506     CompletionService<Object> completionService = new ExecutorCompletionService<>(executor);
507     var plannedFutures = tasks.stream().map(completionService::submit).collect(Collectors.toCollection(ArrayList::new));
508     try {
509       for (int i = 0; i < tasks.size(); i++) {
510         Future<Object> future = completionService.take();
511         plannedFutures.remove(future);
512         future.get();
513       }
514     } catch (InterruptedException | ExecutionException e) {
515       // Cancel remaining futures
516       for (var future : plannedFutures) {
517         future.cancel(true);
518       }
519 
520       Log.info("Waiting for executor task to terminate ...");
521       if (!executor.awaitTermination(300, TimeUnit.SECONDS)) {
522         Log.error("The started executor threads did not terminate within 300s. This can cause undefined behaviour.");
523       }
524 
525       Log.errorf("Error while executing tasks in parallel.", e.getMessage());
526       throw new Exception(e.getMessage());
527     } finally {
528       payloadWriteQueue.clear();
529     }
530   }
531 
532   void addWriterPoisonPills() {
533     try {
534       for (int i = 0; i < numberOfWriterThreads; i++) {
535         getPayloadWriteQueue().put(PayloadWriteTask.poisonPill);
536       }
537     } catch (InterruptedException e) {
538       Log.errorf("Payload write queue interrupted.", e.getMessage());
539       Thread.currentThread().interrupt();
540     }
541   }
542 
543   void addCompressionPoisonPills() {
544     try {
545       // clear compression queue before sending the last task
546       compressionTasksQueue.clear();
547       compressionTasksQueue.put(new CompressionTask(true));
548     } catch (InterruptedException e) {
549       Log.errorf("Compression task queue interrupted.", e.getMessage());
550       Thread.currentThread().interrupt();
551     }
552   }
553 
554   public void addCompressionTask() {
555     try {
556       // Do not add a task if the queue is not empty
557       if (!compressionTasksQueue.isEmpty()) {
558         Log.info("Did not add a duplicate compression task");
559         return;
560       }
561       compressionTasksQueue.put(new CompressionTask(false));
562     } catch (InterruptedException e) {
563       Log.errorf("Compression task queue interrupted.", e.getMessage());
564       Thread.currentThread().interrupt();
565     }
566   }
567 }