1 package de.dlr.shepard.data.timeseries.migration.services; 2 3 import io.quarkus.logging.Log; 4 import jakarta.enterprise.context.RequestScoped; 5 import jakarta.inject.Inject; 6 import java.util.concurrent.BlockingQueue; 7 import java.util.concurrent.Callable; 8 import java.util.concurrent.locks.ReentrantReadWriteLock; 9 10 @RequestScoped 11 public class CompressionRunner implements Callable<Object> { 12 13 @Inject 14 TimeseriesMigrationService migrationService; 15 16 @Override 17 public Object call() { 18 try { 19 BlockingQueue<CompressionTask> queue = migrationService.getCompressionTasksQueue(); 20 ReentrantReadWriteLock lock = migrationService.getReadWriteLock(); 21 while (true) { 22 CompressionTask task = queue.take(); 23 try { 24 Log.info("Compression task waiting for write lock"); 25 lock.writeLock().lock(); 26 if (task.isLastTask) break; 27 migrationService.compressAllDataPoints(); 28 } finally { 29 lock.writeLock().unlock(); 30 } 31 } 32 } catch (InterruptedException e) { 33 Log.error(e); 34 Thread.currentThread().interrupt(); 35 } 36 return "Compression runner Done!"; 37 } 38 }