CompressionRunner.java
package de.dlr.shepard.data.timeseries.migration.services;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@RequestScoped
public class CompressionRunner implements Callable<Object> {
@Inject
TimeseriesMigrationService migrationService;
@Override
public Object call() {
try {
BlockingQueue<CompressionTask> queue = migrationService.getCompressionTasksQueue();
ReentrantReadWriteLock lock = migrationService.getReadWriteLock();
while (true) {
CompressionTask task = queue.take();
try {
Log.info("Compression task waiting for write lock");
lock.writeLock().lock();
if (task.isLastTask) break;
migrationService.compressAllDataPoints();
} finally {
lock.writeLock().unlock();
}
}
} catch (InterruptedException e) {
Log.error(e);
Thread.currentThread().interrupt();
}
return "Compression runner Done!";
}
}