V2__Extract_json.java

  1. package de.dlr.shepard.neo4j.migrations;

  2. import ac.simons.neo4j.migrations.core.JavaBasedMigration;
  3. import ac.simons.neo4j.migrations.core.MigrationContext;
  4. import com.fasterxml.jackson.core.JsonProcessingException;
  5. import com.fasterxml.jackson.databind.JsonNode;
  6. import com.fasterxml.jackson.databind.ObjectMapper;
  7. import com.fasterxml.jackson.databind.util.StdDateFormat;
  8. import io.quarkus.logging.Log;
  9. import java.text.ParseException;
  10. import java.util.Date;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. import java.util.Optional;
  14. import lombok.AllArgsConstructor;
  15. import org.neo4j.driver.Session;

  16. public class V2__Extract_json implements JavaBasedMigration {

  17.   @AllArgsConstructor
  18.   class ShepardFile {

  19.     public final String oid;
  20.     public final String filename;
  21.     public final long createdAt;
  22.     public final String md5;
  23.   }

  24.   @AllArgsConstructor
  25.   class StructuredData {

  26.     public final String oid;
  27.     public final String name;
  28.     public final long createdAt;
  29.   }

  30.   @AllArgsConstructor
  31.   class Timeseries {

  32.     public final String measurement;
  33.     public final String device;
  34.     public final String location;
  35.     public final String symbolicName;
  36.     public final String field;
  37.   }

  38.   private static final String FILES_JSON = "filesJson";
  39.   private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
  40.   private static final String TIMESERIES_JSON = "timeseriesJson";
  41.   private ObjectMapper mapper = new ObjectMapper();

  42.   @Override
  43.   public void apply(MigrationContext context) {
  44.     try (Session session = context.getSession()) {
  45.       Log.info("Running migration (1/5)");
  46.       migrateFileContainer(session);
  47.       Log.info("Running migration (2/5)");
  48.       migrateFileReferences(session);
  49.       Log.info("Running migration (3/5)");
  50.       migrateStructuredDataContainer(session);
  51.       Log.info("Running migration (4/5)");
  52.       migrateStructuredDataReferences(session);
  53.       Log.info("Running migration (5/5)");
  54.       migrateTimeseriesReferences(session);
  55.     } catch (Exception e) {
  56.       Log.error("Error while running migration: ", e);
  57.     }
  58.   }

  59.   private void migrateFileContainer(Session session) {
  60.     var cResults = session.executeRead(tx ->
  61.       tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
  62.     );
  63.     for (int i = 0; i < cResults.size(); i++) {
  64.       logPercent(i, cResults.size());
  65.       var c = cResults.get(i).get("c").asNode();
  66.       var cId = c.elementId();

  67.       if (!c.containsKey(FILES_JSON)) continue;

  68.       var tx = session.beginTransaction();
  69.       for (var fileObj : c.get(FILES_JSON).asList()) {
  70.         if (fileObj instanceof String fileStr) {
  71.           var fileNode = parseJson(fileStr);
  72.           if (fileNode.isEmpty()) {
  73.             Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
  74.             continue;
  75.           }
  76.           var file = parseShepardFile(fileNode.get());
  77.           Map<String, Object> params = new HashMap<>();
  78.           params.put(
  79.             "props",
  80.             Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
  81.           );
  82.           var query =
  83.             """
  84.             MATCH (c:FileContainer) WHERE ID(c) = %s
  85.             CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
  86.             """;
  87.           tx.run(String.format(query, cId), params);
  88.         }
  89.       }
  90.       tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
  91.       tx.commit();
  92.     }
  93.   }

  94.   private void migrateFileReferences(Session session) {
  95.     var rResults = session.executeRead(tx ->
  96.       tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
  97.     );
  98.     for (int i = 0; i < rResults.size(); i++) {
  99.       logPercent(i, rResults.size());
  100.       var r = rResults.get(i).get("r").asNode();
  101.       var rId = r.elementId();

  102.       if (!r.containsKey(FILES_JSON)) continue;

  103.       var tx = session.beginTransaction();
  104.       for (var fileObj : r.get(FILES_JSON).asList()) {
  105.         if (fileObj instanceof String fileStr) {
  106.           var fileNode = parseJson(fileStr);
  107.           if (fileNode.isEmpty()) {
  108.             Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
  109.             continue;
  110.           }
  111.           var file = parseShepardFile(fileNode.get());
  112.           Map<String, Object> params = new HashMap<>();
  113.           params.put("oid", file.oid);
  114.           params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
  115.           var query =
  116.             """
  117.             MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
  118.             MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
  119.             SET sf += $props
  120.             CREATE (r)-[hp:has_payload]->(sf)
  121.             """;
  122.           tx.run(String.format(query, rId), params);
  123.         }
  124.       }
  125.       tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
  126.       tx.commit();
  127.     }
  128.   }

  129.   private void migrateStructuredDataContainer(Session session) {
  130.     var cResults = session.executeRead(tx ->
  131.       tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
  132.     );
  133.     for (int i = 0; i < cResults.size(); i++) {
  134.       logPercent(i, cResults.size());
  135.       var c = cResults.get(i).get("c").asNode();
  136.       var cId = c.elementId();

  137.       if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;

  138.       var tx = session.beginTransaction();
  139.       for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
  140.         if (structuredDataObj instanceof String structuredDataStr) {
  141.           var structuredDataNode = parseJson(structuredDataStr);
  142.           if (structuredDataNode.isEmpty()) {
  143.             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
  144.             continue;
  145.           }
  146.           var sd = parseStructuredData(structuredDataNode.get());
  147.           Map<String, Object> params = new HashMap<>();
  148.           params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
  149.           var query =
  150.             """
  151.             MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
  152.             CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
  153.             """;
  154.           tx.run(String.format(query, cId), params);
  155.         }
  156.       }
  157.       tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
  158.       tx.commit();
  159.     }
  160.   }

  161.   private void migrateStructuredDataReferences(Session session) {
  162.     var rResults = session.executeRead(tx ->
  163.       tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
  164.     );
  165.     for (int i = 0; i < rResults.size(); i++) {
  166.       logPercent(i, rResults.size());
  167.       var r = rResults.get(i).get("r").asNode();
  168.       var rId = r.elementId();

  169.       if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;

  170.       var tx = session.beginTransaction();
  171.       for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
  172.         if (structuredDataObj instanceof String structuredDataStr) {
  173.           var structuredDataNode = parseJson(structuredDataStr);
  174.           if (structuredDataNode.isEmpty()) {
  175.             Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
  176.             continue;
  177.           }
  178.           var sd = parseStructuredData(structuredDataNode.get());
  179.           Map<String, Object> params = new HashMap<>();
  180.           params.put("oid", sd.oid);
  181.           params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
  182.           var query =
  183.             """
  184.             MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
  185.             MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
  186.             SET sd += $props
  187.             CREATE (r)-[hp:has_payload]->(sd)
  188.             """;
  189.           tx.run(String.format(query, rId), params);
  190.         }
  191.       }
  192.       tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
  193.       tx.commit();
  194.     }
  195.   }

  196.   private void migrateTimeseriesReferences(Session session) {
  197.     var rResults = session.executeRead(tx ->
  198.       tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
  199.     );
  200.     for (int i = 0; i < rResults.size(); i++) {
  201.       logPercent(i, rResults.size());
  202.       var r = rResults.get(i).get("r").asNode();
  203.       var rId = r.elementId();

  204.       if (!r.containsKey(TIMESERIES_JSON)) continue;

  205.       var tx = session.beginTransaction();
  206.       for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
  207.         if (timeseriesObj instanceof String timeseriesStr) {
  208.           var timeseriesNode = parseJson(timeseriesStr);
  209.           if (timeseriesNode.isEmpty()) {
  210.             Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
  211.             continue;
  212.           }
  213.           var ts = parseTimeseries(timeseriesNode.get());
  214.           Map<String, Object> params = Map.of(
  215.             "measurement",
  216.             ts.measurement,
  217.             "device",
  218.             ts.device,
  219.             "location",
  220.             ts.location,
  221.             "symbolicName",
  222.             ts.symbolicName,
  223.             "field",
  224.             ts.field
  225.           );
  226.           var query =
  227.             """
  228.             MATCH (r:TimeseriesReference) WHERE ID(r) = %s
  229.             MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
  230.             CREATE (r)-[hp:has_payload]->(ts)
  231.             """;

  232.           tx.run(String.format(query, rId), params);
  233.         }
  234.       }
  235.       tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
  236.       tx.commit();
  237.     }
  238.   }

  239.   private Optional<JsonNode> parseJson(String str) {
  240.     JsonNode node;
  241.     try {
  242.       node = mapper.readTree(str);
  243.     } catch (JsonProcessingException e) {
  244.       // This should not be possible
  245.       Log.error(e.toString());
  246.       return Optional.empty();
  247.     }
  248.     return Optional.of(node);
  249.   }

  250.   private long parseDate(String date) {
  251.     if (date.length() == 0) return 0L;
  252.     Date parsed;
  253.     try {
  254.       parsed = new StdDateFormat().parse(date);
  255.     } catch (ParseException e) {
  256.       // This should not be possible
  257.       Log.warnf("%s, using 0 instead", e.getMessage());
  258.       return 0L;
  259.     }
  260.     return parsed.getTime();
  261.   }

  262.   private ShepardFile parseShepardFile(JsonNode node) {
  263.     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
  264.     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
  265.     var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
  266.     var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
  267.     return new ShepardFile(oid, filename, parseDate(createdAt), md5);
  268.   }

  269.   private StructuredData parseStructuredData(JsonNode node) {
  270.     var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
  271.     var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
  272.     var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
  273.     return new StructuredData(oid, name, parseDate(createdAt));
  274.   }

  275.   private Timeseries parseTimeseries(JsonNode node) {
  276.     var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
  277.     var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
  278.     var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
  279.     var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
  280.     var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
  281.     return new Timeseries(measurement, device, location, symbolicName, field);
  282.   }

  283.   private void logPercent(int i, int size) {
  284.     int curPercent = (int) ((100f / size) * i);
  285.     int prePercent = (int) ((100f / size) * (i - 1));
  286.     if (prePercent < curPercent) {
  287.       Log.infof("... %d %", curPercent);
  288.     }
  289.   }
  290. }