1 package de.dlr.shepard.neo4j.migrations;
2
3 import ac.simons.neo4j.migrations.core.JavaBasedMigration;
4 import ac.simons.neo4j.migrations.core.MigrationContext;
5 import com.fasterxml.jackson.core.JsonProcessingException;
6 import com.fasterxml.jackson.databind.JsonNode;
7 import com.fasterxml.jackson.databind.ObjectMapper;
8 import com.fasterxml.jackson.databind.util.StdDateFormat;
9 import io.quarkus.logging.Log;
10 import java.text.ParseException;
11 import java.util.Date;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.Optional;
15 import lombok.AllArgsConstructor;
16 import org.neo4j.driver.Session;
17
18 public class V2__Extract_json implements JavaBasedMigration {
19
20 @AllArgsConstructor
21 class ShepardFile {
22
23 public final String oid;
24 public final String filename;
25 public final long createdAt;
26 public final String md5;
27 }
28
29 @AllArgsConstructor
30 class StructuredData {
31
32 public final String oid;
33 public final String name;
34 public final long createdAt;
35 }
36
37 @AllArgsConstructor
38 class Timeseries {
39
40 public final String measurement;
41 public final String device;
42 public final String location;
43 public final String symbolicName;
44 public final String field;
45 }
46
47 private static final String FILES_JSON = "filesJson";
48 private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
49 private static final String TIMESERIES_JSON = "timeseriesJson";
50 private ObjectMapper mapper = new ObjectMapper();
51
52 @Override
53 public void apply(MigrationContext context) {
54 try (Session session = context.getSession()) {
55 Log.info("Running migration (1/5)");
56 migrateFileContainer(session);
57 Log.info("Running migration (2/5)");
58 migrateFileReferences(session);
59 Log.info("Running migration (3/5)");
60 migrateStructuredDataContainer(session);
61 Log.info("Running migration (4/5)");
62 migrateStructuredDataReferences(session);
63 Log.info("Running migration (5/5)");
64 migrateTimeseriesReferences(session);
65 } catch (Exception e) {
66 Log.error("Error while running migration: ", e);
67 }
68 }
69
70 private void migrateFileContainer(Session session) {
71 var cResults = session.executeRead(tx ->
72 tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
73 );
74 for (int i = 0; i < cResults.size(); i++) {
75 logPercent(i, cResults.size());
76 var c = cResults.get(i).get("c").asNode();
77 var cId = c.elementId();
78
79 if (!c.containsKey(FILES_JSON)) continue;
80
81 var tx = session.beginTransaction();
82 for (var fileObj : c.get(FILES_JSON).asList()) {
83 if (fileObj instanceof String fileStr) {
84 var fileNode = parseJson(fileStr);
85 if (fileNode.isEmpty()) {
86 Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
87 continue;
88 }
89 var file = parseShepardFile(fileNode.get());
90 Map<String, Object> params = new HashMap<>();
91 params.put(
92 "props",
93 Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
94 );
95 var query =
96 """
97 MATCH (c:FileContainer) WHERE ID(c) = %s
98 CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
99 """;
100 tx.run(String.format(query, cId), params);
101 }
102 }
103 tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
104 tx.commit();
105 }
106 }
107
108 private void migrateFileReferences(Session session) {
109 var rResults = session.executeRead(tx ->
110 tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
111 );
112 for (int i = 0; i < rResults.size(); i++) {
113 logPercent(i, rResults.size());
114 var r = rResults.get(i).get("r").asNode();
115 var rId = r.elementId();
116
117 if (!r.containsKey(FILES_JSON)) continue;
118
119 var tx = session.beginTransaction();
120 for (var fileObj : r.get(FILES_JSON).asList()) {
121 if (fileObj instanceof String fileStr) {
122 var fileNode = parseJson(fileStr);
123 if (fileNode.isEmpty()) {
124 Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
125 continue;
126 }
127 var file = parseShepardFile(fileNode.get());
128 Map<String, Object> params = new HashMap<>();
129 params.put("oid", file.oid);
130 params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
131 var query =
132 """
133 MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
134 MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
135 SET sf += $props
136 CREATE (r)-[hp:has_payload]->(sf)
137 """;
138 tx.run(String.format(query, rId), params);
139 }
140 }
141 tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
142 tx.commit();
143 }
144 }
145
146 private void migrateStructuredDataContainer(Session session) {
147 var cResults = session.executeRead(tx ->
148 tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
149 );
150 for (int i = 0; i < cResults.size(); i++) {
151 logPercent(i, cResults.size());
152 var c = cResults.get(i).get("c").asNode();
153 var cId = c.elementId();
154
155 if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;
156
157 var tx = session.beginTransaction();
158 for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
159 if (structuredDataObj instanceof String structuredDataStr) {
160 var structuredDataNode = parseJson(structuredDataStr);
161 if (structuredDataNode.isEmpty()) {
162 Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
163 continue;
164 }
165 var sd = parseStructuredData(structuredDataNode.get());
166 Map<String, Object> params = new HashMap<>();
167 params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
168 var query =
169 """
170 MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
171 CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
172 """;
173 tx.run(String.format(query, cId), params);
174 }
175 }
176 tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
177 tx.commit();
178 }
179 }
180
181 private void migrateStructuredDataReferences(Session session) {
182 var rResults = session.executeRead(tx ->
183 tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
184 );
185 for (int i = 0; i < rResults.size(); i++) {
186 logPercent(i, rResults.size());
187 var r = rResults.get(i).get("r").asNode();
188 var rId = r.elementId();
189
190 if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;
191
192 var tx = session.beginTransaction();
193 for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
194 if (structuredDataObj instanceof String structuredDataStr) {
195 var structuredDataNode = parseJson(structuredDataStr);
196 if (structuredDataNode.isEmpty()) {
197 Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
198 continue;
199 }
200 var sd = parseStructuredData(structuredDataNode.get());
201 Map<String, Object> params = new HashMap<>();
202 params.put("oid", sd.oid);
203 params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
204 var query =
205 """
206 MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
207 MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
208 SET sd += $props
209 CREATE (r)-[hp:has_payload]->(sd)
210 """;
211 tx.run(String.format(query, rId), params);
212 }
213 }
214 tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
215 tx.commit();
216 }
217 }
218
219 private void migrateTimeseriesReferences(Session session) {
220 var rResults = session.executeRead(tx ->
221 tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
222 );
223 for (int i = 0; i < rResults.size(); i++) {
224 logPercent(i, rResults.size());
225 var r = rResults.get(i).get("r").asNode();
226 var rId = r.elementId();
227
228 if (!r.containsKey(TIMESERIES_JSON)) continue;
229
230 var tx = session.beginTransaction();
231 for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
232 if (timeseriesObj instanceof String timeseriesStr) {
233 var timeseriesNode = parseJson(timeseriesStr);
234 if (timeseriesNode.isEmpty()) {
235 Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
236 continue;
237 }
238 var ts = parseTimeseries(timeseriesNode.get());
239 Map<String, Object> params = Map.of(
240 "measurement",
241 ts.measurement,
242 "device",
243 ts.device,
244 "location",
245 ts.location,
246 "symbolicName",
247 ts.symbolicName,
248 "field",
249 ts.field
250 );
251 var query =
252 """
253 MATCH (r:TimeseriesReference) WHERE ID(r) = %s
254 MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
255 CREATE (r)-[hp:has_payload]->(ts)
256 """;
257
258 tx.run(String.format(query, rId), params);
259 }
260 }
261 tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
262 tx.commit();
263 }
264 }
265
266 private Optional<JsonNode> parseJson(String str) {
267 JsonNode node;
268 try {
269 node = mapper.readTree(str);
270 } catch (JsonProcessingException e) {
271
272 Log.error(e.toString());
273 return Optional.empty();
274 }
275 return Optional.of(node);
276 }
277
278 private long parseDate(String date) {
279 if (date.length() == 0) return 0L;
280 Date parsed;
281 try {
282 parsed = new StdDateFormat().parse(date);
283 } catch (ParseException e) {
284
285 Log.warnf("%s, using 0 instead", e.getMessage());
286 return 0L;
287 }
288 return parsed.getTime();
289 }
290
291 private ShepardFile parseShepardFile(JsonNode node) {
292 var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
293 var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
294 var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
295 var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
296 return new ShepardFile(oid, filename, parseDate(createdAt), md5);
297 }
298
299 private StructuredData parseStructuredData(JsonNode node) {
300 var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
301 var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
302 var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
303 return new StructuredData(oid, name, parseDate(createdAt));
304 }
305
306 private Timeseries parseTimeseries(JsonNode node) {
307 var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
308 var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
309 var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
310 var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
311 var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
312 return new Timeseries(measurement, device, location, symbolicName, field);
313 }
314
315 private void logPercent(int i, int size) {
316 int curPercent = (int) ((100f / size) * i);
317 int prePercent = (int) ((100f / size) * (i - 1));
318 if (prePercent < curPercent) {
319 Log.infof("... %d %", curPercent);
320 }
321 }
322 }