1 package de.dlr.shepard.data.timeseries.migration.services;
2
3 import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseries;
4 import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseriesDataPoints;
5 import static de.dlr.shepard.common.util.InfluxDataMapper.mapToValueType;
6
7 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
8 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
9 import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
10 import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
11 import de.dlr.shepard.data.timeseries.services.TimeseriesService;
12 import io.quarkus.logging.Log;
13 import jakarta.enterprise.context.RequestScoped;
14 import jakarta.inject.Inject;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.locks.ReentrantReadWriteLock;
18
19 @RequestScoped
20 class PayloadWriter implements Callable<Object> {
21
22 @Inject
23 TimeseriesMigrationService migrationService;
24
25 @Inject
26 TimeseriesService timeseriesService;
27
28 @Override
29 public Object call() {
30 try {
31 var initialQueueSize = migrationService.getPayloadReadQueueSize();
32 BlockingQueue<PayloadWriteTask> queue = migrationService.getPayloadWriteQueue();
33 ReentrantReadWriteLock lock = migrationService.getReadWriteLock();
34 while (true) {
35 PayloadWriteTask task = queue.take();
36 if (task.isLastTask) break;
37 try {
38 lock.readLock().lock();
39 Log.infof(
40 "started PayloadWriteTask: %s of %s for container %s with %s points.",
41 task.runningNumber,
42 initialQueueSize,
43 task.container.getId(),
44 task.payload.getPoints().size()
45 );
46
47 saveDataPoints(task.container, task.influxTimeseries, task.dataType, task.payload);
48
49 migrationService.getInsertionCount().getAndAdd(task.payload.getPoints().size());
50
51 int oldval = migrationService
52 .getInsertionCount()
53 .getAndUpdate(x -> x > migrationService.getNumberOfPointsBeforeCompression() ? 0 : x);
54
55 Log.debugf(
56 "Current write counter: %s, max number: %s",
57 oldval,
58 migrationService.getNumberOfPointsBeforeCompression()
59 );
60
61
62 if (oldval > migrationService.getNumberOfPointsBeforeCompression()) {
63 Log.infof(
64 "Adding compression task for after inserting data for container %s, number of points written: %s",
65 task.container.getId(),
66 oldval
67 );
68 migrationService.addCompressionTask();
69 }
70 } finally {
71 lock.readLock().unlock();
72 }
73 }
74 } catch (InterruptedException e) {
75 Log.error(e);
76 Thread.currentThread().interrupt();
77 }
78 return "PayloadWriter Done!";
79 }
80
81 protected void saveDataPoints(
82 TimeseriesContainer container,
83 InfluxTimeseries influxTimeseries,
84 InfluxTimeseriesDataType influxTimeseriesDataType,
85 InfluxTimeseriesPayload payload
86 ) {
87 timeseriesService.saveDataPointsNoChecks(
88 container.getId(),
89 mapToTimeseries(influxTimeseries),
90 mapToTimeseriesDataPoints(payload.getPoints()),
91 mapToValueType(influxTimeseriesDataType)
92 );
93 }
94 }