Skip to main content

cadmus_core/db/
mod.rs

1pub mod backup;
2pub mod migrations;
3pub mod runtime;
4pub mod types;
5pub mod version;
6
7use anyhow::{Context, Error};
8use log::LevelFilter;
9use runtime::RUNTIME;
10use sqlx::ConnectOptions;
11use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::version::get_current_version;
17
18/// The filename of the SQLite database used by Cadmus.
19pub const DB_FILENAME: &str = "cadmus.sqlite";
20
21/// Database handle providing synchronous API over async SQLx operations.
22/// Uses a bridge pattern with `RUNTIME.block_on()` to maintain synchronous interface
23/// for compatibility with existing single-threaded event loop.
24#[derive(Clone)]
25pub struct Database {
26    pool: SqlitePool,
27    /// The path to the database file.
28    db_path: PathBuf,
29    /// The directory containing the database file.
30    ///
31    /// Will be empty if the database path is in-memory or has no parent directory.
32    db_dir: Option<PathBuf>,
33}
34
35impl std::fmt::Debug for Database {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("Database").finish()
38    }
39}
40
41impl Database {
42    /// Create a new database connection pool.
43    ///
44    /// Does not run any migrations — call [`Database::init`] after construction.
45    ///
46    /// # Arguments
47    /// * `path` - Path to the SQLite database file (will be created if it doesn't exist)
48    ///
49    /// # Returns
50    /// * `Ok(Database)` - Successfully connected database
51    /// * `Err(Error)` - Connection failure
52    #[cfg_attr(feature = "tracing", tracing::instrument(fields(db_path = %path.as_ref().display())))]
53    pub fn new<P: AsRef<Path> + std::fmt::Debug>(path: P) -> Result<Self, Error> {
54        let path = path.as_ref();
55        let db_dir = path
56            .parent()
57            .filter(|p| !p.as_os_str().is_empty())
58            .map(Path::to_path_buf);
59
60        if let Some(dir) = &db_dir {
61            std::fs::create_dir_all(dir)?;
62        }
63
64        let path_str = path.display().to_string();
65
66        tracing::info!(db_path = %path_str, "connecting to database");
67
68        RUNTIME.block_on(async {
69            let pool = open_pool(path).await?;
70            tracing::info!(db_path = %path_str, "database connected");
71            Ok(Database {
72                pool,
73                db_path: path.to_path_buf(),
74                db_dir,
75            })
76        })
77    }
78
79    /// Close all connections in the pool, checkpointing WAL and releasing file handles.
80    ///
81    /// After calling this, no further database operations should be performed.
82    /// This must be called before unmounting the filesystem that contains the database file,
83    /// to ensure SQLite releases all file descriptors and flushes any pending WAL data.
84    pub fn close(&self) {
85        tracing::info!("closing database connection pool");
86        RUNTIME.block_on(async {
87            self.pool.close().await;
88        });
89        tracing::info!("database connection pool closed");
90    }
91
92    /// Returns a reference to the SQLite connection pool.
93    pub fn pool(&self) -> &SqlitePool {
94        &self.pool
95    }
96
97    /// Returns a `MigrationRunner` bound to this database's pool.
98    ///
99    /// Use this to execute all registered runtime migrations after the
100    /// database is initialized.
101    pub fn migration_runner(&self) -> migrations::MigrationRunner {
102        migrations::MigrationRunner::new(self.pool.clone())
103    }
104
105    /// Initialises the database for use by the application.
106    ///
107    /// Performs, in order:
108    /// 1. Integrity check (`PRAGMA quick_check`).
109    /// 2. Version gate — detects upgrades, downgrades, and fresh installs.
110    /// 3. Restore from backup if corruption or downgrade is detected.
111    /// 4. Schema and runtime migrations.
112    /// 5. Version stamp update.
113    /// 6. Post-migration backup (when the version changes).
114    ///
115    /// Must be called once after [`Database::new`] before the database is used.
116    /// Intended for use in the synchronous startup path.
117    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
118    pub fn init(&mut self, backup_retention: usize) -> Result<(), Error> {
119        let app_version = get_current_version();
120
121        RUNTIME.block_on(async {
122            self.restore_if_needed(&app_version).await?;
123            self.migrate().await?;
124            version::stamp_db_version(&self.pool, &app_version, &version::current_migration_hash())
125                .await?;
126            tracing::info!(app_version = %app_version, "database version stamped");
127            self.create_version_backup(&app_version, backup_retention)
128                .await?;
129            Ok(())
130        })
131    }
132
133    /// Checks integrity and the version gate, restoring a backup when either fails.
134    ///
135    /// Corruption and downgrade are treated identically: close the pool, restore
136    /// the best available backup for `app_version`, and reopen the pool so the
137    /// caller can continue with migrations against a known-good database.
138    ///
139    /// Returns an error if a restore is needed but no backup directory or
140    /// compatible backup exists.
141    ///
142    /// # Invariant
143    ///
144    /// `db_path` is always a real file path at the point where the pool is
145    /// reopened. This is guaranteed because `db_dir` is `None` only for
146    /// `:memory:` databases, which return early before reaching that point.
147    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
148    async fn restore_if_needed(
149        &mut self,
150        app_version: &crate::version::GitVersion,
151    ) -> Result<(), Error> {
152        tracing::info!("checking database integrity");
153        let integrity = self.check_integrity().await;
154
155        let gate = if integrity.is_ok() {
156            version::check_version_gate(&self.pool, app_version).await?
157        } else {
158            version::VersionGateResult::Unknown
159        };
160
161        let needs_restore = integrity.is_err() || gate == version::VersionGateResult::Downgrade;
162
163        if !needs_restore {
164            log_version_gate(gate);
165            return Ok(());
166        }
167
168        let Some(ref db_dir) = self.db_dir.clone() else {
169            if let Err(e) = integrity {
170                tracing::error!(
171                    app_version = %app_version,
172                    "database corruption detected but no database directory available for backup restore"
173                );
174                return Err(e);
175            }
176            tracing::error!(
177                app_version = %app_version,
178                "downgrade detected but no database directory available for backup restore"
179            );
180            return Err(Error::msg(
181                "downgrade detected but no database directory available for backup restore",
182            ));
183        };
184
185        let db_version = if integrity.is_ok() {
186            version::read_db_version(&self.pool)
187                .await?
188                .unwrap_or_else(|| app_version.clone())
189        } else {
190            app_version.clone()
191        };
192
193        if integrity.is_err() {
194            tracing::warn!(
195                app_version = %app_version,
196                "database corruption detected; attempting restore from backup"
197            );
198        } else {
199            tracing::warn!(
200                app_version = %app_version,
201                db_version = %db_version,
202                "database was touched by a newer Cadmus version; restoring backup"
203            );
204        }
205
206        let backup_manager = backup::DbBackupManager::new(db_dir.clone(), app_version.clone());
207        self.pool.close().await;
208
209        let restore_context = if integrity.is_err() {
210            "corruption detected but no compatible backup found to restore"
211        } else {
212            "downgrade detected but no compatible backup found to restore"
213        };
214
215        let backup_path = backup_manager
216            .restore_best_backup(&self.db_path, &db_version)
217            .await
218            .map_err(|e| {
219                tracing::error!(app_version = %app_version, error = %e, restore_context, "restore failed");
220                Error::from(e).context(restore_context)
221            })?;
222
223        self.pool = open_pool(&self.db_path).await?;
224
225        tracing::info!(backup_path = %backup_path.display(), "database restored from backup");
226
227        Ok(())
228    }
229
230    /// Creates a versioned backup after migrations on every startup.
231    ///
232    /// Skipped in test builds, when the database is in-memory (no `db_dir`),
233    /// or when `retention` is zero.
234    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
235    async fn create_version_backup(
236        &self,
237        app_version: &crate::version::GitVersion,
238        retention: usize,
239    ) -> Result<(), Error> {
240        let Some(ref db_dir) = self.db_dir else {
241            return Ok(());
242        };
243
244        if cfg!(test) {
245            return Ok(());
246        }
247
248        if retention == 0 {
249            tracing::debug!("database backups disabled (db_backup_retention = 0)");
250            return Ok(());
251        }
252
253        let backup_manager = backup::DbBackupManager::new(db_dir.clone(), app_version.clone());
254        backup_manager.create_backup(&self.pool, retention).await?;
255
256        Ok(())
257    }
258
259    /// Runs schema migrations (sqlx) followed by runtime migrations.
260    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
261    async fn migrate(&mut self) -> Result<(), Error> {
262        tracing::info!("running schema migrations");
263        #[cfg(feature = "tracing")]
264        let span = tracing::info_span!("sqlx_migrations").entered();
265        sqlx::migrate!("./migrations").run(&self.pool).await?;
266        #[cfg(feature = "tracing")]
267        span.exit();
268
269        tracing::info!("running runtime migrations");
270        self.migration_runner().run_all().await?;
271
272        Ok(())
273    }
274
275    /// Runs a lightweight SQLite integrity check.
276    ///
277    /// Checkpoints the WAL first so that any WAL corruption is caught and all
278    /// WAL pages are flushed into the main file before `PRAGMA quick_check`
279    /// runs. Without this, a corrupt WAL would be invisible to `quick_check`.
280    ///
281    /// `PRAGMA wal_checkpoint` returns nullable integer columns (`busy`, `log`,
282    /// `checkpointed`) that sqlx typed macros cannot map, so an untyped
283    /// [`sqlx::query()`] with `.execute()` is used — only the success or failure
284    /// of the checkpoint matters here.
285    ///
286    /// Returns `Ok(())` if both the checkpoint and `PRAGMA quick_check` succeed.
287    /// On failure, logs the error and returns it so the caller can decide
288    /// whether to restore.
289    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
290    async fn check_integrity(&self) -> Result<(), Error> {
291        sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
292            .execute(&self.pool)
293            .await
294            .context("failed to run PRAGMA wal_checkpoint")?;
295
296        let result: Option<String> = sqlx::query_scalar!("PRAGMA quick_check")
297            .fetch_one(&self.pool)
298            .await
299            .context("failed to run PRAGMA quick_check")?;
300
301        if result == Some("ok".to_string()) {
302            tracing::info!("database integrity check passed");
303            Ok(())
304        } else {
305            tracing::error!(result = ?result, "database integrity check failed");
306            Err(Error::msg(format!(
307                "database integrity check failed: {:?}",
308                result
309            )))
310        }
311    }
312}
313
314/// Logs the outcome of a version gate check that does not require a restore.
315#[cfg_attr(feature = "tracing", tracing::instrument(skip(gate)))]
316fn log_version_gate(gate: version::VersionGateResult) {
317    match gate {
318        version::VersionGateResult::Upgrade => {
319            tracing::info!("database is from an older Cadmus version; upgrading");
320        }
321        version::VersionGateResult::Unknown => {
322            tracing::info!("no version stamp found in database; treating as fresh install");
323        }
324        version::VersionGateResult::Current => {
325            tracing::info!("database version matches current app version");
326        }
327        version::VersionGateResult::CompatibleDowngrade => {
328            tracing::info!(
329                "database was written by a newer Cadmus version with matching migrations"
330            );
331        }
332        version::VersionGateResult::Downgrade => unreachable!(),
333    }
334}
335
336/// Opens a connection pool for the given SQLite database path.
337#[cfg_attr(feature = "tracing", tracing::instrument(skip(path)))]
338async fn open_pool(path: &Path) -> Result<SqlitePool, Error> {
339    let path_str = path.display().to_string();
340    let options = SqliteConnectOptions::from_str(&format!("sqlite://{}", path_str))?
341        .create_if_missing(true)
342        .foreign_keys(true)
343        .log_slow_statements(LevelFilter::Warn, Duration::from_secs(2));
344
345    SqlitePoolOptions::new()
346        .max_connections(5)
347        .connect_with(options)
348        .await
349        .context("failed to open database pool")
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_database_creation() {
358        let mut db = Database::new(":memory:").expect("failed to create in-memory database");
359        db.init(0).expect("failed to run migrations");
360
361        RUNTIME.block_on(async {
362            let result: (i64,) = sqlx::query_as(
363                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='books'",
364            )
365            .fetch_one(&db.pool)
366            .await
367            .expect("failed to query sqlite_master");
368
369            assert_eq!(result.0, 1, "books table should exist after migrations");
370        });
371    }
372
373    #[test]
374    fn test_migrate_stamps_version_on_first_run() {
375        let mut db = Database::new(":memory:").expect("failed to create in-memory database");
376        db.init(0).expect("failed to run migrations");
377
378        let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
379        assert_eq!(
380            version,
381            Some(get_current_version()),
382            "first migrate should stamp the database with the current app version"
383        );
384    }
385
386    #[test]
387    fn test_migrate_current_version_is_idempotent() {
388        let mut db = Database::new(":memory:").expect("failed to create in-memory database");
389        db.init(0).expect("first migrate");
390        db.init(0)
391            .expect("second migrate should succeed (Current path)");
392
393        let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
394        assert_eq!(version, Some(get_current_version()));
395    }
396
397    #[test]
398    fn test_migrate_upgrade_from_older_version() {
399        let mut db = Database::new(":memory:").expect("failed to create in-memory database");
400        db.init(0).expect("initial migrate");
401
402        let older = crate::version::GitVersion::parse("v0.0.1").unwrap();
403        let migration_hash = version::current_migration_hash();
404        RUNTIME.block_on(async {
405            version::stamp_db_version(&db.pool, &older, &migration_hash)
406                .await
407                .unwrap();
408        });
409
410        db.init(0).expect("migrate should succeed (Upgrade path)");
411
412        let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
413        assert_eq!(
414            version,
415            Some(get_current_version()),
416            "migrate should re-stamp with current version after upgrade"
417        );
418    }
419
420    #[test]
421    fn test_migrate_downgrade_without_db_dir_errors() {
422        let mut db = Database::new(":memory:").expect("failed to create in-memory database");
423        db.init(0).expect("initial migrate");
424
425        let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
426        let migration_hash = incompatible_migration_hash();
427        RUNTIME.block_on(async {
428            version::stamp_db_version(&db.pool, &newer, &migration_hash)
429                .await
430                .unwrap();
431        });
432
433        let err = db
434            .init(0)
435            .expect_err("init should fail on downgrade without db_dir");
436        assert!(
437            err.to_string().contains("no database directory available"),
438            "unexpected error: {}",
439            err
440        );
441    }
442
443    #[test]
444    fn test_migrate_downgrade_with_db_dir_errors_without_backup() {
445        let dir = tempfile::Builder::new()
446            .prefix("cadmus-downgrade-no-backup-")
447            .tempdir()
448            .expect("failed to create temp dir");
449        let db_path = dir.path().join("test.sqlite");
450
451        let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
452        db.init(0).expect("initial migrate");
453
454        let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
455        let migration_hash = incompatible_migration_hash();
456        RUNTIME.block_on(async {
457            version::stamp_db_version(&db.pool, &newer, &migration_hash)
458                .await
459                .unwrap();
460        });
461
462        let err = db
463            .init(0)
464            .expect_err("init should fail when no backup is available");
465        assert!(
466            err.to_string().contains("no compatible backup found"),
467            "unexpected error: {}",
468            err
469        );
470    }
471
472    #[test]
473    fn test_migrate_downgrade_with_db_dir_restores_backup() {
474        let dir = tempfile::Builder::new()
475            .prefix("cadmus-downgrade-restore-")
476            .tempdir()
477            .expect("failed to create temp dir");
478        let db_path = dir.path().join("test.sqlite");
479
480        let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
481        db.init(0).expect("initial migrate");
482
483        let app_version = get_current_version();
484        RUNTIME.block_on(async {
485            let backup_manager =
486                backup::DbBackupManager::new(dir.path().to_path_buf(), app_version.clone());
487            backup_manager.create_backup(&db.pool, 2).await.unwrap();
488        });
489
490        let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
491        let migration_hash = incompatible_migration_hash();
492        RUNTIME.block_on(async {
493            version::stamp_db_version(&db.pool, &newer, &migration_hash)
494                .await
495                .unwrap();
496        });
497
498        db.init(0)
499            .expect("migrate should succeed on downgrade with db_dir (restore path)");
500
501        let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
502        assert_eq!(
503            version,
504            Some(get_current_version()),
505            "migrate should re-stamp with current version after downgrade restore"
506        );
507
508        let demoted = dir
509            .path()
510            .join("backups")
511            .join(format!("cadmus-{}-demoted.sqlite", newer));
512        assert!(
513            demoted.exists(),
514            "demoted file should be named after the DB version ({}), not the app version",
515            newer
516        );
517    }
518
519    #[test]
520    fn test_check_integrity_passes_on_valid_database() {
521        let db = Database::new(":memory:").expect("failed to create in-memory database");
522        RUNTIME.block_on(async {
523            db.check_integrity()
524                .await
525                .expect("integrity check should pass on a fresh database");
526        });
527    }
528
529    #[test]
530    fn test_init_restores_backup_on_corruption() {
531        let dir = tempfile::Builder::new()
532            .prefix("cadmus-corruption-restore-")
533            .tempdir()
534            .expect("failed to create temp dir");
535        let db_path = dir.path().join("cadmus.sqlite");
536
537        let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
538        db.init(0).expect("failed to run initial migrations");
539
540        let app_version = get_current_version();
541        RUNTIME.block_on(async {
542            let backup_manager =
543                backup::DbBackupManager::new(dir.path().to_path_buf(), app_version.clone());
544            backup_manager.create_backup(&db.pool, 2).await.unwrap();
545        });
546
547        RUNTIME.block_on(async { db.pool.close().await });
548
549        {
550            let mut bytes = std::fs::read(&db_path).expect("failed to read db file");
551            for chunk in bytes[100..].chunks_mut(512) {
552                chunk.fill(0xFF);
553            }
554            std::fs::write(&db_path, &bytes).expect("failed to write corrupted db");
555        }
556
557        let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to reopen database");
558        db.init(0)
559            .expect("init should restore from backup on corruption");
560
561        let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
562        assert_eq!(
563            version,
564            Some(get_current_version()),
565            "restored database should be stamped with current version"
566        );
567    }
568
569    #[test]
570    fn test_check_integrity_fails_on_corrupted_database() {
571        let dir = tempfile::Builder::new()
572            .prefix("cadmus-integrity-test-")
573            .tempdir()
574            .expect("failed to create temp dir");
575        let db_path = dir.path().join("corrupt.sqlite");
576
577        let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
578        db.init(0).expect("failed to run migrations");
579
580        RUNTIME.block_on(async { db.pool.close().await });
581
582        {
583            let mut bytes = std::fs::read(&db_path).expect("failed to read db file");
584            for chunk in bytes[100..].chunks_mut(512) {
585                chunk.fill(0xFF);
586            }
587            std::fs::write(&db_path, &bytes).expect("failed to write corrupted db");
588        }
589
590        let db = Database::new(db_path.to_str().unwrap()).expect("failed to reopen database");
591        let result = RUNTIME.block_on(async { db.check_integrity().await });
592        let err = result.expect_err("integrity check should fail on corrupted database");
593        let err_msg = err.to_string();
594        assert!(
595            err_msg.contains("integrity check failed")
596                || err_msg.contains("PRAGMA quick_check")
597                || err_msg.contains("wal_checkpoint"),
598            "expected integrity-related failure, got: {err_msg}"
599        );
600    }
601
602    fn incompatible_migration_hash() -> version::MigrationHash {
603        blake3::hash(uuid::Uuid::now_v7().as_bytes())
604            .to_hex()
605            .to_string()
606            .parse()
607            .unwrap()
608    }
609}