Skip to main content

cadmus_core/task/
mod.rs

1//! Long-running background task infrastructure.
2//!
3//! This module provides a trait-based system for defining and managing
4//! background tasks that run alongside the main application loop.
5//!
6//! # Architecture
7//!
8//! - [`BackgroundTask`] trait defines the interface for long-running tasks
9//! - [`TaskManager`] spawns and manages task lifecycles
10//! - [`ShutdownSignal`] provides graceful shutdown coordination
11//!
12//! # Example
13//!
14//! ```no_run
15//! use std::sync::mpsc::Sender;
16//! use std::time::Duration;
17//!
18//! use cadmus_core::task::{BackgroundTask, ShutdownSignal, TaskId};
19//! use cadmus_core::view::Event;
20//!
21//! struct MyTask;
22//!
23//! impl BackgroundTask for MyTask {
24//!     fn id(&self) -> TaskId {
25//!         TaskId::Placeholder
26//!     }
27//!
28//!     fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
29//!         while !shutdown.should_stop() {
30//!             // Do work...
31//!             if shutdown.wait(Duration::from_secs(60)) {
32//!                 break;
33//!             }
34//!         }
35//!     }
36//! }
37//! ```
38
39#[cfg(any(feature = "kobo", doc))]
40pub mod auto_frontlight;
41#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
42mod dbus_monitor;
43pub mod dictionary_index;
44#[cfg(any(feature = "test", doc))]
45mod hello_world;
46pub mod import;
47pub mod thumbnail;
48#[cfg(any(feature = "kobo", doc))]
49pub mod time_sync;
50#[cfg(any(feature = "kobo", doc))]
51mod wifi_status_monitor;
52
53use std::collections::{HashMap, VecDeque};
54use std::sync::atomic::{AtomicBool, Ordering};
55use std::sync::mpsc::{self, Receiver, Sender};
56use std::thread::{self, JoinHandle};
57use std::time::Duration;
58
59use thiserror::Error;
60
61use crate::context::Context;
62use crate::db::Database;
63use crate::fl;
64use crate::input::DeviceEvent;
65use crate::settings::Settings;
66use crate::view::{EntryId, Event, NotificationEvent};
67
68/// Errors that can occur during task management operations.
69#[derive(Error, Debug)]
70pub enum TaskError {
71    /// A task with the given ID is already running.
72    #[error("task '{0}' is already running")]
73    AlreadyRunning(TaskId),
74
75    /// A task with the given ID is not running.
76    #[error("task '{0}' is not running")]
77    NotRunning(TaskId),
78}
79
80/// Unique identifier for a background task.
81#[derive(Debug, Clone, PartialEq, Eq, Hash)]
82pub enum TaskId {
83    /// A tmp placeholder until there is a Task always available.
84    Placeholder,
85    /// Library import task.
86    Import,
87    /// Thumbnail extraction background task.
88    ThumbnailExtraction,
89    /// Dictionary index background task.
90    DictionaryIndex,
91    /// The example task that prints periodically (test builds only).
92    #[cfg(any(feature = "test", doc))]
93    HelloWorld,
94    /// D-Bus system bus monitor (test + kobo builds only).
95    #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
96    DbusMonitor,
97    /// WiFi status monitor using dhcpcd-dbus (kobo builds only).
98    #[cfg(any(feature = "kobo", doc))]
99    WifiStatusMonitor,
100    /// Time synchronization via NTP (kobo builds only).
101    #[cfg(any(feature = "kobo", doc))]
102    TimeSync,
103    /// Auto frontlight adjustment (kobo builds only).
104    #[cfg(any(feature = "kobo", doc))]
105    AutoFrontlight,
106    /// Test-only task for unit tests.
107    #[cfg(test)]
108    TestTask,
109    /// Second test-only task for unit tests.
110    #[cfg(test)]
111    TestTask2,
112}
113
114impl std::fmt::Display for TaskId {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self {
117            TaskId::Placeholder => write!(f, "placeholder"),
118            TaskId::Import => write!(f, "import"),
119            TaskId::ThumbnailExtraction => write!(f, "thumbnail_extraction"),
120            TaskId::DictionaryIndex => write!(f, "dictionary_index"),
121            #[cfg(feature = "test")]
122            TaskId::HelloWorld => write!(f, "hello_world"),
123            #[cfg(all(feature = "test", feature = "kobo"))]
124            TaskId::DbusMonitor => write!(f, "dbus_monitor"),
125            #[cfg(feature = "kobo")]
126            TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
127            #[cfg(feature = "kobo")]
128            TaskId::TimeSync => write!(f, "time_sync"),
129            #[cfg(feature = "kobo")]
130            TaskId::AutoFrontlight => write!(f, "auto_frontlight"),
131            #[cfg(test)]
132            TaskId::TestTask => write!(f, "test_task"),
133            #[cfg(test)]
134            TaskId::TestTask2 => write!(f, "test_task_2"),
135        }
136    }
137}
138
139/// Signal for coordinating graceful shutdown of background tasks.
140///
141/// Tasks should periodically check [`should_stop`](Self::should_stop) or use
142/// [`wait`](Self::wait) to interrupt sleep when shutdown is requested.
143pub struct ShutdownSignal {
144    receiver: Receiver<()>,
145    /// Keeps the sender alive when no external owner exists, preventing
146    /// spurious `Disconnected` errors in `wait()`.
147    _sender_anchor: Option<Sender<()>>,
148    stopped: AtomicBool,
149}
150
151impl ShutdownSignal {
152    fn new(receiver: Receiver<()>) -> Self {
153        Self {
154            receiver,
155            _sender_anchor: None,
156            stopped: AtomicBool::new(false),
157        }
158    }
159
160    /// Creates a shutdown signal that never fires.
161    ///
162    /// Intended for use in tests and one-shot contexts where graceful shutdown
163    /// is not needed.
164    pub fn never() -> Self {
165        let (tx, rx) = mpsc::channel();
166        Self {
167            receiver: rx,
168            _sender_anchor: Some(tx),
169            stopped: AtomicBool::new(false),
170        }
171    }
172
173    /// Creates a shutdown signal from a raw receiver, for use in tests.
174    ///
175    /// Prefer [`never`](Self::never) when no shutdown is needed. Use this
176    /// when the test needs to trigger shutdown explicitly by sending `()` on
177    /// the corresponding `Sender`.
178    #[cfg(test)]
179    pub fn new_for_test(receiver: Receiver<()>) -> Self {
180        Self::new(receiver)
181    }
182
183    /// Returns `true` if shutdown has been requested.
184    ///
185    /// Once `true` is returned, all subsequent calls also return `true`
186    /// (the shutdown state is latched). This is non-blocking and suitable
187    /// for polling in tight loops.
188    pub fn should_stop(&self) -> bool {
189        if self.stopped.load(Ordering::Acquire) {
190            return true;
191        }
192        if self.receiver.try_recv().is_ok() {
193            self.stopped.store(true, Ordering::Release);
194            return true;
195        }
196        false
197    }
198
199    /// Waits for the given duration or until shutdown is requested.
200    ///
201    /// Returns `true` if shutdown was requested, `false` if the duration elapsed.
202    ///
203    /// This is the preferred method for tasks that sleep between work cycles.
204    pub fn wait(&self, duration: Duration) -> bool {
205        if self.stopped.load(Ordering::Acquire) {
206            return true;
207        }
208        match self.receiver.recv_timeout(duration) {
209            Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
210                self.stopped.store(true, Ordering::Release);
211                true
212            }
213            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
214        }
215    }
216}
217
218/// A long-running background task.
219///
220/// Implement this trait to define tasks that run in dedicated threads
221/// alongside the main application loop. Tasks receive the event hub
222/// to dispatch events and a shutdown signal for graceful termination.
223pub trait BackgroundTask: Send {
224    /// Returns the unique identifier for this task.
225    fn id(&self) -> TaskId;
226
227    /// Runs the task until shutdown is requested.
228    ///
229    /// This method is called in a dedicated thread. Use `hub` to send
230    /// events to the main loop and `shutdown` to check for termination.
231    fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
232
233    /// Called when the task is being stopped.
234    ///
235    /// Override this to perform cleanup. The default implementation does nothing.
236    fn stop(&mut self) {}
237
238    /// Returns a "finished" event to send after the task thread exits.
239    ///
240    /// The [`TaskManager`] sends this event after
241    /// observing the task's thread as finished. The default returns `None`.
242    fn finished_event(&self) -> Option<Event> {
243        None
244    }
245}
246
247struct RunningTask {
248    handle: JoinHandle<()>,
249    shutdown: Sender<()>,
250    /// Event to emit when the task is observed as naturally finished.
251    finished_event: Option<Event>,
252}
253
254/// Manages the lifecycle of background tasks.
255///
256/// The task manager spawns tasks in dedicated threads and provides
257/// methods to stop individual tasks or all tasks at once.
258pub struct TaskManager {
259    tasks: HashMap<TaskId, RunningTask>,
260    /// Library indices awaiting import while one is already running. The bool is the `force` flag.
261    pending_import_indices: VecDeque<(Option<usize>, bool)>,
262    /// Library indices awaiting thumbnail extraction while a run is in progress.
263    pending_thumbnail_indices: VecDeque<Option<usize>>,
264    /// Events from naturally finished tasks, waiting to be sent.
265    buffered_events: Vec<Event>,
266}
267
268impl TaskManager {
269    /// Creates a new empty task manager.
270    pub fn new() -> Self {
271        Self {
272            tasks: HashMap::new(),
273            pending_import_indices: VecDeque::new(),
274            pending_thumbnail_indices: VecDeque::new(),
275            buffered_events: Vec::new(),
276        }
277    }
278
279    /// Starts a background task in a new thread.
280    ///
281    /// The task receives a clone of `hub` for sending events and a
282    /// [`ShutdownSignal`] for graceful termination.
283    ///
284    /// Returns an error if a task with the same ID is already running.
285    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty
286    ), ret))]
287    pub fn start(
288        &mut self,
289        task: Box<dyn BackgroundTask>,
290        hub: Sender<Event>,
291    ) -> Result<TaskId, TaskError> {
292        let id = task.id();
293
294        #[cfg(feature = "tracing")]
295        tracing::Span::current().record("task_id", tracing::field::display(&id));
296
297        if self.is_running(&id) {
298            return Err(TaskError::AlreadyRunning(id));
299        }
300
301        let (shutdown_tx, shutdown_rx) = mpsc::channel();
302        let shutdown_signal = ShutdownSignal::new(shutdown_rx);
303
304        let finished_event = task.finished_event();
305
306        let handle = thread::spawn(move || {
307            let mut task = task;
308            tracing::info!("task started");
309            task.run(&hub, &shutdown_signal);
310            task.stop();
311            tracing::info!("task stopped");
312        });
313
314        self.tasks.insert(
315            id.clone(),
316            RunningTask {
317                handle,
318                shutdown: shutdown_tx,
319                finished_event,
320            },
321        );
322
323        tracing::info!("task registered");
324        Ok(id)
325    }
326
327    /// Stops a running task by ID.
328    ///
329    /// Sends the shutdown signal and waits for the task thread to finish.
330    /// Returns an error if the task is not running.
331    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_id = %id), ret))]
332    pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
333        self.cleanup_finished();
334        if let Some(task) = self.tasks.remove(id) {
335            tracing::info!("sending shutdown signal");
336            if let Err(e) = task.shutdown.send(()) {
337                tracing::error!(error = %e, "failed to send shutdown signal");
338            }
339            if task.handle.join().is_err() {
340                tracing::error!("task thread panicked");
341            }
342            Ok(())
343        } else {
344            Err(TaskError::NotRunning(id.clone()))
345        }
346    }
347
348    /// Stops all running tasks.
349    ///
350    /// Sends shutdown signals to all tasks and waits for them to finish.
351    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty
352    )))]
353    pub fn stop_all(&mut self) {
354        let tasks: Vec<_> = self.tasks.drain().collect();
355
356        #[cfg(feature = "tracing")]
357        tracing::Span::current().record("task_count", tasks.len());
358
359        if !tasks.is_empty() {
360            tracing::info!("stopping all tasks");
361        }
362        for (_, task) in &tasks {
363            if let Err(e) = task.shutdown.send(()) {
364                tracing::error!(error = %e, "failed to send shutdown signal");
365            }
366        }
367        for (_, task) in tasks {
368            if task.handle.join().is_err() {
369                tracing::error!("task thread panicked");
370            }
371        }
372    }
373
374    /// Removes entries for tasks whose threads have finished, buffering
375    /// their completion events only if the thread exited successfully.
376    fn cleanup_finished(&mut self) {
377        let finished: Vec<TaskId> = self
378            .tasks
379            .iter()
380            .filter(|(_, task)| task.handle.is_finished())
381            .map(|(id, _)| id.clone())
382            .collect();
383
384        for id in finished {
385            if let Some(task) = self.tasks.remove(&id) {
386                if task.handle.join().is_ok() {
387                    if let Some(evt) = task.finished_event {
388                        self.buffered_events.push(evt);
389                    }
390                } else {
391                    tracing::error!(task_id = %id, "task thread panicked");
392                }
393            }
394        }
395    }
396
397    /// Sends any buffered completion events from naturally finished tasks.
398    fn flush_buffered_events(&mut self, hub: &Sender<Event>) {
399        for evt in self.buffered_events.drain(..) {
400            hub.send(evt).ok();
401        }
402    }
403
404    /// Observes an event without consuming it.
405    ///
406    /// Must be called for every event before passing it to the view tree.
407    /// Always returns `false` — it never consumes events.
408    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, hub, context)))]
409    pub fn handle_event(&mut self, evt: &Event, hub: &Sender<Event>, context: &Context) -> bool {
410        self.cleanup_finished();
411        self.flush_buffered_events(hub);
412
413        match evt {
414            Event::ImportLibrary {
415                library_index,
416                force,
417            } => {
418                self.schedule_import(
419                    *library_index,
420                    *force,
421                    hub,
422                    &context.database,
423                    &context.settings,
424                );
425            }
426            Event::ImportFinished { library_index } => {
427                self.drain_pending_imports(hub, &context.database, &context.settings);
428                self.schedule_thumbnail_extraction(
429                    *library_index,
430                    hub,
431                    &context.database,
432                    &context.settings,
433                );
434            }
435            Event::ThumbnailExtractionFinished { .. } => {
436                self.drain_pending_thumbnails(hub, &context.database, &context.settings);
437            }
438            Event::ReindexDictionaries => {
439                self.schedule_dictionary_index(hub, &context.database);
440            }
441            Event::Device(DeviceEvent::NetUp) => {
442                #[cfg(feature = "kobo")]
443                {
444                    if context.settings.auto_time {
445                        self.schedule_time_sync(false, hub);
446                    }
447                }
448            }
449            #[cfg(feature = "kobo")]
450            Event::AutoFrontlightConfigChanged => {
451                self.sync_auto_frontlight(hub, &context.settings);
452            }
453            Event::Select(EntryId::SyncTime) => {
454                #[cfg(feature = "kobo")]
455                {
456                    if !context.online {
457                        hub.send(Event::Notification(NotificationEvent::Show(fl!(
458                            "notification-not-online"
459                        ))))
460                        .ok();
461                    } else {
462                        self.schedule_time_sync(true, hub);
463                    }
464                }
465            }
466            _ => {}
467        }
468        false
469    }
470
471    /// Schedules an import task, queuing the index if one is already running.
472    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
473    fn schedule_import(
474        &mut self,
475        library_index: Option<usize>,
476        force: bool,
477        hub: &Sender<Event>,
478        database: &Database,
479        settings: &Settings,
480    ) {
481        if self.is_running(&TaskId::Import) {
482            tracing::info!(library_index = ?library_index, force, "import already running, queueing");
483            self.pending_import_indices
484                .push_back((library_index, force));
485            return;
486        }
487
488        self.flush_buffered_events(hub);
489
490        let task = Box::new(import::ImportTask::new(
491            database.clone(),
492            settings.clone(),
493            library_index,
494            force,
495        ));
496
497        if let Err(e) = self.start(task, hub.clone()) {
498            tracing::warn!(error = %e, "failed to start import task");
499        }
500    }
501
502    /// Starts the next pending import when the current one finishes.
503    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
504    fn drain_pending_imports(
505        &mut self,
506        hub: &Sender<Event>,
507        database: &Database,
508        settings: &Settings,
509    ) {
510        if self.is_running(&TaskId::Import) || self.pending_import_indices.is_empty() {
511            return;
512        }
513
514        let Some((next, force)) = self.pending_import_indices.pop_front() else {
515            return;
516        };
517        self.schedule_import(next, force, hub, database, settings);
518    }
519
520    /// Schedules a dictionary index scan, stopping any running instance first.
521    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
522    fn schedule_dictionary_index(&mut self, hub: &Sender<Event>, database: &Database) {
523        if self.is_running(&TaskId::DictionaryIndex) {
524            tracing::debug!("stopping running dictionary index task for restart");
525            if let Err(e) = self.stop(&TaskId::DictionaryIndex) {
526                tracing::warn!(error = %e, "failed to stop dictionary_index task for restart");
527            }
528        }
529
530        self.flush_buffered_events(hub);
531
532        let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
533
534        if let Err(e) = self.start(task, hub.clone()) {
535            tracing::warn!(error = %e, "failed to start dictionary_index task");
536        }
537    }
538
539    #[cfg(feature = "kobo")]
540    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
541    fn sync_auto_frontlight(&mut self, hub: &Sender<Event>, settings: &Settings) {
542        if self.is_running(&TaskId::AutoFrontlight) {
543            tracing::debug!("stopping running auto_frontlight task for restart");
544            if let Err(e) = self.stop(&TaskId::AutoFrontlight) {
545                tracing::warn!(error = %e, "failed to stop auto_frontlight task for restart");
546            }
547        }
548
549        if !settings.auto_frontlight {
550            return;
551        }
552
553        self.flush_buffered_events(hub);
554
555        let task = Box::new(auto_frontlight::AutoFrontlightTask);
556        if let Err(e) = self.start(task, hub.clone()) {
557            tracing::warn!(error = %e, "failed to start auto_frontlight task");
558        }
559    }
560
561    #[cfg(feature = "kobo")]
562    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
563    fn schedule_time_sync(&mut self, manual: bool, hub: &Sender<Event>) {
564        if self.is_running(&TaskId::TimeSync) {
565            tracing::warn!("Time sync task already running, not scheduling");
566
567            return;
568        }
569
570        match crate::device::CURRENT_DEVICE.time_manager() {
571            Ok(time_manager) => {
572                let task = Box::new(time_sync::TimeSyncTask::new(time_manager, manual));
573                if let Err(e) = self.start(task, hub.clone()) {
574                    tracing::warn!(error = %e, "failed to start time sync task");
575                }
576            }
577            Err(e) => {
578                tracing::warn!(error = %e, "time manager unavailable, cannot sync time");
579            }
580        }
581    }
582
583    /// Schedules a thumbnail extraction task, queuing the index if one is already running.
584    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
585    pub fn schedule_thumbnail_extraction(
586        &mut self,
587        library_index: Option<usize>,
588        hub: &Sender<Event>,
589        database: &Database,
590        settings: &Settings,
591    ) {
592        if self.is_running(&TaskId::ThumbnailExtraction) {
593            tracing::info!(library_index = ?library_index, "thumbnail extraction already running, queueing");
594            self.pending_thumbnail_indices.push_back(library_index);
595            return;
596        }
597
598        self.flush_buffered_events(hub);
599
600        let task = Box::new(thumbnail::ThumbnailExtractionTask::new(
601            database.clone(),
602            settings.clone(),
603            library_index,
604        ));
605
606        if let Err(e) = self.start(task, hub.clone()) {
607            tracing::warn!(error = %e, "failed to start thumbnail extraction task");
608        }
609    }
610
611    /// Starts the next pending thumbnail extraction when the current one finishes.
612    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
613    fn drain_pending_thumbnails(
614        &mut self,
615        hub: &Sender<Event>,
616        database: &Database,
617        settings: &Settings,
618    ) {
619        if self.is_running(&TaskId::ThumbnailExtraction)
620            || self.pending_thumbnail_indices.is_empty()
621        {
622            return;
623        }
624
625        let Some(next) = self.pending_thumbnail_indices.pop_front() else {
626            return;
627        };
628        self.schedule_thumbnail_extraction(next, hub, database, settings);
629    }
630
631    /// Returns `true` if a task with the given ID is running.
632    pub fn is_running(&mut self, id: &TaskId) -> bool {
633        self.cleanup_finished();
634        self.tasks.contains_key(id)
635    }
636
637    /// Returns the IDs of all running tasks.
638    pub fn running_tasks(&mut self) -> Vec<TaskId> {
639        self.cleanup_finished();
640        self.tasks.keys().cloned().collect()
641    }
642}
643
644impl Default for TaskManager {
645    fn default() -> Self {
646        Self::new()
647    }
648}
649
650impl Drop for TaskManager {
651    fn drop(&mut self) {
652        self.stop_all();
653    }
654}
655
656/// Registers background tasks that run at startup.
657///
658/// Call this during startup to add background tasks.
659/// Currently registers:
660/// - [`wifi_status_monitor::WifiStatusMonitorTask`] - monitors WiFi status via dhcpcd-dbus (kobo only)
661/// - [`hello_world::HelloWorldTask`] - prints "Hello world!" every minute (test only)
662/// - [`dbus_monitor::DbusMonitorTask`] - monitors D-Bus signals (test + kobo only, when `settings.logging.enable_dbus_log` is true)
663/// - [`import::ImportTask`] - runs an incremental import of all libraries on startup
664/// - [`dictionary_index::DictionaryIndexTask`] - indexes `.index` dictionary files into SQLite
665pub fn register_startup_tasks(
666    manager: &mut TaskManager,
667    hub: Sender<Event>,
668    settings: &Settings,
669    database: &Database,
670) {
671    #[cfg(feature = "kobo")]
672    {
673        {
674            let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
675            if let Err(e) = manager.start(task, hub.clone()) {
676                tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
677            }
678        }
679        if settings.auto_frontlight {
680            let task = Box::new(auto_frontlight::AutoFrontlightTask);
681            if let Err(e) = manager.start(task, hub.clone()) {
682                tracing::warn!(error = %e, "failed to start auto_frontlight task");
683            }
684        }
685    }
686
687    #[cfg(feature = "test")]
688    {
689        let task = Box::new(hello_world::HelloWorldTask);
690        if let Err(e) = manager.start(task, hub.clone()) {
691            tracing::warn!(error = %e, "failed to start hello_world task");
692        }
693
694        #[cfg(feature = "kobo")]
695        if settings.logging.enable_dbus_log {
696            let task = Box::new(dbus_monitor::DbusMonitorTask);
697            if let Err(e) = manager.start(task, hub.clone()) {
698                tracing::warn!(error = %e, "failed to start dbus_monitor task");
699            }
700        }
701    }
702
703    manager.schedule_import(None, false, &hub, database, settings);
704
705    let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
706    if let Err(e) = manager.start(task, hub.clone()) {
707        tracing::warn!(error = %e, "failed to start dictionary_index task");
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use crate::context::test_helpers::create_test_context;
715    use std::sync::mpsc;
716    use std::time::{Duration, Instant};
717
718    fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
719        let deadline = Instant::now() + Duration::from_secs(5);
720        while Instant::now() < deadline {
721            if !manager.is_running(id) {
722                return;
723            }
724            std::thread::sleep(Duration::from_millis(1));
725        }
726        panic!("task '{id}' did not finish within timeout");
727    }
728
729    struct InstantTask;
730
731    impl BackgroundTask for InstantTask {
732        fn id(&self) -> TaskId {
733            TaskId::TestTask2
734        }
735
736        fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
737    }
738
739    struct WaitingTask;
740
741    impl BackgroundTask for WaitingTask {
742        fn id(&self) -> TaskId {
743            TaskId::TestTask
744        }
745
746        fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
747            shutdown.wait(Duration::from_secs(60));
748        }
749    }
750
751    #[test]
752    fn start_and_stop() {
753        let mut manager = TaskManager::new();
754        let (hub, _rx) = mpsc::channel();
755
756        let id = manager.start(Box::new(WaitingTask), hub).unwrap();
757        assert!(manager.is_running(&id));
758
759        manager.stop(&id).unwrap();
760        assert!(!manager.is_running(&id));
761    }
762
763    #[test]
764    fn duplicate_start_returns_error() {
765        let mut manager = TaskManager::new();
766        let (hub, _rx) = mpsc::channel();
767
768        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
769        let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
770
771        assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
772    }
773
774    #[test]
775    fn finished_task_is_cleaned_up() {
776        let mut manager = TaskManager::new();
777        let (hub, _rx) = mpsc::channel();
778
779        let id = manager.start(Box::new(InstantTask), hub).unwrap();
780
781        wait_until_not_running(&mut manager, &id);
782        assert!(!manager.is_running(&id));
783    }
784
785    #[test]
786    fn stop_finished_task_returns_not_running() {
787        let mut manager = TaskManager::new();
788        let (hub, _rx) = mpsc::channel();
789
790        let id = manager.start(Box::new(InstantTask), hub).unwrap();
791
792        wait_until_not_running(&mut manager, &id);
793        let err = manager.stop(&id).unwrap_err();
794
795        assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
796    }
797
798    #[test]
799    fn running_tasks_excludes_finished() {
800        let mut manager = TaskManager::new();
801        let (hub, _rx) = mpsc::channel();
802
803        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
804        let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
805
806        wait_until_not_running(&mut manager, &instant_id);
807        let running = manager.running_tasks();
808
809        assert_eq!(running.len(), 1);
810        assert_eq!(running[0], TaskId::TestTask);
811
812        manager.stop_all();
813    }
814
815    #[test]
816    fn stop_all_stops_everything() {
817        let mut manager = TaskManager::new();
818        let (hub, _rx) = mpsc::channel();
819
820        manager.start(Box::new(WaitingTask), hub).unwrap();
821        manager.stop_all();
822
823        assert!(!manager.is_running(&TaskId::TestTask));
824    }
825
826    #[test]
827    fn test_thumbnail_extraction_task_lifecycle() {
828        let mut manager = TaskManager::new();
829        let (hub, _rx) = mpsc::channel();
830        let mut database = Database::new(":memory:").unwrap();
831        database.init(0).unwrap();
832        let settings = Settings::default();
833
834        manager.schedule_thumbnail_extraction(None, &hub, &database, &settings);
835
836        // Task exits quickly on an unseeded database, so wait for
837        // completion rather than asserting the transient running state.
838        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
839        assert!(!manager.is_running(&TaskId::ThumbnailExtraction));
840
841        let err = manager.stop(&TaskId::ThumbnailExtraction).unwrap_err();
842        assert!(matches!(
843            err,
844            TaskError::NotRunning(TaskId::ThumbnailExtraction)
845        ));
846    }
847
848    #[test]
849    fn thumbnail_extraction_queues_when_running() {
850        let mut manager = TaskManager::new();
851        let (hub, _rx) = mpsc::channel();
852
853        // Simulate a running ThumbnailExtraction task with a blocking thread.
854        let (shutdown_tx, shutdown_rx) = mpsc::channel();
855        let blocking_handle = thread::spawn(move || {
856            let _ = shutdown_rx.recv();
857        });
858        manager.tasks.insert(
859            TaskId::ThumbnailExtraction,
860            RunningTask {
861                handle: blocking_handle,
862                shutdown: shutdown_tx,
863                finished_event: None,
864            },
865        );
866
867        let mut database = Database::new(":memory:").unwrap();
868        database.init(0).unwrap();
869        let settings = Settings::default();
870
871        manager.schedule_thumbnail_extraction(Some(0), &hub, &database, &settings);
872        manager.schedule_thumbnail_extraction(Some(1), &hub, &database, &settings);
873
874        assert_eq!(manager.pending_thumbnail_indices.len(), 2);
875
876        manager.stop(&TaskId::ThumbnailExtraction).unwrap();
877
878        manager.drain_pending_thumbnails(&hub, &database, &settings);
879        assert_eq!(manager.pending_thumbnail_indices.len(), 1);
880
881        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
882
883        manager.drain_pending_thumbnails(&hub, &database, &settings);
884        assert!(manager.pending_thumbnail_indices.is_empty());
885
886        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
887    }
888
889    #[test]
890    fn import_queue_preserves_force_flag() {
891        let mut manager = TaskManager::new();
892        let (hub, _rx) = mpsc::channel();
893        let context = create_test_context();
894
895        // Simulate a running import task with a blocking thread.
896        let (shutdown_tx, shutdown_rx) = mpsc::channel();
897        let blocking_handle = thread::spawn(move || {
898            let _ = shutdown_rx.recv();
899        });
900        manager.tasks.insert(
901            TaskId::Import,
902            RunningTask {
903                handle: blocking_handle,
904                shutdown: shutdown_tx,
905                finished_event: None,
906            },
907        );
908
909        manager.handle_event(
910            &Event::ImportLibrary {
911                library_index: Some(0),
912                force: true,
913            },
914            &hub,
915            &context,
916        );
917
918        assert_eq!(
919            manager.pending_import_indices.front(),
920            Some(&(Some(0), true))
921        );
922
923        manager.stop(&TaskId::Import).unwrap();
924    }
925
926    #[test]
927    fn import_queue_preserves_force_false_flag() {
928        let mut manager = TaskManager::new();
929        let (hub, _rx) = mpsc::channel();
930        let context = create_test_context();
931
932        let (shutdown_tx, shutdown_rx) = mpsc::channel();
933        let blocking_handle = thread::spawn(move || {
934            let _ = shutdown_rx.recv();
935        });
936        manager.tasks.insert(
937            TaskId::Import,
938            RunningTask {
939                handle: blocking_handle,
940                shutdown: shutdown_tx,
941                finished_event: None,
942            },
943        );
944
945        manager.handle_event(
946            &Event::ImportLibrary {
947                library_index: None,
948                force: false,
949            },
950            &hub,
951            &context,
952        );
953
954        assert_eq!(manager.pending_import_indices.front(), Some(&(None, false)));
955
956        manager.stop(&TaskId::Import).unwrap();
957    }
958}