View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.services;
2   
3   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
4   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
5   import io.quarkus.logging.Log;
6   import jakarta.enterprise.context.RequestScoped;
7   import jakarta.inject.Inject;
8   import java.util.concurrent.Callable;
9   
10  @RequestScoped
11  class PayloadReader implements Callable<Object> {
12  
13    @Inject
14    TimeseriesMigrationService migrationService;
15  
16    @Inject
17    InfluxTimeseriesService influxTimeseriesService;
18  
19    @Override
20    public Object call() {
21      boolean overallLastTask = false;
22      try {
23        while (true) {
24          var queue = migrationService.getPayloadReadQueue();
25          PayloadReadTask payloadReadTask;
26          synchronized (queue) { // Taking the element and looking up if there is no further element must be done
27            // atomically
28            payloadReadTask = queue.poll();
29            overallLastTask = queue.isEmpty();
30            if (payloadReadTask.isLastTask) {
31              Log.infof("ReaderThread was poisoned, remaining queue: %s, last: %s", queue.size(), overallLastTask);
32              break;
33            }
34          }
35  
36          Log.infof(
37            "started PayloadReadTask: %s, from %s to %s",
38            payloadReadTask.runningNumber,
39            payloadReadTask.startTimestamp,
40            payloadReadTask.endTimestamp
41          );
42          InfluxTimeseriesPayload payload =
43            this.influxTimeseriesService.getTimeseriesPayload(
44                payloadReadTask.startTimestamp,
45                payloadReadTask.endTimestamp,
46                payloadReadTask.databaseName,
47                payloadReadTask.influxTimeseries,
48                null,
49                null,
50                null
51              );
52  
53          PayloadWriteTask payloadTask = new PayloadWriteTask(
54            payloadReadTask.runningNumber,
55            payloadReadTask.startTimestamp,
56            payloadReadTask.endTimestamp,
57            payload,
58            payloadReadTask.influxTimeseriesDataType,
59            payloadReadTask.influxTimeseries,
60            payloadReadTask.container,
61            false
62          );
63          migrationService.getPayloadWriteQueue().put(payloadTask);
64        }
65      } catch (InterruptedException e) {
66        // Cancel the task
67        Log.errorf("Exception from reader task: " + e.getMessage());
68        Thread.currentThread().interrupt();
69      } finally {
70        // To ensure adding them once!
71        if (overallLastTask) migrationService.addWriterPoisonPills();
72        migrationService.addCompressionPoisonPills();
73      }
74      return "PayloadReader Done!";
75    }
76  }