PayloadReader.java
package de.dlr.shepard.data.timeseries.migration.services;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.util.concurrent.Callable;
@RequestScoped
class PayloadReader implements Callable<Object> {
@Inject
TimeseriesMigrationService migrationService;
@Inject
InfluxTimeseriesService influxTimeseriesService;
@Override
public Object call() {
boolean overallLastTask = false;
try {
while (true) {
var queue = migrationService.getPayloadReadQueue();
PayloadReadTask payloadReadTask;
synchronized (queue) { // Taking the element and looking up if there is no further element must be done
// atomically
payloadReadTask = queue.poll();
overallLastTask = queue.isEmpty();
if (payloadReadTask.isLastTask) {
Log.infof("ReaderThread was poisoned, remaining queue: %s, last: %s", queue.size(), overallLastTask);
break;
}
}
Log.infof(
"started PayloadReadTask: %s, from %s to %s",
payloadReadTask.runningNumber,
payloadReadTask.startTimestamp,
payloadReadTask.endTimestamp
);
InfluxTimeseriesPayload payload =
this.influxTimeseriesService.getTimeseriesPayload(
payloadReadTask.startTimestamp,
payloadReadTask.endTimestamp,
payloadReadTask.databaseName,
payloadReadTask.influxTimeseries,
null,
null,
null
);
PayloadWriteTask payloadTask = new PayloadWriteTask(
payloadReadTask.runningNumber,
payloadReadTask.startTimestamp,
payloadReadTask.endTimestamp,
payload,
payloadReadTask.influxTimeseriesDataType,
payloadReadTask.influxTimeseries,
payloadReadTask.container,
false
);
migrationService.getPayloadWriteQueue().put(payloadTask);
}
} catch (InterruptedException e) {
// Cancel the task
Log.errorf("Exception from reader task: " + e.getMessage());
Thread.currentThread().interrupt();
} finally {
// To ensure adding them once!
if (overallLastTask) migrationService.addWriterPoisonPills();
migrationService.addCompressionPoisonPills();
}
return "PayloadReader Done!";
}
}