View Javadoc
1   package de.dlr.shepard.common.neo4j.migrations;
2   
3   import ac.simons.neo4j.migrations.core.JavaBasedMigration;
4   import ac.simons.neo4j.migrations.core.MigrationContext;
5   import com.fasterxml.jackson.core.JsonProcessingException;
6   import com.fasterxml.jackson.databind.JsonNode;
7   import com.fasterxml.jackson.databind.ObjectMapper;
8   import com.fasterxml.jackson.databind.util.StdDateFormat;
9   import io.quarkus.logging.Log;
10  import java.text.ParseException;
11  import java.util.Date;
12  import java.util.HashMap;
13  import java.util.Map;
14  import java.util.Optional;
15  import org.neo4j.driver.Session;
16  
17  public class V2__Extract_json implements JavaBasedMigration {
18  
19    record ShepardFile(String oid, String filename, long createdAt, String md5) {}
20  
21    record StructuredData(String oid, String name, long createdAt) {}
22  
23    record Timeseries(String measurement, String device, String location, String symbolicName, String field) {}
24  
25    private static final String FILES_JSON = "filesJson";
26    private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
27    private static final String TIMESERIES_JSON = "timeseriesJson";
28    private final ObjectMapper mapper = new ObjectMapper();
29  
30    @Override
31    public void apply(MigrationContext context) {
32      try (Session session = context.getSession()) {
33        Log.info("Running V2 migration (1/5)");
34        migrateFileContainer(session);
35        Log.info("Running V2 migration (2/5)");
36        migrateFileReferences(session);
37        Log.info("Running V2 migration (3/5)");
38        migrateStructuredDataContainer(session);
39        Log.info("Running V2 migration (4/5)");
40        migrateStructuredDataReferences(session);
41        Log.info("Running V2 migration (5/5)");
42        migrateTimeseriesReferences(session);
43      } catch (Exception e) {
44        Log.error("Error while running migration: ", e);
45      }
46    }
47  
48    private void migrateFileContainer(Session session) {
49      var cResults = session.executeRead(tx ->
50        tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
51      );
52      for (int i = 0; i < cResults.size(); i++) {
53        logPercent(i, cResults.size());
54        var c = cResults.get(i).get("c").asNode();
55        var cId = c.elementId();
56  
57        if (!c.containsKey(FILES_JSON)) continue;
58  
59        var tx = session.beginTransaction();
60        for (var fileObj : c.get(FILES_JSON).asList()) {
61          if (fileObj instanceof String fileStr) {
62            var fileNode = parseJson(fileStr);
63            if (fileNode.isEmpty()) {
64              Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
65              continue;
66            }
67            var file = parseShepardFile(fileNode.get());
68            Map<String, Object> params = new HashMap<>();
69            params.put(
70              "props",
71              Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
72            );
73            var query =
74              """
75              MATCH (c:FileContainer) WHERE ID(c) = %s
76              CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
77              """;
78            tx.run(query.formatted(cId), params);
79          }
80        }
81        tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
82        tx.commit();
83      }
84    }
85  
86    private void migrateFileReferences(Session session) {
87      var rResults = session.executeRead(tx ->
88        tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
89      );
90      for (int i = 0; i < rResults.size(); i++) {
91        logPercent(i, rResults.size());
92        var r = rResults.get(i).get("r").asNode();
93        var rId = r.elementId();
94  
95        if (!r.containsKey(FILES_JSON)) continue;
96  
97        var tx = session.beginTransaction();
98        for (var fileObj : r.get(FILES_JSON).asList()) {
99          if (fileObj instanceof String fileStr) {
100           var fileNode = parseJson(fileStr);
101           if (fileNode.isEmpty()) {
102             Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
103             continue;
104           }
105           var file = parseShepardFile(fileNode.get());
106           Map<String, Object> params = new HashMap<>();
107           params.put("oid", file.oid);
108           params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
109           var query =
110             """
111             MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
112             MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
113             SET sf += $props
114             CREATE (r)-[hp:has_payload]->(sf)
115             """;
116           tx.run(query.formatted(rId), params);
117         }
118       }
119       tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
120       tx.commit();
121     }
122   }
123 
124   private void migrateStructuredDataContainer(Session session) {
125     var cResults = session.executeRead(tx ->
126       tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
127     );
128     for (int i = 0; i < cResults.size(); i++) {
129       logPercent(i, cResults.size());
130       var c = cResults.get(i).get("c").asNode();
131       var cId = c.elementId();
132 
133       if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;
134 
135       var tx = session.beginTransaction();
136       for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
137         if (structuredDataObj instanceof String structuredDataStr) {
138           var structuredDataNode = parseJson(structuredDataStr);
139           if (structuredDataNode.isEmpty()) {
140             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
141             continue;
142           }
143           var sd = parseStructuredData(structuredDataNode.get());
144           Map<String, Object> params = new HashMap<>();
145           params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
146           var query =
147             """
148             MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
149             CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
150             """;
151           tx.run(query.formatted(cId), params);
152         }
153       }
154       tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
155       tx.commit();
156     }
157   }
158 
159   private void migrateStructuredDataReferences(Session session) {
160     var rResults = session.executeRead(tx ->
161       tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
162     );
163     for (int i = 0; i < rResults.size(); i++) {
164       logPercent(i, rResults.size());
165       var r = rResults.get(i).get("r").asNode();
166       var rId = r.elementId();
167 
168       if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;
169 
170       var tx = session.beginTransaction();
171       for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
172         if (structuredDataObj instanceof String structuredDataStr) {
173           var structuredDataNode = parseJson(structuredDataStr);
174           if (structuredDataNode.isEmpty()) {
175             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
176             continue;
177           }
178           var sd = parseStructuredData(structuredDataNode.get());
179           Map<String, Object> params = new HashMap<>();
180           params.put("oid", sd.oid);
181           params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
182           var query =
183             """
184             MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
185             MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
186             SET sd += $props
187             CREATE (r)-[hp:has_payload]->(sd)
188             """;
189           tx.run(query.formatted(rId), params);
190         }
191       }
192       tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
193       tx.commit();
194     }
195   }
196 
197   private void migrateTimeseriesReferences(Session session) {
198     var rResults = session.executeRead(tx ->
199       tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
200     );
201     for (int i = 0; i < rResults.size(); i++) {
202       logPercent(i, rResults.size());
203       var r = rResults.get(i).get("r").asNode();
204       var rId = r.elementId();
205 
206       if (!r.containsKey(TIMESERIES_JSON)) continue;
207 
208       var tx = session.beginTransaction();
209       for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
210         if (timeseriesObj instanceof String timeseriesStr) {
211           var timeseriesNode = parseJson(timeseriesStr);
212           if (timeseriesNode.isEmpty()) {
213             Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
214             continue;
215           }
216           var ts = parseTimeseries(timeseriesNode.get());
217           Map<String, Object> params = Map.of(
218             "measurement",
219             ts.measurement,
220             "device",
221             ts.device,
222             "location",
223             ts.location,
224             "symbolicName",
225             ts.symbolicName,
226             "field",
227             ts.field
228           );
229           var query =
230             """
231             MATCH (r:TimeseriesReference) WHERE ID(r) = %s
232             MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
233             CREATE (r)-[hp:has_payload]->(ts)
234             """;
235 
236           tx.run(query.formatted(rId), params);
237         }
238       }
239       tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
240       tx.commit();
241     }
242   }
243 
244   private Optional<JsonNode> parseJson(String str) {
245     JsonNode node;
246     try {
247       node = mapper.readTree(str);
248     } catch (JsonProcessingException e) {
249       // This should not be possible
250       Log.error(e.toString());
251       return Optional.empty();
252     }
253     return Optional.of(node);
254   }
255 
256   private long parseDate(String date) {
257     if (date.isEmpty()) return 0L;
258     Date parsed;
259     try {
260       parsed = new StdDateFormat().parse(date);
261     } catch (ParseException e) {
262       // This should not be possible
263       Log.warnf("%s, using 0 instead", e.getMessage());
264       return 0L;
265     }
266     return parsed.getTime();
267   }
268 
269   private ShepardFile parseShepardFile(JsonNode node) {
270     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
271     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
272     var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
273     var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
274     return new ShepardFile(oid, filename, parseDate(createdAt), md5);
275   }
276 
277   private StructuredData parseStructuredData(JsonNode node) {
278     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
279     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
280     var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
281     return new StructuredData(oid, name, parseDate(createdAt));
282   }
283 
284   private Timeseries parseTimeseries(JsonNode node) {
285     var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
286     var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
287     var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
288     var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
289     var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
290     return new Timeseries(measurement, device, location, symbolicName, field);
291   }
292 
293   private void logPercent(int i, int size) {
294     int curPercent = (int) ((100f / size) * i);
295     int prePercent = (int) ((100f / size) * (i - 1));
296     if (prePercent < curPercent) {
297       Log.infof("... %d %", curPercent);
298     }
299   }
300 }