1 package de.dlr.shepard.data.timeseries.migration.services;
2
3 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
4 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
5 import io.quarkus.logging.Log;
6 import jakarta.enterprise.context.RequestScoped;
7 import jakarta.inject.Inject;
8 import java.util.concurrent.Callable;
9
10 @RequestScoped
11 class PayloadReader implements Callable<Object> {
12
13 @Inject
14 TimeseriesMigrationService migrationService;
15
16 @Inject
17 InfluxTimeseriesService influxTimeseriesService;
18
19 @Override
20 public Object call() {
21 boolean overallLastTask = false;
22 try {
23 while (true) {
24 var queue = migrationService.getPayloadReadQueue();
25 PayloadReadTask payloadReadTask;
26 synchronized (queue) {
27
28 payloadReadTask = queue.poll();
29 overallLastTask = queue.isEmpty();
30 if (payloadReadTask.isLastTask) {
31 Log.infof("ReaderThread was poisoned, remaining queue: %s, last: %s", queue.size(), overallLastTask);
32 break;
33 }
34 }
35
36 Log.infof(
37 "started PayloadReadTask: %s, from %s to %s",
38 payloadReadTask.runningNumber,
39 payloadReadTask.startTimestamp,
40 payloadReadTask.endTimestamp
41 );
42 InfluxTimeseriesPayload payload =
43 this.influxTimeseriesService.getTimeseriesPayload(
44 payloadReadTask.startTimestamp,
45 payloadReadTask.endTimestamp,
46 payloadReadTask.databaseName,
47 payloadReadTask.influxTimeseries,
48 null,
49 null,
50 null
51 );
52
53 PayloadWriteTask payloadTask = new PayloadWriteTask(
54 payloadReadTask.runningNumber,
55 payloadReadTask.startTimestamp,
56 payloadReadTask.endTimestamp,
57 payload,
58 payloadReadTask.influxTimeseriesDataType,
59 payloadReadTask.influxTimeseries,
60 payloadReadTask.container,
61 false
62 );
63 migrationService.getPayloadWriteQueue().put(payloadTask);
64 }
65 } catch (InterruptedException e) {
66
67 Log.errorf("Exception from reader task: " + e.getMessage());
68 Thread.currentThread().interrupt();
69 } finally {
70
71 if (overallLastTask) migrationService.addWriterPoisonPills();
72 migrationService.addCompressionPoisonPills();
73 }
74 return "PayloadReader Done!";
75 }
76 }