View Javadoc
1   package de.dlr.shepard.data.timeseries.migrations;
2   
3   import io.quarkus.logging.Log;
4   import jakarta.resource.cci.ResultSet;
5   import java.sql.Connection;
6   import java.time.Instant;
7   import org.flywaydb.core.api.MigrationVersion;
8   import org.flywaydb.core.api.migration.BaseJavaMigration;
9   import org.flywaydb.core.api.migration.Context;
10  
11  public class V1_7_0__int_to_bigint extends BaseJavaMigration {
12  
13    @Override
14    public void migrate(Context context) throws Exception {
15      var connection = context.getConnection();
16  
17      Log.info("Start migration, determining earliest time, may take a while ...");
18  
19      alterCompressionJob(connection, false);
20  
21      connection.prepareStatement("ALTER TABLE timeseries_data_points ADD COLUMN bigint_value BIGINT;").executeUpdate();
22  
23      var numberOfIntTSRS = connection
24        .prepareStatement("SELECT COUNT(*) FROM timeseries WHERE value_type = 'Integer';")
25        .executeQuery();
26  
27      numberOfIntTSRS.next();
28      var numberOfIntTS = numberOfIntTSRS.getLong(1);
29  
30      if (numberOfIntTS > 0) {
31        var minTimeRS = connection
32          .prepareStatement(
33            "SELECT MIN(time) FROM timeseries_data_points WHERE timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
34          )
35          .executeQuery();
36  
37        minTimeRS.next();
38        var minTime = minTimeRS.getLong(1);
39  
40        Log.infov("Earliest time: {0}", minTime);
41        var chunkStmt = connection.prepareStatement(
42          "SELECT * FROM timescaledb_information.chunks WHERE (range_start_integer >= ? OR (range_start_integer < ? AND range_end_integer > ?)) AND hypertable_name = 'timeseries_data_points' ORDER BY range_start_integer;",
43          ResultSet.TYPE_SCROLL_INSENSITIVE,
44          ResultSet.CONCUR_READ_ONLY
45        );
46        chunkStmt.setLong(1, minTime);
47        chunkStmt.setLong(2, minTime);
48        chunkStmt.setLong(3, minTime);
49        var chunksRS = chunkStmt.executeQuery();
50  
51        chunksRS.last();
52        int chunkCount = chunksRS.getRow();
53        chunksRS.beforeFirst();
54  
55        Log.infov("There are {0} chunks to migrate", chunkCount);
56  
57        int currentChunk = 0;
58  
59        while (chunksRS.next()) {
60          currentChunk++;
61          var start = chunksRS.getLong("range_start_integer");
62          var end = chunksRS.getLong("range_end_integer");
63  
64          var compressed = chunksRS.getBoolean("is_compressed");
65  
66          Log.infov(
67            "Chunk from {0} to {1}, number {2}/{3}",
68            Instant.ofEpochMilli(start / 1000000),
69            Instant.ofEpochMilli(end / 1000000),
70            currentChunk,
71            chunkCount
72          );
73  
74          // Uncompress
75  
76          if (compressed) {
77            var decompressStmt = connection.prepareStatement("SELECT decompress_chunk(?);");
78            decompressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
79            decompressStmt.execute();
80          }
81  
82          // Update
83          var updateStmt = connection.prepareStatement(
84            "UPDATE timeseries_data_points SET bigint_value = int_value WHERE time >= ? AND time < ? AND timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
85          );
86          updateStmt.setLong(1, start);
87          updateStmt.setLong(2, end);
88          var rowsUpdated = updateStmt.executeUpdate();
89  
90          // Compress
91          if (compressed) {
92            var compressStmt = connection.prepareStatement("SELECT compress_chunk(?);");
93            compressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
94            compressStmt.execute();
95          }
96  
97          Log.infov("Updated chunk, changed {0} rows", rowsUpdated);
98        }
99      } else {
100       Log.info("There are no integer timeseries, therefore no data migration necessary.");
101     }
102 
103     Log.info("Finished migration of data, starting table changes");
104     connection.prepareStatement("ALTER TABLE timeseries_data_points DROP COLUMN int_value;").executeUpdate();
105     connection
106       .prepareStatement("ALTER TABLE timeseries_data_points RENAME COLUMN bigint_value TO int_value;")
107       .executeUpdate();
108 
109     alterCompressionJob(connection, true);
110 
111     Log.info("Finished migration");
112   }
113 
114   private void alterCompressionJob(Connection connection, boolean enable) throws Exception {
115     var jobIDRS = connection
116       .prepareStatement(
117         "SELECT job_id from timescaledb_information.jobs where hypertable_name = 'timeseries_data_points' and proc_name = 'policy_compression';"
118       )
119       .executeQuery();
120 
121     if (!jobIDRS.next()) {
122       Log.info("Did not find a job id - not altering job");
123     }
124 
125     var jobID = jobIDRS.getInt("job_id");
126 
127     var jobStmt = connection.prepareStatement("SELECT alter_job(?, scheduled => ?);");
128     jobStmt.setInt(1, jobID);
129     jobStmt.setBoolean(2, enable);
130     jobStmt.execute();
131   }
132 }