1 package de.dlr.shepard.common.neo4j.migrations;
2
3 import ac.simons.neo4j.migrations.core.JavaBasedMigration;
4 import ac.simons.neo4j.migrations.core.MigrationContext;
5 import com.fasterxml.jackson.core.JsonProcessingException;
6 import com.fasterxml.jackson.databind.JsonNode;
7 import com.fasterxml.jackson.databind.ObjectMapper;
8 import com.fasterxml.jackson.databind.util.StdDateFormat;
9 import io.quarkus.logging.Log;
10 import java.text.ParseException;
11 import java.util.Date;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.Optional;
15 import org.neo4j.driver.Session;
16
17 public class V2__Extract_json implements JavaBasedMigration {
18
19 record ShepardFile(String oid, String filename, long createdAt, String md5) {}
20
21 record StructuredData(String oid, String name, long createdAt) {}
22
23 record Timeseries(String measurement, String device, String location, String symbolicName, String field) {}
24
25 private static final String FILES_JSON = "filesJson";
26 private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
27 private static final String TIMESERIES_JSON = "timeseriesJson";
28 private final ObjectMapper mapper = new ObjectMapper();
29
30 @Override
31 public void apply(MigrationContext context) {
32 try (Session session = context.getSession()) {
33 Log.info("Running V2 migration (1/5)");
34 migrateFileContainer(session);
35 Log.info("Running V2 migration (2/5)");
36 migrateFileReferences(session);
37 Log.info("Running V2 migration (3/5)");
38 migrateStructuredDataContainer(session);
39 Log.info("Running V2 migration (4/5)");
40 migrateStructuredDataReferences(session);
41 Log.info("Running V2 migration (5/5)");
42 migrateTimeseriesReferences(session);
43 } catch (Exception e) {
44 Log.error("Error while running migration: ", e);
45 }
46 }
47
48 private void migrateFileContainer(Session session) {
49 var cResults = session.executeRead(tx ->
50 tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
51 );
52 for (int i = 0; i < cResults.size(); i++) {
53 logPercent(i, cResults.size());
54 var c = cResults.get(i).get("c").asNode();
55 var cId = c.elementId();
56
57 if (!c.containsKey(FILES_JSON)) continue;
58
59 var tx = session.beginTransaction();
60 for (var fileObj : c.get(FILES_JSON).asList()) {
61 if (fileObj instanceof String fileStr) {
62 var fileNode = parseJson(fileStr);
63 if (fileNode.isEmpty()) {
64 Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
65 continue;
66 }
67 var file = parseShepardFile(fileNode.get());
68 Map<String, Object> params = new HashMap<>();
69 params.put(
70 "props",
71 Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
72 );
73 var query =
74 """
75 MATCH (c:FileContainer) WHERE ID(c) = %s
76 CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
77 """;
78 tx.run(query.formatted(cId), params);
79 }
80 }
81 tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
82 tx.commit();
83 }
84 }
85
86 private void migrateFileReferences(Session session) {
87 var rResults = session.executeRead(tx ->
88 tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
89 );
90 for (int i = 0; i < rResults.size(); i++) {
91 logPercent(i, rResults.size());
92 var r = rResults.get(i).get("r").asNode();
93 var rId = r.elementId();
94
95 if (!r.containsKey(FILES_JSON)) continue;
96
97 var tx = session.beginTransaction();
98 for (var fileObj : r.get(FILES_JSON).asList()) {
99 if (fileObj instanceof String fileStr) {
100 var fileNode = parseJson(fileStr);
101 if (fileNode.isEmpty()) {
102 Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
103 continue;
104 }
105 var file = parseShepardFile(fileNode.get());
106 Map<String, Object> params = new HashMap<>();
107 params.put("oid", file.oid);
108 params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
109 var query =
110 """
111 MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
112 MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
113 SET sf += $props
114 CREATE (r)-[hp:has_payload]->(sf)
115 """;
116 tx.run(query.formatted(rId), params);
117 }
118 }
119 tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
120 tx.commit();
121 }
122 }
123
124 private void migrateStructuredDataContainer(Session session) {
125 var cResults = session.executeRead(tx ->
126 tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
127 );
128 for (int i = 0; i < cResults.size(); i++) {
129 logPercent(i, cResults.size());
130 var c = cResults.get(i).get("c").asNode();
131 var cId = c.elementId();
132
133 if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;
134
135 var tx = session.beginTransaction();
136 for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
137 if (structuredDataObj instanceof String structuredDataStr) {
138 var structuredDataNode = parseJson(structuredDataStr);
139 if (structuredDataNode.isEmpty()) {
140 Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
141 continue;
142 }
143 var sd = parseStructuredData(structuredDataNode.get());
144 Map<String, Object> params = new HashMap<>();
145 params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
146 var query =
147 """
148 MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
149 CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
150 """;
151 tx.run(query.formatted(cId), params);
152 }
153 }
154 tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
155 tx.commit();
156 }
157 }
158
159 private void migrateStructuredDataReferences(Session session) {
160 var rResults = session.executeRead(tx ->
161 tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
162 );
163 for (int i = 0; i < rResults.size(); i++) {
164 logPercent(i, rResults.size());
165 var r = rResults.get(i).get("r").asNode();
166 var rId = r.elementId();
167
168 if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;
169
170 var tx = session.beginTransaction();
171 for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
172 if (structuredDataObj instanceof String structuredDataStr) {
173 var structuredDataNode = parseJson(structuredDataStr);
174 if (structuredDataNode.isEmpty()) {
175 Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
176 continue;
177 }
178 var sd = parseStructuredData(structuredDataNode.get());
179 Map<String, Object> params = new HashMap<>();
180 params.put("oid", sd.oid);
181 params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
182 var query =
183 """
184 MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
185 MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
186 SET sd += $props
187 CREATE (r)-[hp:has_payload]->(sd)
188 """;
189 tx.run(query.formatted(rId), params);
190 }
191 }
192 tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
193 tx.commit();
194 }
195 }
196
197 private void migrateTimeseriesReferences(Session session) {
198 var rResults = session.executeRead(tx ->
199 tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
200 );
201 for (int i = 0; i < rResults.size(); i++) {
202 logPercent(i, rResults.size());
203 var r = rResults.get(i).get("r").asNode();
204 var rId = r.elementId();
205
206 if (!r.containsKey(TIMESERIES_JSON)) continue;
207
208 var tx = session.beginTransaction();
209 for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
210 if (timeseriesObj instanceof String timeseriesStr) {
211 var timeseriesNode = parseJson(timeseriesStr);
212 if (timeseriesNode.isEmpty()) {
213 Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
214 continue;
215 }
216 var ts = parseTimeseries(timeseriesNode.get());
217 Map<String, Object> params = Map.of(
218 "measurement",
219 ts.measurement,
220 "device",
221 ts.device,
222 "location",
223 ts.location,
224 "symbolicName",
225 ts.symbolicName,
226 "field",
227 ts.field
228 );
229 var query =
230 """
231 MATCH (r:TimeseriesReference) WHERE ID(r) = %s
232 MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
233 CREATE (r)-[hp:has_payload]->(ts)
234 """;
235
236 tx.run(query.formatted(rId), params);
237 }
238 }
239 tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
240 tx.commit();
241 }
242 }
243
244 private Optional<JsonNode> parseJson(String str) {
245 JsonNode node;
246 try {
247 node = mapper.readTree(str);
248 } catch (JsonProcessingException e) {
249
250 Log.error(e.toString());
251 return Optional.empty();
252 }
253 return Optional.of(node);
254 }
255
256 private long parseDate(String date) {
257 if (date.isEmpty()) return 0L;
258 Date parsed;
259 try {
260 parsed = new StdDateFormat().parse(date);
261 } catch (ParseException e) {
262
263 Log.warnf("%s, using 0 instead", e.getMessage());
264 return 0L;
265 }
266 return parsed.getTime();
267 }
268
269 private ShepardFile parseShepardFile(JsonNode node) {
270 var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
271 var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
272 var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
273 var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
274 return new ShepardFile(oid, filename, parseDate(createdAt), md5);
275 }
276
277 private StructuredData parseStructuredData(JsonNode node) {
278 var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
279 var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
280 var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
281 return new StructuredData(oid, name, parseDate(createdAt));
282 }
283
284 private Timeseries parseTimeseries(JsonNode node) {
285 var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
286 var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
287 var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
288 var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
289 var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
290 return new Timeseries(measurement, device, location, symbolicName, field);
291 }
292
293 private void logPercent(int i, int size) {
294 int curPercent = (int) ((100f / size) * i);
295 int prePercent = (int) ((100f / size) * (i - 1));
296 if (prePercent < curPercent) {
297 Log.infof("... %d %", curPercent);
298 }
299 }
300 }