TimeseriesMigrationTestDataIngestionService.java
package de.dlr.shepard.data.timeseries.migration.services;
import de.dlr.shepard.common.exceptions.ShepardProcessingException;
import de.dlr.shepard.data.timeseries.daos.TimeseriesContainerDAO;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxDBConnector;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxPoint;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseries;
import de.dlr.shepard.data.timeseries.migration.influxtimeseries.InfluxTimeseriesPayload;
import de.dlr.shepard.data.timeseries.model.TimeseriesContainer;
import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
@RequestScoped
public class TimeseriesMigrationTestDataIngestionService {
@Inject
InfluxDBConnector influxConnector;
@Inject
TimeseriesContainerDAO timeseriesContainerDao;
private int PAYLOAD_MAX_SIZE = 200000;
/**
* @return Newly created Influx Timeseries Container
*/
public TimeseriesContainer ingestTestData(
String databaseName,
int datasetSize,
String userName,
DataPointValueType dataPointValueType,
int numberOfDifferentTimeseries
) {
if (influxConnector.databaseExist(databaseName)) {
throw new ShepardProcessingException("Database already exists: " + databaseName);
}
Log.infof("ingestTestData databaseName: %s, size: %s, starting...", databaseName, datasetSize);
var containerName = String.format("Container-%d", System.currentTimeMillis());
TimeseriesContainer entity = new TimeseriesContainer();
entity.setDatabase(databaseName);
entity.setName(containerName);
timeseriesContainerDao.createOrUpdate(entity);
influxConnector.createDatabase(databaseName);
for (int i = 1; i <= numberOfDifferentTimeseries; i++) {
String symbolicName = String.format("symbolicName-%d", i);
int remainingDataSize = datasetSize;
long timeOffset = 0;
while (remainingDataSize > 0) {
Log.infof(
"ingestTestData databaseName: %s, symbolicName: %s, remaining: %s",
databaseName,
symbolicName,
remainingDataSize
);
int payloadSize = Math.min(PAYLOAD_MAX_SIZE, remainingDataSize);
InfluxTimeseriesPayload payload = getRandomPayload(dataPointValueType, symbolicName, payloadSize, timeOffset);
influxConnector.saveTimeseriesPayload(databaseName, payload);
remainingDataSize -= payloadSize;
timeOffset += payloadSize;
}
Log.infof("ingestTestData databaseName: %s, symbolicName: %s, finished!", databaseName, symbolicName);
}
Log.infof("ingestTestData databaseName: %s, finished!", databaseName);
return entity;
}
private InfluxTimeseriesPayload getRandomPayload(
DataPointValueType dataPointValueType,
String symbolicName,
int payloadSize,
long timeOffset
) {
InfluxTimeseries timeseries = new InfluxTimeseries("measurement", "device", "location", symbolicName, "field");
List<InfluxPoint> points = new ArrayList<>();
IntStream.range(0, payloadSize).forEach(i -> {
points.add(getRandomInfluxPoint(dataPointValueType, (timeOffset + i) * 100_000_000)); // one data point per 100
});
InfluxTimeseriesPayload payload = new InfluxTimeseriesPayload(timeseries, points);
return payload;
}
private InfluxPoint getRandomInfluxPoint(DataPointValueType dataPointValueType, long timestamp) {
switch (dataPointValueType) {
case Boolean:
return new InfluxPoint(timestamp, new Random().nextBoolean());
case String:
return new InfluxPoint(timestamp, String.format("String-%d", System.currentTimeMillis() * 1_000_000));
case Integer:
return new InfluxPoint(timestamp, new Random().nextInt());
case Double:
return new InfluxPoint(timestamp, new Random().nextDouble());
}
return null;
}
}