1pub mod backup;
2pub mod migrations;
3pub mod runtime;
4pub mod types;
5pub mod version;
6
7use anyhow::{Context, Error};
8use log::LevelFilter;
9use runtime::RUNTIME;
10use sqlx::ConnectOptions;
11use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
12use std::path::{Path, PathBuf};
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::version::get_current_version;
17
18pub const DB_FILENAME: &str = "cadmus.sqlite";
20
21#[derive(Clone)]
25pub struct Database {
26 pool: SqlitePool,
27 db_path: PathBuf,
29 db_dir: Option<PathBuf>,
33}
34
35impl std::fmt::Debug for Database {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("Database").finish()
38 }
39}
40
41impl Database {
42 #[cfg_attr(feature = "tracing", tracing::instrument(fields(db_path = %path.as_ref().display())))]
53 pub fn new<P: AsRef<Path> + std::fmt::Debug>(path: P) -> Result<Self, Error> {
54 let path = path.as_ref();
55 let db_dir = path
56 .parent()
57 .filter(|p| !p.as_os_str().is_empty())
58 .map(Path::to_path_buf);
59
60 if let Some(dir) = &db_dir {
61 std::fs::create_dir_all(dir)?;
62 }
63
64 let path_str = path.display().to_string();
65
66 tracing::info!(db_path = %path_str, "connecting to database");
67
68 RUNTIME.block_on(async {
69 let pool = open_pool(path).await?;
70 tracing::info!(db_path = %path_str, "database connected");
71 Ok(Database {
72 pool,
73 db_path: path.to_path_buf(),
74 db_dir,
75 })
76 })
77 }
78
79 pub fn close(&self) {
85 tracing::info!("closing database connection pool");
86 RUNTIME.block_on(async {
87 self.pool.close().await;
88 });
89 tracing::info!("database connection pool closed");
90 }
91
92 pub fn pool(&self) -> &SqlitePool {
94 &self.pool
95 }
96
97 pub fn migration_runner(&self) -> migrations::MigrationRunner {
102 migrations::MigrationRunner::new(self.pool.clone())
103 }
104
105 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
118 pub fn init(&mut self, backup_retention: usize) -> Result<(), Error> {
119 let app_version = get_current_version();
120
121 RUNTIME.block_on(async {
122 self.restore_if_needed(&app_version).await?;
123 self.migrate().await?;
124 version::stamp_db_version(&self.pool, &app_version, &version::current_migration_hash())
125 .await?;
126 tracing::info!(app_version = %app_version, "database version stamped");
127 self.create_version_backup(&app_version, backup_retention)
128 .await?;
129 Ok(())
130 })
131 }
132
133 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
148 async fn restore_if_needed(
149 &mut self,
150 app_version: &crate::version::GitVersion,
151 ) -> Result<(), Error> {
152 tracing::info!("checking database integrity");
153 let integrity = self.check_integrity().await;
154
155 let gate = if integrity.is_ok() {
156 version::check_version_gate(&self.pool, app_version).await?
157 } else {
158 version::VersionGateResult::Unknown
159 };
160
161 let needs_restore = integrity.is_err() || gate == version::VersionGateResult::Downgrade;
162
163 if !needs_restore {
164 log_version_gate(gate);
165 return Ok(());
166 }
167
168 let Some(ref db_dir) = self.db_dir.clone() else {
169 if let Err(e) = integrity {
170 tracing::error!(
171 app_version = %app_version,
172 "database corruption detected but no database directory available for backup restore"
173 );
174 return Err(e);
175 }
176 tracing::error!(
177 app_version = %app_version,
178 "downgrade detected but no database directory available for backup restore"
179 );
180 return Err(Error::msg(
181 "downgrade detected but no database directory available for backup restore",
182 ));
183 };
184
185 let db_version = if integrity.is_ok() {
186 version::read_db_version(&self.pool)
187 .await?
188 .unwrap_or_else(|| app_version.clone())
189 } else {
190 app_version.clone()
191 };
192
193 if integrity.is_err() {
194 tracing::warn!(
195 app_version = %app_version,
196 "database corruption detected; attempting restore from backup"
197 );
198 } else {
199 tracing::warn!(
200 app_version = %app_version,
201 db_version = %db_version,
202 "database was touched by a newer Cadmus version; restoring backup"
203 );
204 }
205
206 let backup_manager = backup::DbBackupManager::new(db_dir.clone(), app_version.clone());
207 self.pool.close().await;
208
209 let restore_context = if integrity.is_err() {
210 "corruption detected but no compatible backup found to restore"
211 } else {
212 "downgrade detected but no compatible backup found to restore"
213 };
214
215 let backup_path = backup_manager
216 .restore_best_backup(&self.db_path, &db_version)
217 .await
218 .map_err(|e| {
219 tracing::error!(app_version = %app_version, error = %e, restore_context, "restore failed");
220 Error::from(e).context(restore_context)
221 })?;
222
223 self.pool = open_pool(&self.db_path).await?;
224
225 tracing::info!(backup_path = %backup_path.display(), "database restored from backup");
226
227 Ok(())
228 }
229
230 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
235 async fn create_version_backup(
236 &self,
237 app_version: &crate::version::GitVersion,
238 retention: usize,
239 ) -> Result<(), Error> {
240 let Some(ref db_dir) = self.db_dir else {
241 return Ok(());
242 };
243
244 if cfg!(test) {
245 return Ok(());
246 }
247
248 if retention == 0 {
249 tracing::debug!("database backups disabled (db_backup_retention = 0)");
250 return Ok(());
251 }
252
253 let backup_manager = backup::DbBackupManager::new(db_dir.clone(), app_version.clone());
254 backup_manager.create_backup(&self.pool, retention).await?;
255
256 Ok(())
257 }
258
259 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
261 async fn migrate(&mut self) -> Result<(), Error> {
262 tracing::info!("running schema migrations");
263 #[cfg(feature = "tracing")]
264 let span = tracing::info_span!("sqlx_migrations").entered();
265 sqlx::migrate!("./migrations").run(&self.pool).await?;
266 #[cfg(feature = "tracing")]
267 span.exit();
268
269 tracing::info!("running runtime migrations");
270 self.migration_runner().run_all().await?;
271
272 Ok(())
273 }
274
275 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
290 async fn check_integrity(&self) -> Result<(), Error> {
291 sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
292 .execute(&self.pool)
293 .await
294 .context("failed to run PRAGMA wal_checkpoint")?;
295
296 let result: Option<String> = sqlx::query_scalar!("PRAGMA quick_check")
297 .fetch_one(&self.pool)
298 .await
299 .context("failed to run PRAGMA quick_check")?;
300
301 if result == Some("ok".to_string()) {
302 tracing::info!("database integrity check passed");
303 Ok(())
304 } else {
305 tracing::error!(result = ?result, "database integrity check failed");
306 Err(Error::msg(format!(
307 "database integrity check failed: {:?}",
308 result
309 )))
310 }
311 }
312}
313
314#[cfg_attr(feature = "tracing", tracing::instrument(skip(gate)))]
316fn log_version_gate(gate: version::VersionGateResult) {
317 match gate {
318 version::VersionGateResult::Upgrade => {
319 tracing::info!("database is from an older Cadmus version; upgrading");
320 }
321 version::VersionGateResult::Unknown => {
322 tracing::info!("no version stamp found in database; treating as fresh install");
323 }
324 version::VersionGateResult::Current => {
325 tracing::info!("database version matches current app version");
326 }
327 version::VersionGateResult::CompatibleDowngrade => {
328 tracing::info!(
329 "database was written by a newer Cadmus version with matching migrations"
330 );
331 }
332 version::VersionGateResult::Downgrade => unreachable!(),
333 }
334}
335
336#[cfg_attr(feature = "tracing", tracing::instrument(skip(path)))]
338async fn open_pool(path: &Path) -> Result<SqlitePool, Error> {
339 let path_str = path.display().to_string();
340 let options = SqliteConnectOptions::from_str(&format!("sqlite://{}", path_str))?
341 .create_if_missing(true)
342 .foreign_keys(true)
343 .log_slow_statements(LevelFilter::Warn, Duration::from_secs(2));
344
345 SqlitePoolOptions::new()
346 .max_connections(5)
347 .connect_with(options)
348 .await
349 .context("failed to open database pool")
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_database_creation() {
358 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
359 db.init(0).expect("failed to run migrations");
360
361 RUNTIME.block_on(async {
362 let result: (i64,) = sqlx::query_as(
363 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='books'",
364 )
365 .fetch_one(&db.pool)
366 .await
367 .expect("failed to query sqlite_master");
368
369 assert_eq!(result.0, 1, "books table should exist after migrations");
370 });
371 }
372
373 #[test]
374 fn test_migrate_stamps_version_on_first_run() {
375 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
376 db.init(0).expect("failed to run migrations");
377
378 let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
379 assert_eq!(
380 version,
381 Some(get_current_version()),
382 "first migrate should stamp the database with the current app version"
383 );
384 }
385
386 #[test]
387 fn test_migrate_current_version_is_idempotent() {
388 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
389 db.init(0).expect("first migrate");
390 db.init(0)
391 .expect("second migrate should succeed (Current path)");
392
393 let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
394 assert_eq!(version, Some(get_current_version()));
395 }
396
397 #[test]
398 fn test_migrate_upgrade_from_older_version() {
399 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
400 db.init(0).expect("initial migrate");
401
402 let older = crate::version::GitVersion::parse("v0.0.1").unwrap();
403 let migration_hash = version::current_migration_hash();
404 RUNTIME.block_on(async {
405 version::stamp_db_version(&db.pool, &older, &migration_hash)
406 .await
407 .unwrap();
408 });
409
410 db.init(0).expect("migrate should succeed (Upgrade path)");
411
412 let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
413 assert_eq!(
414 version,
415 Some(get_current_version()),
416 "migrate should re-stamp with current version after upgrade"
417 );
418 }
419
420 #[test]
421 fn test_migrate_downgrade_without_db_dir_errors() {
422 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
423 db.init(0).expect("initial migrate");
424
425 let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
426 let migration_hash = incompatible_migration_hash();
427 RUNTIME.block_on(async {
428 version::stamp_db_version(&db.pool, &newer, &migration_hash)
429 .await
430 .unwrap();
431 });
432
433 let err = db
434 .init(0)
435 .expect_err("init should fail on downgrade without db_dir");
436 assert!(
437 err.to_string().contains("no database directory available"),
438 "unexpected error: {}",
439 err
440 );
441 }
442
443 #[test]
444 fn test_migrate_downgrade_with_db_dir_errors_without_backup() {
445 let dir = tempfile::Builder::new()
446 .prefix("cadmus-downgrade-no-backup-")
447 .tempdir()
448 .expect("failed to create temp dir");
449 let db_path = dir.path().join("test.sqlite");
450
451 let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
452 db.init(0).expect("initial migrate");
453
454 let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
455 let migration_hash = incompatible_migration_hash();
456 RUNTIME.block_on(async {
457 version::stamp_db_version(&db.pool, &newer, &migration_hash)
458 .await
459 .unwrap();
460 });
461
462 let err = db
463 .init(0)
464 .expect_err("init should fail when no backup is available");
465 assert!(
466 err.to_string().contains("no compatible backup found"),
467 "unexpected error: {}",
468 err
469 );
470 }
471
472 #[test]
473 fn test_migrate_downgrade_with_db_dir_restores_backup() {
474 let dir = tempfile::Builder::new()
475 .prefix("cadmus-downgrade-restore-")
476 .tempdir()
477 .expect("failed to create temp dir");
478 let db_path = dir.path().join("test.sqlite");
479
480 let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
481 db.init(0).expect("initial migrate");
482
483 let app_version = get_current_version();
484 RUNTIME.block_on(async {
485 let backup_manager =
486 backup::DbBackupManager::new(dir.path().to_path_buf(), app_version.clone());
487 backup_manager.create_backup(&db.pool, 2).await.unwrap();
488 });
489
490 let newer = crate::version::GitVersion::parse("v99.99.99").unwrap();
491 let migration_hash = incompatible_migration_hash();
492 RUNTIME.block_on(async {
493 version::stamp_db_version(&db.pool, &newer, &migration_hash)
494 .await
495 .unwrap();
496 });
497
498 db.init(0)
499 .expect("migrate should succeed on downgrade with db_dir (restore path)");
500
501 let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
502 assert_eq!(
503 version,
504 Some(get_current_version()),
505 "migrate should re-stamp with current version after downgrade restore"
506 );
507
508 let demoted = dir
509 .path()
510 .join("backups")
511 .join(format!("cadmus-{}-demoted.sqlite", newer));
512 assert!(
513 demoted.exists(),
514 "demoted file should be named after the DB version ({}), not the app version",
515 newer
516 );
517 }
518
519 #[test]
520 fn test_check_integrity_passes_on_valid_database() {
521 let db = Database::new(":memory:").expect("failed to create in-memory database");
522 RUNTIME.block_on(async {
523 db.check_integrity()
524 .await
525 .expect("integrity check should pass on a fresh database");
526 });
527 }
528
529 #[test]
530 fn test_init_restores_backup_on_corruption() {
531 let dir = tempfile::Builder::new()
532 .prefix("cadmus-corruption-restore-")
533 .tempdir()
534 .expect("failed to create temp dir");
535 let db_path = dir.path().join("cadmus.sqlite");
536
537 let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
538 db.init(0).expect("failed to run initial migrations");
539
540 let app_version = get_current_version();
541 RUNTIME.block_on(async {
542 let backup_manager =
543 backup::DbBackupManager::new(dir.path().to_path_buf(), app_version.clone());
544 backup_manager.create_backup(&db.pool, 2).await.unwrap();
545 });
546
547 RUNTIME.block_on(async { db.pool.close().await });
548
549 {
550 let mut bytes = std::fs::read(&db_path).expect("failed to read db file");
551 for chunk in bytes[100..].chunks_mut(512) {
552 chunk.fill(0xFF);
553 }
554 std::fs::write(&db_path, &bytes).expect("failed to write corrupted db");
555 }
556
557 let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to reopen database");
558 db.init(0)
559 .expect("init should restore from backup on corruption");
560
561 let version = RUNTIME.block_on(async { version::read_db_version(&db.pool).await.unwrap() });
562 assert_eq!(
563 version,
564 Some(get_current_version()),
565 "restored database should be stamped with current version"
566 );
567 }
568
569 #[test]
570 fn test_check_integrity_fails_on_corrupted_database() {
571 let dir = tempfile::Builder::new()
572 .prefix("cadmus-integrity-test-")
573 .tempdir()
574 .expect("failed to create temp dir");
575 let db_path = dir.path().join("corrupt.sqlite");
576
577 let mut db = Database::new(db_path.to_str().unwrap()).expect("failed to create database");
578 db.init(0).expect("failed to run migrations");
579
580 RUNTIME.block_on(async { db.pool.close().await });
581
582 {
583 let mut bytes = std::fs::read(&db_path).expect("failed to read db file");
584 for chunk in bytes[100..].chunks_mut(512) {
585 chunk.fill(0xFF);
586 }
587 std::fs::write(&db_path, &bytes).expect("failed to write corrupted db");
588 }
589
590 let db = Database::new(db_path.to_str().unwrap()).expect("failed to reopen database");
591 let result = RUNTIME.block_on(async { db.check_integrity().await });
592 let err = result.expect_err("integrity check should fail on corrupted database");
593 let err_msg = err.to_string();
594 assert!(
595 err_msg.contains("integrity check failed")
596 || err_msg.contains("PRAGMA quick_check")
597 || err_msg.contains("wal_checkpoint"),
598 "expected integrity-related failure, got: {err_msg}"
599 );
600 }
601
602 fn incompatible_migration_hash() -> version::MigrationHash {
603 blake3::hash(uuid::Uuid::now_v7().as_bytes())
604 .to_hex()
605 .to_string()
606 .parse()
607 .unwrap()
608 }
609}