TimeseriesMigrationService.java

package de.dlr.shepard.data.timeseries.migration.services;

import de.dlr.shepard.common.util.JsonConverter;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxDBConnector;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxSingleValuedUnaryFunction;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesDataType;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesService;
import de.dlr.shepard.data.timeseries.migration.model.MigrationState;
import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskEntity;
import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskState;
import de.dlr.shepard.data.timeseries.migration.repositories.MigrationTaskRepository;
import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
import de.dlr.shepard.data.timeseries.services.TimeseriesContainerService;
import io.quarkus.logging.Log;
import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jakarta.transaction.Transactional.TxType;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.context.ManagedExecutor;

@RequestScoped
public class TimeseriesMigrationService {

  private TimeseriesContainerService timeseriesContainerService;
  private MigrationTaskRepository migrationTaskRepository;
  private InfluxDBConnector influxConnector;
  private InfluxTimeseriesService influxTimeseriesService;
  private TimeseriesDataPointRepository timeseriesDataPointRepository;

  private PayloadReader payloadReader;
  private PayloadWriter payloadWriter;
  private CompressionRunner compressionRunner;

  @Inject
  ManagedExecutor executor;

  int numberOfReaderThreads;

  int numberOfWriterThreads;

  @Getter
  private int numberOfPointsBeforeCompression;

  private final BlockingQueue<PayloadWriteTask> payloadWriteQueue;
  private final BlockingQueue<CompressionTask> compressionTasksQueue;
  private final Queue<PayloadReadTask> payloadReadQueue;
  private final ReentrantReadWriteLock readWriteLock;

  @Getter
  private final AtomicInteger insertionCount = new AtomicInteger();

  public Queue<PayloadReadTask> getPayloadReadQueue() {
    return payloadReadQueue;
  }

  public int getPayloadReadQueueSize() {
    synchronized (payloadReadQueue) {
      return payloadReadQueue.size();
    }
  }

  public BlockingQueue<CompressionTask> getCompressionTasksQueue() {
    return compressionTasksQueue;
  }

  public ReentrantReadWriteLock getReadWriteLock() {
    return readWriteLock;
  }

  public BlockingQueue<PayloadWriteTask> getPayloadWriteQueue() {
    return payloadWriteQueue;
  }

  @ConfigProperty(name = "shepard.migration-mode.timeseries-slice-duration", defaultValue = "600000000000")
  long sliceDuration;

  @Inject
  TimeseriesMigrationService(
    TimeseriesContainerService timeseriesContainerService,
    MigrationTaskRepository migrationTaskRepository,
    InfluxDBConnector influxConnector,
    InfluxTimeseriesService influxTimeseriesService,
    TimeseriesDataPointRepository timeseriesDataPointRepository,
    PayloadReader payloadReader,
    PayloadWriter payloadWriter,
    CompressionRunner compressionRunner,
    @ConfigProperty(
      name = "shepard.migration-mode.number-of-writer-threads",
      defaultValue = "6"
    ) int numberOfWriterThreads,
    @ConfigProperty(
      name = "shepard.migration-mode.number-of-reader-threads",
      defaultValue = "4"
    ) int numberOfReaderThreads,
    @ConfigProperty(
      name = "shepard.migration-mode.number-of-points-before-compression",
      defaultValue = "20000000"
    ) int numberOfPointsBeforeCompression
  ) {
    this.timeseriesContainerService = timeseriesContainerService;
    this.migrationTaskRepository = migrationTaskRepository;
    this.influxConnector = influxConnector;
    this.influxTimeseriesService = influxTimeseriesService;
    this.timeseriesDataPointRepository = timeseriesDataPointRepository;
    this.payloadReader = payloadReader;
    this.payloadWriter = payloadWriter;
    this.compressionRunner = compressionRunner;
    this.numberOfWriterThreads = numberOfWriterThreads;
    this.numberOfReaderThreads = numberOfReaderThreads;
    this.numberOfPointsBeforeCompression = numberOfPointsBeforeCompression;

    payloadWriteQueue = new LinkedBlockingQueue<>(numberOfWriterThreads + 1);
    compressionTasksQueue = new LinkedBlockingQueue<>();
    payloadReadQueue = new LinkedList<>();
    readWriteLock = new ReentrantReadWriteLock();
  }

  public List<MigrationTaskEntity> getMigrationTasks(boolean onlyShowErrors) {
    var tasks = migrationTaskRepository.findAll().list();
    if (onlyShowErrors) tasks = tasks.stream().filter(t -> t.getErrors().size() > 0).toList();
    return tasks;
  }

  public MigrationState getMigrationState() {
    var containerIds = getExistingContainerIds();
    var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
    if (containerIdsToMigrate.size() > 0) {
      Log.infof(
        "Migration is necessary because there are %s containers that do not have a MigrationTask yet.",
        containerIdsToMigrate.size()
      );
      return MigrationState.Needed;
    }

    var notFinishedState = migrationTaskRepository.find("state <> 'Finished'").count();
    if (notFinishedState > 0) {
      Log.infof(
        "Migration is necessary because there are %s MigrationTasks that do not have state 'Finished'.",
        notFinishedState
      );
      return MigrationState.HasErrors;
    }

    var finishedState = migrationTaskRepository.find("state = 'Finished'").list();
    var finishedWithErrors = finishedState.stream().filter(t -> t.getErrors().size() > 0).toList();
    if (finishedWithErrors.size() > 0) {
      Log.infof(
        "Migration necessary because there are %s MigrationTasks that have an error.",
        finishedWithErrors.size()
      );
      return MigrationState.HasErrors;
    }

    return MigrationState.NotNeeded;
  }

  public void runMigrations() {
    createMigrationTaskForEachContainer();

    Log.info("Starting migrations...");
    Log.info("To check the current migration state call the REST endpoint [/temp/migrations/state].");

    while (true) {
      var tasks = getUnfinishedMigrationTasks();
      if (tasks.isEmpty()) {
        Log.info("No migration tasks left. Migration finished.");
        break;
      }
      int cnt = 0;
      var size = tasks.size();

      for (var task : tasks) {
        cnt++;
        try {
          Log.infof("Starting migration task %s of %s (so far)", cnt, size);
          migrateTask(task);
          compressAllDataPoints();
        } catch (Exception ex) {
          Log.errorf("Exception occurred during migration of container %s: %s", task.getContainerId(), ex.getMessage());
          persistError(task, ex.getMessage());
        }
      }

      try {
        // Wait some time before checking for new migration tasks to give the system
        // some time to recover
        Thread.sleep(10000);
      } catch (InterruptedException e) {
        Log.errorf("Thread interrupted: %s", e.getMessage());
      }
    }
  }

  @Transactional(value = TxType.REQUIRES_NEW)
  @TransactionConfiguration(
    timeoutFromConfigProperty = "shepard.migration-mode.compression.transaction-timeout",
    timeout = 6000
  )
  public void compressAllDataPoints() {
    Log.info("Starting compression of timeseries data point table...");
    timeseriesDataPointRepository.compressAllChunks();
    Log.info("Finished compression of timeseries data point table.");
    insertionCount.set(0);
  }

  protected void migrateTask(MigrationTaskEntity task) throws Exception {
    setStateToRunning(task);
    Log.infof("Start with migration of container %s now.", task.getContainerId());

    var container = timeseriesContainerService.getContainerNoChecks(task.getContainerId());

    var databaseName = task.getDatabaseName();
    if (doesDatabaseExist(databaseName) == false) {
      setStateToFinishedAndRemoveErrors(task);
      return;
    }

    InfluxTimeseries influxTimeseries = getTimeseries(task);
    if (influxTimeseries != null) {
      Log.infof(
        "Starting migration of timeseries %s for container %s",
        influxTimeseries.getUniqueId(),
        task.getContainerId()
      );
      var influxTimeseriesDataType = influxConnector.getTimeseriesDataType(
        databaseName,
        influxTimeseries.getMeasurement(),
        influxTimeseries.getField()
      );

      migratePayloads(container, databaseName, influxTimeseries, influxTimeseriesDataType);
    }

    setStateToFinishedAndRemoveErrors(task);
    Log.infof("Finished migration of container %s", task.getContainerId());
  }

  private InfluxTimeseries getTimeseries(MigrationTaskEntity migrationTaskEntity) {
    String timeseriesJson = migrationTaskEntity.getTimeseries();
    return JsonConverter.convertToObject(timeseriesJson, InfluxTimeseries.class);
  }

  private List<MigrationTaskEntity> getUnfinishedMigrationTasks() {
    return migrationTaskRepository.find("state <> 'Finished' OR errors <> ''").list();
  }

  /**
   * Get all containers in neo4j that are not deleted.
   * Create a MigrationTask for each container that does not already have one.
   */
  @Transactional(Transactional.TxType.REQUIRES_NEW)
  protected void createMigrationTaskForEachContainer() {
    var containerIds = getExistingContainerIds();
    var containerIdsToMigrate = getContainerIdsThatDoNotHaveMigrationTaskYet(containerIds);
    var tasks = createMigrationTasksForContainers(containerIdsToMigrate);
    storeTasksInDatabase(tasks);
  }

  /**
   * Get ids of all containers from neo4j that are not deleted and do not have a
   * database name.
   * Hint: Containers that have a database name are Timeseries containers stored
   * in influxdb.
   * If the database prop is empty, it is a Timeseries container stored in
   * TimescaleDB.
   *
   * @return
   */
  private List<Long> getExistingContainerIds() {
    var existingContainerIds = timeseriesContainerService
      .getContainers()
      .stream()
      .filter(c -> StringUtils.isNotEmpty(c.getDatabase()))
      .map(c -> c.getId())
      .toList();

    Log.infof(
      "We found %s containers in neo4j that are not deleted and have a database prop that is not empty.",
      existingContainerIds.size()
    );
    return existingContainerIds;
  }

  private List<MigrationTaskEntity> createMigrationTasksForContainers(List<Long> containerIds) {
    return containerIds
      .stream()
      .flatMap(containerId -> {
        var container = timeseriesContainerService.getContainerNoChecks(containerId);

        List<MigrationTaskEntity> migrationTasks = new ArrayList<MigrationTaskEntity>();
        var databaseName = container.getDatabase();
        if (doesDatabaseExist(databaseName) == false) {
          Log.warnf("influxdb %s does not exist.", databaseName);
          var task = new MigrationTaskEntity(containerId);
          migrationTasks.add(task);
          return migrationTasks.stream();
        }

        var timeseriesAvailable = influxConnector.getTimeseriesAvailable(databaseName);
        if (timeseriesAvailable.size() == 0) {
          Log.warnf("No timeseries available for container %s", containerId);
          var task = new MigrationTaskEntity(containerId);
          task.setDatabaseName(databaseName);
          migrationTasks.add(task);
        }

        for (int i = 0; i < timeseriesAvailable.size(); i++) {
          MigrationTaskEntity migrationTaskEntity = new MigrationTaskEntity(containerId);
          InfluxTimeseries influxTimeseries = timeseriesAvailable.get(i);
          migrationTaskEntity.setTimeseries(JsonConverter.convertToString(influxTimeseries));
          migrationTaskEntity.setDatabaseName(databaseName);
          migrationTasks.add(migrationTaskEntity);
        }
        return migrationTasks.stream();
      })
      .toList();
  }

  private void storeTasksInDatabase(List<MigrationTaskEntity> tasks) {
    migrationTaskRepository.persist(tasks);
    Log.infof("We created %s migration tasks.", tasks.size());
  }

  private List<Long> getContainerIdsThatDoNotHaveMigrationTaskYet(List<Long> allContainerIds) {
    var alreadyHandledContainerIds = migrationTaskRepository
      .findAll()
      .stream()
      .map(t -> t.getContainerId())
      .distinct()
      .toList();
    var containerIdsLeft = allContainerIds.stream().filter(t -> !alreadyHandledContainerIds.contains(t)).toList();

    Log.infof(
      "We found %s containers that are already handled and %s containers that are not handled yet.",
      alreadyHandledContainerIds.size(),
      containerIdsLeft.size()
    );

    Log.info(String.join(", ", containerIdsLeft.stream().map(String::valueOf).toList()));
    return containerIdsLeft;
  }

  @Transactional(Transactional.TxType.REQUIRES_NEW)
  protected void setStateToRunning(MigrationTaskEntity task) {
    task.setStartedAt(new Date());
    task.setState(MigrationTaskState.Running);
    this.migrationTaskRepository.getEntityManager().merge(task);
  }

  @Transactional(Transactional.TxType.REQUIRES_NEW)
  protected void setStateToFinishedAndRemoveErrors(MigrationTaskEntity task) {
    task.setFinishedAt(new Date());
    task.setState(MigrationTaskState.Finished);
    task.setErrors(new ArrayList<>());
    this.migrationTaskRepository.getEntityManager().merge(task);
  }

  @Transactional(Transactional.TxType.REQUIRES_NEW)
  protected void persistError(MigrationTaskEntity task, String errorMessage) {
    task.addError(errorMessage);
    task.setState(MigrationTaskState.Finished);
    task.setFinishedAt(new Date());
    this.migrationTaskRepository.getEntityManager().merge(task);
  }

  /**
   * Throws an error if the database with the given name does not exist.
   */
  private boolean doesDatabaseExist(String databaseName) {
    if (influxConnector.databaseExist(databaseName) == false) {
      Log.warnf(
        "InfluxDB with name %s does not exist. Migration not possible. We do not treat that as an error.",
        databaseName
      );
      return false;
    }
    return true;
  }

  /**
   * Identifies the first record of the timeseries and returns that timestamp.
   */
  private long getFirstTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
    return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.FIRST);
  }

  /**
   * Identifies the last record of the timeseries and returns that timestamp.
   */
  private long getLastTimestampOfPayload(String database, InfluxTimeseries influxTimeseries) {
    return getTimestampOfPayload(database, influxTimeseries, InfluxSingleValuedUnaryFunction.LAST);
  }

  private long getTimestampOfPayload(
    String database,
    InfluxTimeseries influxTimeseries,
    InfluxSingleValuedUnaryFunction function
  ) {
    var payload =
      this.influxTimeseriesService.getTimeseriesPayload(
          0,
          Instant.now().getEpochSecond() * 1_000_000_000,
          database,
          influxTimeseries,
          function,
          null,
          null
        );
    return (payload.getPoints().size() > 0) ? payload.getPoints().get(0).getTimeInNanoseconds() : 0;
  }

  /**
   * Copy all payloads from a InfluxTimeseries to the TimeseriesContainer.
   * This method will try to do the copy in batches based on time based slices.
   * The slice duration is defined in env.
   * [shepard.migration-mode.timeseries-slice-duration]
   */
  private void migratePayloads(
    TimeseriesContainer container,
    String databaseName,
    InfluxTimeseries influxTimeseries,
    InfluxTimeseriesDataType influxTimeseriesDataType
  ) throws Exception {
    var firstTimestamp = getFirstTimestampOfPayload(databaseName, influxTimeseries);
    var lastTimestamp = getLastTimestampOfPayload(databaseName, influxTimeseries);
    Log.infof(
      "Doing migration from timestamp %s to %s of container %s",
      firstTimestamp,
      lastTimestamp,
      container.getId()
    );

    long currentStartTimestamp = firstTimestamp - 1;

    payloadReadQueue.clear();
    int runningNumber = 1;
    while (currentStartTimestamp < lastTimestamp) {
      long currentEndTimestamp = Math.min(currentStartTimestamp + sliceDuration, lastTimestamp);

      payloadReadQueue.add(
        new PayloadReadTask(
          runningNumber++,
          currentStartTimestamp,
          currentEndTimestamp,
          influxTimeseries,
          container,
          databaseName,
          influxTimeseriesDataType,
          false
        )
      );
      currentStartTimestamp = currentEndTimestamp;
    }

    for (int i = 0; i < numberOfReaderThreads; i++) {
      payloadReadQueue.add(PayloadReadTask.poisonPill);
    }
    Log.infof("Finished preparing read queue of %s read tasks.", payloadReadQueue.size());
    // Ensure tasks queue is empty
    payloadWriteQueue.clear();
    compressionTasksQueue.clear();

    List<Callable<Object>> tasks = new ArrayList<>();

    Log.debug("Creating writers...");
    for (int i = 0; i < numberOfWriterThreads; i++) {
      tasks.add(payloadWriter);
    }

    Log.debug("Creating readers...");
    for (int i = 0; i < numberOfReaderThreads; i++) {
      tasks.add(payloadReader);
    }

    tasks.add(compressionRunner);

    Log.infof(
      "Starting migration with %s reader threads and %s writer threads, compressing each %s points ...",
      numberOfReaderThreads,
      numberOfWriterThreads,
      numberOfPointsBeforeCompression
    );
    CompletionService<Object> completionService = new ExecutorCompletionService<>(executor);
    var plannedFutures = tasks.stream().map(completionService::submit).collect(Collectors.toCollection(ArrayList::new));
    try {
      for (int i = 0; i < tasks.size(); i++) {
        Future<Object> future = completionService.take();
        plannedFutures.remove(future);
        future.get();
      }
    } catch (InterruptedException | ExecutionException e) {
      // Cancel remaining futures
      for (var future : plannedFutures) {
        future.cancel(true);
      }

      Log.info("Waiting for executor task to terminate ...");
      if (!executor.awaitTermination(300, TimeUnit.SECONDS)) {
        Log.error("The started executor threads did not terminate within 300s. This can cause undefined behaviour.");
      }

      Log.errorf("Error while executing tasks in parallel.", e.getMessage());
      throw new Exception(e.getMessage());
    } finally {
      payloadWriteQueue.clear();
    }
  }

  void addWriterPoisonPills() {
    try {
      for (int i = 0; i < numberOfWriterThreads; i++) {
        getPayloadWriteQueue().put(PayloadWriteTask.poisonPill);
      }
    } catch (InterruptedException e) {
      Log.errorf("Payload write queue interrupted.", e.getMessage());
      Thread.currentThread().interrupt();
    }
  }

  void addCompressionPoisonPills() {
    try {
      // clear compression queue before sending the last task
      compressionTasksQueue.clear();
      compressionTasksQueue.put(new CompressionTask(true));
    } catch (InterruptedException e) {
      Log.errorf("Compression task queue interrupted.", e.getMessage());
      Thread.currentThread().interrupt();
    }
  }

  public void addCompressionTask() {
    try {
      // Do not add a task if the queue is not empty
      if (!compressionTasksQueue.isEmpty()) {
        Log.info("Did not add a duplicate compression task");
        return;
      }
      compressionTasksQueue.put(new CompressionTask(false));
    } catch (InterruptedException e) {
      Log.errorf("Compression task queue interrupted.", e.getMessage());
      Thread.currentThread().interrupt();
    }
  }
}