V2__Extract_json.java
- package de.dlr.shepard.neo4j.migrations;
- import ac.simons.neo4j.migrations.core.JavaBasedMigration;
- import ac.simons.neo4j.migrations.core.MigrationContext;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.databind.util.StdDateFormat;
- import io.quarkus.logging.Log;
- import java.text.ParseException;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Optional;
- import lombok.AllArgsConstructor;
- import org.neo4j.driver.Session;
- public class V2__Extract_json implements JavaBasedMigration {
- @AllArgsConstructor
- class ShepardFile {
- public final String oid;
- public final String filename;
- public final long createdAt;
- public final String md5;
- }
- @AllArgsConstructor
- class StructuredData {
- public final String oid;
- public final String name;
- public final long createdAt;
- }
- @AllArgsConstructor
- class Timeseries {
- public final String measurement;
- public final String device;
- public final String location;
- public final String symbolicName;
- public final String field;
- }
- private static final String FILES_JSON = "filesJson";
- private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
- private static final String TIMESERIES_JSON = "timeseriesJson";
- private ObjectMapper mapper = new ObjectMapper();
- @Override
- public void apply(MigrationContext context) {
- try (Session session = context.getSession()) {
- Log.info("Running migration (1/5)");
- migrateFileContainer(session);
- Log.info("Running migration (2/5)");
- migrateFileReferences(session);
- Log.info("Running migration (3/5)");
- migrateStructuredDataContainer(session);
- Log.info("Running migration (4/5)");
- migrateStructuredDataReferences(session);
- Log.info("Running migration (5/5)");
- migrateTimeseriesReferences(session);
- } catch (Exception e) {
- Log.error("Error while running migration: ", e);
- }
- }
- private void migrateFileContainer(Session session) {
- var cResults = session.executeRead(tx ->
- tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
- );
- for (int i = 0; i < cResults.size(); i++) {
- logPercent(i, cResults.size());
- var c = cResults.get(i).get("c").asNode();
- var cId = c.elementId();
- if (!c.containsKey(FILES_JSON)) continue;
- var tx = session.beginTransaction();
- for (var fileObj : c.get(FILES_JSON).asList()) {
- if (fileObj instanceof String fileStr) {
- var fileNode = parseJson(fileStr);
- if (fileNode.isEmpty()) {
- Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
- continue;
- }
- var file = parseShepardFile(fileNode.get());
- Map<String, Object> params = new HashMap<>();
- params.put(
- "props",
- Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
- );
- var query =
- """
- MATCH (c:FileContainer) WHERE ID(c) = %s
- CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
- """;
- tx.run(String.format(query, cId), params);
- }
- }
- tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
- tx.commit();
- }
- }
- private void migrateFileReferences(Session session) {
- var rResults = session.executeRead(tx ->
- tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
- );
- for (int i = 0; i < rResults.size(); i++) {
- logPercent(i, rResults.size());
- var r = rResults.get(i).get("r").asNode();
- var rId = r.elementId();
- if (!r.containsKey(FILES_JSON)) continue;
- var tx = session.beginTransaction();
- for (var fileObj : r.get(FILES_JSON).asList()) {
- if (fileObj instanceof String fileStr) {
- var fileNode = parseJson(fileStr);
- if (fileNode.isEmpty()) {
- Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
- continue;
- }
- var file = parseShepardFile(fileNode.get());
- Map<String, Object> params = new HashMap<>();
- params.put("oid", file.oid);
- params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
- var query =
- """
- MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
- MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
- SET sf += $props
- CREATE (r)-[hp:has_payload]->(sf)
- """;
- tx.run(String.format(query, rId), params);
- }
- }
- tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
- tx.commit();
- }
- }
- private void migrateStructuredDataContainer(Session session) {
- var cResults = session.executeRead(tx ->
- tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
- );
- for (int i = 0; i < cResults.size(); i++) {
- logPercent(i, cResults.size());
- var c = cResults.get(i).get("c").asNode();
- var cId = c.elementId();
- if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;
- var tx = session.beginTransaction();
- for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
- if (structuredDataObj instanceof String structuredDataStr) {
- var structuredDataNode = parseJson(structuredDataStr);
- if (structuredDataNode.isEmpty()) {
- Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
- continue;
- }
- var sd = parseStructuredData(structuredDataNode.get());
- Map<String, Object> params = new HashMap<>();
- params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
- var query =
- """
- MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
- CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
- """;
- tx.run(String.format(query, cId), params);
- }
- }
- tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
- tx.commit();
- }
- }
- private void migrateStructuredDataReferences(Session session) {
- var rResults = session.executeRead(tx ->
- tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
- );
- for (int i = 0; i < rResults.size(); i++) {
- logPercent(i, rResults.size());
- var r = rResults.get(i).get("r").asNode();
- var rId = r.elementId();
- if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;
- var tx = session.beginTransaction();
- for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
- if (structuredDataObj instanceof String structuredDataStr) {
- var structuredDataNode = parseJson(structuredDataStr);
- if (structuredDataNode.isEmpty()) {
- Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
- continue;
- }
- var sd = parseStructuredData(structuredDataNode.get());
- Map<String, Object> params = new HashMap<>();
- params.put("oid", sd.oid);
- params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
- var query =
- """
- MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
- MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
- SET sd += $props
- CREATE (r)-[hp:has_payload]->(sd)
- """;
- tx.run(String.format(query, rId), params);
- }
- }
- tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
- tx.commit();
- }
- }
- private void migrateTimeseriesReferences(Session session) {
- var rResults = session.executeRead(tx ->
- tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
- );
- for (int i = 0; i < rResults.size(); i++) {
- logPercent(i, rResults.size());
- var r = rResults.get(i).get("r").asNode();
- var rId = r.elementId();
- if (!r.containsKey(TIMESERIES_JSON)) continue;
- var tx = session.beginTransaction();
- for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
- if (timeseriesObj instanceof String timeseriesStr) {
- var timeseriesNode = parseJson(timeseriesStr);
- if (timeseriesNode.isEmpty()) {
- Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
- continue;
- }
- var ts = parseTimeseries(timeseriesNode.get());
- Map<String, Object> params = Map.of(
- "measurement",
- ts.measurement,
- "device",
- ts.device,
- "location",
- ts.location,
- "symbolicName",
- ts.symbolicName,
- "field",
- ts.field
- );
- var query =
- """
- MATCH (r:TimeseriesReference) WHERE ID(r) = %s
- MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
- CREATE (r)-[hp:has_payload]->(ts)
- """;
- tx.run(String.format(query, rId), params);
- }
- }
- tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
- tx.commit();
- }
- }
- private Optional<JsonNode> parseJson(String str) {
- JsonNode node;
- try {
- node = mapper.readTree(str);
- } catch (JsonProcessingException e) {
- // This should not be possible
- Log.error(e.toString());
- return Optional.empty();
- }
- return Optional.of(node);
- }
- private long parseDate(String date) {
- if (date.length() == 0) return 0L;
- Date parsed;
- try {
- parsed = new StdDateFormat().parse(date);
- } catch (ParseException e) {
- // This should not be possible
- Log.warnf("%s, using 0 instead", e.getMessage());
- return 0L;
- }
- return parsed.getTime();
- }
- private ShepardFile parseShepardFile(JsonNode node) {
- var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
- var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
- var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
- var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
- return new ShepardFile(oid, filename, parseDate(createdAt), md5);
- }
- private StructuredData parseStructuredData(JsonNode node) {
- var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
- var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
- var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
- return new StructuredData(oid, name, parseDate(createdAt));
- }
- private Timeseries parseTimeseries(JsonNode node) {
- var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
- var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
- var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
- var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
- var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
- return new Timeseries(measurement, device, location, symbolicName, field);
- }
- private void logPercent(int i, int size) {
- int curPercent = (int) ((100f / size) * i);
- int prePercent = (int) ((100f / size) * (i - 1));
- if (prePercent < curPercent) {
- Log.infof("... %d %", curPercent);
- }
- }
- }