V2__Extract_json.java

package de.dlr.shepard.neo4j.migrations;

import ac.simons.neo4j.migrations.core.JavaBasedMigration;
import ac.simons.neo4j.migrations.core.MigrationContext;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.StdDateFormat;
import io.quarkus.logging.Log;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.neo4j.driver.Session;

public class V2__Extract_json implements JavaBasedMigration {

  @AllArgsConstructor
  class ShepardFile {

    public final String oid;
    public final String filename;
    public final long createdAt;
    public final String md5;
  }

  @AllArgsConstructor
  class StructuredData {

    public final String oid;
    public final String name;
    public final long createdAt;
  }

  @AllArgsConstructor
  class Timeseries {

    public final String measurement;
    public final String device;
    public final String location;
    public final String symbolicName;
    public final String field;
  }

  private static final String FILES_JSON = "filesJson";
  private static final String STRUCTURED_DATAS_JSON = "structuredDatasJson";
  private static final String TIMESERIES_JSON = "timeseriesJson";
  private ObjectMapper mapper = new ObjectMapper();

  @Override
  public void apply(MigrationContext context) {
    try (Session session = context.getSession()) {
      Log.info("Running migration (1/5)");
      migrateFileContainer(session);
      Log.info("Running migration (2/5)");
      migrateFileReferences(session);
      Log.info("Running migration (3/5)");
      migrateStructuredDataContainer(session);
      Log.info("Running migration (4/5)");
      migrateStructuredDataReferences(session);
      Log.info("Running migration (5/5)");
      migrateTimeseriesReferences(session);
    } catch (Exception e) {
      Log.error("Error while running migration: ", e);
    }
  }

  private void migrateFileContainer(Session session) {
    var cResults = session.executeRead(tx ->
      tx.run("MATCH (c:FileContainer) WHERE c.filesJson IS NOT NULL RETURN c").list()
    );
    for (int i = 0; i < cResults.size(); i++) {
      logPercent(i, cResults.size());
      var c = cResults.get(i).get("c").asNode();
      var cId = c.elementId();

      if (!c.containsKey(FILES_JSON)) continue;

      var tx = session.beginTransaction();
      for (var fileObj : c.get(FILES_JSON).asList()) {
        if (fileObj instanceof String fileStr) {
          var fileNode = parseJson(fileStr);
          if (fileNode.isEmpty()) {
            Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", cId, fileStr);
            continue;
          }
          var file = parseShepardFile(fileNode.get());
          Map<String, Object> params = new HashMap<>();
          params.put(
            "props",
            Map.of("oid", file.oid, "createdAt", file.createdAt, "filename", file.filename, "md5", file.md5)
          );
          var query =
            """
            MATCH (c:FileContainer) WHERE ID(c) = %s
            CREATE (c)-[:file_in_container]->(sf:ShepardFile $props)
            """;
          tx.run(String.format(query, cId), params);
        }
      }
      tx.run("MATCH (c:FileContainer) WHERE ID(c) = " + cId + " REMOVE c.filesJson");
      tx.commit();
    }
  }

  private void migrateFileReferences(Session session) {
    var rResults = session.executeRead(tx ->
      tx.run("MATCH (r:FileReference) WHERE r.filesJson IS NOT NULL RETURN r").list()
    );
    for (int i = 0; i < rResults.size(); i++) {
      logPercent(i, rResults.size());
      var r = rResults.get(i).get("r").asNode();
      var rId = r.elementId();

      if (!r.containsKey(FILES_JSON)) continue;

      var tx = session.beginTransaction();
      for (var fileObj : r.get(FILES_JSON).asList()) {
        if (fileObj instanceof String fileStr) {
          var fileNode = parseJson(fileStr);
          if (fileNode.isEmpty()) {
            Log.errorf("NodeID %s: File cannot be parsed and will be skipped: %s", rId, fileStr);
            continue;
          }
          var file = parseShepardFile(fileNode.get());
          Map<String, Object> params = new HashMap<>();
          params.put("oid", file.oid);
          params.put("props", Map.of("createdAt", file.createdAt, "filename", file.filename, "md5", file.md5));
          var query =
            """
            MATCH (r:FileReference)-[:is_in_container]->(c:FileContainer) WHERE ID(r) = %s
            MERGE (c)-[:file_in_container]->(sf:ShepardFile { oid: $oid })
            SET sf += $props
            CREATE (r)-[hp:has_payload]->(sf)
            """;
          tx.run(String.format(query, rId), params);
        }
      }
      tx.run("MATCH (r:FileReference) WHERE ID(r) = " + rId + " REMOVE r.filesJson");
      tx.commit();
    }
  }

  private void migrateStructuredDataContainer(Session session) {
    var cResults = session.executeRead(tx ->
      tx.run("MATCH (c:StructuredDataContainer) WHERE c.structuredDatasJson IS NOT NULL RETURN c").list()
    );
    for (int i = 0; i < cResults.size(); i++) {
      logPercent(i, cResults.size());
      var c = cResults.get(i).get("c").asNode();
      var cId = c.elementId();

      if (!c.containsKey(STRUCTURED_DATAS_JSON)) continue;

      var tx = session.beginTransaction();
      for (var structuredDataObj : c.get(STRUCTURED_DATAS_JSON).asList()) {
        if (structuredDataObj instanceof String structuredDataStr) {
          var structuredDataNode = parseJson(structuredDataStr);
          if (structuredDataNode.isEmpty()) {
            Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", cId, structuredDataStr);
            continue;
          }
          var sd = parseStructuredData(structuredDataNode.get());
          Map<String, Object> params = new HashMap<>();
          params.put("props", Map.of("oid", sd.oid, "createdAt", sd.createdAt, "name", sd.name));
          var query =
            """
            MATCH (c:StructuredDataContainer) WHERE ID(c) = %s
            CREATE (c)-[:structureddata_in_container]->(sd:StructuredData $props)
            """;
          tx.run(String.format(query, cId), params);
        }
      }
      tx.run("MATCH (c:StructuredDataContainer) WHERE ID(c) = " + cId + " REMOVE c.structuredDatasJson");
      tx.commit();
    }
  }

  private void migrateStructuredDataReferences(Session session) {
    var rResults = session.executeRead(tx ->
      tx.run("MATCH (r:StructuredDataReference) WHERE r.structuredDatasJson IS NOT NULL RETURN r").list()
    );
    for (int i = 0; i < rResults.size(); i++) {
      logPercent(i, rResults.size());
      var r = rResults.get(i).get("r").asNode();
      var rId = r.elementId();

      if (!r.containsKey(STRUCTURED_DATAS_JSON)) continue;

      var tx = session.beginTransaction();
      for (var structuredDataObj : r.get(STRUCTURED_DATAS_JSON).asList()) {
        if (structuredDataObj instanceof String structuredDataStr) {
          var structuredDataNode = parseJson(structuredDataStr);
          if (structuredDataNode.isEmpty()) {
            Log.errorf("NodeID %s: StructuredData cannot be parsed and will be skipped: %s", rId, structuredDataStr);
            continue;
          }
          var sd = parseStructuredData(structuredDataNode.get());
          Map<String, Object> params = new HashMap<>();
          params.put("oid", sd.oid);
          params.put("props", Map.of("createdAt", sd.createdAt, "name", sd.name));
          var query =
            """
            MATCH (r:StructuredDataReference)-[:is_in_container]->(c:StructuredDataContainer) WHERE ID(r) = %s
            MERGE (c)-[:structureddata_in_container]->(sd:StructuredData { oid: $oid })
            SET sd += $props
            CREATE (r)-[hp:has_payload]->(sd)
            """;
          tx.run(String.format(query, rId), params);
        }
      }
      tx.run("MATCH (r:StructuredDataReference) WHERE ID(r) = " + rId + " REMOVE r.structuredDatasJson");
      tx.commit();
    }
  }

  private void migrateTimeseriesReferences(Session session) {
    var rResults = session.executeRead(tx ->
      tx.run("MATCH (r:TimeseriesReference) WHERE r.timeseriesJson IS NOT NULL RETURN r").list()
    );
    for (int i = 0; i < rResults.size(); i++) {
      logPercent(i, rResults.size());
      var r = rResults.get(i).get("r").asNode();
      var rId = r.elementId();

      if (!r.containsKey(TIMESERIES_JSON)) continue;

      var tx = session.beginTransaction();
      for (var timeseriesObj : r.get(TIMESERIES_JSON).asList()) {
        if (timeseriesObj instanceof String timeseriesStr) {
          var timeseriesNode = parseJson(timeseriesStr);
          if (timeseriesNode.isEmpty()) {
            Log.errorf("NodeID %s: Timeseries cannot be parsed and will be skipped: %s", rId, timeseriesStr);
            continue;
          }
          var ts = parseTimeseries(timeseriesNode.get());
          Map<String, Object> params = Map.of(
            "measurement",
            ts.measurement,
            "device",
            ts.device,
            "location",
            ts.location,
            "symbolicName",
            ts.symbolicName,
            "field",
            ts.field
          );
          var query =
            """
            MATCH (r:TimeseriesReference) WHERE ID(r) = %s
            MERGE (ts:Timeseries { measurement: $measurement, device: $device, location: $location, symbolicName: $symbolicName, field: $field })
            CREATE (r)-[hp:has_payload]->(ts)
            """;

          tx.run(String.format(query, rId), params);
        }
      }
      tx.run("MATCH (r:TimeseriesReference) WHERE ID(r) = " + rId + " REMOVE r.timeseriesJson");
      tx.commit();
    }
  }

  private Optional<JsonNode> parseJson(String str) {
    JsonNode node;
    try {
      node = mapper.readTree(str);
    } catch (JsonProcessingException e) {
      // This should not be possible
      Log.error(e.toString());
      return Optional.empty();
    }
    return Optional.of(node);
  }

  private long parseDate(String date) {
    if (date.length() == 0) return 0L;
    Date parsed;
    try {
      parsed = new StdDateFormat().parse(date);
    } catch (ParseException e) {
      // This should not be possible
      Log.warnf("%s, using 0 instead", e.getMessage());
      return 0L;
    }
    return parsed.getTime();
  }

  private ShepardFile parseShepardFile(JsonNode node) {
    var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
    var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
    var filename = Optional.ofNullable(node.get("filename")).map(JsonNode::asText).orElse("");
    var md5 = Optional.ofNullable(node.get("md5")).map(JsonNode::asText).orElse("");
    return new ShepardFile(oid, filename, parseDate(createdAt), md5);
  }

  private StructuredData parseStructuredData(JsonNode node) {
    var oid = Optional.ofNullable(node.get("oid")).map(JsonNode::asText).orElse("");
    var createdAt = Optional.ofNullable(node.get("createdAt")).map(JsonNode::asText).orElse("");
    var name = Optional.ofNullable(node.get("name")).map(JsonNode::asText).orElse("");
    return new StructuredData(oid, name, parseDate(createdAt));
  }

  private Timeseries parseTimeseries(JsonNode node) {
    var measurement = Optional.ofNullable(node.get("measurement")).map(JsonNode::asText).orElse("");
    var device = Optional.ofNullable(node.get("device")).map(JsonNode::asText).orElse("");
    var location = Optional.ofNullable(node.get("location")).map(JsonNode::asText).orElse("");
    var symbolicName = Optional.ofNullable(node.get("symbolicName")).map(JsonNode::asText).orElse("");
    var field = Optional.ofNullable(node.get("field")).map(JsonNode::asText).orElse("");
    return new Timeseries(measurement, device, location, symbolicName, field);
  }

  private void logPercent(int i, int size) {
    int curPercent = (int) ((100f / size) * i);
    int prePercent = (int) ((100f / size) * (i - 1));
    if (prePercent < curPercent) {
      Log.infof("... %d %", curPercent);
    }
  }
}