1 package de.dlr.shepard.data.timeseries.repositories; 2 3 import de.dlr.shepard.data.timeseries.model.Timeseries; 4 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity; 5 import io.micrometer.core.annotation.Timed; 6 import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase; 7 import io.quarkus.logging.Log; 8 import jakarta.enterprise.context.RequestScoped; 9 import jakarta.persistence.EntityManager; 10 import jakarta.persistence.PersistenceContext; 11 import java.util.List; 12 import java.util.Optional; 13 import java.util.stream.Collectors; 14 15 @RequestScoped 16 public class TimeseriesRepository implements PanacheRepositoryBase<TimeseriesEntity, Integer> { 17 18 @PersistenceContext 19 EntityManager entityManager; 20 21 public Optional<TimeseriesEntity> findTimeseries(long containerId, Timeseries timeseries) { 22 List<TimeseriesEntity> timeseriesList = 23 this.find( 24 "containerId = ?1 and measurement = ?2 and field = ?3 and symbolicName = ?4 and device = ?5 and location = ?6", 25 containerId, 26 timeseries.getMeasurement(), 27 timeseries.getField(), 28 timeseries.getSymbolicName(), 29 timeseries.getDevice(), 30 timeseries.getLocation() 31 ).list(); 32 if (timeseriesList.isEmpty()) return Optional.empty(); 33 if (timeseriesList.size() > 1) { 34 var errorMessage = String.format( 35 "Multiple Timeseries exist with the same properties for container id: %d. Timeseries Ids: %s", 36 containerId, 37 timeseriesList.stream().map(ts -> ts.getId() + "").collect(Collectors.joining(", ")) 38 ); 39 throw new RuntimeException(errorMessage); 40 } 41 return Optional.of(timeseriesList.get(0)); 42 } 43 44 public void upsert(long containerId, TimeseriesEntity entity) { 45 var rowCount = entityManager 46 .createQuery( 47 "insert into TimeseriesEntity (containerId, measurement, field, symbolicName, device, location, valueType)" + 48 " values (:containerId, :measurement, :field, :symbolicName, :device, :location, :valueType)" + 49 " on conflict(containerId, measurement, field, symbolicName, device, location) do nothing" 50 ) 51 .setParameter("containerId", entity.getContainerId()) 52 .setParameter("measurement", entity.getMeasurement()) 53 .setParameter("field", entity.getField()) 54 .setParameter("symbolicName", entity.getSymbolicName()) 55 .setParameter("device", entity.getDevice()) 56 .setParameter("location", entity.getLocation()) 57 .setParameter("valueType", entity.getValueType()) 58 .executeUpdate(); 59 60 if (rowCount == 0) Log.warn("Upsert did not insert a timeseries record."); 61 if (rowCount == 1) Log.info("Upsert has created a new timeseries record."); 62 if (rowCount > 1) throw new RuntimeException("Upsert has changed multiple rows."); 63 } 64 65 @Timed(value = "shepard.timeseries.delete") 66 public void deleteByContainerId(long containerId) { 67 var rowCount = entityManager 68 .createQuery("delete from TimeseriesEntity where containerId = :containerId") 69 .setParameter("containerId", containerId) 70 .executeUpdate(); 71 72 if (rowCount == 0) Log.warn("deleteByContainerId did not delete any timeseries record."); 73 if (rowCount > 0) Log.infof("deleteByContainerId has deleted %s timeseries records.", rowCount); 74 } 75 }