1 package de.dlr.shepard.data.timeseries.repositories;
2
3 import de.dlr.shepard.data.timeseries.model.Timeseries;
4 import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
5 import io.micrometer.core.annotation.Timed;
6 import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
7 import io.quarkus.logging.Log;
8 import jakarta.enterprise.context.RequestScoped;
9 import jakarta.persistence.EntityManager;
10 import jakarta.persistence.PersistenceContext;
11 import java.util.List;
12 import java.util.Optional;
13 import java.util.stream.Collectors;
14
15 @RequestScoped
16 public class TimeseriesRepository implements PanacheRepositoryBase<TimeseriesEntity, Integer> {
17
18 @PersistenceContext
19 EntityManager entityManager;
20
21 public Optional<TimeseriesEntity> findTimeseries(long containerId, Timeseries timeseries) {
22 List<TimeseriesEntity> timeseriesList =
23 this.find(
24 "containerId = ?1 and measurement = ?2 and field = ?3 and symbolicName = ?4 and device = ?5 and location = ?6",
25 containerId,
26 timeseries.getMeasurement(),
27 timeseries.getField(),
28 timeseries.getSymbolicName(),
29 timeseries.getDevice(),
30 timeseries.getLocation()
31 ).list();
32 if (timeseriesList.isEmpty()) return Optional.empty();
33 if (timeseriesList.size() > 1) {
34 var errorMessage = String.format(
35 "Multiple Timeseries exist with the same properties for container id: %d. Timeseries Ids: %s",
36 containerId,
37 timeseriesList.stream().map(ts -> ts.getId() + "").collect(Collectors.joining(", "))
38 );
39 throw new RuntimeException(errorMessage);
40 }
41 return Optional.of(timeseriesList.get(0));
42 }
43
44 public void upsert(long containerId, TimeseriesEntity entity) {
45 var rowCount = entityManager
46 .createQuery(
47 "insert into TimeseriesEntity (containerId, measurement, field, symbolicName, device, location, valueType)" +
48 " values (:containerId, :measurement, :field, :symbolicName, :device, :location, :valueType)" +
49 " on conflict(containerId, measurement, field, symbolicName, device, location) do nothing"
50 )
51 .setParameter("containerId", entity.getContainerId())
52 .setParameter("measurement", entity.getMeasurement())
53 .setParameter("field", entity.getField())
54 .setParameter("symbolicName", entity.getSymbolicName())
55 .setParameter("device", entity.getDevice())
56 .setParameter("location", entity.getLocation())
57 .setParameter("valueType", entity.getValueType())
58 .executeUpdate();
59
60 if (rowCount == 0) Log.warn("Upsert did not insert a timeseries record.");
61 if (rowCount == 1) Log.info("Upsert has created a new timeseries record.");
62 if (rowCount > 1) throw new RuntimeException("Upsert has changed multiple rows.");
63 }
64
65 @Timed(value = "shepard.timeseries.delete")
66 public void deleteByContainerId(long containerId) {
67 var rowCount = entityManager
68 .createQuery("delete from TimeseriesEntity where containerId = :containerId")
69 .setParameter("containerId", containerId)
70 .executeUpdate();
71
72 if (rowCount == 0) Log.warn("deleteByContainerId did not delete any timeseries record.");
73 if (rowCount > 0) Log.infof("deleteByContainerId has deleted %s timeseries records.", rowCount);
74 }
75 }