TimeseriesService.java
package de.dlr.shepard.data.timeseries.services;
import de.dlr.shepard.common.exceptions.InvalidAuthException;
import de.dlr.shepard.common.exceptions.InvalidBodyException;
import de.dlr.shepard.common.exceptions.InvalidPathException;
import de.dlr.shepard.data.timeseries.io.TimeseriesWithDataPoints;
import de.dlr.shepard.data.timeseries.model.Timeseries;
import de.dlr.shepard.data.timeseries.model.TimeseriesDataPoint;
import de.dlr.shepard.data.timeseries.model.TimeseriesDataPointsQueryParams;
import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
import de.dlr.shepard.data.timeseries.model.enums.DataPointValueType;
import de.dlr.shepard.data.timeseries.repositories.TimeseriesDataPointRepository;
import de.dlr.shepard.data.timeseries.repositories.TimeseriesRepository;
import de.dlr.shepard.data.timeseries.utilities.ObjectTypeEvaluator;
import de.dlr.shepard.data.timeseries.utilities.TimeseriesValidator;
import io.quarkus.logging.Log;
import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.NotFoundException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
@RequestScoped
public class TimeseriesService {
@Inject
TimeseriesRepository timeseriesRepository;
@Inject
TimeseriesDataPointRepository timeseriesDataPointRepository;
@Inject
TimeseriesContainerService timeseriesContainerService;
/**
* Returns a list of timeseries objects that are in the given database.
*
* Returns an empty list if the timeseries container is not accessible (cannot
* be found or wrong permissions).
*
* @param containerId of the given timeseries container
* @return a list of timeseries entities
*/
public List<TimeseriesEntity> getTimeseriesAvailable(long containerId) {
timeseriesContainerService.getContainer(containerId);
return timeseriesRepository.list("containerId", containerId);
}
/**
* Returns a timeseries entity by id
*
* @param containerId timeseries container id
* @param id
* @return TimeseriesEntity
* @throws InvalidPathException if container with containerId or the timeseries
* are not accessible
* @throws InvalidAuthException if user has no read permissions on the
* timeseries container
*/
public TimeseriesEntity getTimeseriesById(long containerId, int id) {
timeseriesContainerService.getContainer(containerId);
var timeseries = timeseriesRepository.findById(id);
if (timeseries == null) {
String errorMsg = String.format(
"ID ERROR - Timeseries with id %s in container %s is null or deleted",
id,
containerId
);
Log.error(errorMsg);
throw new InvalidPathException(errorMsg);
}
return timeseries;
}
/**
* Returns a timeseries entity
*
* @param containerId timeseries container id
* @param timeseries
* @return TimeseriesEntity
* @throws NotFoundException if the timeseries is not found
* @throws InvalidPathException if container with containerId is not accessible
* @throws InvalidAuthException if user has no read permissions on the timeseries container
*/
public TimeseriesEntity getTimeseries(long containerId, Timeseries timeseries) {
timeseriesContainerService.getContainer(containerId);
var timeseriesEntity = timeseriesRepository.findTimeseries(containerId, timeseries);
if (timeseriesEntity.isEmpty()) {
String errorMsg = String.format(
"Timeseries (%s, %s, %s, %s, %s) in container %s is null or deleted",
timeseries.getMeasurement(),
timeseries.getDevice(),
timeseries.getLocation(),
timeseries.getSymbolicName(),
timeseries.getField(),
containerId
);
Log.error(errorMsg);
throw new NotFoundException(errorMsg);
}
return timeseriesEntity.get();
}
/**
* Deletes timeseries container by id
*
* @param containerId
* @throws InvalidPathException if container could not be found
* @throws InvalidAuthException if user has no edit permissions on container
*/
@Transactional
public void deleteTimeseriesByContainerId(long containerId) {
timeseriesContainerService.getContainer(containerId);
timeseriesContainerService.assertIsAllowedToDeleteContainer(containerId);
this.timeseriesRepository.deleteByContainerId(containerId);
}
/**
* Retrieve a list of DataPoints for a time-interval with options to grouping/
* time slicing, filling and aggregating.
*
* @return List of TimeseriesDataPoint
* @throws InvalidPathException if container is null or deleted
* @throws InvalidAuthException if user has no read permissions on container
*/
public List<TimeseriesDataPoint> getDataPointsByTimeseries(
long containerId,
Timeseries timeseries,
TimeseriesDataPointsQueryParams queryParams
) {
timeseriesContainerService.getContainer(containerId);
return getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams);
}
/**
* Retrieve a list of DataPoints for a time-interval with options to grouping/
* time slicing, filling and aggregating.
*
* This function does not check if the container specified by containerId is
* accessible.
* We add <code>@ActivateRequestContext</code> in order to call this method in a
* parallel stream.
* The container check relies on an active request context.
* However, the 'ActivateRequestContext' annotation does not allow for a
* container check.
*
* @param containerId
* @param timeseries
* @param queryParams
* @return List<TimeseriesDataPoint>
*/
@ActivateRequestContext
public List<TimeseriesDataPoint> getDataPointsByTimeseriesActivatedRequestContext(
long containerId,
Timeseries timeseries,
TimeseriesDataPointsQueryParams queryParams
) {
Optional<TimeseriesEntity> timeseriesEntity = this.timeseriesRepository.findTimeseries(containerId, timeseries);
if (timeseriesEntity.isEmpty()) return Collections.emptyList();
int timeseriesId = timeseriesEntity.get().getId();
DataPointValueType valueType = timeseriesEntity.get().getValueType();
return this.timeseriesDataPointRepository.queryDataPoints(timeseriesId, valueType, queryParams);
}
public List<TimeseriesWithDataPoints> getManyTimeseriesWithDataPoints(
Long containerId,
List<Timeseries> timeseriesList,
TimeseriesDataPointsQueryParams queryParams
) {
timeseriesContainerService.getContainer(containerId);
ConcurrentLinkedQueue<TimeseriesWithDataPoints> timeseriesWithDataPointsQueue = new ConcurrentLinkedQueue<
TimeseriesWithDataPoints
>();
timeseriesList
.parallelStream()
.forEach(timeseries -> {
timeseriesWithDataPointsQueue.add(
new TimeseriesWithDataPoints(
timeseries,
getDataPointsByTimeseriesActivatedRequestContext(containerId, timeseries, queryParams)
)
);
});
return new ArrayList<TimeseriesWithDataPoints>(timeseriesWithDataPointsQueue);
}
/**
* Saves data points in the database.
* If the corresponding timeseries did not exist before, it will be persisted in
* the database.
*
* @param timeseriesContainerId Identifies the TimeseriesContainer
* @param timeseries The timeseries identifiers
* @param dataPoints Data points to be added to the timeseries
* @return created timeseries
*/
public TimeseriesEntity saveDataPoints(
long timeseriesContainerId,
Timeseries timeseries,
List<TimeseriesDataPoint> dataPoints
) {
timeseriesContainerService.getContainer(timeseriesContainerId);
timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
DataPointValueType incomingValueType = ObjectTypeEvaluator.determineType(dataPoints.get(0).getValue()).orElseThrow(
() -> new InvalidBodyException()
);
return saveDataPoints(timeseriesContainerId, timeseries, dataPoints, incomingValueType);
}
/**
* Saves data points in the database.
* If the corresponding timeseries did not exist before, it will be persisted in
* the database.
*
* @param timeseriesContainerId Identifies the TimeseriesContainer
* @param timeseries The timeseries identifiers
* @param dataPoints Data points to be added to the timeseries
* @param dataType The data type that values in this timeseries
* will have
* @return created timeseries
*/
@Transactional(Transactional.TxType.REQUIRES_NEW)
@TransactionConfiguration(timeout = 6000)
public TimeseriesEntity saveDataPoints(
long timeseriesContainerId,
Timeseries timeseries,
List<TimeseriesDataPoint> dataPoints,
DataPointValueType dataType
) {
timeseriesContainerService.getContainer(timeseriesContainerId);
timeseriesContainerService.assertIsAllowedToEditContainer(timeseriesContainerId);
TimeseriesEntity timeseriesEntity = getOrCreateTimeseries(timeseriesContainerId, timeseries, dataType);
assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
timeseriesDataPointRepository.insertManyDataPoints(dataPoints, timeseriesEntity);
return timeseriesEntity;
}
/**
* Saves data points in the database.
* If the corresponding timeseries did not exist before, it will be persisted in
* the database.
*
* This function is introduced to fix the issue occurring in ticket: #581.
*
* TODO: This function should only be used for the timeseries migration, and
* should be deleted in the future!
*
* @param timeseriesContainerId Identifies the TimeseriesContainer
* @param timeseries The timeseries identifiers
* @param dataPoints Data points to be added to the timeseries
* @param dataType The data type that values in this timeseries
* will have
* @return created timeseries
*/
@Deprecated
@Transactional(Transactional.TxType.REQUIRES_NEW)
@TransactionConfiguration(timeout = 6000)
public TimeseriesEntity saveDataPointsNoChecks(
long timeseriesContainerId,
Timeseries timeseries,
List<TimeseriesDataPoint> dataPoints,
DataPointValueType dataType
) {
TimeseriesEntity timeseriesEntity = getOrCreateTimeseriesNoChecks(timeseriesContainerId, timeseries, dataType);
assertDataPointsMatchTimeseriesValueType(timeseriesEntity, dataPoints);
try {
timeseriesDataPointRepository.insertManyDataPointsWithCopyCommand(dataPoints, timeseriesEntity);
} catch (SQLException e) {
// COPY command cannot handle duplicates (conflict on timeseries_id and time column)
// We are going to use the normal batch insertion instead.
Log.warnf("SQLException during copy insert (expected): %s ", e.getMessage());
Log.warn("We are going to repeat the operation with batch insert.");
return repeatSaveDataPointsWithBatchInsert(dataPoints, timeseriesEntity);
}
return timeseriesEntity;
}
@Deprecated
@Transactional(Transactional.TxType.REQUIRES_NEW)
@TransactionConfiguration(timeout = 6000)
public TimeseriesEntity repeatSaveDataPointsWithBatchInsert(
List<TimeseriesDataPoint> entities,
TimeseriesEntity timeseriesEntity
) {
timeseriesDataPointRepository.insertManyDataPoints(entities, timeseriesEntity);
return timeseriesEntity;
}
/**
* This function is introduced to fix the issue occurring in ticket: #581.
*
* TODO: This function should only be used for the timeseries migration, and
* should be deleted in the future!
*/
@Deprecated
private TimeseriesEntity getOrCreateTimeseriesNoChecks(
long containerId,
Timeseries timeseries,
DataPointValueType incomingValueType
) {
// try to find timeseries in db
Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
// create new timeseries because it does not exist
TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
QuarkusTransaction.requiringNew()
.run(() -> {
this.timeseriesRepository.upsert(containerId, timeseriesEntity);
});
var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
return found.get();
}
private TimeseriesEntity getOrCreateTimeseries(
long containerId,
Timeseries timeseries,
DataPointValueType incomingValueType
) {
timeseriesContainerService.getContainer(containerId);
timeseriesContainerService.assertIsAllowedToEditContainer(containerId);
// try to find timeseries in db
Optional<TimeseriesEntity> matchingTimeseries = timeseriesRepository.findTimeseries(containerId, timeseries);
if (matchingTimeseries.isPresent()) return matchingTimeseries.get();
TimeseriesValidator.assertTimeseriesPropertiesAreValid(timeseries);
// create new timeseries because it does not exist
TimeseriesEntity timeseriesEntity = new TimeseriesEntity(containerId, timeseries, incomingValueType);
QuarkusTransaction.requiringNew()
.run(() -> {
this.timeseriesRepository.upsert(containerId, timeseriesEntity);
});
var found = this.timeseriesRepository.findTimeseries(containerId, timeseries);
return found.get();
}
private static void assertDataPointsMatchTimeseriesValueType(
TimeseriesEntity timeseriesEntity,
List<TimeseriesDataPoint> dataPoints
) {
for (TimeseriesDataPoint dataPoint : dataPoints) {
DataPointValueType expectedType = ObjectTypeEvaluator.determineType(dataPoint.getValue()).orElseThrow(() ->
new InvalidBodyException()
);
assertValueTypeMatchesTimeseries(timeseriesEntity, expectedType);
}
}
private static void assertValueTypeMatchesTimeseries(
TimeseriesEntity timeseries,
DataPointValueType incomingValueType
) {
if (timeseries.getValueType() != incomingValueType) throw new InvalidBodyException(
"Timeseries already exists for data type %s but new data points are of type %s",
timeseries.getValueType(),
incomingValueType
);
}
}