View Javadoc
1   package de.dlr.shepard.neo4j.migrations;
2   
3   import ac.simons.neo4j.migrations.core.JavaBasedMigration;
4   import ac.simons.neo4j.migrations.core.MigrationContext;
5   import com.fasterxml.jackson.core.JsonProcessingException;
6   import com.fasterxml.jackson.databind.JsonNode;
7   import com.fasterxml.jackson.databind.ObjectMapper;
8   import com.fasterxml.jackson.databind.util.StdDateFormat;
9   import io.quarkus.logging.Log;
10  import java.text.ParseException;
11  import java.util.Date;
12  import java.util.HashMap;
13  import java.util.Map;
14  import java.util.Optional;
15  import lombok.AllArgsConstructor;
16  import org.neo4j.driver.Session;
17  
18  public class V2__Extract_json implements JavaBasedMigration {
19  
20    @AllArgsConstructor
21    class ShepardFile {
22  
23      public final String oid;
24      public final String filename;
25      public final long createdAt;
26      public final String md5;
27    }
28  
29    @AllArgsConstructor
30    class StructuredData {
31  
32      public final String oid;
33      public final String name;
34      public final long createdAt;
35    }
36  
37    @AllArgsConstructor
38    class Timeseries {
39  
40      public final String measurement;
41      public final String device;
42      public final String location;
43      public final String symbolicName;
44      public final String field;
45    }
46  
47    private static final String FILES_JSON = "filesJson";
48    private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
49    private static final String TIMESERIES_JSON = "timeseriesJson";
50    private ObjectMapper mapper = new ObjectMapper();
51  
52    @Override
53    public void apply(MigrationContext context) {
54      try (Session session = context.getSession()) {
55        Log.info("Running migration (1/5)");
56        migrateFileContainer(session);
57        Log.info("Running migration (2/5)");
58        migrateFileReferences(session);
59        Log.info("Running migration (3/5)");
60        migrateStructuredDataContainer(session);
61        Log.info("Running migration (4/5)");
62        migrateStructuredDataReferences(session);
63        Log.info("Running migration (5/5)");
64        migrateTimeseriesReferences(session);
65      } catch (Exception e) {
66        Log.error("Error while running migration: ", e);
67      }
68    }
69  
70    private void migrateFileContainer(Session session) {
71      var cResults = session.executeRead(tx ->
72        tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
73      );
74      for (int i = 0; i < cResults.size(); i++) {
75        logPercent(i, cResults.size());
76        var c = cResults.get(i).get("c").asNode();
77        var cId = c.elementId();
78  
79        if (!c.containsKey(FILES_JSON)) continue;
80  
81        var tx = session.beginTransaction();
82        for (var fileObj : c.get(FILES_JSON).asList()) {
83          if (fileObj instanceof String fileStr) {
84            var fileNode = parseJson(fileStr);
85            if (fileNode.isEmpty()) {
86              Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
87              continue;
88            }
89            var file = parseShepardFile(fileNode.get());
90            Map<String, Object> params = new HashMap<>();
91            params.put(
92              "props",
93              Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
94            );
95            var query =
96              """
97              MATCH (c:FileContainer) WHERE ID(c) = %s
98              CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
99              """;
100           tx.run(String.format(query, cId), params);
101         }
102       }
103       tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
104       tx.commit();
105     }
106   }
107 
108   private void migrateFileReferences(Session session) {
109     var rResults = session.executeRead(tx ->
110       tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
111     );
112     for (int i = 0; i < rResults.size(); i++) {
113       logPercent(i, rResults.size());
114       var r = rResults.get(i).get("r").asNode();
115       var rId = r.elementId();
116 
117       if (!r.containsKey(FILES_JSON)) continue;
118 
119       var tx = session.beginTransaction();
120       for (var fileObj : r.get(FILES_JSON).asList()) {
121         if (fileObj instanceof String fileStr) {
122           var fileNode = parseJson(fileStr);
123           if (fileNode.isEmpty()) {
124             Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
125             continue;
126           }
127           var file = parseShepardFile(fileNode.get());
128           Map<String, Object> params = new HashMap<>();
129           params.put("oid", file.oid);
130           params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
131           var query =
132             """
133             MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
134             MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
135             SET sf += $props
136             CREATE (r)-[hp:has_payload]->(sf)
137             """;
138           tx.run(String.format(query, rId), params);
139         }
140       }
141       tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
142       tx.commit();
143     }
144   }
145 
146   private void migrateStructuredDataContainer(Session session) {
147     var cResults = session.executeRead(tx ->
148       tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
149     );
150     for (int i = 0; i < cResults.size(); i++) {
151       logPercent(i, cResults.size());
152       var c = cResults.get(i).get("c").asNode();
153       var cId = c.elementId();
154 
155       if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;
156 
157       var tx = session.beginTransaction();
158       for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
159         if (structuredDataObj instanceof String structuredDataStr) {
160           var structuredDataNode = parseJson(structuredDataStr);
161           if (structuredDataNode.isEmpty()) {
162             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
163             continue;
164           }
165           var sd = parseStructuredData(structuredDataNode.get());
166           Map<String, Object> params = new HashMap<>();
167           params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
168           var query =
169             """
170             MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
171             CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
172             """;
173           tx.run(String.format(query, cId), params);
174         }
175       }
176       tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
177       tx.commit();
178     }
179   }
180 
181   private void migrateStructuredDataReferences(Session session) {
182     var rResults = session.executeRead(tx ->
183       tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
184     );
185     for (int i = 0; i < rResults.size(); i++) {
186       logPercent(i, rResults.size());
187       var r = rResults.get(i).get("r").asNode();
188       var rId = r.elementId();
189 
190       if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;
191 
192       var tx = session.beginTransaction();
193       for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
194         if (structuredDataObj instanceof String structuredDataStr) {
195           var structuredDataNode = parseJson(structuredDataStr);
196           if (structuredDataNode.isEmpty()) {
197             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
198             continue;
199           }
200           var sd = parseStructuredData(structuredDataNode.get());
201           Map<String, Object> params = new HashMap<>();
202           params.put("oid", sd.oid);
203           params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
204           var query =
205             """
206             MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
207             MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
208             SET sd += $props
209             CREATE (r)-[hp:has_payload]->(sd)
210             """;
211           tx.run(String.format(query, rId), params);
212         }
213       }
214       tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
215       tx.commit();
216     }
217   }
218 
219   private void migrateTimeseriesReferences(Session session) {
220     var rResults = session.executeRead(tx ->
221       tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
222     );
223     for (int i = 0; i < rResults.size(); i++) {
224       logPercent(i, rResults.size());
225       var r = rResults.get(i).get("r").asNode();
226       var rId = r.elementId();
227 
228       if (!r.containsKey(TIMESERIES_JSON)) continue;
229 
230       var tx = session.beginTransaction();
231       for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
232         if (timeseriesObj instanceof String timeseriesStr) {
233           var timeseriesNode = parseJson(timeseriesStr);
234           if (timeseriesNode.isEmpty()) {
235             Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
236             continue;
237           }
238           var ts = parseTimeseries(timeseriesNode.get());
239           Map<String, Object> params = Map.of(
240             "measurement",
241             ts.measurement,
242             "device",
243             ts.device,
244             "location",
245             ts.location,
246             "symbolicName",
247             ts.symbolicName,
248             "field",
249             ts.field
250           );
251           var query =
252             """
253             MATCH (r:TimeseriesReference) WHERE ID(r) = %s
254             MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
255             CREATE (r)-[hp:has_payload]->(ts)
256             """;
257 
258           tx.run(String.format(query, rId), params);
259         }
260       }
261       tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
262       tx.commit();
263     }
264   }
265 
266   private Optional<JsonNode> parseJson(String str) {
267     JsonNode node;
268     try {
269       node = mapper.readTree(str);
270     } catch (JsonProcessingException e) {
271       // This should not be possible
272       Log.error(e.toString());
273       return Optional.empty();
274     }
275     return Optional.of(node);
276   }
277 
278   private long parseDate(String date) {
279     if (date.length() == 0) return 0L;
280     Date parsed;
281     try {
282       parsed = new StdDateFormat().parse(date);
283     } catch (ParseException e) {
284       // This should not be possible
285       Log.warnf("%s, using 0 instead", e.getMessage());
286       return 0L;
287     }
288     return parsed.getTime();
289   }
290 
291   private ShepardFile parseShepardFile(JsonNode node) {
292     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
293     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
294     var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
295     var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
296     return new ShepardFile(oid, filename, parseDate(createdAt), md5);
297   }
298 
299   private StructuredData parseStructuredData(JsonNode node) {
300     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
301     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
302     var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
303     return new StructuredData(oid, name, parseDate(createdAt));
304   }
305 
306   private Timeseries parseTimeseries(JsonNode node) {
307     var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
308     var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
309     var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
310     var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
311     var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
312     return new Timeseries(measurement, device, location, symbolicName, field);
313   }
314 
315   private void logPercent(int i, int size) {
316     int curPercent = (int) ((100f / size) * i);
317     int prePercent = (int) ((100f / size) * (i - 1));
318     if (prePercent < curPercent) {
319       Log.infof("... %d %", curPercent);
320     }
321   }
322 }