1use std::collections::HashSet;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::num::NonZeroU64;
8use std::sync::mpsc::Sender;
9
10use globset::Glob;
11use walkdir::WalkDir;
12
13use crate::context::DICTIONARIES_DIRNAME;
14use crate::db::Database;
15use crate::db::runtime::RUNTIME;
16use crate::device::CURRENT_DEVICE;
17use crate::dictionary::{Entry, Metadata, normalize};
18use crate::fl;
19use crate::helpers::{Fingerprint, IsHidden};
20use crate::task::{BackgroundTask, ShutdownSignal, TaskId};
21use crate::view::notification::NotificationEvent;
22use crate::view::{Event, ID_FEEDER, ViewId};
23
24const BATCH_SIZE: usize = 5000;
25
26struct IndexFileJob<'a> {
27 index_path: &'a std::path::Path,
28 path_str: &'a str,
29 dict_id: i64,
30 dict_name: &'a str,
31 total_lines: u64,
32 notif_id: ViewId,
33 metadata: Metadata,
34}
35
36fn decode_number(word: &str) -> Option<u64> {
55 let mut index = 0u64;
56 for (i, ch) in word.chars().rev().enumerate() {
57 let base: u64 = match ch {
58 'A'..='Z' => (ch as u64) - 65,
59 'a'..='z' => (ch as u64) - 71,
60 '0'..='9' => (ch as u64) + 4,
61 '+' => 62,
62 '/' => 63,
63 _ => return None,
64 };
65 index += base * 64u64.pow(i as u32);
66 }
67 Some(index)
68}
69
70pub struct DictionaryIndexTask {
75 database: Database,
76}
77
78impl DictionaryIndexTask {
79 pub fn new(database: Database) -> Self {
81 Self { database }
82 }
83
84 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %path_str)))]
89 fn detect_metadata(path_str: &str) -> (bool, bool) {
90 let file = match File::open(path_str) {
91 Ok(f) => f,
92 Err(e) => {
93 tracing::error!(path = %path_str, error = %e, "failed to open index file for metadata detection");
94 return (false, false);
95 }
96 };
97
98 let mut all_chars = false;
99 let mut case_sensitive = false;
100
101 for line in BufReader::new(file).lines() {
102 let line = match line {
103 Ok(l) => l,
104 Err(_) => continue,
105 };
106
107 let word = line.split('\t').next().unwrap_or("");
108
109 if word.is_empty() {
110 continue;
111 } else if word == "00-database-allchars" {
112 all_chars = true;
113 } else if word == "00-database-case-sensitive" || word == "00databasecasesensitive" {
114 case_sensitive = true;
115 } else if !word.starts_with("00-database-") && !word.starts_with("00database") {
116 break;
117 }
118
119 if all_chars && case_sensitive {
120 break;
121 }
122 }
123
124 (case_sensitive, all_chars)
125 }
126
127 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(path = %path_str, fingerprint = %fp_str)))]
133 fn resolve_index_state(
134 &self,
135 index_path: &std::path::Path,
136 path_str: &str,
137 fp_str: &str,
138 ) -> Option<(i64, u64, u64, bool)> {
139 let pool = self.database.pool().clone();
140
141 let meta = RUNTIME.block_on(async {
142 sqlx::query!(
143 r#"SELECT dict_id, total_lines, indexed_lines, completed
144 FROM dictionary_index_meta
145 WHERE fingerprint = ?"#,
146 fp_str,
147 )
148 .fetch_optional(&pool)
149 .await
150 });
151
152 let meta = match meta {
153 Ok(m) => m,
154 Err(e) => {
155 tracing::error!(path = %path_str, fingerprint = %fp_str, error = %e, "failed to query dictionary_index_meta");
156 return None;
157 }
158 };
159
160 if let Some(row) = meta {
161 if row.completed != 0 {
162 tracing::debug!(path = %path_str, fingerprint = %fp_str, "dictionary already indexed, skipping");
163 return None;
164 }
165
166 return Some((
167 row.dict_id?,
168 row.indexed_lines as u64,
169 row.total_lines as u64,
170 false,
171 ));
172 }
173
174 let file = match File::open(index_path) {
175 Ok(f) => f,
176 Err(e) => {
177 tracing::error!(path = %path_str, error = %e, "failed to open index file for line count");
178 return None;
179 }
180 };
181
182 let total = BufReader::new(file).lines().count() as i64;
183
184 let result = RUNTIME.block_on(async {
185 sqlx::query!(
186 r#"INSERT INTO dictionary_index_meta (fingerprint, dict_path, total_lines, indexed_lines, completed)
187 VALUES (?, ?, ?, 0, 0)"#,
188 fp_str,
189 path_str,
190 total,
191 )
192 .execute(&pool)
193 .await
194 });
195
196 if let Err(e) = result {
197 tracing::error!(path = %path_str, error = %e, "failed to insert dictionary_index_meta row");
198 return None;
199 }
200
201 let dict_id: i64 = RUNTIME.block_on(async {
202 sqlx::query_scalar!(
203 "SELECT dict_id FROM dictionary_index_meta WHERE fingerprint = ?",
204 fp_str
205 )
206 .fetch_one(&pool)
207 .await
208 .ok()?
209 })?;
210
211 Some((dict_id, 0u64, total as u64, true))
212 }
213
214 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(path = %path_str, dict_id, indexed = current_line, total = total_lines)))]
216 fn mark_completed(&self, dict_id: i64, path_str: &str, current_line: u64, total_lines: u64) {
217 let pool = self.database.pool().clone();
218
219 let result = RUNTIME.block_on(async {
220 sqlx::query!(
221 "UPDATE dictionary_index_meta SET completed = 1 WHERE dict_id = ?",
222 dict_id,
223 )
224 .execute(&pool)
225 .await
226 });
227
228 if let Err(e) = result {
229 tracing::error!(path = %path_str, error = %e, "failed to mark dictionary as completed");
230 return;
231 }
232
233 tracing::info!(path = %path_str, indexed = current_line, total = total_lines, "dictionary index complete");
234 }
235
236 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %path_str)))]
244 fn parse_index_line<'a>(path_str: &str, line: &'a str) -> Option<(&'a str, i64, i64)> {
245 let trimmed = line.trim_end();
246 let mut cols = trimmed.split('\t');
247
248 let word = cols.next()?;
249
250 let offset_str = cols.next()?;
251 let offset = match decode_number(offset_str) {
252 Some(o) => o as i64,
253 None => {
254 tracing::error!(path = %path_str, word, offset_str, "failed to decode offset");
255 return None;
256 }
257 };
258
259 let size_str = cols.next()?;
260 let size = match decode_number(size_str) {
261 Some(s) => s as i64,
262 None => {
263 tracing::error!(path = %path_str, word, size_str, "failed to decode size");
264 return None;
265 }
266 };
267
268 Some((word, offset, size))
269 }
270
271 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %job.path_str, skip_lines, total_lines = job.total_lines)))]
277 fn scan_and_batch(
278 &self,
279 job: &IndexFileJob<'_>,
280 skip_lines: u64,
281 hub: &Sender<Event>,
282 shutdown: &ShutdownSignal,
283 ) -> Option<u64> {
284 let file = match File::open(job.index_path) {
285 Ok(f) => f,
286 Err(e) => {
287 tracing::error!(path = %job.path_str, error = %e, "failed to open index file");
288 return None;
289 }
290 };
291
292 let reader = BufReader::new(file);
293 let mut lines_iter = reader.lines().enumerate();
294
295 for _ in 0..skip_lines {
296 lines_iter.next();
297 }
298
299 let mut current_line = skip_lines;
300 let mut raw_batch: Vec<Entry> = Vec::with_capacity(BATCH_SIZE);
301
302 for (_, line_result) in &mut lines_iter {
303 let line = match line_result {
304 Ok(l) => l,
305 Err(e) => {
306 tracing::error!(path = %job.path_str, line = current_line, error = %e, "failed to read line");
307 current_line += 1;
308 continue;
309 }
310 };
311
312 current_line += 1;
313
314 if let Some((word, offset, size)) = Self::parse_index_line(job.path_str, &line) {
315 raw_batch.push(Entry {
316 headword: word.to_string(),
317 offset: offset as u64,
318 size: size as u64,
319 original: None,
320 });
321 }
322
323 if raw_batch.len() >= BATCH_SIZE {
324 let normalized = normalize(&raw_batch, &job.metadata);
325 let batch: Vec<(i64, String, i64, i64, Option<String>)> = normalized
326 .into_iter()
327 .map(|e| {
328 (
329 job.dict_id,
330 e.headword,
331 e.offset as i64,
332 e.size as i64,
333 e.original,
334 )
335 })
336 .collect();
337
338 if let Err(e) = self.flush_batch(job, &batch, current_line, hub) {
339 tracing::error!(path = %job.path_str, error = %e, "failed to flush batch");
340 return None;
341 }
342
343 raw_batch.clear();
344
345 if shutdown.should_stop() {
346 return None;
347 }
348 }
349 }
350
351 if !raw_batch.is_empty() {
352 let normalized = normalize(&raw_batch, &job.metadata);
353 let batch: Vec<(i64, String, i64, i64, Option<String>)> = normalized
354 .into_iter()
355 .map(|e| {
356 (
357 job.dict_id,
358 e.headword,
359 e.offset as i64,
360 e.size as i64,
361 e.original,
362 )
363 })
364 .collect();
365
366 if let Err(e) = self.flush_batch(job, &batch, current_line, hub) {
367 tracing::error!(path = %job.path_str, error = %e, "failed to flush final batch");
368 return None;
369 }
370 }
371
372 Some(current_line)
373 }
374
375 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(path = %index_path.display())))]
376 fn index_file(
377 &self,
378 index_path: &std::path::Path,
379 hub: &Sender<Event>,
380 shutdown: &ShutdownSignal,
381 ) {
382 let path_str = index_path.display().to_string();
383
384 let dict_name = index_path
385 .file_stem()
386 .map(|s| s.to_string_lossy().into_owned())
387 .unwrap_or_else(|| path_str.clone());
388
389 let fp = match index_path.fingerprint() {
390 Ok(fp) => fp,
391 Err(e) => {
392 tracing::error!(path = %path_str, error = %e, "failed to fingerprint index file");
393 return;
394 }
395 };
396
397 let fp_str = fp.to_string();
398
399 let (dict_id, skip_lines, total_lines, is_new) =
400 match self.resolve_index_state(index_path, &path_str, &fp_str) {
401 Some(state) => state,
402 None => {
403 return;
404 }
405 };
406
407 if is_new {
408 hub.send(Event::ReloadDictionaries).ok();
409 }
410
411 let (case_sensitive, all_chars) = Self::detect_metadata(&path_str);
412 let metadata = Metadata {
413 case_sensitive,
414 all_chars,
415 };
416
417 let notif_id = ViewId::MessageNotif(ID_FEEDER.next());
418 hub.send(Event::Notification(NotificationEvent::ShowPinned(
419 notif_id,
420 fl!(
421 "notification-dictionary-indexing",
422 name = dict_name.as_str()
423 ),
424 )))
425 .ok();
426
427 let job = IndexFileJob {
428 index_path,
429 path_str: &path_str,
430 dict_id,
431 dict_name: &dict_name,
432 total_lines,
433 notif_id,
434 metadata,
435 };
436
437 tracing::debug!(path = %path_str, dict_id, skip_lines, total_lines, case_sensitive, all_chars, "starting dictionary indexing");
438
439 match self.scan_and_batch(&job, skip_lines, hub, shutdown) {
440 Some(current_line) => {
441 self.mark_completed(dict_id, &path_str, current_line, total_lines);
442 hub.send(Event::ReloadDictionaries).ok();
443 hub.send(Event::Close(notif_id)).ok();
444 }
445 None => {
446 hub.send(Event::Close(notif_id)).ok();
447 }
448 }
449 }
450
451 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(batch_size = batch.len(), current_line, total_lines = job.total_lines)))]
452 fn flush_batch(
453 &self,
454 job: &IndexFileJob<'_>,
455 batch: &[(i64, String, i64, i64, Option<String>)],
456 current_line: u64,
457 hub: &Sender<Event>,
458 ) -> Result<(), anyhow::Error> {
459 let pool = self.database.pool().clone();
460 let indexed_lines = current_line as i64;
461
462 RUNTIME.block_on(async {
463 let mut tx = pool.begin().await?;
464
465 for (dict_id, word, offset, size, original) in batch {
466 sqlx::query!(
467 r#"INSERT OR IGNORE INTO dictionary_index_entry (dict_id, word, offset, size, original)
468 VALUES (?, ?, ?, ?, ?)"#,
469 dict_id,
470 word,
471 offset,
472 size,
473 original,
474 )
475 .execute(&mut *tx)
476 .await?;
477 }
478
479 sqlx::query!(
480 "UPDATE dictionary_index_meta SET indexed_lines = ? WHERE dict_id = ?",
481 indexed_lines,
482 job.dict_id,
483 )
484 .execute(&mut *tx)
485 .await?;
486
487 tx.commit().await?;
488
489 Ok::<_, anyhow::Error>(())
490 })?;
491
492 let progress = NonZeroU64::new(job.total_lines)
493 .and_then(|total_lines| {
494 current_line
495 .checked_mul(100)
496 .map(|value| value / total_lines.get())
497 })
498 .unwrap_or(0)
499 .min(100) as u8;
500 let msg = fl!("notification-dictionary-indexing", name = job.dict_name);
501 hub.send(Event::Notification(NotificationEvent::UpdateText(
502 job.notif_id,
503 msg,
504 )))
505 .ok();
506 hub.send(Event::Notification(NotificationEvent::UpdateProgress(
507 job.notif_id,
508 progress,
509 )))
510 .ok();
511
512 Ok(())
513 }
514
515 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(on_disk_count = on_disk_fingerprints.len())))]
524 fn delete_stale_entries(
525 &self,
526 on_disk_fingerprints: &[String],
527 hub: &Sender<Event>,
528 shutdown: &ShutdownSignal,
529 ) {
530 let pool = self.database.pool().clone();
531
532 let result = RUNTIME.block_on(async {
533 let on_disk_set: HashSet<&str> =
534 on_disk_fingerprints.iter().map(|s| s.as_str()).collect();
535
536 let db_entries = sqlx::query!(
537 "SELECT fingerprint, dict_id FROM dictionary_index_meta"
538 )
539 .fetch_all(&pool)
540 .await?;
541
542 let mut deleted_any = false;
543
544 for row in db_entries {
545 let fp = row.fingerprint;
546
547 if on_disk_set.contains(fp.as_str()) {
548 continue;
549 }
550
551 let dict_id = match row.dict_id {
552 Some(id) => id,
553 None => {
554 tracing::warn!(fingerprint = %fp, "dict_id missing for stale fingerprint, skipping");
555 continue;
556 }
557 };
558
559 tracing::info!(fingerprint = %fp, "removing stale dictionary index");
560
561 sqlx::query!(
562 "UPDATE dictionary_index_meta SET completed = 0, indexed_lines = 0 WHERE dict_id = ?",
563 dict_id,
564 )
565 .execute(&pool)
566 .await?;
567
568 let total_deleted =
569 delete_entries_for_dict(&pool, dict_id, shutdown).await?;
570
571 tracing::info!(fingerprint = %fp, total_deleted, "deleted stale dictionary index entries");
572
573 sqlx::query!(
574 "DELETE FROM dictionary_index_meta WHERE fingerprint = ?",
575 fp
576 )
577 .execute(&pool)
578 .await?;
579
580 deleted_any = true;
581
582 if shutdown.should_stop() {
583 break;
584 }
585 }
586
587 Ok::<_, anyhow::Error>(deleted_any)
588 });
589
590 match result {
591 Ok(true) => {
592 hub.send(Event::ReloadDictionaries).ok();
593 }
594 Ok(false) => {}
595 Err(e) => {
596 tracing::error!(error = %e, "failed to delete stale dictionary index entries");
597 }
598 }
599 }
600}
601
602#[cfg_attr(
611 feature = "tracing",
612 tracing::instrument(skip(pool, shutdown), fields(dict_id))
613)]
614async fn delete_entries_for_dict(
615 pool: &sqlx::SqlitePool,
616 dict_id: i64,
617 shutdown: &ShutdownSignal,
618) -> Result<u64, anyhow::Error> {
619 let batch_size = BATCH_SIZE as i64;
620 let mut total_deleted: u64 = 0;
621
622 loop {
623 let rows_affected = sqlx::query!(
624 "DELETE FROM dictionary_index_entry WHERE dict_id = ? LIMIT ?",
625 dict_id,
626 batch_size,
627 )
628 .execute(pool)
629 .await?
630 .rows_affected();
631
632 if rows_affected == 0 {
633 break;
634 }
635
636 total_deleted += rows_affected;
637
638 if shutdown.should_stop() {
639 tracing::info!(total_deleted, "entry deletion interrupted by shutdown");
640 return Ok(total_deleted);
641 }
642 }
643
644 Ok(total_deleted)
645}
646
647impl BackgroundTask for DictionaryIndexTask {
648 fn id(&self) -> TaskId {
649 TaskId::DictionaryIndex
650 }
651
652 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
653 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
654 let glob = match Glob::new("**/*.index") {
655 Ok(g) => g.compile_matcher(),
656 Err(e) => {
657 tracing::error!(error = %e, "failed to compile glob pattern for dictionary index task");
658 return;
659 }
660 };
661
662 let path = CURRENT_DEVICE.data_path(DICTIONARIES_DIRNAME);
663
664 let mut on_disk_fingerprints: Vec<String> = Vec::new();
665
666 for entry in WalkDir::new(path)
667 .min_depth(1)
668 .into_iter()
669 .filter_entry(|e| !e.is_hidden())
670 {
671 if shutdown.should_stop() {
672 return;
673 }
674
675 let entry = match entry {
676 Ok(e) => e,
677 Err(e) => {
678 tracing::error!(error = %e, "failed to read directory entry");
679 continue;
680 }
681 };
682
683 if !glob.is_match(entry.path()) {
684 continue;
685 }
686
687 if let Ok(fp) = entry.path().fingerprint() {
688 on_disk_fingerprints.push(fp.to_string());
689 }
690
691 self.index_file(entry.path(), hub, shutdown);
692 }
693
694 if shutdown.should_stop() {
695 return;
696 }
697
698 self.delete_stale_entries(&on_disk_fingerprints, hub, shutdown);
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use crate::db::{Database, runtime::RUNTIME};
706
707 fn setup_db() -> Database {
708 let mut db = Database::new(":memory:").expect("failed to create in-memory database");
709 db.init(0).expect("failed to run migrations");
710 db
711 }
712
713 async fn insert_meta(pool: &sqlx::SqlitePool, fingerprint: &str) -> i64 {
714 sqlx::query_scalar!(
715 "INSERT INTO dictionary_index_meta (fingerprint, dict_path, total_lines) VALUES (?, ?, ?) RETURNING dict_id",
716 fingerprint,
717 fingerprint,
718 0_i64,
719 )
720 .fetch_one(pool)
721 .await
722 .expect("failed to insert meta")
723 }
724
725 async fn insert_entry(pool: &sqlx::SqlitePool, dict_id: i64, word: &str, offset: i64) {
726 sqlx::query!(
727 "INSERT INTO dictionary_index_entry (dict_id, word, offset, size) VALUES (?, ?, ?, 0)",
728 dict_id,
729 word,
730 offset,
731 )
732 .execute(pool)
733 .await
734 .expect("failed to insert entry");
735 }
736
737 async fn count_entries(pool: &sqlx::SqlitePool, dict_id: i64) -> i64 {
738 sqlx::query_scalar!(
739 "SELECT COUNT(*) FROM dictionary_index_entry WHERE dict_id = ?",
740 dict_id,
741 )
742 .fetch_one(pool)
743 .await
744 .expect("failed to count entries")
745 }
746
747 #[test]
748 fn test_delete_entries_for_dict_removes_all_entries() {
749 let db = setup_db();
750 let pool = db.pool();
751 let shutdown = ShutdownSignal::never();
752
753 RUNTIME.block_on(async {
754 let dict_id = insert_meta(pool, "all-entries").await;
755 for i in 0..5_i64 {
756 insert_entry(pool, dict_id, "word", i).await;
757 }
758
759 let deleted = delete_entries_for_dict(pool, dict_id, &shutdown)
760 .await
761 .expect("delete should succeed");
762
763 assert_eq!(deleted, 5);
764 assert_eq!(count_entries(pool, dict_id).await, 0);
765 });
766 }
767
768 #[test]
769 fn test_delete_entries_for_dict_only_removes_target_dict() {
770 let db = setup_db();
771 let pool = db.pool();
772 let shutdown = ShutdownSignal::never();
773
774 RUNTIME.block_on(async {
775 let dict_a = insert_meta(pool, "dict-a").await;
776 let dict_b = insert_meta(pool, "dict-b").await;
777
778 insert_entry(pool, dict_a, "apple", 0).await;
779 insert_entry(pool, dict_b, "banana", 0).await;
780 insert_entry(pool, dict_b, "cherry", 0).await;
781
782 let deleted = delete_entries_for_dict(pool, dict_a, &shutdown)
783 .await
784 .expect("delete should succeed");
785
786 assert_eq!(deleted, 1);
787 assert_eq!(count_entries(pool, dict_a).await, 0);
788 assert_eq!(count_entries(pool, dict_b).await, 2);
789 });
790 }
791}