View Javadoc
1   package de.dlr.shepard.data.timeseries.migration.services;
2   
3   import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseries;
4   import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseriesDataPoints;
5   import static de.dlr.shepard.common.util.InfluxDataMapper.mapToValueType;
6   
7   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
8   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
9   import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
10  import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
11  import de.dlr.shepard.data.timeseries.services.TimeseriesService;
12  import io.quarkus.logging.Log;
13  import jakarta.enterprise.context.RequestScoped;
14  import jakarta.inject.Inject;
15  import java.util.concurrent.BlockingQueue;
16  import java.util.concurrent.Callable;
17  import java.util.concurrent.locks.ReentrantReadWriteLock;
18  
19  @RequestScoped
20  class PayloadWriter implements Callable<Object> {
21  
22    @Inject
23    TimeseriesMigrationService migrationService;
24  
25    @Inject
26    TimeseriesService timeseriesService;
27  
28    @Override
29    public Object call() {
30      try {
31        var initialQueueSize = migrationService.getPayloadReadQueueSize();
32        BlockingQueue<PayloadWriteTask> queue = migrationService.getPayloadWriteQueue();
33        ReentrantReadWriteLock lock = migrationService.getReadWriteLock();
34        while (true) {
35          PayloadWriteTask task = queue.take();
36          if (task.isLastTask) break;
37          try {
38            lock.readLock().lock();
39            Log.infof(
40              "started PayloadWriteTask: %s of %s for container %s with %s points.",
41              task.runningNumber,
42              initialQueueSize,
43              task.container.getId(),
44              task.payload.getPoints().size()
45            );
46  
47            saveDataPoints(task.container, task.influxTimeseries, task.dataType, task.payload);
48  
49            migrationService.getInsertionCount().getAndAdd(task.payload.getPoints().size());
50  
51            int oldval = migrationService
52              .getInsertionCount()
53              .getAndUpdate(x -> x > migrationService.getNumberOfPointsBeforeCompression() ? 0 : x);
54  
55            Log.debugf(
56              "Current write counter: %s, max number: %s",
57              oldval,
58              migrationService.getNumberOfPointsBeforeCompression()
59            );
60  
61            // Add compression if the task occurs in between two days
62            if (oldval > migrationService.getNumberOfPointsBeforeCompression()) {
63              Log.infof(
64                "Adding compression task for after inserting data for container %s, number of points written: %s",
65                task.container.getId(),
66                oldval
67              );
68              migrationService.addCompressionTask();
69            }
70          } finally {
71            lock.readLock().unlock();
72          }
73        }
74      } catch (InterruptedException e) {
75        Log.error(e);
76        Thread.currentThread().interrupt();
77      }
78      return "PayloadWriter Done!";
79    }
80  
81    protected void saveDataPoints(
82      TimeseriesContainer container,
83      InfluxTimeseries influxTimeseries,
84      InfluxTimeseriesDataType influxTimeseriesDataType,
85      InfluxTimeseriesPayload payload
86    ) {
87      timeseriesService.saveDataPointsNoChecks(
88        container.getId(),
89        mapToTimeseries(influxTimeseries),
90        mapToTimeseriesDataPoints(payload.getPoints()),
91        mapToValueType(influxTimeseriesDataType)
92      );
93    }
94  }