1 package de.dlr.shepard.data.timeseries.migrations;
2
3 import io.quarkus.logging.Log;
4 import jakarta.resource.cci.ResultSet;
5 import java.sql.Connection;
6 import java.time.Instant;
7 import org.flywaydb.core.api.MigrationVersion;
8 import org.flywaydb.core.api.migration.BaseJavaMigration;
9 import org.flywaydb.core.api.migration.Context;
10
11 public class V1_7_0__int_to_bigint extends BaseJavaMigration {
12
13 @Override
14 public void migrate(Context context) throws Exception {
15 var connection = context.getConnection();
16
17 Log.info("Start migration, determining earliest time, may take a while ...");
18
19 alterCompressionJob(connection, false);
20
21 connection.prepareStatement("ALTER TABLE timeseries_data_points ADD COLUMN bigint_value BIGINT;").executeUpdate();
22
23 var numberOfIntTSRS = connection
24 .prepareStatement("SELECT COUNT(*) FROM timeseries WHERE value_type = 'Integer';")
25 .executeQuery();
26
27 numberOfIntTSRS.next();
28 var numberOfIntTS = numberOfIntTSRS.getLong(1);
29
30 if (numberOfIntTS > 0) {
31 var minTimeRS = connection
32 .prepareStatement(
33 "SELECT MIN(time) FROM timeseries_data_points WHERE timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
34 )
35 .executeQuery();
36
37 minTimeRS.next();
38 var minTime = minTimeRS.getLong(1);
39
40 Log.infov("Earliest time: {0}", minTime);
41 var chunkStmt = connection.prepareStatement(
42 "SELECT * FROM timescaledb_information.chunks WHERE (range_start_integer >= ? OR (range_start_integer < ? AND range_end_integer > ?)) AND hypertable_name = 'timeseries_data_points' ORDER BY range_start_integer;",
43 ResultSet.TYPE_SCROLL_INSENSITIVE,
44 ResultSet.CONCUR_READ_ONLY
45 );
46 chunkStmt.setLong(1, minTime);
47 chunkStmt.setLong(2, minTime);
48 chunkStmt.setLong(3, minTime);
49 var chunksRS = chunkStmt.executeQuery();
50
51 chunksRS.last();
52 int chunkCount = chunksRS.getRow();
53 chunksRS.beforeFirst();
54
55 Log.infov("There are {0} chunks to migrate", chunkCount);
56
57 int currentChunk = 0;
58
59 while (chunksRS.next()) {
60 currentChunk++;
61 var start = chunksRS.getLong("range_start_integer");
62 var end = chunksRS.getLong("range_end_integer");
63
64 var compressed = chunksRS.getBoolean("is_compressed");
65
66 Log.infov(
67 "Chunk from {0} to {1}, number {2}/{3}",
68 Instant.ofEpochMilli(start / 1000000),
69 Instant.ofEpochMilli(end / 1000000),
70 currentChunk,
71 chunkCount
72 );
73
74
75
76 if (compressed) {
77 var decompressStmt = connection.prepareStatement("SELECT decompress_chunk(?);");
78 decompressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
79 decompressStmt.execute();
80 }
81
82
83 var updateStmt = connection.prepareStatement(
84 "UPDATE timeseries_data_points SET bigint_value = int_value WHERE time >= ? AND time < ? AND timeseries_id IN (SELECT id FROM timeseries WHERE value_type = 'Integer');"
85 );
86 updateStmt.setLong(1, start);
87 updateStmt.setLong(2, end);
88 var rowsUpdated = updateStmt.executeUpdate();
89
90
91 if (compressed) {
92 var compressStmt = connection.prepareStatement("SELECT compress_chunk(?);");
93 compressStmt.setString(1, chunksRS.getString("chunk_schema") + "." + chunksRS.getString("chunk_name"));
94 compressStmt.execute();
95 }
96
97 Log.infov("Updated chunk, changed {0} rows", rowsUpdated);
98 }
99 } else {
100 Log.info("There are no integer timeseries, therefore no data migration necessary.");
101 }
102
103 Log.info("Finished migration of data, starting table changes");
104 connection.prepareStatement("ALTER TABLE timeseries_data_points DROP COLUMN int_value;").executeUpdate();
105 connection
106 .prepareStatement("ALTER TABLE timeseries_data_points RENAME COLUMN bigint_value TO int_value;")
107 .executeUpdate();
108
109 alterCompressionJob(connection, true);
110
111 Log.info("Finished migration");
112 }
113
114 private void alterCompressionJob(Connection connection, boolean enable) throws Exception {
115 var jobIDRS = connection
116 .prepareStatement(
117 "SELECT job_id from timescaledb_information.jobs where hypertable_name = 'timeseries_data_points' and proc_name = 'policy_compression';"
118 )
119 .executeQuery();
120
121 if (!jobIDRS.next()) {
122 Log.info("Did not find a job id - not altering job");
123 }
124
125 var jobID = jobIDRS.getInt("job_id");
126
127 var jobStmt = connection.prepareStatement("SELECT alter_job(?, scheduled => ?);");
128 jobStmt.setInt(1, jobID);
129 jobStmt.setBoolean(2, enable);
130 jobStmt.execute();
131 }
132 }