PayloadWriter.java

package de.dlr.shepard.data.timeseries.migration.services;

import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseries;
import static de.dlr.shepard.common.util.InfluxDataMapper.mapToTimeseriesDataPoints;
import static de.dlr.shepard.common.util.InfluxDataMapper.mapToValueType;

import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
import de.dlr.shepard.data.timeseries.services.TimeseriesService;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@RequestScoped
class PayloadWriter implements Callable<Object> {

  @Inject
  TimeseriesMigrationService migrationService;

  @Inject
  TimeseriesService timeseriesService;

  @Override
  public Object call() {
    try {
      var initialQueueSize = migrationService.getPayloadReadQueueSize();
      BlockingQueue<PayloadWriteTask> queue = migrationService.getPayloadWriteQueue();
      ReentrantReadWriteLock lock = migrationService.getReadWriteLock();
      while (true) {
        PayloadWriteTask task = queue.take();
        if (task.isLastTask) break;
        try {
          lock.readLock().lock();
          Log.infof(
            "started PayloadWriteTask: %s of %s for container %s with %s points.",
            task.runningNumber,
            initialQueueSize,
            task.container.getId(),
            task.payload.getPoints().size()
          );

          saveDataPoints(task.container, task.influxTimeseries, task.dataType, task.payload);

          migrationService.getInsertionCount().getAndAdd(task.payload.getPoints().size());

          int oldval = migrationService
            .getInsertionCount()
            .getAndUpdate(x -> x > migrationService.getNumberOfPointsBeforeCompression() ? 0 : x);

          Log.debugf(
            "Current write counter: %s, max number: %s",
            oldval,
            migrationService.getNumberOfPointsBeforeCompression()
          );

          // Add compression if the task occurs in between two days
          if (oldval > migrationService.getNumberOfPointsBeforeCompression()) {
            Log.infof(
              "Adding compression task for after inserting data for container %s, number of points written: %s",
              task.container.getId(),
              oldval
            );
            migrationService.addCompressionTask();
          }
        } finally {
          lock.readLock().unlock();
        }
      }
    } catch (InterruptedException e) {
      Log.error(e);
      Thread.currentThread().interrupt();
    }
    return "PayloadWriter Done!";
  }

  protected void saveDataPoints(
    TimeseriesContainer container,
    InfluxTimeseries influxTimeseries,
    InfluxTimeseriesDataType influxTimeseriesDataType,
    InfluxTimeseriesPayload payload
  ) {
    timeseriesService.saveDataPointsNoChecks(
      container.getId(),
      mapToTimeseries(influxTimeseries),
      mapToTimeseriesDataPoints(payload.getPoints()),
      mapToValueType(influxTimeseriesDataType)
    );
  }
}