View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.services;
2   
3   import io.quarkus.logging.Log;
4   import jakarta.enterprise.context.RequestScoped;
5   import jakarta.inject.Inject;
6   import java.util.concurrent.BlockingQueue;
7   import java.util.concurrent.Callable;
8   import java.util.concurrent.locks.ReentrantReadWriteLock;
9   
10  @RequestScoped
11  public class CompressionRunner implements Callable<Object> {
12  
13    @Inject
14    TimeseriesMigrationService migrationService;
15  
16    @Override
17    public Object call() {
18      try {
19        BlockingQueue<CompressionTask> queue = migrationService.getCompressionTasksQueue();
20        ReentrantReadWriteLock lock = migrationService.getReadWriteLock();
21        while (true) {
22          CompressionTask task = queue.take();
23          try {
24            Log.info("Compression task waiting for write lock");
25            lock.writeLock().lock();
26            if (task.isLastTask) break;
27            migrationService.compressAllDataPoints();
28          } finally {
29            lock.writeLock().unlock();
30          }
31        }
32      } catch (InterruptedException e) {
33        Log.error(e);
34        Thread.currentThread().interrupt();
35      }
36      return "Compression runner Done!";
37    }
38  }