From 4d779cfc69fea5bb8ae7d0eec24eeab07ade6950 Mon Sep 17 00:00:00 2001 From: Louis Lam Date: Sat, 26 Oct 2024 20:50:29 +0800 Subject: [PATCH] Data migration and history retention for 2.0.0 (#5075) --- extra/reset-migrate-aggregate-table-state.js | 24 +++ package.json | 3 +- server/database.js | 163 +++++++++++++++++++ server/jobs/clear-old-data.js | 34 ++-- server/server.js | 16 +- server/uptime-calculator.js | 109 ++++++++----- 6 files changed, 292 insertions(+), 57 deletions(-) create mode 100644 extra/reset-migrate-aggregate-table-state.js diff --git a/extra/reset-migrate-aggregate-table-state.js b/extra/reset-migrate-aggregate-table-state.js new file mode 100644 index 00000000..e6c51fbd --- /dev/null +++ b/extra/reset-migrate-aggregate-table-state.js @@ -0,0 +1,24 @@ +const { R } = require("redbean-node"); +const Database = require("../server/database"); +const args = require("args-parser")(process.argv); +const { Settings } = require("../server/settings"); + +const main = async () => { + console.log("Connecting the database"); + Database.initDataDir(args); + await Database.connect(false, false, true); + + console.log("Deleting all data from aggregate tables"); + await R.exec("DELETE FROM stat_minutely"); + await R.exec("DELETE FROM stat_hourly"); + await R.exec("DELETE FROM stat_daily"); + + console.log("Resetting the aggregate table state"); + await Settings.set("migrateAggregateTableState", ""); + + await Database.close(); + console.log("Done"); +}; + +main(); + diff --git a/package.json b/package.json index 5186cafc..2a28b8d4 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,8 @@ "sort-contributors": "node extra/sort-contributors.js", "quick-run-nightly": "docker run --rm --env NODE_ENV=development -p 3001:3001 louislam/uptime-kuma:nightly2", "start-dev-container": "cd docker && docker-compose -f docker-compose-dev.yml up --force-recreate", - "rebase-pr-to-1.23.X": "node extra/rebase-pr.js 1.23.X" + "rebase-pr-to-1.23.X": "node extra/rebase-pr.js 1.23.X", + "reset-migrate-aggregate-table-state": "node extra/reset-migrate-aggregate-table-state.js" }, "dependencies": { "@grpc/grpc-js": "~1.8.22", diff --git a/server/database.js b/server/database.js index 3374aff9..714c51ba 100644 --- a/server/database.js +++ b/server/database.js @@ -6,6 +6,9 @@ const knex = require("knex"); const path = require("path"); const { EmbeddedMariaDB } = require("./embedded-mariadb"); const mysql = require("mysql2/promise"); +const { Settings } = require("./settings"); +const { UptimeCalculator } = require("./uptime-calculator"); +const dayjs = require("dayjs"); /** * Database & App Data Folder @@ -391,9 +394,23 @@ class Database { // https://knexjs.org/guide/migrations.html // https://gist.github.com/NigelEarle/70db130cc040cc2868555b29a0278261 try { + // Disable foreign key check for SQLite + // Known issue of knex: https://github.com/drizzle-team/drizzle-orm/issues/1813 + if (Database.dbConfig.type === "sqlite") { + await R.exec("PRAGMA foreign_keys = OFF"); + } + await R.knex.migrate.latest({ directory: Database.knexMigrationsPath, }); + + // Enable foreign key check for SQLite + if (Database.dbConfig.type === "sqlite") { + await R.exec("PRAGMA foreign_keys = ON"); + } + + await this.migrateAggregateTable(); + } catch (e) { // Allow missing patch files for downgrade or testing pr. if (e.message.includes("the following files are missing:")) { @@ -711,6 +728,152 @@ class Database { } } + /** + * Migrate the old data in the heartbeat table to the new format (stat_daily, stat_hourly, stat_minutely) + * It should be run once while upgrading V1 to V2 + * + * Normally, it should be in transaction, but UptimeCalculator wasn't designed to be in transaction before that. + * I don't want to heavily modify the UptimeCalculator, so it is not in transaction. + * Run `npm run reset-migrate-aggregate-table-state` to reset, in case the migration is interrupted. + * @returns {Promise} + */ + static async migrateAggregateTable() { + log.debug("db", "Enter Migrate Aggregate Table function"); + + // Add a setting for 2.0.0-dev users to skip this migration + if (process.env.SET_MIGRATE_AGGREGATE_TABLE_TO_TRUE === "1") { + log.warn("db", "SET_MIGRATE_AGGREGATE_TABLE_TO_TRUE is set to 1, skipping aggregate table migration forever (for 2.0.0-dev users)"); + await Settings.set("migrateAggregateTableState", "migrated"); + } + + let migrateState = await Settings.get("migrateAggregateTableState"); + + // Skip if already migrated + // If it is migrating, it possibly means the migration was interrupted, or the migration is in progress + if (migrateState === "migrated") { + log.debug("db", "Migrated aggregate table already, skip"); + return; + } else if (migrateState === "migrating") { + log.warn("db", "Aggregate table migration is already in progress, or it was interrupted"); + throw new Error("Aggregate table migration is already in progress"); + } + + await Settings.set("migrateAggregateTableState", "migrating"); + + log.info("db", "Migrating Aggregate Table"); + + log.info("db", "Getting list of unique monitors"); + + // Get a list of unique monitors from the heartbeat table, using raw sql + let monitors = await R.getAll(` + SELECT DISTINCT monitor_id + FROM heartbeat + ORDER BY monitor_id ASC + `); + + // Stop if stat_* tables are not empty + for (let table of [ "stat_minutely", "stat_hourly", "stat_daily" ]) { + let countResult = await R.getRow(`SELECT COUNT(*) AS count FROM ${table}`); + let count = countResult.count; + if (count > 0) { + log.warn("db", `Aggregate table ${table} is not empty, migration will not be started (Maybe you were using 2.0.0-dev?)`); + return; + } + } + + let progressPercent = 0; + let part = 100 / monitors.length; + let i = 1; + for (let monitor of monitors) { + // Get a list of unique dates from the heartbeat table, using raw sql + let dates = await R.getAll(` + SELECT DISTINCT DATE(time) AS date + FROM heartbeat + WHERE monitor_id = ? + ORDER BY date ASC + `, [ + monitor.monitor_id + ]); + + for (let date of dates) { + // New Uptime Calculator + let calculator = new UptimeCalculator(); + calculator.monitorID = monitor.monitor_id; + calculator.setMigrationMode(true); + + // Get all the heartbeats for this monitor and date + let heartbeats = await R.getAll(` + SELECT status, ping, time + FROM heartbeat + WHERE monitor_id = ? + AND DATE(time) = ? + ORDER BY time ASC + `, [ monitor.monitor_id, date.date ]); + + if (heartbeats.length > 0) { + log.info("db", `[DON'T STOP] Migrating monitor data ${monitor.monitor_id} - ${date.date} [${progressPercent.toFixed(2)}%][${i}/${monitors.length}]`); + } + + for (let heartbeat of heartbeats) { + await calculator.update(heartbeat.status, parseFloat(heartbeat.ping), dayjs(heartbeat.time)); + } + + progressPercent += (Math.round(part / dates.length * 100) / 100); + + // Lazy to fix the floating point issue, it is acceptable since it is just a progress bar + if (progressPercent > 100) { + progressPercent = 100; + } + } + + i++; + } + + await Database.clearHeartbeatData(true); + + await Settings.set("migrateAggregateTableState", "migrated"); + + if (monitors.length > 0) { + log.info("db", "Aggregate Table Migration Completed"); + } else { + log.info("db", "No data to migrate"); + } + } + + /** + * Remove all non-important heartbeats from heartbeat table, keep last 24-hour or {KEEP_LAST_ROWS} rows for each monitor + * @param {boolean} detailedLog Log detailed information + * @returns {Promise} + */ + static async clearHeartbeatData(detailedLog = false) { + let monitors = await R.getAll("SELECT id FROM monitor"); + const sqlHourOffset = Database.sqlHourOffset(); + + for (let monitor of monitors) { + if (detailedLog) { + log.info("db", "Deleting non-important heartbeats for monitor " + monitor.id); + } + await R.exec(` + DELETE FROM heartbeat + WHERE monitor_id = ? + AND important = 0 + AND time < ${sqlHourOffset} + AND id NOT IN ( + SELECT id + FROM heartbeat + WHERE monitor_id = ? + ORDER BY time DESC + LIMIT ? + ) + `, [ + monitor.id, + -24, + monitor.id, + 100, + ]); + } + } + } module.exports = Database; diff --git a/server/jobs/clear-old-data.js b/server/jobs/clear-old-data.js index 248a4d40..cfd65a8a 100644 --- a/server/jobs/clear-old-data.js +++ b/server/jobs/clear-old-data.js @@ -1,21 +1,22 @@ const { R } = require("redbean-node"); const { log } = require("../../src/util"); -const { setSetting, setting } = require("../util-server"); const Database = require("../database"); +const { Settings } = require("../settings"); +const dayjs = require("dayjs"); -const DEFAULT_KEEP_PERIOD = 180; +const DEFAULT_KEEP_PERIOD = 365; /** - * Clears old data from the heartbeat table of the database. + * Clears old data from the heartbeat table and the stat_daily of the database. * @returns {Promise} A promise that resolves when the data has been cleared. */ - const clearOldData = async () => { - let period = await setting("keepDataPeriodDays"); + await Database.clearHeartbeatData(); + let period = await Settings.get("keepDataPeriodDays"); // Set Default Period if (period == null) { - await setSetting("keepDataPeriodDays", DEFAULT_KEEP_PERIOD, "general"); + await Settings.set("keepDataPeriodDays", DEFAULT_KEEP_PERIOD, "general"); period = DEFAULT_KEEP_PERIOD; } @@ -25,23 +26,28 @@ const clearOldData = async () => { parsedPeriod = parseInt(period); } catch (_) { log.warn("clearOldData", "Failed to parse setting, resetting to default.."); - await setSetting("keepDataPeriodDays", DEFAULT_KEEP_PERIOD, "general"); + await Settings.set("keepDataPeriodDays", DEFAULT_KEEP_PERIOD, "general"); parsedPeriod = DEFAULT_KEEP_PERIOD; } if (parsedPeriod < 1) { log.info("clearOldData", `Data deletion has been disabled as period is less than 1. Period is ${parsedPeriod} days.`); } else { - log.debug("clearOldData", `Clearing Data older than ${parsedPeriod} days...`); - const sqlHourOffset = Database.sqlHourOffset(); try { - await R.exec( - "DELETE FROM heartbeat WHERE time < " + sqlHourOffset, - [ parsedPeriod * -24 ] - ); + // Heartbeat + await R.exec("DELETE FROM heartbeat WHERE time < " + sqlHourOffset, [ + parsedPeriod * -24, + ]); + + let timestamp = dayjs().subtract(parsedPeriod, "day").utc().startOf("day").unix(); + + // stat_daily + await R.exec("DELETE FROM stat_daily WHERE timestamp < ? ", [ + timestamp, + ]); if (Database.dbConfig.type === "sqlite") { await R.exec("PRAGMA optimize;"); @@ -50,6 +56,8 @@ const clearOldData = async () => { log.error("clearOldData", `Failed to clear old data: ${e.message}`); } } + + log.debug("clearOldData", "Data cleared."); }; module.exports = { diff --git a/server/server.js b/server/server.js index c88daca8..7c46fa89 100644 --- a/server/server.js +++ b/server/server.js @@ -1604,18 +1604,20 @@ let needSetup = false; await server.start(); - server.httpServer.listen(port, hostname, () => { + server.httpServer.listen(port, hostname, async () => { if (hostname) { log.info("server", `Listening on ${hostname}:${port}`); } else { log.info("server", `Listening on ${port}`); } - startMonitors(); + await startMonitors(); + + // Put this here. Start background jobs after the db and server is ready to prevent clear up during db migration. + await initBackgroundJobs(); + checkVersion.startInterval(); }); - await initBackgroundJobs(); - // Start cloudflared at the end if configured await cloudflaredAutoStart(cloudflaredToken); @@ -1809,7 +1811,11 @@ async function startMonitors() { } for (let monitor of list) { - await monitor.start(io); + try { + await monitor.start(io); + } catch (e) { + log.error("monitor", e); + } // Give some delays, so all monitors won't make request at the same moment when just start the server. await sleep(getRandomInt(300, 1000)); } diff --git a/server/uptime-calculator.js b/server/uptime-calculator.js index f2738b96..71d1d458 100644 --- a/server/uptime-calculator.js +++ b/server/uptime-calculator.js @@ -12,7 +12,6 @@ class UptimeCalculator { * @private * @type {{string:UptimeCalculator}} */ - static list = {}; /** @@ -55,6 +54,15 @@ class UptimeCalculator { lastHourlyStatBean = null; lastMinutelyStatBean = null; + /** + * For migration purposes. + * @type {boolean} + */ + migrationMode = false; + + statMinutelyKeepHour = 24; + statHourlyKeepDay = 30; + /** * Get the uptime calculator for a monitor * Initializes and returns the monitor if it does not exist @@ -189,16 +197,19 @@ class UptimeCalculator { /** * @param {number} status status * @param {number} ping Ping + * @param {dayjs.Dayjs} date Date (Only for migration) * @returns {dayjs.Dayjs} date * @throws {Error} Invalid status */ - async update(status, ping = 0) { - let date = this.getCurrentDate(); + async update(status, ping = 0, date) { + if (!date) { + date = this.getCurrentDate(); + } let flatStatus = this.flatStatus(status); if (flatStatus === DOWN && ping > 0) { - log.warn("uptime-calc", "The ping is not effective when the status is DOWN"); + log.debug("uptime-calc", "The ping is not effective when the status is DOWN"); } let divisionKey = this.getMinutelyKey(date); @@ -297,47 +308,61 @@ class UptimeCalculator { } await R.store(dailyStatBean); - let hourlyStatBean = await this.getHourlyStatBean(hourlyKey); - hourlyStatBean.up = hourlyData.up; - hourlyStatBean.down = hourlyData.down; - hourlyStatBean.ping = hourlyData.avgPing; - hourlyStatBean.pingMin = hourlyData.minPing; - hourlyStatBean.pingMax = hourlyData.maxPing; - { - // eslint-disable-next-line no-unused-vars - const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = hourlyData; - if (Object.keys(extras).length > 0) { - hourlyStatBean.extras = JSON.stringify(extras); + let currentDate = this.getCurrentDate(); + + // For migration mode, we don't need to store old hourly and minutely data, but we need 30-day's hourly data + // Run anyway for non-migration mode + if (!this.migrationMode || date.isAfter(currentDate.subtract(this.statHourlyKeepDay, "day"))) { + let hourlyStatBean = await this.getHourlyStatBean(hourlyKey); + hourlyStatBean.up = hourlyData.up; + hourlyStatBean.down = hourlyData.down; + hourlyStatBean.ping = hourlyData.avgPing; + hourlyStatBean.pingMin = hourlyData.minPing; + hourlyStatBean.pingMax = hourlyData.maxPing; + { + // eslint-disable-next-line no-unused-vars + const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = hourlyData; + if (Object.keys(extras).length > 0) { + hourlyStatBean.extras = JSON.stringify(extras); + } } + await R.store(hourlyStatBean); } - await R.store(hourlyStatBean); - let minutelyStatBean = await this.getMinutelyStatBean(divisionKey); - minutelyStatBean.up = minutelyData.up; - minutelyStatBean.down = minutelyData.down; - minutelyStatBean.ping = minutelyData.avgPing; - minutelyStatBean.pingMin = minutelyData.minPing; - minutelyStatBean.pingMax = minutelyData.maxPing; - { - // eslint-disable-next-line no-unused-vars - const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = minutelyData; - if (Object.keys(extras).length > 0) { - minutelyStatBean.extras = JSON.stringify(extras); + // For migration mode, we don't need to store old hourly and minutely data, but we need 24-hour's minutely data + // Run anyway for non-migration mode + if (!this.migrationMode || date.isAfter(currentDate.subtract(this.statMinutelyKeepHour, "hour"))) { + let minutelyStatBean = await this.getMinutelyStatBean(divisionKey); + minutelyStatBean.up = minutelyData.up; + minutelyStatBean.down = minutelyData.down; + minutelyStatBean.ping = minutelyData.avgPing; + minutelyStatBean.pingMin = minutelyData.minPing; + minutelyStatBean.pingMax = minutelyData.maxPing; + { + // eslint-disable-next-line no-unused-vars + const { up, down, avgPing, minPing, maxPing, timestamp, ...extras } = minutelyData; + if (Object.keys(extras).length > 0) { + minutelyStatBean.extras = JSON.stringify(extras); + } } + await R.store(minutelyStatBean); } - await R.store(minutelyStatBean); - // Remove the old data - log.debug("uptime-calc", "Remove old data"); - await R.exec("DELETE FROM stat_minutely WHERE monitor_id = ? AND timestamp < ?", [ - this.monitorID, - this.getMinutelyKey(date.subtract(24, "hour")), - ]); + // No need to remove old data in migration mode + if (!this.migrationMode) { + // Remove the old data + // TODO: Improvement: Convert it to a job? + log.debug("uptime-calc", "Remove old data"); + await R.exec("DELETE FROM stat_minutely WHERE monitor_id = ? AND timestamp < ?", [ + this.monitorID, + this.getMinutelyKey(currentDate.subtract(this.statMinutelyKeepHour, "hour")), + ]); - await R.exec("DELETE FROM stat_hourly WHERE monitor_id = ? AND timestamp < ?", [ - this.monitorID, - this.getHourlyKey(date.subtract(30, "day")), - ]); + await R.exec("DELETE FROM stat_hourly WHERE monitor_id = ? AND timestamp < ?", [ + this.monitorID, + this.getHourlyKey(currentDate.subtract(this.statHourlyKeepDay, "day")), + ]); + } return date; } @@ -812,6 +837,14 @@ class UptimeCalculator { return dayjs.utc(); } + /** + * For migration purposes. + * @param {boolean} value Migration mode on/off + * @returns {void} + */ + setMigrationMode(value) { + this.migrationMode = value; + } } class UptimeDataResult {