TimeseriesMigrationRest.java
package de.dlr.shepard.data.timeseries.migration.endpoints;
import de.dlr.shepard.data.timeseries.migration.model.MigrationTaskEntity;
import de.dlr.shepard.data.timeseries.migration.services.TimeseriesMigrationService;
import de.dlr.shepard.data.timeseries.migration.services.TimeseriesMigrationTestDataIngestionService;
import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.util.List;
import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("temp/")
@RequestScoped
public class TimeseriesMigrationRest {
@Context
private SecurityContext securityContext;
@Inject
TimeseriesMigrationService migrationService;
@Inject
TimeseriesMigrationTestDataIngestionService testDataIngestionService;
@GET
@Path("/migrations/state")
@Tag(
name = "timeseries migration",
description = "This endpoint is only temporarily available. It is related to the migration of timeseries data from InfluxDB to TimescaleDB. The total amount of points added is datasetSize * numberOfDifferentTimeseries."
)
@Operation(
description = "This endpoint is used to retrieve the current state of all data migrations that are currently running or already finished."
)
@APIResponse(
responseCode = "200",
description = "Returns information about the current state of all data migrations."
)
@APIResponse(responseCode = "401", description = "Unauthorized")
@APIResponse(responseCode = "500", description = "Internal Server Error.")
@Parameter(description = "show only tasks with errors", required = false)
public Response getStateOfAllMigrations(@QueryParam("onlyShowErrors") boolean onlyShowErrors) {
List<MigrationTaskEntity> tasks = migrationService.getMigrationTasks(onlyShowErrors);
return Response.ok(tasks).build();
}
@POST
@Path("/migrations/ingest")
@Tag(name = "timeseries migration")
@Parameter(name = "databaseName", required = true)
@Parameter(name = "datasetSize", required = true)
@Parameter(name = "dataPointValueType", required = true)
@Parameter(name = "numberOfDifferentTimeseries")
@Operation(
description = "Creates new database in influxdb, generate and insert random data to it to be use later in testing the migration."
)
@APIResponse(responseCode = "200", description = "Data inserted successfully.")
@APIResponse(responseCode = "401", description = "Unauthorized")
@APIResponse(responseCode = "500", description = "If database already exists.")
public Response ingestData(
@QueryParam("databaseName") String databaseName,
@QueryParam("datasetSize") int datasetSize,
@QueryParam("dataPointValueType") DataPointValueType dataPointValueType,
@QueryParam("numberOfDifferentTimeseries") int numberOfDifferentTimeseries
) {
if (numberOfDifferentTimeseries <= 0) {
numberOfDifferentTimeseries = 1;
}
testDataIngestionService.ingestTestData(
databaseName,
datasetSize,
securityContext.getUserPrincipal().getName(),
dataPointValueType,
numberOfDifferentTimeseries
);
return Response.ok().build();
}
}