View Javadoc
1   package de.dlr.shepard.data.timeseries.repositories;
2   
3   import de.dlr.shepard.data.timeseries.model.Timeseries;
4   import de.dlr.shepard.data.timeseries.model.TimeseriesEntity;
5   import io.micrometer.core.annotation.Timed;
6   import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
7   import io.quarkus.logging.Log;
8   import jakarta.enterprise.context.RequestScoped;
9   import jakarta.persistence.EntityManager;
10  import jakarta.persistence.PersistenceContext;
11  import java.util.List;
12  import java.util.Optional;
13  import java.util.stream.Collectors;
14  
15  @RequestScoped
16  public class TimeseriesRepository implements PanacheRepositoryBase<TimeseriesEntity, Integer> {
17  
18    @PersistenceContext
19    EntityManager entityManager;
20  
21    public Optional<TimeseriesEntity> findTimeseries(long containerId, Timeseries timeseries) {
22      List<TimeseriesEntity> timeseriesList =
23        this.find(
24            "containerId = ?1 and measurement = ?2 and field = ?3 and symbolicName = ?4 and device = ?5 and location = ?6",
25            containerId,
26            timeseries.getMeasurement(),
27            timeseries.getField(),
28            timeseries.getSymbolicName(),
29            timeseries.getDevice(),
30            timeseries.getLocation()
31          ).list();
32      if (timeseriesList.isEmpty()) return Optional.empty();
33      if (timeseriesList.size() > 1) {
34        var errorMessage = String.format(
35          "Multiple Timeseries exist with the same properties for container id: %d. Timeseries Ids: %s",
36          containerId,
37          timeseriesList.stream().map(ts -> ts.getId() + "").collect(Collectors.joining(", "))
38        );
39        throw new RuntimeException(errorMessage);
40      }
41      return Optional.of(timeseriesList.get(0));
42    }
43  
44    public void upsert(long containerId, TimeseriesEntity entity) {
45      var rowCount = entityManager
46        .createQuery(
47          "insert into TimeseriesEntity (containerId, measurement, field, symbolicName, device, location, valueType)" +
48          " values (:containerId, :measurement, :field, :symbolicName, :device, :location, :valueType)" +
49          " on conflict(containerId, measurement, field, symbolicName, device, location) do nothing"
50        )
51        .setParameter("containerId", entity.getContainerId())
52        .setParameter("measurement", entity.getMeasurement())
53        .setParameter("field", entity.getField())
54        .setParameter("symbolicName", entity.getSymbolicName())
55        .setParameter("device", entity.getDevice())
56        .setParameter("location", entity.getLocation())
57        .setParameter("valueType", entity.getValueType())
58        .executeUpdate();
59  
60      if (rowCount == 0) Log.warn("Upsert did not insert a timeseries record.");
61      if (rowCount == 1) Log.info("Upsert has created a new timeseries record.");
62      if (rowCount > 1) throw new RuntimeException("Upsert has changed multiple rows.");
63    }
64  
65    @Timed(value = "shepard.timeseries.delete")
66    public void deleteByContainerId(long containerId) {
67      var rowCount = entityManager
68        .createQuery("delete from TimeseriesEntity where containerId = :containerId")
69        .setParameter("containerId", containerId)
70        .executeUpdate();
71  
72      if (rowCount == 0) Log.warn("deleteByContainerId did not delete any timeseries record.");
73      if (rowCount > 0) Log.infof("deleteByContainerId has deleted %s timeseries records.", rowCount);
74    }
75  }