V1_7_0__int_to_bigint.java

package de.dlr.shepard.data.timeseries.migrations;

import io.quarkus.logging.Log;
import jakarta.resource.cci.ResultSet;
import java.sql.Connection;
import java.time.Instant;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;

public class V1_7_0__int_to_bigint extends BaseJavaMigration {

  @Override
  public void migrate(Context context) throws Exception {
    var connection = context.getConnection();

    Log.info("Start migration, determining earliest time, may take a while ...");

    alterCompressionJob(connection, false);

    connection.prepareStatement("ALTER TABLE timeseries_data_points ADD COLUMN bigint_value BIGINT;").executeUpdate();

    var numberOfIntTSRS = connection
      .prepareStatement("SELECT COUNT(*) FROM timeseries WHERE value_type = 'Integer';")
      .executeQuery();

    numberOfIntTSRS.next();
    var numberOfIntTS = numberOfIntTSRS.getLong(1);

    if (numberOfIntTS > 0) {
      var minTimeRS = connection
        .prepareStatement(
          "SELECT MIN(time) FROM timeseries_data_points WHERE timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
        )
        .executeQuery();

      minTimeRS.next();
      var minTime = minTimeRS.getLong(1);

      Log.infov("Earliest time: {0}", minTime);
      var chunkStmt = connection.prepareStatement(
        "SELECT * FROM timescaledb_information.chunks WHERE (range_start_integer >= ? OR (range_start_integer < ? AND range_end_integer > ?)) AND hypertable_name = 'timeseries_data_points' ORDER BY range_start_integer;",
        ResultSet.TYPE_SCROLL_INSENSITIVE,
        ResultSet.CONCUR_READ_ONLY
      );
      chunkStmt.setLong(1, minTime);
      chunkStmt.setLong(2, minTime);
      chunkStmt.setLong(3, minTime);
      var chunksRS = chunkStmt.executeQuery();

      chunksRS.last();
      int chunkCount = chunksRS.getRow();
      chunksRS.beforeFirst();

      Log.infov("There are {0} chunks to migrate", chunkCount);

      int currentChunk = 0;

      while (chunksRS.next()) {
        currentChunk++;
        var start = chunksRS.getLong("range_start_integer");
        var end = chunksRS.getLong("range_end_integer");

        var compressed = chunksRS.getBoolean("is_compressed");

        Log.infov(
          "Chunk from {0} to {1}, number {2}/{3}",
          Instant.ofEpochMilli(start / 1000000),
          Instant.ofEpochMilli(end / 1000000),
          currentChunk,
          chunkCount
        );

        // Uncompress

        if (compressed) {
          var decompressStmt = connection.prepareStatement("SELECT decompress_chunk(?);");
          decompressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
          decompressStmt.execute();
        }

        // Update
        var updateStmt = connection.prepareStatement(
          "UPDATE timeseries_data_points SET bigint_value = int_value WHERE time >= ? AND time < ? AND timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
        );
        updateStmt.setLong(1, start);
        updateStmt.setLong(2, end);
        var rowsUpdated = updateStmt.executeUpdate();

        // Compress
        if (compressed) {
          var compressStmt = connection.prepareStatement("SELECT compress_chunk(?);");
          compressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
          compressStmt.execute();
        }

        Log.infov("Updated chunk, changed {0} rows", rowsUpdated);
      }
    } else {
      Log.info("There are no integer timeseries, therefore no data migration necessary.");
    }

    Log.info("Finished migration of data, starting table changes");
    connection.prepareStatement("ALTER TABLE timeseries_data_points DROP COLUMN int_value;").executeUpdate();
    connection
      .prepareStatement("ALTER TABLE timeseries_data_points RENAME COLUMN bigint_value TO int_value;")
      .executeUpdate();

    alterCompressionJob(connection, true);

    Log.info("Finished migration");
  }

  private void alterCompressionJob(Connection connection, boolean enable) throws Exception {
    var jobIDRS = connection
      .prepareStatement(
        "SELECT job_id from timescaledb_information.jobs where hypertable_name = 'timeseries_data_points' and proc_name = 'policy_compression';"
      )
      .executeQuery();

    if (!jobIDRS.next()) {
      Log.info("Did not find a job id - not altering job");
    }

    var jobID = jobIDRS.getInt("job_id");

    var jobStmt = connection.prepareStatement("SELECT alter_job(?, scheduled => ?);");
    jobStmt.setInt(1, jobID);
    jobStmt.setBoolean(2, enable);
    jobStmt.execute();
  }
}