View Javadoc
1   package de.dlr.shepard.data.spatialdata.repositories;
2   
3   import de.dlr.shepard.common.util.JsonConverter;
4   import de.dlr.shepard.data.spatialdata.io.FilterCondition;
5   import de.dlr.shepard.data.spatialdata.model.SpatialDataPoint;
6   import io.micrometer.core.annotation.Timed;
7   import io.quarkus.hibernate.orm.PersistenceUnit;
8   import jakarta.enterprise.context.RequestScoped;
9   import jakarta.persistence.EntityManager;
10  import java.util.List;
11  import java.util.Locale;
12  import java.util.Map;
13  import org.locationtech.jts.geom.Coordinate;
14  
15  @RequestScoped
16  public class SpatialDataPointRepository {
17  
18    private final int INSERT_BATCH_SIZE = 20000;
19    public static final String SPATIAL_TABLE_NAME = "spatial_data_points";
20    public static final String SPATIAL_COLUMN_CONTAINER_ID = "container_id";
21    public static final String SPATIAL_COLUMN_TIME = "time";
22    public static final String SPATIAL_COLUMN_POSITION = "position";
23    public static final String SPATIAL_COLUMN_METADATA = "metadata";
24    public static final String SPATIAL_COLUMN_MEASUREMENTS = "measurements";
25  
26    private static final String[] ALL_COLUMNS_STRING = new String[] { "*" };
27  
28    @PersistenceUnit("spatial")
29    EntityManager entityManager;
30  
31    @Timed(value = "shepard.spatial-data.insert")
32    public int insert(long containerId, SpatialDataPoint data) {
33      var sql = new NativeInsertStatementBuilder()
34        .insert(
35          SPATIAL_TABLE_NAME,
36          new String[] {
37            SPATIAL_COLUMN_CONTAINER_ID,
38            SPATIAL_COLUMN_TIME,
39            SPATIAL_COLUMN_POSITION,
40            SPATIAL_COLUMN_METADATA,
41            SPATIAL_COLUMN_MEASUREMENTS,
42          }
43        )
44        .addValues(
45          String.format(
46            Locale.US,
47            "%d, '%s', ST_MakePoint(%f, %f, %f), CAST('%s' AS JSONB), CAST('%s' AS JSONB)",
48            containerId,
49            data.getTime(),
50            data.getPosition().getCoordinate().x,
51            data.getPosition().getCoordinate().y,
52            data.getPosition().getCoordinate().z,
53            JsonConverter.convertToString(data.getMetadata()),
54            JsonConverter.convertToString(data.getMeasurements())
55          )
56        )
57        .build();
58  
59      var query = entityManager.createNativeQuery(sql);
60      var resultCount = query.executeUpdate();
61      if (resultCount <= 0) throw new RuntimeException("SpatialData was not stored in database.");
62      return resultCount;
63    }
64  
65    @Timed(value = "shepard.spatial-data.insert-many")
66    public int insert(long containerId, SpatialDataPoint[] data) {
67      var allResultCount = 0;
68      var sql = new NativeInsertStatementBuilder()
69        .insert(
70          SPATIAL_TABLE_NAME,
71          new String[] {
72            SPATIAL_COLUMN_CONTAINER_ID,
73            SPATIAL_COLUMN_TIME,
74            SPATIAL_COLUMN_POSITION,
75            SPATIAL_COLUMN_METADATA,
76            SPATIAL_COLUMN_MEASUREMENTS,
77          }
78        );
79  
80      for (int i = 0; i < data.length; i += INSERT_BATCH_SIZE) {
81        int currentLimit = Math.min(i + INSERT_BATCH_SIZE, data.length);
82        for (int j = i; j < currentLimit; j++) {
83          sql.addValues(
84            String.format(
85              Locale.US,
86              "%d, '%s', ST_MakePoint(%f, %f, %f), CAST('%s' AS JSONB), CAST('%s' AS JSONB)",
87              containerId,
88              data[j].getTime(),
89              data[j].getPosition().getCoordinate().x,
90              data[j].getPosition().getCoordinate().y,
91              data[j].getPosition().getCoordinate().z,
92              JsonConverter.convertToString(data[j].getMetadata()),
93              JsonConverter.convertToString(data[j].getMeasurements())
94            )
95          );
96        }
97        var query = entityManager.createNativeQuery(sql.build());
98        allResultCount += query.executeUpdate();
99      }
100     return allResultCount;
101   }
102 
103   /**
104    *
105    * @param containerId to be used for deletion
106    * @return the number of rows deleted
107    */
108   @Timed(value = "shepard.spatial-data.delete-by-container")
109   public int deleteByContainerId(long containerId) {
110     return entityManager
111       .createNativeQuery(
112         "DELETE FROM %s WHERE %s=:containerId;".formatted(SPATIAL_TABLE_NAME, SPATIAL_COLUMN_CONTAINER_ID)
113       )
114       .setParameter("containerId", containerId)
115       .executeUpdate();
116   }
117 
118   @Timed(value = "shepard.spatial-data.get-by-container")
119   @SuppressWarnings("unchecked")
120   public List<SpatialDataPoint> getByContainerId(long containerId) {
121     var query = new NativeQueryStringBuilder()
122       .select(SPATIAL_TABLE_NAME, ALL_COLUMNS_STRING)
123       .addWhereCondition(SPATIAL_COLUMN_CONTAINER_ID, containerId)
124       .build();
125 
126     return entityManager.createNativeQuery(query, SpatialDataPoint.class).getResultList();
127   }
128 
129   @Timed(value = "shepard.spatial-data.query-by-bounding-box")
130   @SuppressWarnings("unchecked")
131   public List<SpatialDataPoint> get(
132     long containerId,
133     Long timestampStart,
134     Long timestampEnd,
135     Map<String, Object> metadataFilter,
136     List<FilterCondition> measurementsFilter,
137     Integer limit,
138     Integer skip
139   ) {
140     var queryBuilder = new NativeQueryStringBuilder()
141       .select(SPATIAL_TABLE_NAME, ALL_COLUMNS_STRING)
142       .addWhereCondition(SPATIAL_COLUMN_CONTAINER_ID, containerId)
143       .addTimeCondition(SPATIAL_COLUMN_TIME, timestampStart, timestampEnd)
144       .addJsonContainsCondition(SPATIAL_COLUMN_METADATA, metadataFilter)
145       .addJsonFilterConditions(SPATIAL_COLUMN_MEASUREMENTS, measurementsFilter)
146       .addSkipClause(skip)
147       .addLimitClause(limit);
148 
149     var query = entityManager.createNativeQuery(queryBuilder.build(), SpatialDataPoint.class);
150     queryBuilder.getQueryParameters().forEach(query::setParameter);
151     return query.getResultList();
152   }
153 
154   /**
155    * Create an (axis-aligned) bounding box query request for spatial data.
156    * The request uses the '&&&'' indexed operator, that acts similar to the
157    * ST_Intersects function.
158    * If the point is part of the bounding box (i.e., on a bounding box corner) the
159    * bounding box check returns true
160    */
161   @Timed(value = "shepard.spatial-data.query-by-bounding-box")
162   @SuppressWarnings("unchecked")
163   public List<SpatialDataPoint> getByBoundingBox(
164     long containerId,
165     Coordinate bottomLeft,
166     Coordinate topRight,
167     Long timestampStart,
168     Long timestampEnd,
169     Map<String, Object> metadataFilter,
170     List<FilterCondition> measurementsFilter,
171     Integer limit,
172     Integer skip
173   ) {
174     var queryBuilder = new NativeQueryStringBuilder()
175       .select(SPATIAL_TABLE_NAME, ALL_COLUMNS_STRING)
176       .addWhereCondition(SPATIAL_COLUMN_CONTAINER_ID, containerId)
177       .addTimeCondition(SPATIAL_COLUMN_TIME, timestampStart, timestampEnd)
178       .addJsonContainsCondition(SPATIAL_COLUMN_METADATA, metadataFilter)
179       .addJsonFilterConditions(SPATIAL_COLUMN_MEASUREMENTS, measurementsFilter)
180       .addAABBGeometryCondition(bottomLeft.x, bottomLeft.y, bottomLeft.z, topRight.x, topRight.y, topRight.z)
181       .addSkipClause(skip)
182       .addLimitClause(limit);
183 
184     var query = entityManager.createNativeQuery(queryBuilder.build(), SpatialDataPoint.class);
185     queryBuilder.getQueryParameters().forEach(query::setParameter);
186     return query.getResultList();
187   }
188 
189   @SuppressWarnings("unchecked")
190   @Timed(value = "shepard.spatial-data.query-by-bounding-sphere")
191   public List<SpatialDataPoint> getByBoundingSphere(
192     long containerId,
193     Coordinate coordinate,
194     double radius,
195     Long timestampStart,
196     Long timestampEnd,
197     Map<String, Object> metadataFilter,
198     List<FilterCondition> measurementsFilter,
199     Integer limit,
200     Integer skip
201   ) {
202     var queryBuilder = new NativeQueryStringBuilder()
203       .select(SPATIAL_TABLE_NAME, ALL_COLUMNS_STRING)
204       .addWhereCondition(SPATIAL_COLUMN_CONTAINER_ID, containerId)
205       .addTimeCondition(SPATIAL_COLUMN_TIME, timestampStart, timestampEnd)
206       .addJsonContainsCondition(SPATIAL_COLUMN_METADATA, metadataFilter)
207       .addJsonFilterConditions(SPATIAL_COLUMN_MEASUREMENTS, measurementsFilter)
208       .addBSGeometryCondition(coordinate.x, coordinate.y, coordinate.z, radius)
209       .addSkipClause(skip)
210       .addLimitClause(limit);
211 
212     var query = entityManager.createNativeQuery(queryBuilder.build(), SpatialDataPoint.class);
213     queryBuilder.getQueryParameters().forEach(query::setParameter);
214     return query.getResultList();
215   }
216 
217   /**
218    * Runs a k-nearest-neighbor search on the spatial data.
219    *
220    * @param coordinate - Starting point for the KNN search
221    * @param k          - number of returned points
222    */
223   @SuppressWarnings("unchecked")
224   @Timed(value = "shepard.spatial-data.query-by-knn")
225   public List<SpatialDataPoint> getByKNN(
226     long containerId,
227     Coordinate coordinate,
228     int k,
229     Long timestampStart,
230     Long timestampEnd,
231     Map<String, Object> metadataFilter,
232     List<FilterCondition> measurementsFilter
233   ) {
234     var queryBuilder = new NativeQueryStringBuilder()
235       .select(SPATIAL_TABLE_NAME, ALL_COLUMNS_STRING)
236       .addWhereCondition(SPATIAL_COLUMN_CONTAINER_ID, containerId)
237       .addTimeCondition(SPATIAL_COLUMN_TIME, timestampStart, timestampEnd)
238       .addJsonContainsCondition(SPATIAL_COLUMN_METADATA, metadataFilter)
239       .addJsonFilterConditions(SPATIAL_COLUMN_MEASUREMENTS, measurementsFilter)
240       .addKNNGeometryCondition(coordinate.x, coordinate.y, coordinate.z, k);
241 
242     var query = entityManager.createNativeQuery(queryBuilder.build(), SpatialDataPoint.class);
243     queryBuilder.getQueryParameters().forEach(query::setParameter);
244     return query.getResultList();
245   }
246 }