1#[cfg(any(feature = "kobo", doc))]
40pub mod auto_frontlight;
41#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
42mod dbus_monitor;
43pub mod dictionary_index;
44#[cfg(any(feature = "test", doc))]
45mod hello_world;
46pub mod import;
47pub mod thumbnail;
48#[cfg(any(feature = "kobo", doc))]
49pub mod time_sync;
50#[cfg(any(feature = "kobo", doc))]
51mod wifi_status_monitor;
52
53use std::collections::{HashMap, VecDeque};
54use std::sync::atomic::{AtomicBool, Ordering};
55use std::sync::mpsc::{self, Receiver, Sender};
56use std::thread::{self, JoinHandle};
57use std::time::Duration;
58
59use thiserror::Error;
60
61use crate::context::Context;
62use crate::db::Database;
63use crate::fl;
64use crate::input::DeviceEvent;
65use crate::settings::Settings;
66use crate::view::{EntryId, Event, NotificationEvent};
67
68#[derive(Error, Debug)]
70pub enum TaskError {
71 #[error("task '{0}' is already running")]
73 AlreadyRunning(TaskId),
74
75 #[error("task '{0}' is not running")]
77 NotRunning(TaskId),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Hash)]
82pub enum TaskId {
83 Placeholder,
85 Import,
87 ThumbnailExtraction,
89 DictionaryIndex,
91 #[cfg(any(feature = "test", doc))]
93 HelloWorld,
94 #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
96 DbusMonitor,
97 #[cfg(any(feature = "kobo", doc))]
99 WifiStatusMonitor,
100 #[cfg(any(feature = "kobo", doc))]
102 TimeSync,
103 #[cfg(any(feature = "kobo", doc))]
105 AutoFrontlight,
106 #[cfg(test)]
108 TestTask,
109 #[cfg(test)]
111 TestTask2,
112}
113
114impl std::fmt::Display for TaskId {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self {
117 TaskId::Placeholder => write!(f, "placeholder"),
118 TaskId::Import => write!(f, "import"),
119 TaskId::ThumbnailExtraction => write!(f, "thumbnail_extraction"),
120 TaskId::DictionaryIndex => write!(f, "dictionary_index"),
121 #[cfg(feature = "test")]
122 TaskId::HelloWorld => write!(f, "hello_world"),
123 #[cfg(all(feature = "test", feature = "kobo"))]
124 TaskId::DbusMonitor => write!(f, "dbus_monitor"),
125 #[cfg(feature = "kobo")]
126 TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
127 #[cfg(feature = "kobo")]
128 TaskId::TimeSync => write!(f, "time_sync"),
129 #[cfg(feature = "kobo")]
130 TaskId::AutoFrontlight => write!(f, "auto_frontlight"),
131 #[cfg(test)]
132 TaskId::TestTask => write!(f, "test_task"),
133 #[cfg(test)]
134 TaskId::TestTask2 => write!(f, "test_task_2"),
135 }
136 }
137}
138
139pub struct ShutdownSignal {
144 receiver: Receiver<()>,
145 _sender_anchor: Option<Sender<()>>,
148 stopped: AtomicBool,
149}
150
151impl ShutdownSignal {
152 fn new(receiver: Receiver<()>) -> Self {
153 Self {
154 receiver,
155 _sender_anchor: None,
156 stopped: AtomicBool::new(false),
157 }
158 }
159
160 pub fn never() -> Self {
165 let (tx, rx) = mpsc::channel();
166 Self {
167 receiver: rx,
168 _sender_anchor: Some(tx),
169 stopped: AtomicBool::new(false),
170 }
171 }
172
173 #[cfg(test)]
179 pub fn new_for_test(receiver: Receiver<()>) -> Self {
180 Self::new(receiver)
181 }
182
183 pub fn should_stop(&self) -> bool {
189 if self.stopped.load(Ordering::Acquire) {
190 return true;
191 }
192 if self.receiver.try_recv().is_ok() {
193 self.stopped.store(true, Ordering::Release);
194 return true;
195 }
196 false
197 }
198
199 pub fn wait(&self, duration: Duration) -> bool {
205 if self.stopped.load(Ordering::Acquire) {
206 return true;
207 }
208 match self.receiver.recv_timeout(duration) {
209 Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
210 self.stopped.store(true, Ordering::Release);
211 true
212 }
213 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
214 }
215 }
216}
217
218pub trait BackgroundTask: Send {
224 fn id(&self) -> TaskId;
226
227 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
232
233 fn stop(&mut self) {}
237
238 fn finished_event(&self) -> Option<Event> {
243 None
244 }
245}
246
247struct RunningTask {
248 handle: JoinHandle<()>,
249 shutdown: Sender<()>,
250 finished_event: Option<Event>,
252}
253
254pub struct TaskManager {
259 tasks: HashMap<TaskId, RunningTask>,
260 pending_import_indices: VecDeque<(Option<usize>, bool)>,
262 pending_thumbnail_indices: VecDeque<Option<usize>>,
264 buffered_events: Vec<Event>,
266}
267
268impl TaskManager {
269 pub fn new() -> Self {
271 Self {
272 tasks: HashMap::new(),
273 pending_import_indices: VecDeque::new(),
274 pending_thumbnail_indices: VecDeque::new(),
275 buffered_events: Vec::new(),
276 }
277 }
278
279 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty
286 ), ret))]
287 pub fn start(
288 &mut self,
289 task: Box<dyn BackgroundTask>,
290 hub: Sender<Event>,
291 ) -> Result<TaskId, TaskError> {
292 let id = task.id();
293
294 #[cfg(feature = "tracing")]
295 tracing::Span::current().record("task_id", tracing::field::display(&id));
296
297 if self.is_running(&id) {
298 return Err(TaskError::AlreadyRunning(id));
299 }
300
301 let (shutdown_tx, shutdown_rx) = mpsc::channel();
302 let shutdown_signal = ShutdownSignal::new(shutdown_rx);
303
304 let finished_event = task.finished_event();
305
306 let handle = thread::spawn(move || {
307 let mut task = task;
308 tracing::info!("task started");
309 task.run(&hub, &shutdown_signal);
310 task.stop();
311 tracing::info!("task stopped");
312 });
313
314 self.tasks.insert(
315 id.clone(),
316 RunningTask {
317 handle,
318 shutdown: shutdown_tx,
319 finished_event,
320 },
321 );
322
323 tracing::info!("task registered");
324 Ok(id)
325 }
326
327 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_id = %id), ret))]
332 pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
333 self.cleanup_finished();
334 if let Some(task) = self.tasks.remove(id) {
335 tracing::info!("sending shutdown signal");
336 if let Err(e) = task.shutdown.send(()) {
337 tracing::error!(error = %e, "failed to send shutdown signal");
338 }
339 if task.handle.join().is_err() {
340 tracing::error!("task thread panicked");
341 }
342 Ok(())
343 } else {
344 Err(TaskError::NotRunning(id.clone()))
345 }
346 }
347
348 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty
352 )))]
353 pub fn stop_all(&mut self) {
354 let tasks: Vec<_> = self.tasks.drain().collect();
355
356 #[cfg(feature = "tracing")]
357 tracing::Span::current().record("task_count", tasks.len());
358
359 if !tasks.is_empty() {
360 tracing::info!("stopping all tasks");
361 }
362 for (_, task) in &tasks {
363 if let Err(e) = task.shutdown.send(()) {
364 tracing::error!(error = %e, "failed to send shutdown signal");
365 }
366 }
367 for (_, task) in tasks {
368 if task.handle.join().is_err() {
369 tracing::error!("task thread panicked");
370 }
371 }
372 }
373
374 fn cleanup_finished(&mut self) {
377 let finished: Vec<TaskId> = self
378 .tasks
379 .iter()
380 .filter(|(_, task)| task.handle.is_finished())
381 .map(|(id, _)| id.clone())
382 .collect();
383
384 for id in finished {
385 if let Some(task) = self.tasks.remove(&id) {
386 if task.handle.join().is_ok() {
387 if let Some(evt) = task.finished_event {
388 self.buffered_events.push(evt);
389 }
390 } else {
391 tracing::error!(task_id = %id, "task thread panicked");
392 }
393 }
394 }
395 }
396
397 fn flush_buffered_events(&mut self, hub: &Sender<Event>) {
399 for evt in self.buffered_events.drain(..) {
400 hub.send(evt).ok();
401 }
402 }
403
404 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, hub, context)))]
409 pub fn handle_event(&mut self, evt: &Event, hub: &Sender<Event>, context: &Context) -> bool {
410 self.cleanup_finished();
411 self.flush_buffered_events(hub);
412
413 match evt {
414 Event::ImportLibrary {
415 library_index,
416 force,
417 } => {
418 self.schedule_import(
419 *library_index,
420 *force,
421 hub,
422 &context.database,
423 &context.settings,
424 );
425 }
426 Event::ImportFinished { library_index } => {
427 self.drain_pending_imports(hub, &context.database, &context.settings);
428 self.schedule_thumbnail_extraction(
429 *library_index,
430 hub,
431 &context.database,
432 &context.settings,
433 );
434 }
435 Event::ThumbnailExtractionFinished { .. } => {
436 self.drain_pending_thumbnails(hub, &context.database, &context.settings);
437 }
438 Event::ReindexDictionaries => {
439 self.schedule_dictionary_index(hub, &context.database);
440 }
441 Event::Device(DeviceEvent::NetUp) => {
442 #[cfg(feature = "kobo")]
443 {
444 if context.settings.auto_time {
445 self.schedule_time_sync(false, hub);
446 }
447 }
448 }
449 #[cfg(feature = "kobo")]
450 Event::AutoFrontlightConfigChanged => {
451 self.sync_auto_frontlight(hub, &context.settings);
452 }
453 Event::Select(EntryId::SyncTime) => {
454 #[cfg(feature = "kobo")]
455 {
456 if !context.online {
457 hub.send(Event::Notification(NotificationEvent::Show(fl!(
458 "notification-not-online"
459 ))))
460 .ok();
461 } else {
462 self.schedule_time_sync(true, hub);
463 }
464 }
465 }
466 _ => {}
467 }
468 false
469 }
470
471 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
473 fn schedule_import(
474 &mut self,
475 library_index: Option<usize>,
476 force: bool,
477 hub: &Sender<Event>,
478 database: &Database,
479 settings: &Settings,
480 ) {
481 if self.is_running(&TaskId::Import) {
482 tracing::info!(library_index = ?library_index, force, "import already running, queueing");
483 self.pending_import_indices
484 .push_back((library_index, force));
485 return;
486 }
487
488 self.flush_buffered_events(hub);
489
490 let task = Box::new(import::ImportTask::new(
491 database.clone(),
492 settings.clone(),
493 library_index,
494 force,
495 ));
496
497 if let Err(e) = self.start(task, hub.clone()) {
498 tracing::warn!(error = %e, "failed to start import task");
499 }
500 }
501
502 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
504 fn drain_pending_imports(
505 &mut self,
506 hub: &Sender<Event>,
507 database: &Database,
508 settings: &Settings,
509 ) {
510 if self.is_running(&TaskId::Import) || self.pending_import_indices.is_empty() {
511 return;
512 }
513
514 let Some((next, force)) = self.pending_import_indices.pop_front() else {
515 return;
516 };
517 self.schedule_import(next, force, hub, database, settings);
518 }
519
520 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
522 fn schedule_dictionary_index(&mut self, hub: &Sender<Event>, database: &Database) {
523 if self.is_running(&TaskId::DictionaryIndex) {
524 tracing::debug!("stopping running dictionary index task for restart");
525 if let Err(e) = self.stop(&TaskId::DictionaryIndex) {
526 tracing::warn!(error = %e, "failed to stop dictionary_index task for restart");
527 }
528 }
529
530 self.flush_buffered_events(hub);
531
532 let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
533
534 if let Err(e) = self.start(task, hub.clone()) {
535 tracing::warn!(error = %e, "failed to start dictionary_index task");
536 }
537 }
538
539 #[cfg(feature = "kobo")]
540 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
541 fn sync_auto_frontlight(&mut self, hub: &Sender<Event>, settings: &Settings) {
542 if self.is_running(&TaskId::AutoFrontlight) {
543 tracing::debug!("stopping running auto_frontlight task for restart");
544 if let Err(e) = self.stop(&TaskId::AutoFrontlight) {
545 tracing::warn!(error = %e, "failed to stop auto_frontlight task for restart");
546 }
547 }
548
549 if !settings.auto_frontlight {
550 return;
551 }
552
553 self.flush_buffered_events(hub);
554
555 let task = Box::new(auto_frontlight::AutoFrontlightTask);
556 if let Err(e) = self.start(task, hub.clone()) {
557 tracing::warn!(error = %e, "failed to start auto_frontlight task");
558 }
559 }
560
561 #[cfg(feature = "kobo")]
562 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
563 fn schedule_time_sync(&mut self, manual: bool, hub: &Sender<Event>) {
564 if self.is_running(&TaskId::TimeSync) {
565 tracing::warn!("Time sync task already running, not scheduling");
566
567 return;
568 }
569
570 match crate::device::CURRENT_DEVICE.time_manager() {
571 Ok(time_manager) => {
572 let task = Box::new(time_sync::TimeSyncTask::new(time_manager, manual));
573 if let Err(e) = self.start(task, hub.clone()) {
574 tracing::warn!(error = %e, "failed to start time sync task");
575 }
576 }
577 Err(e) => {
578 tracing::warn!(error = %e, "time manager unavailable, cannot sync time");
579 }
580 }
581 }
582
583 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
585 pub fn schedule_thumbnail_extraction(
586 &mut self,
587 library_index: Option<usize>,
588 hub: &Sender<Event>,
589 database: &Database,
590 settings: &Settings,
591 ) {
592 if self.is_running(&TaskId::ThumbnailExtraction) {
593 tracing::info!(library_index = ?library_index, "thumbnail extraction already running, queueing");
594 self.pending_thumbnail_indices.push_back(library_index);
595 return;
596 }
597
598 self.flush_buffered_events(hub);
599
600 let task = Box::new(thumbnail::ThumbnailExtractionTask::new(
601 database.clone(),
602 settings.clone(),
603 library_index,
604 ));
605
606 if let Err(e) = self.start(task, hub.clone()) {
607 tracing::warn!(error = %e, "failed to start thumbnail extraction task");
608 }
609 }
610
611 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
613 fn drain_pending_thumbnails(
614 &mut self,
615 hub: &Sender<Event>,
616 database: &Database,
617 settings: &Settings,
618 ) {
619 if self.is_running(&TaskId::ThumbnailExtraction)
620 || self.pending_thumbnail_indices.is_empty()
621 {
622 return;
623 }
624
625 let Some(next) = self.pending_thumbnail_indices.pop_front() else {
626 return;
627 };
628 self.schedule_thumbnail_extraction(next, hub, database, settings);
629 }
630
631 pub fn is_running(&mut self, id: &TaskId) -> bool {
633 self.cleanup_finished();
634 self.tasks.contains_key(id)
635 }
636
637 pub fn running_tasks(&mut self) -> Vec<TaskId> {
639 self.cleanup_finished();
640 self.tasks.keys().cloned().collect()
641 }
642}
643
644impl Default for TaskManager {
645 fn default() -> Self {
646 Self::new()
647 }
648}
649
650impl Drop for TaskManager {
651 fn drop(&mut self) {
652 self.stop_all();
653 }
654}
655
656pub fn register_startup_tasks(
666 manager: &mut TaskManager,
667 hub: Sender<Event>,
668 settings: &Settings,
669 database: &Database,
670) {
671 #[cfg(feature = "kobo")]
672 {
673 {
674 let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
675 if let Err(e) = manager.start(task, hub.clone()) {
676 tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
677 }
678 }
679 if settings.auto_frontlight {
680 let task = Box::new(auto_frontlight::AutoFrontlightTask);
681 if let Err(e) = manager.start(task, hub.clone()) {
682 tracing::warn!(error = %e, "failed to start auto_frontlight task");
683 }
684 }
685 }
686
687 #[cfg(feature = "test")]
688 {
689 let task = Box::new(hello_world::HelloWorldTask);
690 if let Err(e) = manager.start(task, hub.clone()) {
691 tracing::warn!(error = %e, "failed to start hello_world task");
692 }
693
694 #[cfg(feature = "kobo")]
695 if settings.logging.enable_dbus_log {
696 let task = Box::new(dbus_monitor::DbusMonitorTask);
697 if let Err(e) = manager.start(task, hub.clone()) {
698 tracing::warn!(error = %e, "failed to start dbus_monitor task");
699 }
700 }
701 }
702
703 manager.schedule_import(None, false, &hub, database, settings);
704
705 let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
706 if let Err(e) = manager.start(task, hub.clone()) {
707 tracing::warn!(error = %e, "failed to start dictionary_index task");
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use crate::context::test_helpers::create_test_context;
715 use std::sync::mpsc;
716 use std::time::{Duration, Instant};
717
718 fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
719 let deadline = Instant::now() + Duration::from_secs(5);
720 while Instant::now() < deadline {
721 if !manager.is_running(id) {
722 return;
723 }
724 std::thread::sleep(Duration::from_millis(1));
725 }
726 panic!("task '{id}' did not finish within timeout");
727 }
728
729 struct InstantTask;
730
731 impl BackgroundTask for InstantTask {
732 fn id(&self) -> TaskId {
733 TaskId::TestTask2
734 }
735
736 fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
737 }
738
739 struct WaitingTask;
740
741 impl BackgroundTask for WaitingTask {
742 fn id(&self) -> TaskId {
743 TaskId::TestTask
744 }
745
746 fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
747 shutdown.wait(Duration::from_secs(60));
748 }
749 }
750
751 #[test]
752 fn start_and_stop() {
753 let mut manager = TaskManager::new();
754 let (hub, _rx) = mpsc::channel();
755
756 let id = manager.start(Box::new(WaitingTask), hub).unwrap();
757 assert!(manager.is_running(&id));
758
759 manager.stop(&id).unwrap();
760 assert!(!manager.is_running(&id));
761 }
762
763 #[test]
764 fn duplicate_start_returns_error() {
765 let mut manager = TaskManager::new();
766 let (hub, _rx) = mpsc::channel();
767
768 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
769 let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
770
771 assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
772 }
773
774 #[test]
775 fn finished_task_is_cleaned_up() {
776 let mut manager = TaskManager::new();
777 let (hub, _rx) = mpsc::channel();
778
779 let id = manager.start(Box::new(InstantTask), hub).unwrap();
780
781 wait_until_not_running(&mut manager, &id);
782 assert!(!manager.is_running(&id));
783 }
784
785 #[test]
786 fn stop_finished_task_returns_not_running() {
787 let mut manager = TaskManager::new();
788 let (hub, _rx) = mpsc::channel();
789
790 let id = manager.start(Box::new(InstantTask), hub).unwrap();
791
792 wait_until_not_running(&mut manager, &id);
793 let err = manager.stop(&id).unwrap_err();
794
795 assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
796 }
797
798 #[test]
799 fn running_tasks_excludes_finished() {
800 let mut manager = TaskManager::new();
801 let (hub, _rx) = mpsc::channel();
802
803 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
804 let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
805
806 wait_until_not_running(&mut manager, &instant_id);
807 let running = manager.running_tasks();
808
809 assert_eq!(running.len(), 1);
810 assert_eq!(running[0], TaskId::TestTask);
811
812 manager.stop_all();
813 }
814
815 #[test]
816 fn stop_all_stops_everything() {
817 let mut manager = TaskManager::new();
818 let (hub, _rx) = mpsc::channel();
819
820 manager.start(Box::new(WaitingTask), hub).unwrap();
821 manager.stop_all();
822
823 assert!(!manager.is_running(&TaskId::TestTask));
824 }
825
826 #[test]
827 fn test_thumbnail_extraction_task_lifecycle() {
828 let mut manager = TaskManager::new();
829 let (hub, _rx) = mpsc::channel();
830 let mut database = Database::new(":memory:").unwrap();
831 database.init(0).unwrap();
832 let settings = Settings::default();
833
834 manager.schedule_thumbnail_extraction(None, &hub, &database, &settings);
835
836 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
839 assert!(!manager.is_running(&TaskId::ThumbnailExtraction));
840
841 let err = manager.stop(&TaskId::ThumbnailExtraction).unwrap_err();
842 assert!(matches!(
843 err,
844 TaskError::NotRunning(TaskId::ThumbnailExtraction)
845 ));
846 }
847
848 #[test]
849 fn thumbnail_extraction_queues_when_running() {
850 let mut manager = TaskManager::new();
851 let (hub, _rx) = mpsc::channel();
852
853 let (shutdown_tx, shutdown_rx) = mpsc::channel();
855 let blocking_handle = thread::spawn(move || {
856 let _ = shutdown_rx.recv();
857 });
858 manager.tasks.insert(
859 TaskId::ThumbnailExtraction,
860 RunningTask {
861 handle: blocking_handle,
862 shutdown: shutdown_tx,
863 finished_event: None,
864 },
865 );
866
867 let mut database = Database::new(":memory:").unwrap();
868 database.init(0).unwrap();
869 let settings = Settings::default();
870
871 manager.schedule_thumbnail_extraction(Some(0), &hub, &database, &settings);
872 manager.schedule_thumbnail_extraction(Some(1), &hub, &database, &settings);
873
874 assert_eq!(manager.pending_thumbnail_indices.len(), 2);
875
876 manager.stop(&TaskId::ThumbnailExtraction).unwrap();
877
878 manager.drain_pending_thumbnails(&hub, &database, &settings);
879 assert_eq!(manager.pending_thumbnail_indices.len(), 1);
880
881 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
882
883 manager.drain_pending_thumbnails(&hub, &database, &settings);
884 assert!(manager.pending_thumbnail_indices.is_empty());
885
886 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
887 }
888
889 #[test]
890 fn import_queue_preserves_force_flag() {
891 let mut manager = TaskManager::new();
892 let (hub, _rx) = mpsc::channel();
893 let context = create_test_context();
894
895 let (shutdown_tx, shutdown_rx) = mpsc::channel();
897 let blocking_handle = thread::spawn(move || {
898 let _ = shutdown_rx.recv();
899 });
900 manager.tasks.insert(
901 TaskId::Import,
902 RunningTask {
903 handle: blocking_handle,
904 shutdown: shutdown_tx,
905 finished_event: None,
906 },
907 );
908
909 manager.handle_event(
910 &Event::ImportLibrary {
911 library_index: Some(0),
912 force: true,
913 },
914 &hub,
915 &context,
916 );
917
918 assert_eq!(
919 manager.pending_import_indices.front(),
920 Some(&(Some(0), true))
921 );
922
923 manager.stop(&TaskId::Import).unwrap();
924 }
925
926 #[test]
927 fn import_queue_preserves_force_false_flag() {
928 let mut manager = TaskManager::new();
929 let (hub, _rx) = mpsc::channel();
930 let context = create_test_context();
931
932 let (shutdown_tx, shutdown_rx) = mpsc::channel();
933 let blocking_handle = thread::spawn(move || {
934 let _ = shutdown_rx.recv();
935 });
936 manager.tasks.insert(
937 TaskId::Import,
938 RunningTask {
939 handle: blocking_handle,
940 shutdown: shutdown_tx,
941 finished_event: None,
942 },
943 );
944
945 manager.handle_event(
946 &Event::ImportLibrary {
947 library_index: None,
948 force: false,
949 },
950 &hub,
951 &context,
952 );
953
954 assert_eq!(manager.pending_import_indices.front(), Some(&(None, false)));
955
956 manager.stop(&TaskId::Import).unwrap();
957 }
958}