TimeseriesMigrationTestDataIngestionService.java

package de.dlr.shepard.data.timeseries.migration.services;

import de.dlr.shepard.common.exceptions.ShepardProcessingException;
import de.dlr.shepard.data.timeseries.daos.TimeseriesContainerDAO;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxDBConnector;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxPoint;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;

@RequestScoped
public class TimeseriesMigrationTestDataIngestionService {

  @Inject
  InfluxDBConnector influxConnector;

  @Inject
  TimeseriesContainerDAO timeseriesContainerDao;

  private int PAYLOAD_MAX_SIZE = 200000;

  /**
   * @return Newly created Influx Timeseries Container
   */
  public TimeseriesContainer ingestTestData(
    String databaseName,
    int datasetSize,
    String userName,
    DataPointValueType dataPointValueType,
    int numberOfDifferentTimeseries
  ) {
    if (influxConnector.databaseExist(databaseName)) {
      throw new ShepardProcessingException("Database already exists: " + databaseName);
    }
    Log.infof("ingestTestData databaseName: %s, size: %s, starting...", databaseName, datasetSize);
    var containerName = String.format("Container-%d", System.currentTimeMillis());
    TimeseriesContainer entity = new TimeseriesContainer();
    entity.setDatabase(databaseName);
    entity.setName(containerName);

    timeseriesContainerDao.createOrUpdate(entity);
    influxConnector.createDatabase(databaseName);

    for (int i = 1; i <= numberOfDifferentTimeseries; i++) {
      String symbolicName = String.format("symbolicName-%d", i);
      int remainingDataSize = datasetSize;
      long timeOffset = 0;
      while (remainingDataSize > 0) {
        Log.infof(
          "ingestTestData databaseName: %s, symbolicName: %s, remaining: %s",
          databaseName,
          symbolicName,
          remainingDataSize
        );
        int payloadSize = Math.min(PAYLOAD_MAX_SIZE, remainingDataSize);
        InfluxTimeseriesPayload payload = getRandomPayload(dataPointValueType, symbolicName, payloadSize, timeOffset);
        influxConnector.saveTimeseriesPayload(databaseName, payload);
        remainingDataSize -= payloadSize;
        timeOffset += payloadSize;
      }
      Log.infof("ingestTestData databaseName: %s, symbolicName: %s, finished!", databaseName, symbolicName);
    }
    Log.infof("ingestTestData databaseName: %s, finished!", databaseName);
    return entity;
  }

  private InfluxTimeseriesPayload getRandomPayload(
    DataPointValueType dataPointValueType,
    String symbolicName,
    int payloadSize,
    long timeOffset
  ) {
    InfluxTimeseries timeseries = new InfluxTimeseries("measurement", "device", "location", symbolicName, "field");
    List<InfluxPoint> points = new ArrayList<>();

    IntStream.range(0, payloadSize).forEach(i -> {
      points.add(getRandomInfluxPoint(dataPointValueType, (timeOffset + i) * 100_000_000)); // one data point per 100
    });
    InfluxTimeseriesPayload payload = new InfluxTimeseriesPayload(timeseries, points);
    return payload;
  }

  private InfluxPoint getRandomInfluxPoint(DataPointValueType dataPointValueType, long timestamp) {
    switch (dataPointValueType) {
      case Boolean:
        return new InfluxPoint(timestamp, new Random().nextBoolean());
      case String:
        return new InfluxPoint(timestamp, String.format("String-%d", System.currentTimeMillis() * 1_000_000));
      case Integer:
        return new InfluxPoint(timestamp, new Random().nextInt());
      case Double:
        return new InfluxPoint(timestamp, new Random().nextDouble());
    }
    return null;
  }
}