Glossarion / GlossaryManager.py
Shirochi's picture
Upload 93 files
ec038f4 verified
# -*- coding: utf-8 -*-
# This is for automatic glossary generation only, unrelated to the more thorough glossary generation you get from clicking the "Extract Glossary" button
import os
import re
import os
import sys
import threading
import tempfile
import queue
import time
import json
from bs4 import BeautifulSoup
import PatternManager as PM
import duplicate_detection_config as ddc
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
# Default unified auto-glossary prompt (used when AUTO_GLOSSARY_PROMPT is unset/empty).
# NOTE: This matches the GUI's default_unified_prompt in GlossaryManager_GUI.py.
DEFAULT_AUTO_GLOSARY_PROMPT3 = """You are a novel glossary extraction assistant.
You must strictly return ONLY CSV format with 2-4 columns in this exact order: type,raw_name,translated_name,gender,description.
For character entries, determine gender from context, leave empty if context is insufficient.
For non-character entries, leave gender empty.
The description column is optional and can contain brief context (role, location, significance).
Critical Requirement: The translated name and description column must be in {language}.
For example:
character,ᫀ이히리ᄐ 나애,Dihirit Ade,female,The enigmatic guild leader of the Shadow Lotus who operates from the concealed backrooms of the capital, manipulating city politics through commerce and wielding dual daggers with lethal precision
character,ᫀ뢔사난,Kim Sang-hyu,male,A master swordsman from the Northern Sect known for his icy demeanor and unparalleled skill with the Frost Blade technique which he uses to defend the border fortress
CRITICAL EXTRACTION RULES:
- Extract All Character names, Terms, Location names, Ability/Skill names, Item names, Organization names, and Titles/Ranks.
- Do NOT extract sentences, dialogue, actions, questions, or statements as glossary entries
- REJECT entries that contain verbs or end with punctuation (?, !, .)
- REJECT entries starting with: "Me", "How", "What", "Why", "I", "He", "She", "They", "That's", "So", "Therefore", "Still", "But", "Protagonist". (The description column is excluded from this restriction)
- Do NOT output any entries that are rejected by the above rules; skip them entirely
- If unsure whether something is a proper noun/name, skip it
- The description column must contain detailed context/explanation
- Create at least one glossary entry for EVERY context marker window (lines ending with "=== CONTEXT N END ==="); treat each marker boundary as a required extraction point.
- You must create {marker} glossary entries (one or more per window; do not invent placeholders).
- You must include absolutely all characters found in the provided text in your glossary generation. Do not skip any character."""
# Class-level shared lock for API submission timing
_api_submission_lock = threading.Lock()
_last_api_submission_time = 0
_results_lock = threading.Lock()
_file_write_lock = threading.Lock()
_stop_requested = False
# Register watchdog cleanup once per process (best-effort)
_watchdog_atexit_registered = False
BOOK_TITLE_RAW = None
BOOK_TITLE_TRANSLATED = None
BOOK_TITLE_VALUE = None # Legacy support if needed, or remove? Keeping for safety but won't use.
def _extract_title_from_metadata(meta):
"""Best-effort lookup of a book title inside metadata structures."""
if not isinstance(meta, dict):
return None
title_keys = [
"title",
"book_title",
"bookTitle",
"title_translated",
"translated_title",
"title_en",
]
for key in title_keys:
val = meta.get(key)
if val:
return str(val).strip()
for nested_key in ("metadata", "opf", "info", "data"):
nested = meta.get(nested_key)
if isinstance(nested, dict):
nested_title = _extract_title_from_metadata(nested)
if nested_title:
return nested_title
return None
def _extract_raw_title_from_epub(epub_path):
"""Extract the raw untranslated title from the input EPUB content.opf."""
if not epub_path or not os.path.exists(epub_path):
return None
print(f"[Metadata] Checking input EPUB for raw title: {epub_path}")
# Try manual parsing first (more robust)
try:
import zipfile
with zipfile.ZipFile(epub_path, 'r') as zf:
# Find opf
opf_name = next((n for n in zf.namelist() if n.lower().endswith('.opf')), None)
if opf_name:
content = zf.read(opf_name).decode('utf-8', errors='ignore')
# Use BS4 with xml parser
try:
soup = BeautifulSoup(content, 'xml')
except Exception:
soup = BeautifulSoup(content, 'html.parser')
# Try dc:title
title_tag = soup.find('dc:title')
if not title_tag:
# Fallback to any title tag
title_tag = soup.find('title')
if title_tag:
val = title_tag.get_text(strip=True)
if val:
return val
except Exception as e:
print(f"[Warning] Manual EPUB title extraction failed: {e}")
# Fallback: ebooklib
try:
from ebooklib import epub
book = epub.read_epub(epub_path)
titles = book.get_metadata("DC", "title")
if titles:
val = titles[0][0]
if val:
return str(val).strip()
except Exception as e:
print(f"[Warning] Could not read EPUB metadata via ebooklib: {e}")
return None
def _extract_translated_title_from_metadata(output_dir):
"""Extract translated title from metadata.json in output directory."""
base_dir = os.path.abspath(output_dir or ".")
epub_path = os.getenv("EPUB_PATH", "")
epub_base = os.path.splitext(os.path.basename(epub_path or ""))[0] if epub_path else None
candidates = []
# Only check output directory logic for translated title
if epub_base:
candidates.append(os.path.join(base_dir, epub_base, "metadata.json"))
# Also check direct output dir
candidates.append(os.path.join(base_dir, "metadata.json"))
for meta_path in candidates:
# print(f"[Metadata] Checking for translated book title at: {meta_path}")
if os.path.exists(meta_path):
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
meta_title = _extract_title_from_metadata(meta)
if meta_title:
return meta_title.strip()
except Exception as e:
print(f"[Warning] Could not read metadata.json for book title: {e}")
return None
def _derive_book_title(output_dir):
"""Legacy wrapper - logic moved to save_glossary main flow."""
return None
def _ensure_book_title_csv_lines(csv_lines):
"""
Ensure the CSV (header + rows) contains a leading book title entry when enabled.
Uses distinct raw and translated titles.
"""
if not csv_lines:
return csv_lines
include = os.getenv("GLOSSARY_INCLUDE_BOOK_TITLE", "1").lower() not in ("0", "false", "no")
raw_title = BOOK_TITLE_RAW
trans_title = BOOK_TITLE_TRANSLATED
# If we don't have BOTH, we can't create a perfect entry.
# But user said "no scenarios with untranslated and untranslated".
# So if one is missing, we might skip OR just use what we have?
# User said "we only need untranslated text and translated text".
# Assuming if both aren't available, we might default to what we have but prefer distinct.
# Logic: if we have raw but no translated, use raw for both? No, user hates that.
# But if we literally don't have a translation, we can't invent one.
# The requirement seems to be: Get the CORRECT source for each field.
if not include:
return csv_lines
if not raw_title and not trans_title:
return csv_lines
# Normalize for dedup check
norm_raw = raw_title.lower() if raw_title else ""
norm_trans = trans_title.lower() if trans_title else ""
# Skip if already present
header = csv_lines[0]
for line in csv_lines[1:]:
parts = [p.strip() for p in line.split(",")]
if len(parts) >= 3:
# Check if this line is already the book title
p_raw = parts[1].lower()
p_trans = parts[2].lower()
# Match if we find our raw title or our translated title in the respective columns
if (raw_title and p_raw == norm_raw) or (trans_title and p_trans == norm_trans):
return csv_lines
fields = [f.strip() for f in header.split(",")]
row = []
for field in fields:
key = field.lower()
if key == "type":
row.append("book")
elif key == "raw_name":
row.append(raw_title if raw_title else (trans_title if trans_title else ""))
elif key == "translated_name":
row.append(trans_title if trans_title else (raw_title if raw_title else ""))
else:
row.append("")
book_line = ",".join(row)
return [header, book_line] + csv_lines[1:]
def _csv_sort_key(line: str):
"""Sort book first, then characters, then others by raw name."""
try:
parts = line.split(",")
entry_type = parts[0].strip().lower()
name = parts[1].lower() if len(parts) > 1 else line.lower()
except Exception:
entry_type = ""
name = line.lower()
order = {"book": -1, "character": 0, "term": 1}
return (order.get(entry_type, 2), name)
# Timing variables
_extraction_time = 0
_api_time = 0
_freq_check_time = 0
_dedup_time = 0
_io_time = 0
def _get_stop_file_path():
"""Return the stop-flag file path (shared across processes)."""
return os.environ.get("GLOSSARY_STOP_FILE") or os.path.join(tempfile.gettempdir(), "glossarion_glossary.stop")
def _get_glossary_status_file_path() -> str:
"""File path for cross-process status about chunk submission/completion.
This lets the parent process decide whether it's safe to "wait for chunks" even when
WAIT_FOR_CHUNKS is disabled.
"""
try:
explicit = os.environ.get("GLOSSARY_STATUS_FILE")
if explicit:
return explicit
except Exception:
pass
# Default: colocate next to the stop file so both processes can find it deterministically.
try:
stop_fp = _get_stop_file_path()
if stop_fp:
return f"{stop_fp}.status.json"
except Exception:
pass
return os.path.join(tempfile.gettempdir(), "glossarion_glossary.status.json")
def _write_glossary_status(payload: dict) -> None:
"""Best-effort atomic write of glossary chunk status."""
try:
fp = _get_glossary_status_file_path()
os.makedirs(os.path.dirname(fp) or ".", exist_ok=True)
tmp = f"{fp}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
os.replace(tmp, fp)
except Exception:
# Status is best-effort only.
pass
def _clear_api_watchdog_state(*, remove_watchdog_file: bool = True) -> None:
"""Best-effort reset of unified_api_client watchdog state.
GlossaryManager often runs in a separate process; if it exits mid-stream or is force-stopped,
its watchdog JSON file can keep the GUI progress bar "busy" until manually cleared.
"""
# Reset in-memory counters
try:
import unified_api_client
if hasattr(unified_api_client, '_api_watchdog_reset'):
unified_api_client._api_watchdog_reset()
except Exception:
pass
# Remove the per-process watchdog file (if enabled)
if remove_watchdog_file:
try:
wd_dir = os.environ.get("GLOSSARION_WATCHDOG_DIR")
if wd_dir and os.path.isdir(wd_dir):
fp = os.path.join(wd_dir, f"api_watchdog_{os.getpid()}.json")
tmp = f"{fp}.tmp"
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
try:
if os.path.exists(fp):
os.remove(fp)
except Exception:
pass
except Exception:
pass
def set_stop_flag(value: bool):
"""Set the module-level stop flag and propagate to shared channels."""
global _stop_requested
_stop_requested = bool(value)
# Mirror to environment for other components
os.environ["TRANSLATION_CANCELLED"] = "1" if value else "0"
# If we're stopping, clear watchdog immediately so the GUI bar doesn't stick.
# (If graceful-stop semantics are needed, the caller should avoid setting stop until ready.)
if value:
_clear_api_watchdog_state(remove_watchdog_file=True)
# Touch/remove stop file for cross-process signalling
stop_path = _get_stop_file_path()
try:
if value:
with open(stop_path, "w", encoding="utf-8") as f:
f.write("stop")
else:
if os.path.exists(stop_path):
os.remove(stop_path)
except Exception:
pass
# Notify unified_api_client if present
try:
import unified_api_client
if hasattr(unified_api_client, "UnifiedClient"):
unified_api_client.UnifiedClient._global_cancelled = bool(value)
if hasattr(unified_api_client, "global_stop_flag"):
unified_api_client.global_stop_flag = bool(value)
except Exception:
pass
# Function to check if stop is requested (can be overridden)
def is_stop_requested():
"""Check if stop has been requested from any source.
NOTE: TRANSLATION_CANCELLED is set on BOTH graceful and immediate stop.
During graceful stop we must let in-flight API calls finish, so we only
treat it as a stop signal when GRACEFUL_STOP is not active. When
graceful stop IS active, the orchestrator in TransateKRtoEN handles the
decision of whether to wait or cancel.
"""
if _stop_requested:
return True
# Environment toggle (set by GUI stop button)
# Only treat as immediate stop when GRACEFUL_STOP is not active
if os.environ.get("TRANSLATION_CANCELLED") == "1":
if os.environ.get("GRACEFUL_STOP") != "1":
return True
# File-based stop flag for cross-process cancellation
try:
stop_path = _get_stop_file_path()
if stop_path and os.path.exists(stop_path):
return True
except Exception:
pass
# Unified API client global cancellation
try:
import unified_api_client
if getattr(unified_api_client, "global_stop_flag", False):
return True
if hasattr(unified_api_client, "UnifiedClient") and getattr(unified_api_client.UnifiedClient, "_global_cancelled", False):
return True
except Exception:
pass
return False
def set_output_redirect(log_callback=None):
"""Redirect print statements to a callback function for GUI integration"""
if log_callback:
import threading
class CallbackWriter:
def __init__(self, callback):
self.callback = callback
self.main_thread = threading.main_thread()
def write(self, text):
if text.strip():
# The callback (append_log) is already thread-safe - it handles QTimer internally
# So we can call it directly from any thread
self.callback(text.strip())
def flush(self):
pass
sys.stdout = CallbackWriter(log_callback)
def is_traditional_translation_api(model: str) -> bool:
"""Check if the model is a traditional translation API"""
return model in ['deepl', 'google-translate', 'google-translate-free'] or model.startswith('deepl/') or model.startswith('google-translate/')
def _model_uses_own_auth(model: str) -> bool:
"""Check if the model uses its own authentication (no API key needed).
authgpt/ uses OAuth tokens, vertex/ uses Google service account credentials."""
if not model:
return False
m = model.lower()
return m.startswith('authgpt/') or m.startswith('vertex/')
def _ensure_multi_key_config_loaded():
"""Best-effort load of multi-key config when running in subprocesses.
In subprocesses, in-memory key lists are not inherited. If multi-key mode is
enabled via env but no keys are present, load them from config.json and
initialize UnifiedClient's in-memory pool.
"""
try:
if os.getenv('USE_MULTI_API_KEYS', '0') != '1':
return
except Exception:
return
# If keys are already present in env or in-memory, nothing to do.
try:
mk_env = os.getenv('MULTI_API_KEYS', '')
if mk_env and str(mk_env).strip() not in ('', '[]', 'null', 'None'):
return
except Exception:
pass
try:
import unified_api_client as _uac
with _uac.UnifiedClient._in_memory_multi_keys_lock:
if _uac.UnifiedClient._in_memory_multi_keys:
return
except Exception:
pass
# Try to load from config.json in common locations.
cfg_paths = []
try:
cfg_env = os.getenv('CONFIG_FILE')
if cfg_env:
cfg_paths.append(cfg_env)
except Exception:
pass
try:
cfg_paths.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json"))
except Exception:
pass
try:
cfg_paths.append(os.path.join(os.getcwd(), "config.json"))
except Exception:
pass
# Deduplicate while preserving order
seen = set()
candidates = []
for p in cfg_paths:
if not p:
continue
p_norm = os.path.abspath(p)
if p_norm in seen:
continue
seen.add(p_norm)
candidates.append(p_norm)
cfg = None
cfg_path = None
for p in candidates:
if os.path.exists(p):
try:
with open(p, 'r', encoding='utf-8') as f:
cfg = json.load(f)
cfg_path = p
break
except Exception:
continue
if not isinstance(cfg, dict):
return
keys = cfg.get('multi_api_keys') or []
if not keys:
return
force_rotation = bool(cfg.get('force_key_rotation', True))
rotation_frequency = int(cfg.get('rotation_frequency', 1))
try:
os.environ.setdefault('FORCE_KEY_ROTATION', '1' if force_rotation else '0')
os.environ.setdefault('ROTATION_FREQUENCY', str(rotation_frequency))
os.environ.setdefault('USE_MULTI_KEYS', '1') # backward-compat
except Exception:
pass
try:
import unified_api_client as _uac
_uac.UnifiedClient.set_in_memory_multi_keys(
keys,
force_rotation=force_rotation,
rotation_frequency=rotation_frequency,
)
if cfg_path:
print(f"[DEBUG] Loaded multi-key config from {os.path.basename(cfg_path)} ({len(keys)} keys)")
else:
print(f"[DEBUG] Loaded multi-key config ({len(keys)} keys)")
except Exception as e:
print(f"[DEBUG] Failed to initialize multi-key config from file: {e}")
def send_with_interrupt(*args, **kwargs):
"""Lazy wrapper to avoid circular import"""
from TransateKRtoEN import send_with_interrupt as _send_with_interrupt
return _send_with_interrupt(*args, **kwargs)
# Class-level shared lock for API submission timing
_api_submission_lock = threading.Lock()
_last_api_submission_time = 0
_results_lock = threading.Lock()
_file_write_lock = threading.Lock()
# Timing variables
_extraction_time = 0
_api_time = 0
_freq_check_time = 0
_dedup_time = 0
_io_time = 0
def _atomic_write_file(filepath, content, encoding='utf-8'):
"""Atomically write to a file to prevent corruption from concurrent writes"""
# Create temp file in same directory to ensure same filesystem
dir_path = os.path.dirname(filepath)
with _file_write_lock:
try:
# Write to temporary file first
with tempfile.NamedTemporaryFile(mode='w', encoding=encoding,
dir=dir_path, delete=False) as tmp_file:
tmp_file.write(content)
tmp_path = tmp_file.name
# Atomic rename (on same filesystem)
if os.name == 'nt': # Windows
# Windows doesn't support atomic rename if target exists
if os.path.exists(filepath):
os.remove(filepath)
os.rename(tmp_path, filepath)
else: # Unix/Linux/Mac
os.rename(tmp_path, filepath)
return True
except Exception as e:
print(f"⚠️ Atomic write failed: {e}")
# Cleanup temp file if it exists
if 'tmp_path' in locals() and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except:
pass
# Fallback to direct write with lock
try:
with open(filepath, 'w', encoding=encoding) as f:
f.write(content)
return True
except Exception as e2:
print(f"⚠️ Fallback write also failed: {e2}")
return False
def save_glossary(output_dir, chapters, instructions, language="korean", log_callback=None):
"""Targeted glossary generator with true CSV format output and parallel processing"""
# If the user stops translation while glossary runs in a subprocess, we must ensure the
# per-process watchdog file doesn't stick around and keep the GUI progress bar "busy".
# We only clear on stop (not on normal completion).
global _watchdog_atexit_registered
if not _watchdog_atexit_registered:
try:
import atexit
def _cleanup_watchdog_on_exit():
try:
if is_stop_requested():
_clear_api_watchdog_state(remove_watchdog_file=True)
except Exception:
pass
atexit.register(_cleanup_watchdog_on_exit)
_watchdog_atexit_registered = True
except Exception:
pass
# Note: Don't redirect stdout here if log_callback is provided by subprocess worker
# The worker already captures stdout and sends to queue
# Only redirect if we're NOT in a subprocess (i.e., log_callback is a real GUI callback)
import sys
in_subprocess = hasattr(sys.stdout, 'queue') # Worker's LogCapture has a queue attribute
if log_callback and not in_subprocess:
set_output_redirect(log_callback)
# Clear any stale stop flags before starting a new glossary run
try:
set_stop_flag(False)
except Exception:
try:
os.environ["TRANSLATION_CANCELLED"] = "0"
except Exception:
pass
try:
stop_path = _get_stop_file_path()
if stop_path and os.path.exists(stop_path):
os.remove(stop_path)
except Exception:
pass
try:
import unified_api_client
if hasattr(unified_api_client, "UnifiedClient"):
unified_api_client.UnifiedClient._global_cancelled = False
if hasattr(unified_api_client, "global_stop_flag"):
unified_api_client.global_stop_flag = False
except Exception:
pass
print("📱 Targeted Glossary Generator v6.0 (CSV Format + Parallel)")
# CRITICAL: Reload ALL glossary settings from environment variables at the START
# This ensures child processes spawned by ProcessPoolExecutor get the latest values
# Force fresh read of all environment variables (they were set by save_config)
print("🔄 Reloading glossary settings from environment variables...")
# Honor output directory override (same behavior as translation pipeline)
try:
override_dir = os.getenv("OUTPUT_DIRECTORY")
if override_dir:
override_dir = os.path.abspath(override_dir)
leaf = os.path.basename(os.path.abspath(output_dir)) or "output"
# Always place under the override root (handles different drives safely)
output_dir = os.path.join(override_dir, leaf)
except Exception as e:
print(f"⚠️ OUTPUT_DIRECTORY override failed: {e}")
print(f"📁 Glossary output directory: {os.path.abspath(output_dir)}")
# Check stop flag at start
# Ensure output directory exists
try:
os.makedirs(output_dir, exist_ok=True)
except Exception as _e:
print(f"⚠️ Could not ensure output directory exists: {output_dir} ({_e})")
if is_stop_requested():
print("📁 ❌ Glossary generation stopped by user")
_clear_api_watchdog_state(remove_watchdog_file=True)
return {}
# CLEAR incremental history UNCONDITIONALLY at the start of any run
# This prevents stale chunks from polluting the aggregation, regardless of whether chunking is used
incremental_dir = os.path.join(output_dir, "incremental_glossary")
if os.path.exists(incremental_dir):
print(f"📑 Cleaning incremental glossary folder: {incremental_dir}")
try:
import shutil
# Safely clear the entire incremental folder
for filename in os.listdir(incremental_dir):
file_path = os.path.join(incremental_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f"⚠️ Failed to delete {file_path}: {e}")
except Exception as e:
print(f"⚠️ Failed to clear incremental history: {e}")
# Ensure directory exists for potential use
os.makedirs(incremental_dir, exist_ok=True)
# Check if glossary already exists; if so, we'll MERGE it later (do not return early)
glossary_path = os.path.join(output_dir, "glossary.csv")
existing_glossary_content = None
if os.path.exists(glossary_path):
print(f"📁 Existing glossary detected (will merge): {glossary_path}")
try:
with open(glossary_path, 'r', encoding='utf-8') as f:
existing_glossary_content = f.read()
except Exception as e:
print(f"⚠️ Could not read existing glossary: {e}")
# Rest of the method continues as before...
print("📁 Extracting names and terms with configurable options")
global BOOK_TITLE_RAW, BOOK_TITLE_TRANSLATED
# 1. Get raw title from input EPUB (input path)
epub_path = os.getenv("EPUB_PATH", "")
BOOK_TITLE_RAW = _extract_raw_title_from_epub(epub_path)
# 2. Get translated title from output metadata (output path)
BOOK_TITLE_TRANSLATED = _extract_translated_title_from_metadata(output_dir)
# Debug info
if BOOK_TITLE_RAW:
print(f"📚 Raw book title: {BOOK_TITLE_RAW}")
if BOOK_TITLE_TRANSLATED:
print(f"📚 Translated book title: {BOOK_TITLE_TRANSLATED}")
# Check stop flag before processing
if is_stop_requested():
print("📁 ❌ Glossary generation stopped by user")
_clear_api_watchdog_state(remove_watchdog_file=True)
return {}
# Check if automatic glossary generation is enabled
enable_auto_glossary = os.getenv("ENABLE_AUTO_GLOSSARY", "1") == "1"
# Check for manual glossary first (CSV only)
manual_glossary_path = os.getenv("MANUAL_GLOSSARY")
existing_glossary = None
if manual_glossary_path and os.path.exists(manual_glossary_path):
print(f"📁 Manual glossary detected: {os.path.basename(manual_glossary_path)}")
try:
with open(manual_glossary_path, 'r', encoding='utf-8') as f:
content = f.read()
# Treat as CSV text and stage it for merge; also copy to output for visibility
target_path = os.path.join(output_dir, "glossary.csv")
with open(target_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"📁 ✅ Manual CSV glossary copied to: {target_path}")
existing_glossary = content
# Skip automatic generation when manual glossary is loaded
if not enable_auto_glossary:
print(f"ℹ️ Automatic glossary generation disabled, using manual glossary only")
return {}
else:
print(f"ℹ️ Skipping automatic glossary generation (manual glossary already loaded)")
return {}
except Exception as e:
print(f"⚠️ Could not copy manual glossary: {e}")
print(f"📁 Proceeding with automatic generation...")
# Check if auto-glossary is disabled without a manual glossary
if not enable_auto_glossary:
print(f"ℹ️ Automatic glossary generation is disabled and no manual glossary provided")
return {}
# Check for existing glossary from manual extraction
# Avoid double-nesting when output_dir already ends with "Glossary"
if os.path.basename(os.path.abspath(output_dir)).lower() == "glossary":
glossary_folder_path = output_dir
else:
glossary_folder_path = os.path.join(output_dir, "Glossary")
# existing_glossary may already be set by MANUAL_GLOSSARY above
if os.path.exists(glossary_folder_path):
for file in os.listdir(glossary_folder_path):
if file.endswith("_glossary.json"):
existing_path = os.path.join(glossary_folder_path, file)
try:
with open(existing_path, 'r', encoding='utf-8') as f:
existing_content = f.read()
existing_glossary = existing_content
print(f"📁 Found existing glossary from manual extraction: {file}")
break
except Exception as e:
print(f"⚠️ Could not load existing glossary: {e}")
# Get configuration from environment variables (FRESH READ)
min_frequency = int(os.getenv("GLOSSARY_MIN_FREQUENCY", "2"))
max_names = int(os.getenv("GLOSSARY_MAX_NAMES", "50"))
max_titles = int(os.getenv("GLOSSARY_MAX_TITLES", "30"))
# Batch sizing:
# - GUI uses BATCH_SIZE for concurrency/batching.
# - Keep GLOSSARY_BATCH_SIZE for backward compatibility, but default to GUI's value.
batch_size = int(os.getenv("GLOSSARY_BATCH_SIZE", os.getenv("BATCH_SIZE", "50")))
strip_honorifics = os.getenv("GLOSSARY_STRIP_HONORIFICS", "1") == "1"
fuzzy_threshold = float(os.getenv("GLOSSARY_FUZZY_THRESHOLD", "0.90"))
max_text_size = int(os.getenv("GLOSSARY_MAX_TEXT_SIZE", "0"))
# DEBUG: Show what we're reading from environment
max_sentences_env = os.getenv("GLOSSARY_MAX_SENTENCES", "200")
print(f"🔍 [DEBUG] Reading GLOSSARY_MAX_SENTENCES from environment: '{max_sentences_env}'")
max_sentences = int(max_sentences_env)
print(f"🔍 [DEBUG] Converted to integer: {max_sentences}")
include_all_characters_env = os.getenv("GLOSSARY_INCLUDE_ALL_CHARACTERS", "0")
include_all_characters = include_all_characters_env == "1"
include_gender_context_flag = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
print(f"📑 DEBUG: Include all characters (dynamic limit expansion) = '{include_all_characters_env}'")
print(f"📑 Settings: Min frequency: {min_frequency}, Max names: {max_names}, Max titles: {max_titles}")
print(f"📑 Strip honorifics: {'✅ Yes' if strip_honorifics else '❌ No'}")
print(f"📑 Fuzzy matching threshold: {fuzzy_threshold}")
print(f"📑 Max sentences for filtering: {max_sentences}")
# Get custom prompt from environment
custom_prompt = os.getenv("AUTO_GLOSSARY_PROMPT", "").strip()
# Initialize to the default unified prompt when unset/empty.
# Pattern-based extraction remains disabled elsewhere.
if not custom_prompt:
custom_prompt = DEFAULT_AUTO_GLOSARY_PROMPT3.strip()
os.environ["AUTO_GLOSSARY_PROMPT"] = custom_prompt
print("📑 AUTO_GLOSSARY_PROMPT not set - initialized to default unified prompt")
def clean_html(html_text):
"""Remove HTML tags to get clean text"""
soup = BeautifulSoup(html_text, 'html.parser')
return soup.get_text()
# Check stop before processing chapters
if is_stop_requested():
print("📑 ❌ Glossary generation stopped by user")
_clear_api_watchdog_state(remove_watchdog_file=True)
return {}
# Get chapter split threshold, toggle, and filter mode
chapter_split_threshold = int(os.getenv("GLOSSARY_CHAPTER_SPLIT_THRESHOLD", "100000"))
chapter_split_enabled = os.getenv("GLOSSARY_ENABLE_CHAPTER_SPLIT", "1") == "1"
filter_mode = os.getenv("GLOSSARY_FILTER_MODE", "all") # all, only_with_honorifics, only_without_honorifics
# Check if parallel extraction is enabled for automatic glossary
extraction_workers = int(os.getenv("EXTRACTION_WORKERS", "1"))
batch_translation = os.getenv("BATCH_TRANSLATION", "0") == "1"
# Prefer GUI's batch size; fall back to glossary batch size if needed.
api_batch_size = int(os.getenv("BATCH_SIZE", os.getenv("GLOSSARY_BATCH_SIZE", "5")))
batching_mode = os.getenv("BATCHING_MODE", "direct")
batch_group_size = int(os.getenv("BATCH_GROUP_SIZE", "3"))
# Backward compatibility
if os.getenv("CONSERVATIVE_BATCHING", "0") == "1":
batching_mode = "conservative"
# Log the settings
print(f"📑 Filter mode: {filter_mode}")
if extraction_workers > 1:
print(f"📑 Parallel extraction enabled: {extraction_workers} workers")
if batch_translation:
print(f"📑 Batch API calls enabled: {api_batch_size} chunks per batch")
print(f"📑 Batching mode: {batching_mode}")
if batching_mode == "conservative":
print(f"📑 Conservative group size: {batch_group_size}")
all_text = ' '.join(clean_html(chapter["body"]) for chapter in chapters)
print(f"📑 Processing {len(all_text):,} characters of text")
# Apply smart filtering FIRST to check actual size needed
use_smart_filter = os.getenv("GLOSSARY_USE_SMART_FILTER", "1") == "1"
effective_text_size = len(all_text)
filtered_text_cache = None
if use_smart_filter and custom_prompt: # Only apply for AI extraction
print(f"📁 Smart filtering enabled - checking effective text size after filtering...")
# Perform filtering ONCE and reuse for chunking
filtered_sample, _ = _filter_text_for_glossary(all_text, min_frequency, max_sentences)
filtered_text_cache = filtered_sample
effective_text_size = len(filtered_sample)
# Calculate token count using tiktoken
try:
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
token_count = len(enc.encode(filtered_sample))
print(f"📁 Text reduction: {len(all_text):,}{effective_text_size:,} chars ({100*(1-effective_text_size/len(all_text)):.1f}% reduction) | {token_count:,} tokens")
except:
print(f"📁 Text reduction: {len(all_text):,}{effective_text_size:,} chars ({100*(1-effective_text_size/len(all_text)):.1f}% reduction)")
# Safety check: Calculate actual token count for chunking decision
estimated_tokens = None
try:
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
estimated_tokens = len(enc.encode(filtered_text_cache if filtered_text_cache else all_text))
except:
# Fallback estimate: 1 token ≈ 3-4 characters for Asian languages
estimated_tokens = effective_text_size // 3
# Get output token limit (glossary-specific with fallback to global)
max_output_tokens = int(os.getenv("GLOSSARY_MAX_OUTPUT_TOKENS", os.getenv("MAX_OUTPUT_TOKENS", "65536")))
# Use compression factor to determine safe input limit (from CJK→English compression ratio)
# Use glossary-specific compression factor with fallback to global
compression_factor = float(os.getenv("GLOSSARY_COMPRESSION_FACTOR", os.getenv("COMPRESSION_FACTOR", "1.0")))
# Safe input limit is max_output divided by compression factor
# (e.g., if compression is 0.7, output will be 70% of input, so we can use 1/0.7 = 1.43x for safety)
safe_input_limit = int(max_output_tokens / max(compression_factor, 0.1)) if compression_factor > 0 else int(max_output_tokens * 0.8)
if estimated_tokens > safe_input_limit:
# Only show detailed token logs if using token-based chunking (threshold == 0)
if chapter_split_threshold == 0:
print(f"⚠️ Text too large for single API call!")
print(f" Estimated tokens: {estimated_tokens:,}")
print(f" Safe input limit: {safe_input_limit:,} (based on {compression_factor:.2f}x compression factor and {max_output_tokens:,} max output tokens)")
print(f" Will use ChapterSplitter for token-based chunking...")
else:
# Character-based threshold already set, just use it silently
pass
# Check if we need to split into chunks based on EFFECTIVE size after filtering
needs_chunking = chapter_split_enabled and (
(chapter_split_threshold == 0 and estimated_tokens > safe_input_limit) or
(chapter_split_threshold > 0 and effective_text_size > chapter_split_threshold)
)
if not chapter_split_enabled:
print("📑 Chapter splitting disabled (GLOSSARY_ENABLE_CHAPTER_SPLIT=0) - processing without pre-splitting")
if needs_chunking:
# Prepare chunk processing
incremental_dir = os.path.join(output_dir, "incremental_glossary")
agg_path = os.path.join(incremental_dir, "glossary.incremental.all.csv")
# CLEAR incremental history if it exists to ensure 'all' file only contains current run data
# This prevents it from growing indefinitely across multiple runs
if os.path.exists(incremental_dir):
try:
import shutil
# Safely clear the entire incremental folder
for filename in os.listdir(incremental_dir):
file_path = os.path.join(incremental_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f"⚠️ Failed to delete {file_path}: {e}")
print(f"📑 Cleared incremental glossary folder: {incremental_dir}")
except Exception as e:
print(f"⚠️ Failed to clear incremental history: {e}")
# Ensure directory exists (if it was fully removed or didn't exist)
os.makedirs(incremental_dir, exist_ok=True)
if chapter_split_threshold == 0:
# Use ChapterSplitter for token-based intelligent chunking
print(f"📑 Text exceeds safe token limit, using ChapterSplitter for token-based chunking...")
from chapter_splitter import ChapterSplitter
# Get the model name for the tokenizer
model = os.getenv("MODEL", "gemini-2.0-flash")
splitter = ChapterSplitter(model_name=model, target_tokens=safe_input_limit)
# Get the text to split (filtered or raw)
text_to_split = filtered_text_cache if (use_smart_filter and custom_prompt and filtered_text_cache) else all_text
# Use ChapterSplitter to intelligently split based on tokens
split_results = splitter.split_chapter(text_to_split, max_tokens=safe_input_limit)
chunks_to_process = [(i, chunk) for i, (chunk, _, _) in enumerate(split_results, 1)]
print(f"📑 ChapterSplitter created {len(chunks_to_process)} token-balanced chunks")
all_glossary_entries = []
else:
# Use character-based splitting with fixed threshold
print(f"📑 Effective text exceeds {chapter_split_threshold:,} chars, will process in chunks...")
# If using smart filter, we need to split the FILTERED text, not raw text
if use_smart_filter and custom_prompt:
# Split the filtered text into chunks (reuse cached filtered text)
filtered_text = filtered_text_cache if filtered_text_cache is not None else _filter_text_for_glossary(all_text, min_frequency, max_sentences)[0]
chunks_to_process = []
# Split filtered text into chunks of appropriate size
chunk_size = chapter_split_threshold
for i in range(0, len(filtered_text), chunk_size):
chunk_text = filtered_text[i:i + chunk_size]
chunks_to_process.append((len(chunks_to_process) + 1, chunk_text))
print(f"📑 Split filtered text into {len(chunks_to_process)} chunks")
all_glossary_entries = []
else:
# Original logic for unfiltered text
all_glossary_entries = []
chunk_size = 0
chunk_chapters = []
chunks_to_process = []
for idx, chapter in enumerate(chapters):
if is_stop_requested():
print("📑 ❌ Glossary generation stopped by user")
return all_glossary_entries
chapter_text = clean_html(chapter["body"])
chunk_size += len(chapter_text)
chunk_chapters.append(chapter)
# Process chunk when it reaches threshold or last chapter
if chunk_size >= chapter_split_threshold or idx == len(chapters) - 1:
chunk_text = ' '.join(clean_html(ch["body"]) for ch in chunk_chapters)
chunks_to_process.append((len(chunks_to_process) + 1, chunk_text))
# Reset for next chunk
chunk_size = 0
chunk_chapters = []
print(f"📑 Split into {len(chunks_to_process)} chunks for processing")
# Batch toggle decides concurrency: ON => parallel API calls; OFF => strict sequential
if batch_translation and custom_prompt and len(chunks_to_process) > 1:
print(f"📑 Processing chunks in batch mode with {api_batch_size} chunks per batch...")
# Set fast mode for batch processing
os.environ["GLOSSARY_SKIP_ALL_VALIDATION"] = "1"
# Use batch API calls for AI extraction
all_csv_lines = _process_chunks_batch_api(
chunks_to_process, custom_prompt, language,
min_frequency, max_names, max_titles,
output_dir, strip_honorifics, fuzzy_threshold,
filter_mode, api_batch_size, extraction_workers, max_sentences
)
# Reset validation mode
os.environ["GLOSSARY_SKIP_ALL_VALIDATION"] = "0"
print(f"📑 All chunks completed. Aggregated raw lines: {len(all_csv_lines)}")
# Process all collected entries at once (even if empty)
# Add header so downstream steps can work uniformly
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
include_description = os.getenv("GLOSSARY_INCLUDE_DESCRIPTION", "0") == "1"
if include_description:
all_csv_lines.insert(0, "type,raw_name,translated_name,gender,description")
elif include_gender_context:
all_csv_lines.insert(0, "type,raw_name,translated_name,gender")
else:
all_csv_lines.insert(0, "type,raw_name,translated_name")
# Merge with any on-disk glossary first (to avoid overwriting user edits)
on_disk_path = os.path.join(output_dir, "glossary.csv")
if os.path.exists(on_disk_path):
try:
with open(on_disk_path, 'r', encoding='utf-8') as f:
on_disk_content = f.read()
all_csv_lines = _merge_csv_entries(all_csv_lines, on_disk_content, strip_honorifics, language)
print("📑 Merged with existing on-disk glossary")
except Exception as e:
print(f"⚠️ Failed to merge with existing on-disk glossary: {e}")
# Apply filter mode if needed
if filter_mode == "only_with_honorifics":
filtered = [all_csv_lines[0]] # Keep header
for line in all_csv_lines[1:]:
parts = line.split(',', 2)
if len(parts) >= 3 and parts[0] == "character":
filtered.append(line)
all_csv_lines = filtered
print(f"📑 Filter applied: {len(all_csv_lines)-1} character entries with honorifics kept")
# Ensure book title header is present before dedup/sort when requested
if os.getenv("GLOSSARY_INCLUDE_BOOK_TITLE", "0") == "1":
all_csv_lines = _ensure_book_title_csv_lines(all_csv_lines)
# Apply fuzzy deduplication (deferred until after all chunks)
try:
print(f"📑 Applying fuzzy deduplication (threshold: {fuzzy_threshold})...")
all_csv_lines = _deduplicate_glossary_with_fuzzy(all_csv_lines, fuzzy_threshold)
except Exception as e:
print(f"⚠️ Deduplication error: {e} — continuing without dedup")
# Sort by type and name
print(f"📑 Sorting glossary by type and name...")
header = all_csv_lines[0]
entries = all_csv_lines[1:]
if entries:
entries.sort(key=_csv_sort_key)
all_csv_lines = [header] + entries
# Save
# Check format preference
use_legacy_format = os.getenv('GLOSSARY_USE_LEGACY_CSV', '0') == '1'
if not use_legacy_format:
# Convert to token-efficient format
all_csv_lines = _convert_to_token_efficient_format(all_csv_lines)
# Final sanitize to prevent stray headers
all_csv_lines = _sanitize_final_glossary_lines(all_csv_lines, use_legacy_format)
# If user requested stop, avoid writing new glossary to disk
if is_stop_requested():
print("🛑 Stop requested — skipping final glossary write (batch mode)")
return _parse_csv_to_dict(existing_glossary_content) if existing_glossary_content else {}
# If user stopped and we have no entries, keep existing file to avoid wiping it
if is_stop_requested() and len(all_csv_lines) <= 1:
print("🛑 Stop requested with no new entries — preserving existing glossary.csv")
return _parse_csv_to_dict(existing_glossary_content) if existing_glossary_content else {}
# Save
csv_content = '\n'.join(all_csv_lines)
glossary_path = os.path.join(output_dir, "glossary.csv")
_atomic_write_file(glossary_path, csv_content)
# Verify file exists; fallback direct write if needed
if not os.path.exists(glossary_path):
try:
with open(glossary_path, 'w', encoding='utf-8') as f:
f.write(csv_content)
print("📑 Fallback write succeeded for glossary.csv")
except Exception as e:
print(f"❌ Failed to write glossary.csv: {e}")
print(f"\n📑 ✅ GLOSSARY SAVED!")
print(f"📑 ✅ AI GLOSSARY SAVED!")
c_count, t_count, total = _count_glossary_entries(all_csv_lines, use_legacy_format)
print(f"📑 Character entries: {c_count}")
# print(f"📑 Term entries: {t_count}")
print(f"📑 Total entries: {total}")
return _parse_csv_to_dict(csv_content)
else:
# Strict sequential processing (one API call at a time)
_prev_defer = os.getenv("GLOSSARY_DEFER_SAVE")
_prev_filtered = os.getenv("_CHUNK_ALREADY_FILTERED")
_prev_force_disable = os.getenv("GLOSSARY_FORCE_DISABLE_SMART_FILTER")
os.environ["GLOSSARY_DEFER_SAVE"] = "1"
# Tell the extractor each chunk is already filtered to avoid re-running smart filter per chunk
os.environ["_CHUNK_ALREADY_FILTERED"] = "1"
os.environ["GLOSSARY_FORCE_DISABLE_SMART_FILTER"] = "1"
try:
for pos, (chunk_idx, chunk_text) in enumerate(chunks_to_process, start=1):
if is_stop_requested():
break
print(f"📑 Processing chunk {chunk_idx}/{len(chunks_to_process)} ({len(chunk_text):,} chars)...")
if custom_prompt:
chunk_glossary = _extract_with_custom_prompt(
custom_prompt, chunk_text, language,
min_frequency, max_names, max_titles,
None, output_dir, # Don't pass existing glossary to chunks
strip_honorifics, fuzzy_threshold, filter_mode, max_sentences, log_callback,
chunk_pos=pos,
total_chunks=len(chunks_to_process),
)
else:
# Pattern fallback disabled
print("📑 AUTO_GLOSSARY_PROMPT is empty - skipping chunk glossary extraction (pattern fallback disabled)")
chunk_glossary = {}
# Normalize to CSV lines and aggregate
chunk_lines = []
if isinstance(chunk_glossary, list):
for line in chunk_glossary:
if line and not line.startswith('type,'):
all_glossary_entries.append(line)
chunk_lines.append(line)
else:
for raw_name, translated_name in chunk_glossary.items():
entry_type = "character" if _has_honorific(raw_name) else "term"
line = f"{entry_type},{raw_name},{translated_name}"
all_glossary_entries.append(line)
chunk_lines.append(line)
# Incremental update (per chunk file inside incremental_glossary folder)
try:
_incremental_update_glossary(output_dir, chunk_idx, chunk_lines, strip_honorifics, language, filter_mode)
print(f"📑 Incremental write: chunk {chunk_idx} (+{len(chunk_lines)} entries)")
except Exception as e2:
print(f"⚠️ Incremental write failed for chunk {chunk_idx}: {e2}")
finally:
if _prev_defer is None:
if "GLOSSARY_DEFER_SAVE" in os.environ:
del os.environ["GLOSSARY_DEFER_SAVE"]
else:
os.environ["GLOSSARY_DEFER_SAVE"] = _prev_defer
if _prev_filtered is None:
os.environ.pop("_CHUNK_ALREADY_FILTERED", None)
else:
os.environ["_CHUNK_ALREADY_FILTERED"] = _prev_filtered
if _prev_force_disable is None:
os.environ.pop("GLOSSARY_FORCE_DISABLE_SMART_FILTER", None)
else:
os.environ["GLOSSARY_FORCE_DISABLE_SMART_FILTER"] = _prev_force_disable
# Build CSV from aggregated entries
print(f"📑 DEBUG: all_glossary_entries count before merge: {len(all_glossary_entries)}")
# START WITH INCREMENTAL GLOSSARY AS BASE IF IT EXISTS AND IS LARGER
# This ensures that if memory was lost (e.g. during a long sequential run), we rely on the disk backup
incremental_dir = os.path.join(output_dir, "incremental_glossary")
incremental_path = os.path.join(incremental_dir, "glossary.incremental.all.csv")
base_entries = list(all_glossary_entries)
using_incremental_as_base = False
if os.path.exists(incremental_path):
try:
with open(incremental_path, 'r', encoding='utf-8') as f:
inc_content = f.read()
# Simple parse to count lines/entries
inc_lines = [line for line in inc_content.split('\n') if line.strip() and not line.startswith('type,')]
print(f"📑 Found incremental glossary: {len(inc_lines)} entries (Memory: {len(all_glossary_entries)} entries)")
if len(inc_lines) > len(all_glossary_entries):
print("📑 🔄 Incremental glossary is larger than memory - using it as primary source")
# We need to ensure it has the header for csv_lines logic below
# But csv_lines construction adds header anyway.
# So we just REPLACE base_entries with inc_lines
base_entries = inc_lines
using_incremental_as_base = True
except Exception as e:
print(f"⚠️ Failed to check incremental glossary: {e}")
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
include_description = os.getenv("GLOSSARY_INCLUDE_DESCRIPTION", "0") == "1"
if include_description:
csv_lines = ["type,raw_name,translated_name,gender,description"] + base_entries
elif include_gender_context:
csv_lines = ["type,raw_name,translated_name,gender"] + base_entries
else:
csv_lines = ["type,raw_name,translated_name"] + base_entries
# If we used incremental as base, we must merge MEMORY into it (to capture the last chunk if it wasn't in incremental yet)
if using_incremental_as_base and all_glossary_entries:
print("📑 Merging memory entries into incremental base...")
# Create a mini-CSV for memory entries
mem_csv = ["type,raw_name,translated_name"] + all_glossary_entries
csv_lines = _merge_csv_entries(csv_lines, '\n'.join(mem_csv), strip_honorifics, language)
# Merge with any provided existing glossary AND on-disk glossary to avoid overwriting
on_disk_path = os.path.join(output_dir, "glossary.csv")
merge_sources = []
if existing_glossary:
merge_sources.append(existing_glossary)
# We already handled incremental above as the base, so we don't add it to merge_sources here
if os.path.exists(on_disk_path):
try:
with open(on_disk_path, 'r', encoding='utf-8') as f:
merge_sources.append(f.read())
print("📑 Found existing on-disk glossary to merge")
except Exception as e:
print(f"⚠️ Failed to read on-disk glossary for merging: {e}")
# Also merge the main on-disk glossary if it was present at start
if existing_glossary_content:
csv_lines = _merge_csv_entries(csv_lines, existing_glossary_content, strip_honorifics, language)
for src in merge_sources:
before_merge_count = len(csv_lines)
csv_lines = _merge_csv_entries(csv_lines, src, strip_honorifics, language)
print(f"📑 DEBUG: Merged source. Count: {before_merge_count} -> {len(csv_lines)}")
# Apply filter mode to final results
csv_lines = _filter_csv_by_mode(csv_lines, filter_mode)
# Ensure book title entry before dedup/sort when requested
if os.getenv("GLOSSARY_INCLUDE_BOOK_TITLE", "0") == "1":
csv_lines = _ensure_book_title_csv_lines(csv_lines)
# Apply fuzzy deduplication (deferred until after all chunks)
print(f"📑 Applying fuzzy deduplication (threshold: {fuzzy_threshold})...")
original_count = len(csv_lines) - 1
csv_lines = _deduplicate_glossary_with_fuzzy(csv_lines, fuzzy_threshold)
deduped_count = len(csv_lines) - 1
if original_count > deduped_count:
print(f"📑 Removed {original_count - deduped_count} duplicate entries")
# Sort by type and name
print(f"📑 Sorting glossary by type and name...")
header = csv_lines[0]
entries = csv_lines[1:]
entries.sort(key=_csv_sort_key)
csv_lines = [header] + entries
# Token-efficient format if enabled
use_legacy_format = os.getenv('GLOSSARY_USE_LEGACY_CSV', '0') == '1'
if not use_legacy_format:
csv_lines = _convert_to_token_efficient_format(csv_lines)
# Final sanitize to prevent stray headers and section titles at end
csv_lines = _sanitize_final_glossary_lines(csv_lines, use_legacy_format)
# If user requested stop, avoid overwriting files; preserve existing when possible
if is_stop_requested():
if len(csv_lines) <= 1 and os.path.exists(on_disk_path):
print("🛑 Stop requested with no new entries — preserving existing glossary.csv")
return _parse_csv_to_dict(existing_glossary_content) if existing_glossary_content else {}
print("🛑 Stop requested — skipping final glossary write (chunked mode)")
return _parse_csv_to_dict(existing_glossary_content) if existing_glossary_content else {}
# Copy glossary extension file if configured
# Copy glossary extension file if configured
add_additional_glossary = os.getenv('ADD_ADDITIONAL_GLOSSARY', '0') == '1'
additional_glossary_path = os.getenv('ADDITIONAL_GLOSSARY_PATH', '')
if add_additional_glossary and additional_glossary_path and os.path.exists(additional_glossary_path):
print(f"📜 Processing glossary extension: {os.path.basename(additional_glossary_path)}")
try:
import shutil
file_ext = os.path.splitext(additional_glossary_path)[1].lower()
# Target path in output directory
target_path = os.path.join(output_dir, "glossary_extension.csv")
if file_ext == '.csv':
# Copy CSV directly
shutil.copy2(additional_glossary_path, target_path)
print(f"📜 Copied glossary extension to {os.path.basename(target_path)}")
elif file_ext in ['.txt', '.json', '.pdf']:
# Convert non-CSV formats to CSV
converted_lines = []
if file_ext == '.txt':
with open(additional_glossary_path, 'r', encoding='utf-8') as f:
content = f.read()
# Try to parse as CSV-like format
for line in content.strip().split('\n'):
if line.strip():
converted_lines.append(line.strip())
elif file_ext == '.json':
import json
with open(additional_glossary_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Add CSV header
converted_lines.append("type,raw_name,translated_name")
# Convert JSON to CSV format
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
raw = value.get('raw', key)
translated = value.get('translated', value.get('translation', key))
entry_type = value.get('type', 'term')
converted_lines.append(f"{entry_type},{raw},{translated}")
else:
converted_lines.append(f"term,{key},{value}")
elif isinstance(data, list):
for entry in data:
if isinstance(entry, dict):
entry_type = entry.get('type', 'term')
raw = entry.get('raw_name', entry.get('raw', ''))
translated = entry.get('translated_name', entry.get('translated', ''))
if raw and translated:
converted_lines.append(f"{entry_type},{raw},{translated}")
elif file_ext == '.pdf':
# Try to extract text from PDF and save as CSV
try:
import PyPDF2
with open(additional_glossary_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
pdf_text = []
for page in pdf_reader.pages:
pdf_text.append(page.extract_text())
text_content = '\n'.join(pdf_text)
# Try to parse as CSV
for line in text_content.strip().split('\n'):
if line.strip():
converted_lines.append(line.strip())
except ImportError:
print("⚠️ PyPDF2 not available, cannot read PDF. Install with: pip install PyPDF2")
except Exception as pdf_error:
print(f"⚠️ Could not read PDF: {pdf_error}")
# Write converted content to CSV
if converted_lines:
with open(target_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(converted_lines))
print(f"📜 Converted and saved glossary extension to {os.path.basename(target_path)}")
except Exception as e:
print(f"⚠️ Failed to copy glossary extension: {e}")
import traceback
traceback.print_exc()
try:
# Save
csv_content = '\n'.join(csv_lines)
glossary_path = os.path.join(output_dir, "glossary.csv")
_atomic_write_file(glossary_path, csv_content)
# Verify file exists; fallback direct write if needed
if not os.path.exists(glossary_path):
try:
with open(glossary_path, 'w', encoding='utf-8') as f:
f.write(csv_content)
print("📑 Fallback write succeeded for glossary.csv")
except Exception as e:
print(f"❌ Failed to write glossary.csv: {e}")
finally:
print(f"\n📑 ✅ CHUNKED GLOSSARY SAVED!")
print(f"📑 ✅ AI GLOSSARY SAVED!")
print(f"📑 File: {glossary_path}")
c_count, t_count, total = _count_glossary_entries(csv_lines, use_legacy_format)
print(f"📑 Character entries: {c_count}")
# print(f"📑 Term entries: {t_count}")
print(f"📑 Total entries: {total}")
return _parse_csv_to_dict(csv_content)
# Original single-text processing
if custom_prompt:
# Pass cached filtered text if available to avoid re-filtering
text_to_process = filtered_text_cache if filtered_text_cache is not None else all_text
already_filtered = filtered_text_cache is not None
# Set environment flag to indicate text is already filtered
if already_filtered:
os.environ["_TEXT_ALREADY_FILTERED"] = "1"
try:
return _extract_with_custom_prompt(custom_prompt, text_to_process, language,
min_frequency, max_names, max_titles,
existing_glossary, output_dir,
strip_honorifics, fuzzy_threshold, filter_mode, max_sentences, log_callback)
finally:
if already_filtered:
os.environ.pop("_TEXT_ALREADY_FILTERED", None)
else:
# Pattern fallback disabled
print("📑 AUTO_GLOSSARY_PROMPT is empty - skipping automatic glossary generation (pattern fallback disabled)")
return {}
total_time = time.time() - total_start_time
print(f"\n📑 ========== GLOSSARY GENERATION COMPLETE ==========")
print(f"📑 Total time: {total_time:.1f}s")
print(f"📑 Performance breakdown:")
print(f"📑 - Extraction: {0:.1f}s")
print(f"📑 - API calls: {0:.1f}s")
print(f"📑 - Frequency checking: {0:.1f}s")
print(f"📑 - Deduplication: {0:.1f}s")
print(f"📑 - File I/O: {0:.1f}s")
print(f"📑 ================================================")
return result # This is the existing return statement
def _convert_to_token_efficient_format(csv_lines):
"""Convert CSV lines to token-efficient format with sections and asterisks"""
if len(csv_lines) <= 1:
return csv_lines
header = csv_lines[0]
entries = csv_lines[1:]
# Group by type (only from valid CSV lines)
import re as _re
import csv as _csv
grouped = {}
for line in entries:
if not line.strip():
continue
# Only accept proper CSV rows: at least 3 fields and a sane type token
parts_full = [p.strip() for p in line.split(',')]
if len(parts_full) < 3:
continue
entry_type = parts_full[0].lower()
if not _re.match(r'^[a-z_]+$', entry_type):
continue
if entry_type not in grouped:
grouped[entry_type] = []
grouped[entry_type].append(line)
# Rebuild with token-efficient format
result = []
# Extract column headers from CSV to show in dynamic header
columns = ['translated_name', 'raw_name']
# Check for gender and description columns
try:
header_parts = [p.strip() for p in next(_csv.reader([header]))] if header else []
except Exception:
header_parts = [p.strip() for p in header.split(',')] if header else []
if 'gender' in header_parts:
columns.append('gender')
if 'description' in header_parts:
columns.append('description')
# Add any other custom fields (exclude type, raw_name, translated_name, gender, description)
standard_cols = {'type', 'raw_name', 'translated_name', 'gender', 'description'}
for col in header_parts:
if col.lower() not in standard_cols and col:
columns.append(col)
result.append(f"Glossary Columns: {', '.join(columns)}\n")
# Process in order: character first, then term, then others
type_order = ['book', 'character', 'term'] + [t for t in grouped.keys() if t not in ['book', 'character', 'term']]
# Precompute column indices for richer rendering
lower_header = [h.lower() for h in header_parts]
def _idx(name):
return lower_header.index(name) if name in lower_header else -1
type_idx = _idx('type')
raw_idx = _idx('raw_name')
trans_idx = _idx('translated_name')
gender_idx = _idx('gender')
desc_idx = _idx('description')
for entry_type in type_order:
if entry_type not in grouped:
continue
entries = grouped[entry_type]
# Add section header
section_name = entry_type.upper() + 'S' if not entry_type.upper().endswith('S') else entry_type.upper()
result.append(f"=== {section_name} ===")
# Add entries in new format
for line in entries:
try:
parts = next(_csv.reader([line]))
except Exception:
parts = [p.strip() for p in line.split(',')]
if header_parts and len(parts) < len(header_parts):
parts += [''] * (len(header_parts) - len(parts))
elif header_parts and len(parts) > len(header_parts):
# If unquoted commas split the description, merge overflow into the description column
if desc_idx != -1 and desc_idx < len(header_parts):
parts = parts[:desc_idx] + [",".join(parts[desc_idx:])]
else:
parts = parts[:len(header_parts)]
# Extract core fields using header positions when available
entry_type_val = (parts[type_idx] if type_idx != -1 and len(parts) > type_idx else entry_type).lower()
raw_name = parts[raw_idx] if raw_idx != -1 and len(parts) > raw_idx else (parts[1] if len(parts) > 1 else '')
translated_name = parts[trans_idx] if trans_idx != -1 and len(parts) > trans_idx else (parts[2] if len(parts) > 2 else '')
if not raw_name or not translated_name:
continue
entry_line = f"* {translated_name} ({raw_name})"
# Gender support (any type that supplies it)
if gender_idx != -1 and len(parts) > gender_idx:
gender_val = parts[gender_idx].strip()
if gender_val and gender_val != 'Unknown':
entry_line += f" [{gender_val}]"
# Description + extra fields
desc_val = parts[desc_idx].strip() if desc_idx != -1 and len(parts) > desc_idx else ''
# Fallback: if no description column exists in header but there are trailing columns,
# join everything after the last known core column as description.
if desc_idx == -1:
core_max = max(idx for idx in [type_idx, raw_idx, trans_idx, gender_idx] if idx != -1) if any(idx != -1 for idx in [type_idx, raw_idx, trans_idx, gender_idx]) else 2
if len(parts) > core_max + 1:
desc_tail = ",".join(parts[core_max + 1:]).strip()
if desc_tail and not desc_val:
desc_val = desc_tail
extra_segments = []
for idx, col in enumerate(header_parts):
col_lower = col.lower()
if col_lower in ['type', 'raw_name', 'translated_name', 'gender', 'description']:
continue
if idx < len(parts):
val = parts[idx].strip()
if val:
extra_segments.append(f"{col}: {val}")
base_desc = desc_val
if not base_desc and extra_segments:
base_desc = extra_segments[0]
extra_segments = extra_segments[1:]
if base_desc:
entry_line += f": {base_desc}"
for seg in extra_segments:
entry_line += f" | {seg}"
result.append(entry_line)
result.append("") # Blank line between sections
return result
def _count_glossary_entries(lines, use_legacy_format=False):
"""Return (char_count, term_count, total_count) for either format."""
if not lines:
return 0, 0, 0
if use_legacy_format:
data = lines[1:] if lines and lines[0].lower().startswith('type,raw_name') else lines
char_count = sum(1 for ln in data if ln.startswith('character,'))
term_count = sum(1 for ln in data if ln.startswith('term,'))
total = sum(1 for ln in data if ln and ',' in ln)
return char_count, term_count, total
# token-efficient
current = None
char_count = term_count = total = 0
for ln in lines:
s = ln.strip()
if s.startswith('=== ') and 'CHARACTER' in s.upper():
current = 'character'
continue
if s.startswith('=== ') and 'TERM' in s.upper():
current = 'term'
continue
if s.startswith('* '):
total += 1
if current == 'character':
char_count += 1
elif current == 'term':
term_count += 1
return char_count, term_count, total
def _sanitize_final_glossary_lines(lines, use_legacy_format=False):
"""Remove stray CSV headers and normalize header placement before saving.
- In legacy CSV mode, ensure exactly one header at the very top.
- In token-efficient mode, remove any CSV header lines entirely.
"""
header_norm = "type,raw_name,translated_name"
if not lines:
return lines
if use_legacy_format:
sanitized = []
header_seen = False
for ln in lines:
txt = ln.strip()
if txt.lower().startswith("type,raw_name"):
if not header_seen:
sanitized.append(header_norm)
header_seen = True
# skip duplicates
else:
sanitized.append(ln)
# ensure header at top
if sanitized and not sanitized[0].strip().lower().startswith("type,raw_name"):
sanitized.insert(0, header_norm)
return sanitized
else:
# remove any CSV header lines anywhere and duplicate top headers/sections
cleaned = []
glossary_header_seen = False
for i, ln in enumerate(lines):
txt = ln.strip()
low = txt.lower()
# Drop CSV headers
if low.startswith("type,raw_name"):
continue
# Keep only the first main glossary header
if low.startswith("glossary:"):
if glossary_header_seen:
continue
glossary_header_seen = True
cleaned.append(ln)
continue
# Remove bogus section like '=== GLOSSARY: ... ==='
if low.startswith("=== glossary:"):
continue
cleaned.append(ln)
return cleaned
def _process_chunks_batch_api(chunks_to_process, custom_prompt, language,
min_frequency, max_names, max_titles,
output_dir, strip_honorifics, fuzzy_threshold,
filter_mode, api_batch_size, extraction_workers, max_sentences=200):
"""Process chunks using batch API calls for AI extraction with thread delay.
IMPORTANT: when a stop is requested, we must stop *submitting* new API work immediately.
Any already in-flight requests may finish (graceful stop) or be aborted by unified_api_client
cancellation (immediate stop).
"""
print(f"📑 Using batch API mode with {api_batch_size} chunks per batch")
# Graceful stop semantics:
# - If GRACEFUL_STOP=1 and WAIT_FOR_CHUNKS=1: stop submitting *new* work, but do NOT cancel in-flight.
# - If WAIT_FOR_CHUNKS=0: we will only "wait for in-flight" if ALL chunks were already submitted.
# If any chunk is still pending/not-submitted when stop is raised, escalate to full-stop.
graceful_stop = (os.getenv('GRACEFUL_STOP') == '1')
wait_for_chunks = (os.getenv('WAIT_FOR_CHUNKS') == '1')
# Ensure we defer saving and heavy merging when processing chunks
_prev_defer = os.getenv("GLOSSARY_DEFER_SAVE")
os.environ["GLOSSARY_DEFER_SAVE"] = "1"
# Get thread submission delay
thread_delay = float(os.getenv("THREAD_SUBMISSION_DELAY_SECONDS", "0.5"))
if thread_delay > 0:
print(f"📑 Thread submission delay: {thread_delay}s between parallel calls")
# CHANGE: Collect raw CSV lines instead of dictionary
all_csv_lines = [] # Collect all entries as CSV lines
total_chunks = len(chunks_to_process)
completed_chunks = 0
# Ensure per-chunk smart filtering is disabled globally during batch processing
_prev_filtered = os.getenv("_CHUNK_ALREADY_FILTERED")
_prev_force_disable = os.getenv("GLOSSARY_FORCE_DISABLE_SMART_FILTER")
os.environ["_CHUNK_ALREADY_FILTERED"] = "1"
os.environ["GLOSSARY_FORCE_DISABLE_SMART_FILTER"] = "1"
# Concurrency: follow GUI batch size (BATCH_SIZE).
# NOTE: EXTRACTION_WORKERS is used for *chapter extraction*/CPU work; it should not cap API concurrency.
# If you want to throttle API concurrency, use BATCH_SIZE (and/or SEND_INTERVAL_SECONDS).
try:
api_batch_size = int(api_batch_size)
except Exception:
api_batch_size = 1
api_batch_size = max(1, api_batch_size)
max_workers = min(api_batch_size, len(chunks_to_process))
max_workers = max(1, max_workers)
# Useful debug when users think batching isn't applying
try:
send_interval = os.getenv("SEND_INTERVAL_SECONDS", "")
thread_delay_env = os.getenv("THREAD_SUBMISSION_DELAY_SECONDS", "")
print(f"📑 DEBUG: BATCH_SIZE={api_batch_size}, EXTRACTION_WORKERS={extraction_workers}, SEND_INTERVAL_SECONDS={send_interval}, THREAD_SUBMISSION_DELAY_SECONDS={thread_delay_env}")
except Exception:
pass
print(f"📑 Processing {len(chunks_to_process)} chunks with up to {max_workers} concurrent API calls...")
# Submit incrementally so Stop can prevent queued work from ever starting.
from concurrent.futures import wait, FIRST_COMPLETED
pending = list(chunks_to_process)
next_pos = 1
# Track work in three stages:
# - executor_submitted: submitted to our ThreadPoolExecutor (NOT what the user means by "sent")
# - sent_chunks: requests that actually transitioned to in-flight (i.e., after api stagger/delay)
# - completed_chunks_local: futures that completed (success or failure)
executor_submitted = 0
completed_chunks_local = 0
sent_chunks = set() # set[int] of chunk_pos that have actually been sent (in-flight)
def _status_snapshot(*, in_flight_count: int) -> dict:
total = int(total_chunks or 0)
pend = int(len(pending))
# "all_sent" means every chunk call has actually begun sending (post-delay) at least once.
all_sent = (total > 0 and len(sent_chunks) >= total)
# Keep legacy fields for compatibility/debugging, but note "submitted" here is executor-submitted.
all_submitted = (executor_submitted >= total and pend == 0)
return {
"pid": os.getpid(),
"ts": time.time(),
"total_chunks": total,
"executor_submitted": int(executor_submitted),
"submitted_chunks": int(executor_submitted),
"sent_chunks": int(len(sent_chunks)),
"all_sent": bool(all_sent),
"completed_chunks": int(completed_chunks_local),
"in_flight": int(in_flight_count),
"pending": pend,
"all_submitted": bool(all_submitted),
"graceful_stop": bool(graceful_stop),
"wait_for_chunks": bool(wait_for_chunks),
"stop_requested": bool(is_stop_requested()),
}
# Monitor watchdog entries to detect when requests actually transition to "in_flight" (sent).
# This matches the user's definition of "submitted" (after API delay/stagger).
_sent_monitor_stop = threading.Event()
def _sent_monitor():
try:
import unified_api_client as _uac
except Exception:
return
# Regex for the context we set in _extract_with_custom_prompt: "auto glossary (i/N)"
rx = re.compile(r"auto\s+glossary\s*\(\s*(\d+)\s*/\s*(\d+)\s*\)", re.IGNORECASE)
while not _sent_monitor_stop.is_set():
try:
st = _uac.get_api_watchdog_state() if hasattr(_uac, 'get_api_watchdog_state') else {}
entries = st.get('in_flight_entries', []) if isinstance(st, dict) else []
if not isinstance(entries, list):
entries = []
for e in entries:
if not isinstance(e, dict):
continue
if e.get('status') != 'in_flight':
continue
ctx = e.get('context') or e.get('label') or ''
m = rx.search(str(ctx))
if not m:
continue
pos = int(m.group(1))
tot = int(m.group(2))
if tot == int(total_chunks or 0) and 1 <= pos <= tot:
if pos not in sent_chunks:
sent_chunks.add(pos)
# Update status file periodically
_write_glossary_status(_status_snapshot(in_flight_count=int(st.get('in_flight', 0) or 0) if isinstance(st, dict) else 0))
except Exception:
pass
time.sleep(0.1)
try:
t_mon = threading.Thread(target=_sent_monitor, name="GlossarySentMonitor", daemon=True)
t_mon.start()
except Exception:
t_mon = None
# Initialize status file early
_write_glossary_status(_status_snapshot(in_flight_count=0))
def _submit_one(executor, pos, chunk_idx, chunk_text, *, last_submission_time: float):
if is_stop_requested():
return None
# Apply thread submission delay
if thread_delay > 0 and last_submission_time > 0:
time_since_last = time.time() - last_submission_time
if time_since_last < thread_delay:
sleep_time = thread_delay - time_since_last
print(f"🧵 Thread delay: {sleep_time:.1f}s for chunk {chunk_idx}")
time.sleep(sleep_time)
fut = executor.submit(
_extract_with_custom_prompt,
custom_prompt, chunk_text, language,
min_frequency, max_names, max_titles,
None, output_dir, strip_honorifics,
fuzzy_threshold, filter_mode, max_sentences,
log_callback=None,
chunk_pos=pos,
total_chunks=total_chunks,
)
return fut
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {} # future -> chunk_idx
last_submission_time = 0.0
# Prime the worker pool
while pending and len(futures) < max_workers and not is_stop_requested():
chunk_idx, chunk_text = pending.pop(0)
fut = _submit_one(executor, next_pos, chunk_idx, chunk_text, last_submission_time=last_submission_time)
if fut is False or fut is None:
break
futures[fut] = chunk_idx
executor_submitted += 1
next_pos += 1
last_submission_time = time.time()
_write_glossary_status(_status_snapshot(in_flight_count=len(futures)))
escalated_full_stop = False
def _escalate_to_full_stop(reason: str) -> None:
nonlocal escalated_full_stop
if escalated_full_stop:
return
escalated_full_stop = True
try:
print(f"🛑 Escalating to FULL STOP (glossary batch): {reason}")
except Exception:
pass
# Disable graceful semantics locally so unified_api_client cancels quickly.
try:
os.environ['GRACEFUL_STOP'] = '0'
os.environ['WAIT_FOR_CHUNKS'] = '0'
except Exception:
pass
# Force unified_api_client cancellation if available.
try:
import unified_api_client
if hasattr(unified_api_client, 'set_stop_flag'):
unified_api_client.set_stop_flag(True)
if hasattr(unified_api_client, 'global_stop_flag'):
unified_api_client.global_stop_flag = True
if hasattr(unified_api_client, 'UnifiedClient'):
unified_api_client.UnifiedClient._global_cancelled = True
except Exception:
pass
while futures:
# On stop:
# - If not graceful: immediate stop (cancel queued work).
# - If graceful + WAIT_FOR_CHUNKS=1: stop submitting new but keep waiting for in-flight.
# - If graceful + WAIT_FOR_CHUNKS=0: ONLY keep waiting if all chunks were already submitted;
# otherwise escalate to full stop.
if is_stop_requested():
# IMPORTANT: "all sent" means every chunk call has transitioned to in-flight (post delay/stagger).
all_sent_now = (int(total_chunks or 0) > 0 and len(sent_chunks) >= int(total_chunks or 0))
if graceful_stop and (not wait_for_chunks) and (not all_sent_now):
_escalate_to_full_stop("stop requested before all chunks were sent to API")
if (not graceful_stop) or escalated_full_stop:
try:
for fut in list(futures.keys()):
fut.cancel()
except Exception:
pass
# Do not keep waiting if we're full-stopping.
break
# Graceful stop: keep waiting only if WAIT_FOR_CHUNKS=1 OR all chunks already sent.
if graceful_stop and (wait_for_chunks or all_sent_now):
# no-op: just continue waiting for done futures
pass
else:
# Graceful stop without waiting semantics -> treat as immediate stop.
try:
for fut in list(futures.keys()):
fut.cancel()
except Exception:
pass
break
done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED)
for fut in done:
chunk_idx = futures.pop(fut, None)
if chunk_idx is None:
continue
# Collect result (even if stop was requested; it may have completed before cancellation)
try:
chunk_glossary = fut.result()
print(f"📑 DEBUG: Chunk {chunk_idx} returned type={type(chunk_glossary)}, len={len(chunk_glossary)}")
# Normalize to CSV lines (without header)
chunk_lines = []
if isinstance(chunk_glossary, dict):
for raw_name, translated_name in chunk_glossary.items():
entry_type = "character" if _has_honorific(raw_name) else "term"
chunk_lines.append(f"{entry_type},{raw_name},{translated_name}")
elif isinstance(chunk_glossary, list):
for line in chunk_glossary:
if line and not line.startswith('type,'):
chunk_lines.append(line)
# Aggregate for end-of-run
all_csv_lines.extend(chunk_lines)
# Incremental writes (best-effort)
try:
_incremental_update_glossary(output_dir, chunk_idx, chunk_lines, strip_honorifics, language, filter_mode)
print(f"📑 Incremental write: chunk {chunk_idx} (+{len(chunk_lines)} entries)")
except Exception as e2:
print(f"⚠️ Incremental write failed: {e2}")
completed_chunks += 1
completed_chunks_local += 1
progress_percent = (completed_chunks / total_chunks) * 100 if total_chunks else 100
print(f"📑 Progress: {completed_chunks}/{total_chunks} chunks ({progress_percent:.0f}%)")
print(f"📑 Chunk {chunk_idx} completed and aggregated")
except Exception as e:
print(f"⚠️ API call for chunk {chunk_idx} failed: {e}")
completed_chunks += 1
progress_percent = (completed_chunks / total_chunks) * 100 if total_chunks else 100
print(f"📑 Progress: {completed_chunks}/{total_chunks} chunks ({progress_percent:.0f}%)")
# Submit next work only if not stopping
while pending and len(futures) < max_workers and not is_stop_requested():
next_chunk_idx, next_chunk_text = pending.pop(0)
fut2 = _submit_one(executor, next_pos, next_chunk_idx, next_chunk_text, last_submission_time=last_submission_time)
if fut2 is False or fut2 is None:
pending.clear()
break
futures[fut2] = next_chunk_idx
executor_submitted += 1
next_pos += 1
last_submission_time = time.time()
_write_glossary_status(_status_snapshot(in_flight_count=len(futures)))
# Update status after processing completions
_write_glossary_status(_status_snapshot(in_flight_count=len(futures)))
# CHANGE: Return CSV lines instead of dictionary
# Stop sent-monitor thread
try:
_sent_monitor_stop.set()
except Exception:
pass
# Restore per-chunk filter disabling envs
if _prev_filtered is None:
os.environ.pop("_CHUNK_ALREADY_FILTERED", None)
else:
os.environ["_CHUNK_ALREADY_FILTERED"] = _prev_filtered
if _prev_force_disable is None:
os.environ.pop("GLOSSARY_FORCE_DISABLE_SMART_FILTER", None)
else:
os.environ["GLOSSARY_FORCE_DISABLE_SMART_FILTER"] = _prev_force_disable
# Restore previous defer setting
if _prev_defer is None:
# Default back to not deferring if it wasn't set
if "GLOSSARY_DEFER_SAVE" in os.environ:
del os.environ["GLOSSARY_DEFER_SAVE"]
else:
os.environ["GLOSSARY_DEFER_SAVE"] = _prev_defer
# If we are exiting due to a stop request, clear watchdog state/file so GUI doesn't stay "busy".
if is_stop_requested():
try:
_clear_api_watchdog_state(remove_watchdog_file=True)
except Exception:
pass
return all_csv_lines
def _incremental_update_glossary(output_dir, chunk_idx, chunk_lines, strip_honorifics, language, filter_mode):
"""Incrementally update glossary output.
Creates per-chunk CSV snapshots in an "incremental_glossary" subfolder:
glossary.incremental1.csv, glossary.incremental2.csv, ...
Also maintains a combined aggregator file (glossary.incremental.all.csv)
that save_glossary() can use as a crash-safe backup.
"""
if not chunk_lines:
return
# Respect stop flag to avoid writing partial files after cancellation
if is_stop_requested():
return
# Incremental output directory
incremental_dir = os.path.join(output_dir, "incremental_glossary")
os.makedirs(incremental_dir, exist_ok=True)
# Per-chunk snapshot path (no merging, just this chunk)
chunk_filename = f"glossary.incremental{chunk_idx}.csv"
chunk_path = os.path.join(incremental_dir, chunk_filename)
# Combined aggregator path (append-only) and visible glossary path (merged)
agg_path = os.path.join(incremental_dir, "glossary.incremental.all.csv")
vis_path = os.path.join(output_dir, "glossary.csv")
# Ensure main output dir exists
os.makedirs(output_dir, exist_ok=True)
# Compose CSV lines for this chunk
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
include_description = os.getenv("GLOSSARY_INCLUDE_DESCRIPTION", "0") == "1"
header = "type,raw_name,translated_name"
if include_description:
header += ",gender,description"
elif include_gender_context:
header += ",gender"
new_csv_lines = [header] + chunk_lines
# Save per-chunk snapshot (no merging)
_atomic_write_file(chunk_path, "\n".join(new_csv_lines))
# Append to aggregator (raw append, no merging/deduping to preserve full history)
# Use lock to prevent concurrent appends - use proper file locking/flushing
with _file_write_lock:
try:
# Force close/reopen to ensure flush
# Read first to check header
file_exists = os.path.exists(agg_path)
with open(agg_path, 'a', encoding='utf-8') as f:
# If new file, write header
if not file_exists:
f.write(header + "\n")
# Append chunks
if chunk_lines:
content_to_write = "\n".join(chunk_lines) + "\n"
f.write(content_to_write)
# Force flush to disk
f.flush()
os.fsync(f.fileno())
except Exception as e:
print(f"⚠️ Failed to append to incremental aggregator: {e}")
# Update visible glossary.csv (merged and deduped)
# DISABLED: Per user request, we only do this at the very end to save performance
# The incremental_glossary folder maintains the safety backup
# existing_csv = None
# if os.path.exists(agg_path):
# try:
# with open(agg_path, 'r', encoding='utf-8') as f:
# existing_csv = f.read()
# except Exception as e:
# print(f"⚠️ Incremental: cannot read aggregator: {e}")
# Merge (exact merge, no fuzzy to keep this fast)
# Note: _merge_csv_entries handles deduplication
# We pass empty string as 'new' content because existing_csv already contains everything (from append above)
# Actually, _merge_csv_entries merges two CSV strings. existing_csv is the full raw history.
# If we pass it as 'base', it will clean it up.
# merged_csv_lines = _merge_csv_entries([], existing_csv or "", strip_honorifics, language)
# Optional filter mode
# merged_csv_lines = _filter_csv_by_mode(merged_csv_lines, filter_mode)
# Convert to token-efficient format for visible glossary.csv
# token_lines = _convert_to_token_efficient_format(merged_csv_lines)
# token_lines = _sanitize_final_glossary_lines(token_lines, use_legacy_format=False)
# _atomic_write_file(vis_path, "\n".join(token_lines))
def _process_single_chunk(chunk_idx, chunk_text, custom_prompt, language,
min_frequency, max_names, max_titles, batch_size,
output_dir, strip_honorifics, fuzzy_threshold, filter_mode,
already_filtered=False, max_sentences=200):
"""Process a single chunk - wrapper for parallel execution"""
print(f"📑 Worker processing chunk {chunk_idx} ({len(chunk_text):,} chars)...")
if custom_prompt:
# Pass flag to indicate if text is already filtered
os.environ["_CHUNK_ALREADY_FILTERED"] = "1" if already_filtered else "0"
_prev_defer = os.getenv("GLOSSARY_DEFER_SAVE")
os.environ["GLOSSARY_DEFER_SAVE"] = "1"
try:
result = _extract_with_custom_prompt(
custom_prompt, chunk_text, language,
min_frequency, max_names, max_titles,
None, output_dir,
strip_honorifics, fuzzy_threshold, filter_mode, max_sentences, log_callback=None
)
finally:
os.environ["_CHUNK_ALREADY_FILTERED"] = "0" # Reset
if _prev_defer is None:
if "GLOSSARY_DEFER_SAVE" in os.environ:
del os.environ["GLOSSARY_DEFER_SAVE"]
else:
os.environ["GLOSSARY_DEFER_SAVE"] = _prev_defer
return result
else:
# Pattern fallback disabled
print("📑 AUTO_GLOSSARY_PROMPT is empty - skipping chunk glossary extraction (pattern fallback disabled)")
return {}
def _apply_final_filter(entries, filter_mode):
"""Apply final filtering based on mode to ensure only requested types are included"""
if filter_mode == "only_with_honorifics":
# Filter to keep only entries that look like they have honorifics
filtered = {}
for key, value in entries.items():
# Check if the key contains known honorific patterns
if _has_honorific(key):
filtered[key] = value
print(f"📑 Final filter: Kept {len(filtered)} entries with honorifics (from {len(entries)} total)")
return filtered
elif filter_mode == "only_without_honorifics":
# Filter to keep only entries without honorifics
filtered = {}
for key, value in entries.items():
if not _has_honorific(key):
filtered[key] = value
print(f"📑 Final filter: Kept {len(filtered)} entries without honorifics (from {len(entries)} total)")
return filtered
else:
return entries
def _looks_like_name(text):
"""Check if text looks like a character name"""
if not text:
return False
# Check for various name patterns
# Korean names (2-4 hangul characters)
if all(0xAC00 <= ord(char) <= 0xD7AF for char in text) and 2 <= len(text) <= 4:
return True
# Japanese names (mix of kanji/kana, 2-6 chars)
has_kanji = any(0x4E00 <= ord(char) <= 0x9FFF for char in text)
has_kana = any((0x3040 <= ord(char) <= 0x309F) or (0x30A0 <= ord(char) <= 0x30FF) for char in text)
if (has_kanji or has_kana) and 2 <= len(text) <= 6:
return True
# Chinese names (EXPANDED: 2-6 Chinese characters for cultivation novels)
if all(0x4E00 <= ord(char) <= 0x9FFF for char in text) and 2 <= len(text) <= 6:
# 1. Check if it matches specific Chinese name patterns (Courtesy Name, Generation Name)
if hasattr(PM, 'CHINESE_NAME_PATTERNS'):
# Courtesy names (e.g. "Lu Bozi")
if 'courtesy_names' in PM.CHINESE_NAME_PATTERNS:
for pattern in PM.CHINESE_NAME_PATTERNS['courtesy_names']:
if re.match(pattern, text):
return True
# Generation names (middle character matches generation list)
if len(text) == 3 and 'generation_names' in PM.CHINESE_NAME_PATTERNS:
if text[1] in PM.CHINESE_NAME_PATTERNS['generation_names']:
return True
# Title prefixes (e.g. "Old Li", "Little Wang")
if 'title_prefixes' in PM.CHINESE_NAME_PATTERNS:
if text[0] in PM.CHINESE_NAME_PATTERNS['title_prefixes']:
return True
# 2. Check if it starts with a known surname (1 or 2 chars)
if len(text) >= 2:
# Check single-char surname
if text[0] in PM.CHINESE_SINGLE_SURNAMES:
return True
# Check two-char compound surname
if len(text) >= 3 and text[:2] in PM.CHINESE_COMPOUND_SURNAMES:
return True
# 3. Even without surname match, if it's 2-6 chars it could be a valid term
return True
# English names (starts with capital, mostly letters)
if text[0].isupper() and sum(1 for c in text if c.isalpha()) >= len(text) * 0.8:
return True
return False
def _has_honorific(term):
"""Check if a term contains an honorific using PatternManager's comprehensive list"""
if not term:
return False
term_lower = term.lower()
# Check all language honorifics from PatternManager
for language, honorifics_list in PM.CJK_HONORIFICS.items():
for honorific in honorifics_list:
# For romanized/English honorifics with spaces or dashes
if honorific.startswith(' ') or honorific.startswith('-'):
if term_lower.endswith(honorific.lower()):
return True
# For CJK honorifics (no separator)
else:
if honorific in term:
return True
return False
def _strip_all_honorifics(term, language='korean'):
"""Strip all honorifics from a term using PatternManager's lists"""
if not term:
return term
result = term
# Get honorifics for the specific language and English romanizations
honorifics_to_strip = []
if language in PM.CJK_HONORIFICS:
honorifics_to_strip.extend(PM.CJK_HONORIFICS[language])
honorifics_to_strip.extend(PM.CJK_HONORIFICS.get('english', []))
# Sort by length (longest first) to avoid partial matches
honorifics_to_strip.sort(key=len, reverse=True)
# Strip honorifics
for honorific in honorifics_to_strip:
if honorific.startswith(' ') or honorific.startswith('-'):
# For romanized honorifics with separators
if result.lower().endswith(honorific.lower()):
result = result[:-len(honorific)]
else:
# For CJK honorifics (no separator)
if result.endswith(honorific):
result = result[:-len(honorific)]
return result.strip()
def _convert_to_csv_format(data):
"""Convert various glossary formats to CSV string format with enforced 3 columns"""
csv_lines = ["type,raw_name,translated_name"]
if isinstance(data, str):
# Already CSV string
if data.strip().startswith('type,raw_name'):
return data
# Try to parse as JSON
try:
data = json.loads(data)
except:
return data
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
if 'type' in item and 'raw_name' in item:
# Already in correct format
line = f"{item['type']},{item['raw_name']},{item.get('translated_name', item['raw_name'])}"
csv_lines.append(line)
else:
# Old format - default to 'term' type
entry_type = 'term'
raw_name = item.get('original_name', '')
translated_name = item.get('name', raw_name)
if raw_name and translated_name:
csv_lines.append(f"{entry_type},{raw_name},{translated_name}")
elif isinstance(data, dict):
if 'entries' in data:
# Has metadata wrapper, extract entries
for original, translated in data['entries'].items():
csv_lines.append(f"term,{original},{translated}")
else:
# Plain dictionary - default to 'term' type
for original, translated in data.items():
csv_lines.append(f"term,{original},{translated}")
return '\n'.join(csv_lines)
def _parse_csv_to_dict(csv_content):
"""Parse CSV content to dictionary for backward compatibility"""
result = {}
lines = csv_content.strip().split('\n')
for line in lines[1:]: # Skip header
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 3:
result[parts[1]] = parts[2] # raw_name -> translated_name
return result
def _fuzzy_match(term1, term2, threshold=0.90):
"""Check if two terms match using fuzzy matching"""
ratio = SequenceMatcher(None, term1.lower(), term2.lower()).ratio()
return ratio >= threshold
def _fuzzy_match_rapidfuzz(term_lower, text_lower, threshold, term_len):
"""Use rapidfuzz library for MUCH faster fuzzy matching"""
from rapidfuzz import fuzz
print(f"📑 Using RapidFuzz (C++ speed)...")
start_time = time.time()
matches_count = 0
threshold_percent = threshold * 100 # rapidfuzz uses 0-100 scale
# Can use smaller step because rapidfuzz is so fast
step = 1 # Check every position - rapidfuzz can handle it
# Process text
for i in range(0, len(text_lower) - term_len + 1, step):
# Check stop flag every 10000 positions
if i > 0 and i % 10000 == 0:
if is_stop_requested():
print(f"📑 RapidFuzz stopped at position {i}")
return matches_count
window = text_lower[i:i + term_len]
# rapidfuzz is fast enough we can check every position
if fuzz.ratio(term_lower, window) >= threshold_percent:
matches_count += 1
elapsed = time.time() - start_time
print(f"📑 RapidFuzz found {matches_count} matches in {elapsed:.2f}s")
return matches_count
def _batch_compute_frequencies(terms, all_text, fuzzy_threshold=0.90, min_frequency=2):
"""Compute frequencies for all terms at once - MUCH faster than individual checking"""
print(f"📑 Computing frequencies for {len(terms)} terms in batch mode...")
start_time = time.time()
# Result dictionary
term_frequencies = {}
# First pass: exact matching (very fast)
print(f"📑 Phase 1: Exact matching...")
text_lower = all_text.lower()
for term in terms:
if is_stop_requested():
return term_frequencies
term_lower = term.lower()
count = text_lower.count(term_lower)
term_frequencies[term] = count
exact_time = time.time() - start_time
high_freq_terms = sum(1 for count in term_frequencies.values() if count >= min_frequency)
print(f"📑 Exact matching complete: {high_freq_terms}/{len(terms)} terms meet threshold ({exact_time:.1f}s)")
# If fuzzy matching is disabled, we're done
if fuzzy_threshold >= 1.0:
return term_frequencies
# Second pass: fuzzy matching ONLY for low-frequency terms
low_freq_terms = [term for term, count in term_frequencies.items() if count < min_frequency]
if low_freq_terms:
print(f"📑 Phase 2: Fuzzy matching for {len(low_freq_terms)} low-frequency terms...")
# Try to use RapidFuzz batch processing
try:
from rapidfuzz import process, fuzz
# For very large texts, sample it for fuzzy matching
if len(text_lower) > 500000:
print(f"📑 Text too large ({len(text_lower):,} chars), sampling for fuzzy matching...")
# Sample every Nth character to reduce size
sample_rate = max(1, len(text_lower) // 100000)
sampled_text = text_lower[::sample_rate]
else:
sampled_text = text_lower
# Create chunks of text for fuzzy matching
chunk_size = 1000 # Process text in chunks
text_chunks = [sampled_text[i:i+chunk_size] for i in range(0, len(sampled_text), chunk_size//2)] # Overlapping chunks
print(f"📑 Processing {len(text_chunks)} text chunks...")
threshold_percent = fuzzy_threshold * 100
# Process in batches to avoid memory issues
batch_size = 100 # Process 100 terms at a time
for batch_start in range(0, len(low_freq_terms), batch_size):
if is_stop_requested():
break
batch_end = min(batch_start + batch_size, len(low_freq_terms))
batch_terms = low_freq_terms[batch_start:batch_end]
for term in batch_terms:
if is_stop_requested():
break
# Quick fuzzy search in chunks
fuzzy_count = 0
for chunk in text_chunks[:50]: # Limit to first 50 chunks for speed
if fuzz.partial_ratio(term.lower(), chunk) >= threshold_percent:
fuzzy_count += 1
if fuzzy_count > 0:
# Scale up based on sampling
if len(text_lower) > 500000:
fuzzy_count *= (len(text_lower) // len(sampled_text))
term_frequencies[term] += fuzzy_count
if (batch_end % 500 == 0) or (batch_end == len(low_freq_terms)):
elapsed = time.time() - start_time
print(f"📑 Processed {batch_end}/{len(low_freq_terms)} terms ({elapsed:.1f}s)")
except ImportError:
print("📑 RapidFuzz not available, skipping fuzzy matching")
total_time = time.time() - start_time
final_high_freq = sum(1 for count in term_frequencies.values() if count >= min_frequency)
print(f"📑 Batch frequency computation complete: {final_high_freq}/{len(terms)} terms accepted ({total_time:.1f}s)")
return term_frequencies
def _find_fuzzy_matches(term, text, threshold=0.90):
"""Find fuzzy matches of a term in text using efficient method with parallel processing"""
start_time = time.time()
term_lower = term.lower()
text_lower = text.lower()
term_len = len(term)
# Only log for debugging if explicitly enabled
debug_search = os.getenv("GLOSSARY_DEBUG_SEARCH", "0") == "1"
if debug_search and len(text) > 100000:
print(f"📑 Searching for '{term}' in {len(text):,} chars (threshold: {threshold})")
# Strategy 1: Use exact matching first for efficiency
exact_start = time.time()
matches_count = text_lower.count(term_lower)
exact_time = time.time() - exact_start
if matches_count > 0:
if debug_search and len(text) > 100000:
print(f"📑 Found {matches_count} exact matches in {exact_time:.3f}s")
return matches_count
# Strategy 2: Try rapidfuzz if available (much faster)
if matches_count == 0 and threshold < 1.0:
try:
from rapidfuzz import fuzz
return _fuzzy_match_rapidfuzz(term_lower, text_lower, threshold, term_len)
except ImportError:
pass # Fall back to parallel/sequential
# Strategy 3: Fall back to parallel/sequential if rapidfuzz not available
# Check if parallel processing is enabled
extraction_workers = int(os.getenv("EXTRACTION_WORKERS", "1"))
if extraction_workers > 1 and len(text) > 50000: # Use parallel for large texts
return _parallel_fuzzy_search(term_lower, text_lower, threshold, term_len, extraction_workers)
else:
return _sequential_fuzzy_search(term_lower, text_lower, threshold, term_len)
# Check if parallel processing is enabled
extraction_workers = int(os.getenv("EXTRACTION_WORKERS", "1"))
if extraction_workers > 1 and len(text) > 50000: # Use parallel for large texts
return _parallel_fuzzy_search(term_lower, text_lower, threshold, term_len, extraction_workers)
else:
return _sequential_fuzzy_search(term_lower, text_lower, threshold, term_len)
return matches_count
def _parallel_fuzzy_search(term_lower, text_lower, threshold, term_len, num_workers):
"""Parallel fuzzy search using ThreadPoolExecutor"""
print(f"📑 Starting parallel fuzzy search with {num_workers} workers...")
text_len = len(text_lower)
matches_count = 0
# Split text into overlapping chunks for parallel processing
chunk_size = max(text_len // num_workers, term_len * 100)
chunks = []
for i in range(0, text_len, chunk_size):
# Add overlap to avoid missing matches at boundaries
end = min(i + chunk_size + term_len - 1, text_len)
chunks.append((i, text_lower[i:end]))
print(f"📑 Split into {len(chunks)} chunks of ~{chunk_size:,} chars each")
# Process chunks in parallel
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = []
for chunk_idx, (start_pos, chunk_text) in enumerate(chunks):
if is_stop_requested():
return matches_count
future = executor.submit(
_fuzzy_search_chunk,
term_lower, chunk_text, threshold, term_len, chunk_idx, len(chunks)
)
futures.append(future)
# Collect results
for future in as_completed(futures):
if is_stop_requested():
executor.shutdown(wait=False)
return matches_count
try:
chunk_matches = future.result()
matches_count += chunk_matches
except Exception as e:
print(f"📑 ⚠️ Chunk processing error: {e}")
print(f"📑 Parallel fuzzy search found {matches_count} matches")
return matches_count
def _fuzzy_search_chunk(term_lower, chunk_text, threshold, term_len, chunk_idx, total_chunks):
"""Process a single chunk for fuzzy matches"""
chunk_matches = 0
# Use a more efficient step size - no need to check every position
step = max(1, term_len // 3) # Check every third of term length
for i in range(0, len(chunk_text) - term_len + 1, step):
# Check stop flag periodically
if i > 0 and i % 1000 == 0:
if is_stop_requested():
return chunk_matches
window = chunk_text[i:i + term_len]
# Use SequenceMatcher for fuzzy matching
if SequenceMatcher(None, term_lower, window).ratio() >= threshold:
chunk_matches += 1
# Log progress for this chunk
if total_chunks > 1:
print(f"📑 Chunk {chunk_idx + 1}/{total_chunks} completed: {chunk_matches} matches")
return chunk_matches
def _sequential_fuzzy_search(term_lower, text_lower, threshold, term_len):
"""Sequential fuzzy search (fallback for small texts or single worker)"""
print(f"📑 Starting sequential fuzzy search...")
fuzzy_start = time.time()
matches_count = 0
# More efficient step size
step = max(1, term_len // 3)
total_windows = (len(text_lower) - term_len + 1) // step
print(f"📑 Checking ~{total_windows:,} windows with step size {step}")
windows_checked = 0
for i in range(0, len(text_lower) - term_len + 1, step):
# Check stop flag frequently
if i > 0 and i % (step * 100) == 0:
if is_stop_requested():
return matches_count
# Progress log for very long operations
if windows_checked % 1000 == 0 and windows_checked > 0:
elapsed = time.time() - fuzzy_start
rate = windows_checked / elapsed if elapsed > 0 else 0
eta = (total_windows - windows_checked) / rate if rate > 0 else 0
print(f"📑 Progress: {windows_checked}/{total_windows} windows, {rate:.0f} w/s, ETA: {eta:.1f}s")
window = text_lower[i:i + term_len]
if SequenceMatcher(None, term_lower, window).ratio() >= threshold:
matches_count += 1
windows_checked += 1
fuzzy_time = time.time() - fuzzy_start
print(f"📑 Sequential fuzzy search completed in {fuzzy_time:.2f}s, found {matches_count} matches")
return matches_count
def _fuzzy_match(term1, term2, threshold=0.90):
"""Check if two terms match using fuzzy matching (unchanged)"""
ratio = SequenceMatcher(None, term1.lower(), term2.lower()).ratio()
return ratio >= threshold
def _strip_honorific(term, language_hint='unknown'):
"""Strip honorific from a term if present"""
if not term:
return term
# Get honorifics for the detected language
honorifics_to_check = []
if language_hint in PM.CJK_HONORIFICS:
honorifics_to_check.extend(PM.CJK_HONORIFICS[language_hint])
honorifics_to_check.extend(PM.CJK_HONORIFICS.get('english', []))
# Check and remove honorifics
for honorific in honorifics_to_check:
if honorific.startswith('-') or honorific.startswith(' '):
# English-style suffix
if term.endswith(honorific):
return term[:-len(honorific)].strip()
else:
# CJK-style suffix (no separator)
if term.endswith(honorific):
return term[:-len(honorific)]
return term
def _filter_text_for_glossary(text, min_frequency=2, max_sentences=None):
"""Filter text to extract only meaningful content for glossary extraction
Args:
text: Input text to filter
min_frequency: Minimum frequency threshold for terms
max_sentences: Maximum number of sentences to return (reads from env if None)
"""
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
filter_start_time = time.time()
print(f"📑 Starting smart text filtering...")
print(f"📑 Input text size: {len(text):,} characters")
# Dynamic character coverage flag (must be defined before any early checks)
include_all_characters_env = os.getenv("GLOSSARY_INCLUDE_ALL_CHARACTERS", "0")
include_all_characters = include_all_characters_env == "1"
force_skip_smart_selection = False
honorific_first_indices = {}
# Clean HTML if present
print(f"📑 Step 1/7: Cleaning HTML tags...")
from bs4 import BeautifulSoup
soup = BeautifulSoup(text, 'html.parser')
clean_text = soup.get_text()
print(f"📑 Clean text size: {len(clean_text):,} characters")
# Detect primary language for better filtering
print(f"📑 Step 2/7: Detecting primary language...")
def detect_primary_language(text_sample):
sample = text_sample[:1000]
korean_chars = sum(1 for char in sample if 0xAC00 <= ord(char) <= 0xD7AF)
japanese_kana = sum(1 for char in sample if (0x3040 <= ord(char) <= 0x309F) or (0x30A0 <= ord(char) <= 0x30FF))
chinese_chars = sum(1 for char in sample if 0x4E00 <= ord(char) <= 0x9FFF)
# Check gender pronouns as secondary indicator if character counts are ambiguous
if korean_chars == 0 and japanese_kana == 0 and chinese_chars > 0:
# Distinguish Chinese vs Kanji-heavy Japanese using pronouns
if hasattr(PM, 'GENDER_PRONOUNS'):
# Check Chinese pronouns
chinese_pronouns = PM.GENDER_PRONOUNS.get('chinese', {}).get('male', []) + \
PM.GENDER_PRONOUNS.get('chinese', {}).get('female', [])
for p in chinese_pronouns:
if p in sample:
return 'chinese'
# Check Japanese pronouns
japanese_pronouns = PM.GENDER_PRONOUNS.get('japanese', {}).get('male', []) + \
PM.GENDER_PRONOUNS.get('japanese', {}).get('female', [])
for p in japanese_pronouns:
if p in sample:
return 'japanese'
if korean_chars > 50:
return 'korean'
elif japanese_kana > 20:
return 'japanese'
elif chinese_chars > 50 and japanese_kana < 10:
return 'chinese'
else:
return 'english'
primary_lang = detect_primary_language(clean_text)
print(f"📑 Detected primary language: {primary_lang}")
# Safety guard: ensure flag exists even if subprocess reload missed earlier assignment
try:
include_gender_context_flag
except NameError:
include_gender_context_flag = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
# Gender pronouns for optional gender-context filtering in early captures
gender_pronouns = []
if include_gender_context_flag and hasattr(PM, "GENDER_PRONOUNS"):
lang_key = "english"
if primary_lang == "korean":
lang_key = "korean"
elif primary_lang == "chinese":
lang_key = "chinese"
elif primary_lang == "japanese":
lang_key = "japanese"
gp = PM.GENDER_PRONOUNS.get(lang_key, {})
gender_pronouns = gp.get("male", []) + gp.get("female", [])
# Split into sentences for better context
print(f"📁 Step 3/7: Splitting text into sentences...")
# Use language-specific sentence splitting for better accuracy
if primary_lang == 'chinese':
# Split on major punctuation, but keep 、 and , within sentences
# This preserves more context for Chinese cultivation/wuxia terms
sentences = re.split(r'[。!?;:]+', clean_text)
else:
sentences = re.split(r'[.!?。!?]+', clean_text)
print(f"📁 Found {len(sentences):,} sentences")
# Extract potential terms (words/phrases that appear multiple times)
print(f"📑 Step 4/7: Setting up extraction patterns and exclusion rules...")
word_freq = Counter()
# Pattern for detecting potential names/terms based on capitalization or special characters
# Korean names: 2-4 hangul characters WITHOUT honorifics
korean_pattern = r'[가-힣]{2,4}'
# Japanese names: kanji/hiragana/katakana combinations
japanese_pattern = r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]{2,6}'
# Chinese names: EXPANDED to 2-8 characters for cultivation/wuxia novels
# This captures longer compound names, titles, and cultivation terms
chinese_pattern = r'[\u4e00-\u9fff]{2,8}'
# English proper nouns: Capitalized words
english_pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b'
# Combine patterns
combined_pattern = f'({korean_pattern}|{japanese_pattern}|{chinese_pattern}|{english_pattern})'
print(f"📑 Using combined regex pattern for {primary_lang} text")
# Get honorifics and title patterns for the detected language
honorifics_to_exclude = set()
if primary_lang in PM.CJK_HONORIFICS:
honorifics_to_exclude.update(PM.CJK_HONORIFICS[primary_lang])
# Also add English romanizations
honorifics_to_exclude.update(PM.CJK_HONORIFICS.get('english', []))
# Compile title patterns for the language
title_patterns = []
if primary_lang in PM.TITLE_PATTERNS:
for pattern in PM.TITLE_PATTERNS[primary_lang]:
title_patterns.append(re.compile(pattern))
# Function to check if a term should be excluded
def should_exclude_term(term):
term_lower = term.lower()
# Check if it's a common word
if term in PM.COMMON_WORDS or term_lower in PM.COMMON_WORDS:
return True
# Check if it contains honorifics
for honorific in honorifics_to_exclude:
if honorific in term or (honorific.startswith('-') and term.endswith(honorific[1:])):
return True
# Check if it matches title patterns
for pattern in title_patterns:
if pattern.search(term):
return True
# Check if it's a number (including Chinese numbers)
if term in PM.CHINESE_NUMS:
return True
# Check if it's just digits
if term.isdigit():
return True
# For Chinese text, INCLUDE domain-specific terms (don't exclude them)
if primary_lang == 'chinese' and len(term) >= 2:
# Check if it's a cultivation term - these should NOT be excluded
for category in PM.CHINESE_CULTIVATION_TERMS.values():
if term in category:
return False # Keep cultivation terms!
# Check if it's a wuxia term - these should NOT be excluded
for category in PM.CHINESE_WUXIA_TERMS.values():
if term in category:
return False # Keep wuxia terms!
# Check relationship terms (important character relationships)
for category in PM.CHINESE_RELATIONSHIP_TERMS.values():
if term in category:
return False # Keep relationship terms!
# Check mythological terms (creatures, artifacts, legendary beings)
for category in PM.CHINESE_MYTHOLOGICAL_TERMS.values():
if term in category:
return False # Keep mythological terms!
# Check elemental/natural force terms
for category in PM.CHINESE_ELEMENTAL_TERMS.values():
if term in category:
return False # Keep elemental terms!
# Check physique/spiritual root terms
for category in PM.CHINESE_PHYSIQUE_TERMS.values():
if term in category:
return False # Keep physique terms!
# Check treasure grades
for category in PM.CHINESE_TREASURE_GRADES.values():
if term in category:
return False # Keep treasure grade terms!
# Check power system terms (levels, stars, etc.)
for category in PM.CHINESE_POWER_SYSTEMS.values():
if term in category:
return False # Keep power system terms!
# Check location types
for category in PM.CHINESE_LOCATION_TYPES.values():
if term in category:
return False # Keep location terms!
# Check battle terms
for category in PM.CHINESE_BATTLE_TERMS.values():
if term in category:
return False # Keep battle terms!
# Check novel terms (common raw Chinese terms)
if hasattr(PM, 'CHINESE_NOVEL_TERMS'):
for category in PM.CHINESE_NOVEL_TERMS.values():
if term in category:
return False
return False
# Extract potential terms from each sentence
print(f"📑 Step 5/7: Extracting and filtering terms from sentences...")
# Check if we should use parallel processing
extraction_workers = int(os.getenv("EXTRACTION_WORKERS", "1"))
# Auto-detect optimal workers if not set
if extraction_workers == 1 and len(sentences) > 1000:
# Use more cores for better parallelization
cpu_count = os.cpu_count() or 4
extraction_workers = min(cpu_count, 12) # Use up to 12 cores
print(f"📑 Auto-detected {cpu_count} CPU cores, using {extraction_workers} workers")
use_parallel = extraction_workers > 1 and len(sentences) > 100
if use_parallel:
print(f"📑 Using parallel processing with {extraction_workers} workers")
print(f"📑 Estimated speedup: {extraction_workers}x faster")
important_sentences = []
seen_contexts = set()
processed_count = 0
total_sentences = len(sentences)
last_progress_time = time.time()
# Prepare gender context check
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
gender_nuance_enabled = include_gender_context and os.getenv("GLOSSARY_ENABLE_GENDER_NUANCE", "1") == "1"
gender_pronouns = []
if gender_nuance_enabled and hasattr(PM, 'GENDER_PRONOUNS'):
# Get pronouns for the detected language
lang_key = 'english'
if primary_lang == 'korean': lang_key = 'korean'
elif primary_lang == 'chinese': lang_key = 'chinese'
elif primary_lang == 'japanese': lang_key = 'japanese'
gender_pronouns.extend(PM.GENDER_PRONOUNS.get(lang_key, {}).get('male', []))
gender_pronouns.extend(PM.GENDER_PRONOUNS.get(lang_key, {}).get('female', []))
if gender_pronouns:
print(f"📑 Gender context enabled: scanning for pronouns in {lang_key}")
def process_sentence_batch(batch_sentences, batch_idx):
"""Process a batch of sentences"""
local_word_freq = Counter()
local_important = []
local_seen = set()
for sentence in batch_sentences:
sentence = sentence.strip()
if len(sentence) < 10 or len(sentence) > 500:
continue
# Check for gender pronouns if enabled - include sentence if pronoun found
has_pronoun = False
if gender_nuance_enabled and gender_pronouns:
for pronoun in gender_pronouns:
if pronoun in sentence:
has_pronoun = True
break
# Find all potential terms in this sentence
matches = re.findall(combined_pattern, sentence)
valid_term_found = False
if matches:
# Filter out excluded terms
for match in matches:
if not should_exclude_term(match):
local_word_freq[match] += 1
valid_term_found = True
# Keep sentence if it has valid terms OR contains a gender pronoun (for context)
# If include_gender_context is True, we include sentences with pronouns even if they don't have new terms,
# but ONLY if the pronouns match known characters. However, we don't know the characters yet.
# So, we include pronoun sentences to provide context for the LLM to infer gender.
if valid_term_found or (gender_nuance_enabled and has_pronoun):
sentence_key = sentence[:50] # Use prefix as key to avoid duplicates
if sentence_key not in local_seen:
local_important.append(sentence)
local_seen.add(sentence_key)
return local_word_freq, local_important, local_seen, batch_idx
if use_parallel:
# Force SMALL batches for real parallelization
# We want MANY small batches, not few large ones!
# Calculate based on total sentences
total_sentences = len(sentences)
# CRITICAL: Batch size must balance two factors:
# 1. Small batches = more parallelism but higher overhead
# 2. Large batches = less overhead but limits parallelism
#
# For Windows ProcessPoolExecutor, overhead is HIGH, so we prefer LARGE batches
# Target: Each worker should get 3-10 batches (not 100+ tiny batches)
# Calculate batch size based on workers to minimize overhead
target_batches_per_worker = 5 # Sweet spot: enough work distribution, minimal overhead
ideal_batch_size = max(500, total_sentences // (extraction_workers * target_batches_per_worker))
# Apply sensible limits
if total_sentences < 1000:
optimal_batch_size = 100 # Small dataset: normal batching
elif total_sentences < 10000:
optimal_batch_size = min(500, ideal_batch_size)
elif total_sentences < 50000:
optimal_batch_size = min(2000, ideal_batch_size)
elif total_sentences < 200000:
optimal_batch_size = min(5000, ideal_batch_size)
else:
# For 754K sentences with 12 workers:
# target_batches = 12 * 5 = 60 batches
# batch_size = 754K / 60 = ~12,500 sentences/batch
# This is MUCH better than 1887 batches of 400!
optimal_batch_size = min(20000, ideal_batch_size)
# Ensure we have enough batches for all workers
min_batches = extraction_workers * 3 # At least 3 batches per worker
max_batch_size = max(50, total_sentences // min_batches)
optimal_batch_size = min(optimal_batch_size, max_batch_size)
print(f"📑 Total sentences: {total_sentences:,}")
print(f"📑 Target batch size: {optimal_batch_size} sentences")
# Calculate expected number of batches
expected_batches = (total_sentences + optimal_batch_size - 1) // optimal_batch_size
print(f"📑 Expected batches: {expected_batches} (for {extraction_workers} workers)")
print(f"📑 Batches per worker: ~{expected_batches // extraction_workers} batches")
batches = [sentences[i:i + optimal_batch_size] for i in range(0, len(sentences), optimal_batch_size)]
print(f"📑 Processing {len(batches)} batches of ~{optimal_batch_size} sentences each")
print(f"📑 Expected speedup: {min(extraction_workers, len(batches))}x (using {extraction_workers} workers)")
# Decide between ThreadPoolExecutor and ProcessPoolExecutor
import multiprocessing
in_subprocess = multiprocessing.current_process().name != 'MainProcess'
# Use ProcessPoolExecutor for better parallelism on larger datasets
# On Windows, we CAN use ProcessPoolExecutor in subprocess with spawn context
use_process_pool = len(sentences) > 5000 # Remove subprocess check!
if use_process_pool:
# Check if we're in a daemonic process (can't spawn children)
is_daemon = multiprocessing.current_process().daemon if hasattr(multiprocessing.current_process(), 'daemon') else False
if in_subprocess and is_daemon:
# Daemonic processes can't spawn children - fall back to ThreadPoolExecutor
print(f"⚠️ Running in daemonic subprocess - cannot use ProcessPoolExecutor")
print(f"📁 Falling back to ThreadPoolExecutor (limited parallelism due to GIL)")
use_process_pool = False
executor_class = ThreadPoolExecutor
executor_kwargs = {'max_workers': extraction_workers}
use_mp_pool = False
else:
# We can use ProcessPoolExecutor
if in_subprocess:
print(f"📁 Using ProcessPoolExecutor in non-daemonic subprocess")
print(f"📁 This enables TRUE parallelism even from within a subprocess!")
else:
print(f"📁 Using ProcessPoolExecutor for maximum performance (true parallelism)")
mp_context = multiprocessing.get_context('spawn')
executor_class = mp_context.Pool
# Capture CURRENT environment variable values from parent process
current_env_vars = {
'GLOSSARY_MAX_SENTENCES': os.getenv('GLOSSARY_MAX_SENTENCES', '200'),
'GLOSSARY_MIN_FREQUENCY': os.getenv('GLOSSARY_MIN_FREQUENCY', '2'),
'GLOSSARY_MAX_NAMES': os.getenv('GLOSSARY_MAX_NAMES', '50'),
'GLOSSARY_MAX_TITLES': os.getenv('GLOSSARY_MAX_TITLES', '30'),
'GLOSSARY_BATCH_SIZE': os.getenv('GLOSSARY_BATCH_SIZE', '50'),
'GLOSSARY_STRIP_HONORIFICS': os.getenv('GLOSSARY_STRIP_HONORIFICS', '1'),
'GLOSSARY_FUZZY_THRESHOLD': os.getenv('GLOSSARY_FUZZY_THRESHOLD', '0.90'),
}
print(f"📁 Passing env vars to child processes: GLOSSARY_MAX_SENTENCES={current_env_vars['GLOSSARY_MAX_SENTENCES']}")
# For multiprocessing.Pool, we use different kwargs
# Use module-level init function (can't use local function due to pickling)
executor_kwargs = {
'processes': extraction_workers,
'initializer': _init_worker_with_env,
'initargs': (current_env_vars,)
}
use_mp_pool = True # Flag to use different API
else:
print(f"📁 Using ThreadPoolExecutor for sentence processing (dataset < 5000 sentences)")
executor_class = ThreadPoolExecutor
executor_kwargs = {'max_workers': extraction_workers}
use_mp_pool = False
# Handle multiprocessing.Pool vs concurrent.futures differently
if use_process_pool and use_mp_pool:
# Use multiprocessing.Pool API (map_async)
with executor_class(**executor_kwargs) as pool:
# Prepare data for process pool
exclude_check_data = (
list(honorifics_to_exclude),
[p.pattern for p in title_patterns],
PM.COMMON_WORDS,
PM.CHINESE_NUMS
)
# Prepare all arguments
all_args = [(batch, idx, combined_pattern, exclude_check_data)
for idx, batch in enumerate(batches)]
print(f"📁 Submitting {len(all_args)} batches to process pool...")
# Use map_async with chunksize for better distribution
# chunksize=1 means each worker gets one batch at a time
result_async = pool.map_async(_process_sentence_batch_for_extraction, all_args, chunksize=1)
# Poll for completion with progress estimates
completed_batches = 0
batch_start_time = time.time()
next_report_ts = batch_start_time + 5.0
print(f"📁 Processing batches with {extraction_workers} parallel workers...")
while not result_async.ready():
time.sleep(2) # Check every 2 seconds
now = time.time()
elapsed = now - batch_start_time
# Emit logs on a fixed 5s cadence (5, 10, 15...) even if our poll loop wakes late.
while now >= next_report_ts:
elapsed_for_log = int(next_report_ts - batch_start_time)
# Estimate progress based on time and worker count
batches_per_second = extraction_workers / 0.3 # rough heuristic
estimated_completed = min(int(elapsed * batches_per_second), len(all_args))
estimated_progress = min(95, (estimated_completed / len(all_args)) * 100)
estimated_sentences = min(estimated_completed * optimal_batch_size, total_sentences)
if estimated_progress < 95:
print(f"📁 Processing... ~{estimated_progress:.0f}% estimated (~{estimated_sentences:,} sentences) | {elapsed_for_log}s elapsed")
else:
print(f"📁 Processing... finalizing last batches | {elapsed_for_log}s elapsed")
next_report_ts += 5.0
# Get all results
total_elapsed = time.time() - batch_start_time
print(f"📁 All batches completed in {total_elapsed:.1f}s! Collecting results...")
all_results = result_async.get()
# Process all results
for local_word_freq, local_important, local_seen, batch_idx in all_results:
# Merge results
word_freq.update(local_word_freq)
for sentence in local_important:
sentence_key = ' '.join(sorted(re.findall(combined_pattern, sentence)))
if sentence_key not in seen_contexts:
important_sentences.append(sentence)
seen_contexts.add(sentence_key)
processed_count += len(batches[batch_idx])
completed_batches += 1
# Show progress
progress_interval = 1 if len(batches) <= 20 else (5 if len(batches) <= 100 else 10)
if completed_batches % progress_interval == 0 or completed_batches == len(batches):
progress = (processed_count / total_sentences) * 100
elapsed = time.time() - batch_start_time
rate = (processed_count / elapsed) if elapsed > 0 else 0
print(f"📑 Progress: {processed_count:,}/{total_sentences:,} sentences ({progress:.1f}%) | Batch {completed_batches}/{len(batches)} | {rate:.0f} sent/sec")
else:
# Use concurrent.futures API (ThreadPoolExecutor or ProcessPoolExecutor)
with executor_class(**executor_kwargs) as executor:
futures = []
# Prepare data for ProcessPoolExecutor if needed
if use_process_pool:
# Serialize exclusion check data for process pool
exclude_check_data = (
list(honorifics_to_exclude),
[p.pattern for p in title_patterns],
PM.COMMON_WORDS,
PM.CHINESE_NUMS
)
for idx, batch in enumerate(batches):
if use_process_pool:
# Use module-level function for ProcessPoolExecutor
future = executor.submit(_process_sentence_batch_for_extraction,
(batch, idx, combined_pattern, exclude_check_data))
else:
# Use local function for ThreadPoolExecutor
future = executor.submit(process_sentence_batch, batch, idx)
futures.append(future)
# Yield to GUI when submitting futures
if idx % 10 == 0:
time.sleep(0.001)
# Collect results with progress
completed_batches = 0
batch_start_time = time.time()
for future in as_completed(futures):
# Get result without timeout - as_completed already handles waiting
local_word_freq, local_important, local_seen, batch_idx = future.result()
# Merge results
word_freq.update(local_word_freq)
for sentence in local_important:
sentence_key = ' '.join(sorted(re.findall(combined_pattern, sentence)))
if sentence_key not in seen_contexts:
important_sentences.append(sentence)
seen_contexts.add(sentence_key)
processed_count += len(batches[batch_idx])
completed_batches += 1
# Show progress more frequently for better user feedback
progress_interval = 1 if len(batches) <= 20 else (5 if len(batches) <= 100 else 10)
if completed_batches % progress_interval == 0 or completed_batches == len(batches):
progress = (processed_count / total_sentences) * 100
elapsed = time.time() - batch_start_time
rate = (processed_count / elapsed) if elapsed > 0 else 0
print(f"📑 Progress: {processed_count:,}/{total_sentences:,} sentences ({progress:.1f}%) | Batch {completed_batches}/{len(batches)} | {rate:.0f} sent/sec")
# Yield to GUI after each batch completes
time.sleep(0.001)
else:
# Sequential processing with progress
for idx, sentence in enumerate(sentences):
sentence = sentence.strip()
if len(sentence) < 10 or len(sentence) > 500:
continue
# Find all potential terms in this sentence
matches = re.findall(combined_pattern, sentence)
if matches:
# Filter out excluded terms
filtered_matches = []
for match in matches:
if not should_exclude_term(match):
word_freq[match] += 1
filtered_matches.append(match)
# Keep sentences with valid potential terms
if filtered_matches:
sentence_key = ' '.join(sorted(filtered_matches))
if sentence_key not in seen_contexts:
important_sentences.append(sentence)
seen_contexts.add(sentence_key)
# Show progress every 1000 sentences or 2 seconds
if idx % 1000 == 0 or (time.time() - last_progress_time > 2):
progress = ((idx + 1) / total_sentences) * 100
print(f"📑 Processing sentences: {idx + 1:,}/{total_sentences:,} ({progress:.1f}%)")
last_progress_time = time.time()
# Yield to GUI thread every 1000 sentences
time.sleep(0.001) # Tiny sleep to let GUI update
# Yield to GUI thread every 1000 sentences
time.sleep(0.001) # Tiny sleep to let GUI update
print(f"📑 Found {len(important_sentences):,} sentences with potential glossary terms")
# Step 6/7: Deduplicate and normalize terms
# Skip this heavy deduplication if "Dynamic Limit Expansion" (include_all_characters) is disabled
# When disabled, we only care about exact matches of high-frequency terms, which combined_freq already handles
if not include_all_characters:
print(f"📑 Step 6/7: Skipping advanced term deduplication (Dynamic Limit Expansion disabled)...")
print(f"📑 Using simple normalized frequency counts for {len(word_freq):,} terms")
combined_freq = Counter()
term_count = 0
# Simple deduplication by normalized form only
for term, count in word_freq.items():
normalized = term.lower().strip()
if normalized in combined_freq:
if count > combined_freq[normalized]:
del combined_freq[normalized]
combined_freq[term] = count
else:
combined_freq[term] = count
term_count += 1
if term_count % 5000 == 0:
time.sleep(0.001)
else:
print(f"📑 Step 6/7: Normalizing and deduplicating {len(word_freq):,} unique terms...")
combined_freq = Counter()
term_count = 0
# Original logic with potential for future advanced features if enabled
for term, count in word_freq.items():
normalized = term.lower().strip()
if normalized in combined_freq:
if count > combined_freq[normalized]:
del combined_freq[normalized]
combined_freq[term] = count
else:
combined_freq[term] = count
term_count += 1
if term_count % 1000 == 0:
time.sleep(0.001)
print(f"📑 Deduplicated to {len(combined_freq):,} unique terms")
# Filter to keep only terms that appear at least min_frequency times
frequent_terms = {term: count for term, count in combined_freq.items() if count >= min_frequency}
# Build filtered text focusing on sentences containing frequent terms
print(f"📑 Step 7/7: Building filtered text from relevant sentences...")
# OPTIMIZATION: Skip sentences that already passed filtering in step 5
# These sentences already contain glossary terms, no need to check again!
# We just need to limit the sample size
filtered_sentences = important_sentences # Already filtered!
print(f"📑 Using {len(filtered_sentences):,} pre-filtered sentences (already contain glossary terms)")
# EARLY DYNAMIC EXPANSION: collect one sentence index per unique honorific-attached name (first appearance), before scoring/nuance
def _sentence_has_gender_pronoun(sent: str) -> bool:
if not include_gender_context_flag or not gender_pronouns:
return True
return any(p in sent for p in gender_pronouns)
if include_all_characters:
honorific_pattern_str = None
if primary_lang in PM.CJK_HONORIFICS:
h_list = PM.CJK_HONORIFICS[primary_lang] + PM.CJK_HONORIFICS.get('english', [])
h_list.sort(key=len, reverse=True)
if h_list:
honorific_pattern_str = '|'.join(map(re.escape, h_list))
if honorific_pattern_str:
try:
honorifics = PM.CJK_HONORIFICS.get(primary_lang, []) + PM.CJK_HONORIFICS.get('english', [])
honorifics = [h for h in honorifics if h] # drop empties
# Keep only clear suffix/title honorifics; drop verb endings/keigo/politeness particles
if primary_lang == 'korean':
suffix_allow = {'님','씨','군','양','공','옹','군','양','낭','랑','생','자','부','모','시','제','족하',
'마마','대감','영감','나리','도령','낭자','아씨','규수','각하','전하','폐하','저하','합하',
'대비','대왕','왕자','공주','도련님','아가씨'}
honorifics = [h for h in honorifics if h in suffix_allow]
elif primary_lang == 'japanese':
suffix_allow = {'さん','ちゃん','君','くん','様','さま','殿','先輩','先生','氏','殿下','閣下','卿'}
honorifics = [h for h in honorifics if h in suffix_allow]
elif primary_lang == 'chinese':
# short person titles only
honorifics = [h for h in honorifics if len(h) <= 3 and h in {'先生','小姐','夫人','公子','姑娘','大人','阁下','将军','公主','少爷','老爷','相公','郎君','小姐','少侠','侠士'}]
else:
# romanized suffixes only
honorifics = [h for h in honorifics if h.startswith('-') and len(h) <= 8]
if honorifics:
hon_regex = "|".join(map(re.escape, honorifics))
cjk_name_pat = r"[\\u4e00-\\u9fff\\u3040-\\u30ff\\uac00-\\ud7af·]{2,4}"
latin_name_pat = r"[A-Z][a-z]{1,15}(?:\\s+[A-Z][a-z]{1,15}){0,1}"
punct_opt = r"[,、,.:;!?…\\)\\] \\}】』」]?"
combined_pat = re.compile(
rf"(?P<name>{cjk_name_pat}|{latin_name_pat})\\s*(?P<hon>{hon_regex}){punct_opt}"
)
honor_pat = re.compile(hon_regex)
ordered_names = []
for idx, sent in enumerate(filtered_sentences):
for m in combined_pat.finditer(sent):
name = m.group("name").strip()
if not name or any(ch.isdigit() for ch in name):
continue
# Apply strict filtering to regex matches too
# FILTERING: Skip tokens with common noisy start characters
if any(name.startswith(c) for c in ['[', '(', '{', '<', '-', 'ㄴ', 'ㅇ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅋ', 'ㅎ']):
continue
# FILTERING: Skip tokens that are just common words/particles
if name in PM.COMMON_WORDS:
continue
# FILTERING: Aggressive Korean Verb/Adjective Ending Check
if len(name) > 2 and any(name.endswith(e) for e in ['겠네', '리라', '니까', '는데', '러나', '다가', '면서', '지만', '도록', '으로', '에서', '에게', '한테', '라고', '이란']):
continue
# Skip if name looks like a title term (PatternManager title patterns)
skip_title = False
for pat in PM.TITLE_PATTERNS.get(primary_lang, []):
if re.search(pat, name):
skip_title = True
break
if skip_title:
continue
if name not in honorific_first_indices:
honorific_first_indices[name] = idx
# Append every time to track frequency
ordered_names.append(name)
# Fallback: token immediately before any honorific
# NOTE: Bidirectional check ('after') was removed due to excessive false positives.
# Strict filtering applied to 'before' token to reduce noise.
for m in honor_pat.finditer(sent):
# 1. Check BEFORE the honorific
if primary_lang == 'chinese':
# Chinese logic: Get previous 2-4 characters without relying on space
start_idx = m.start()
# Try taking 2, 3, 4 characters backwards
# Chinese names are typically 2-3 characters (Surname + Given Name)
# We check if they form a valid name
prefix_str = sent[max(0, start_idx-4):start_idx]
# Iterate through possible name lengths (2 to 4) ending at honorific
# We prioritize shorter names (2-3) if they look valid? No, prioritize longest valid?
# Let's try to extract valid chunks.
token = ""
# Scan backwards for valid Chinese chars
current_token = ""
for i in range(1, 5): # Look back up to 4 chars
if start_idx - i < 0: break
char = sent[start_idx - i]
# Check if char is valid Chinese character
if '\u4e00' <= char <= '\u9fff':
current_token = char + current_token
else:
break # Stop at non-Chinese char (punctuation, space, etc)
if len(current_token) >= 2:
token = current_token
elif primary_lang == 'japanese':
# Japanese logic: Get previous 2-6 characters
start_idx = m.start()
# Scan backwards for valid Japanese chars (Kanji, Hiragana, Katakana)
token = ""
current_token = ""
for i in range(1, 7): # Look back up to 6 chars
if start_idx - i < 0: break
char = sent[start_idx - i]
# Check if char is valid Japanese character
# Kanji: 4E00-9FFF, Hiragana: 3040-309F, Katakana: 30A0-30FF
# Also include long vowel mark (ー): 30FC
is_valid_jp = ('\u4e00' <= char <= '\u9fff') or \
('\u3040' <= char <= '\u309f') or \
('\u30a0' <= char <= '\u30ff') or \
(char == '\u30fc')
if is_valid_jp:
current_token = char + current_token
else:
break # Stop at non-Japanese char
if len(current_token) >= 2:
token = current_token
else:
# Original logic for space-separated languages (Korean, English)
prefix = sent[:m.start()].strip()
if prefix:
token = prefix.split()[-1]
token = token.strip(".,;:!?\"'()[]{}<>~`@#$%^&*-=_+|\\/")
else:
token = ""
if token:
# Apply all validation logic (common words, fullmatch regex, etc.)
if not any(ch.isdigit() for ch in token):
# ... (Rest of existing validation logic) ...
# FILTERING: Skip tokens with common noisy start characters
if not any(token.startswith(c) for c in ['[', '(', '{', '<', '-', 'ㄴ', 'ㅇ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅋ', 'ㅎ']):
# FILTERING: Skip tokens that look like file extensions or paths
if not ('.' in token or '/' in token or '\\' in token):
# FILTERING: Skip tokens that are just common words/particles
if token not in PM.COMMON_WORDS:
# FILTERING: Aggressive Korean Verb/Adjective Ending Check
if not (len(token) > 2 and any(token.endswith(e) for e in ['겠네', '리라', '니까', '는데', '러나', '다가', '면서', '지만', '도록', '으로', '에서', '에게', '한테', '라고', '이란'])):
# STRICTER ATTACHMENT CHECK FOR KOREAN SUFFIXES
# (For Chinese, we already extracted attached characters, so this check is implicitly passed or N/A)
is_attached = True
if primary_lang != 'chinese':
is_attached = not sent[:m.start()].endswith(' ')
# Valid token structure check
valid_shape = False
# STRICTER: Use regex to ensure the ENTIRE token matches the valid pattern
if re.fullmatch(r'[\u4e00-\u9fff\u3040-\u30ff\uac00-\ud7af·]{2,4}', token):
valid_shape = True
elif re.fullmatch(r'^[A-Z][a-z]{1,15}(\s+[A-Z][a-z]{1,15})?$', token):
valid_shape = True
if valid_shape:
# Skip if token looks like a title term
skip_title = False
for pat in PM.TITLE_PATTERNS.get(primary_lang, []):
if re.search(pat, token):
skip_title = True
break
if not skip_title:
if token not in honorific_first_indices:
honorific_first_indices[token] = idx
# Append every time to track frequency
ordered_names.append(token)
# DEDUPLICATE THE REPRESENTATIVE UNIQUE CHARACTERS HERE
if ordered_names:
print(f"📑 Deduplicating {len(ordered_names)} potential character names (honorific-first)...")
try:
import duplicate_detection_config as DDC
# Get configured algorithm and threshold
dd_config = DDC.get_duplicate_detection_config()
algo_desc = dd_config.get('description', 'Unknown')
# Use environment variable directly as fallback
fallback_threshold = float(os.getenv("GLOSSARY_FUZZY_THRESHOLD", "0.90"))
effective_threshold = dd_config.get('threshold', fallback_threshold)
selected_algo = os.getenv('GLOSSARY_DUPLICATE_ALGORITHM', 'auto').upper()
print(f"📑 Duplicate Detection Algorithm: {selected_algo} ({algo_desc})")
print(f"📑 Deduplicating names with threshold: {effective_threshold:.2f}")
deduped_names = []
kept_indices = {} # Rebuild this map
skipped_dupes = 0
# Optimized deduplication using bucketing by first character
# This avoids O(N²) all-to-all comparison while maintaining fuzzy matching quality
deduped_names = []
kept_indices = {}
skipped_dupes = 0
# Filter by honorific attachment frequency
# Only keep names that appear with an honorific at least N times
# This filters out one-off noise while keeping legitimate names
name_freq_with_honorific = Counter(ordered_names)
# Use configured minimum frequency (GLOSSARY_MIN_FREQUENCY)
# This allows the user to control the strictness via the GUI/Config
min_hon_freq = min_frequency
print(f"📑 Filtering by honorific attachment frequency (min {min_hon_freq} occurrences)...")
# Get unique candidates that meet frequency threshold
# Use seen set to deduplicate ordered_names while preserving order
filtered_unique = []
seen_candidates = set()
for name in ordered_names:
if name not in seen_candidates and name_freq_with_honorific[name] >= min_hon_freq:
filtered_unique.append(name)
seen_candidates.add(name)
print(f"📑 Reduced candidates from {len(ordered_names)} (total) to {len(filtered_unique)} (unique freq-filtered)")
ordered_names = filtered_unique
# Fast lookup structures
seen_normalized = set()
# Bucket by first character (normalized) to reduce search space
# Key: first_char, Value: list of existing names starting with that char
lookup_buckets = {}
print(f"📑 Processing {len(ordered_names)} names with bucketed optimization...")
for i, name in enumerate(ordered_names):
# Progress logging for large sets
if i > 0 and i % 1000 == 0:
print(f"📑 Dedupe progress: {i}/{len(ordered_names)}...")
norm = name.lower().strip()
if not norm: continue
# 1. Exact normalized check (O(1) - Instant)
if norm in seen_normalized:
skipped_dupes += 1
continue
# 2. Fuzzy Check (Bucketed)
is_dup = False
first_char = norm[0]
# Only compare against names starting with the same character
# This reduces comparisons by ~20-50x (alphabet size)
candidates = lookup_buckets.get(first_char, [])
# If bucket is massive (>1000), limit to most recent 1000 to prevent slowdown
# (Heuristic: duplicates usually appear near each other or we catch them early)
if len(candidates) > 1000:
search_candidates = candidates[-1000:]
else:
search_candidates = candidates
for existing in search_candidates:
score = DDC.calculate_similarity_with_config(name, existing, dd_config)
if score >= effective_threshold:
is_dup = True
skipped_dupes += 1
break
if not is_dup:
deduped_names.append(name)
seen_normalized.add(norm)
# Add to bucket
if first_char not in lookup_buckets:
lookup_buckets[first_char] = []
lookup_buckets[first_char].append(name)
# Keep the original index
if name in honorific_first_indices:
kept_indices[name] = honorific_first_indices[name]
print(f"📑 Advanced deduplication removed {skipped_dupes} duplicate names")
# Update the lists
ordered_names = deduped_names
honorific_first_indices = kept_indices
except ImportError:
print("⚠️ duplicate_detection_config module not found, skipping name deduplication")
except Exception as e:
print(f"⚠️ Name deduplication failed: {e}")
else:
print("📑 Dynamic expansion (honorific-first): no honorifics found in PatternManager for this language")
base_count = len(honorific_first_indices)
if include_gender_context_flag and base_count > 0:
try:
gender_subset = sum(
1 for idx in honorific_first_indices.values()
if 0 <= idx < len(filtered_sentences) and _sentence_has_gender_pronoun(filtered_sentences[idx])
)
print(f"📑 Dynamic expansion (honorific-first): captured {base_count} unique characters before scoring (gender-context subset: {gender_subset})")
except Exception:
print(f"📑 Dynamic expansion (honorific-first): captured {base_count} unique characters before scoring")
else:
print(f"📑 Dynamic expansion (honorific-first): captured {base_count} unique characters before scoring")
# Debug: Write filtered terms to file (User request)
if base_count > 0 and 'ordered_names' in locals():
try:
# Use output_dir if available, otherwise cwd
debug_base = output_dir if 'output_dir' in locals() else os.getcwd()
debug_dir = os.path.join(debug_base, 'debug')
os.makedirs(debug_dir, exist_ok=True)
debug_file_path = os.path.join(debug_dir, 'honorific_debug.txt')
with open(debug_file_path, 'w', encoding='utf-8') as f:
for name in ordered_names:
f.write(f"{name}\n")
print(f"📑 Wrote {len(ordered_names)} terms to {debug_file_path}")
except Exception as e:
print(f"📑 Failed to write debug file: {e}")
except Exception:
print("📑 Dynamic expansion (honorific-first): error parsing honorific names; continuing without early captures")
else:
print("📑 Dynamic expansion (honorific-first): no honorific pattern available for this language")
# For extremely large datasets, we can optionally do additional filtering
# Skip this reduction when include_all_characters is enabled to avoid losing rare characters
if (not include_all_characters) and len(filtered_sentences) > 10000 and len(frequent_terms) > 1000:
print(f"📑 Large dataset detected - applying frequency-based filtering...")
print(f"📑 Filtering {len(filtered_sentences):,} sentences for top frequent terms...")
# Sort terms by frequency to prioritize high-frequency ones
sorted_terms = sorted(frequent_terms.items(), key=lambda x: x[1], reverse=True)
top_terms = dict(sorted_terms[:1000]) # Focus on top 1000 most frequent terms
print(f"📑 Using top {len(top_terms):,} most frequent terms for final filtering")
# Use parallel processing only if really needed
if use_parallel and len(filtered_sentences) > 5000:
import multiprocessing
in_subprocess = multiprocessing.current_process().name != 'MainProcess'
# Create a simple set of terms for fast lookup (no variations needed)
term_set = set(top_terms.keys())
print(f"📑 Using parallel filtering with {extraction_workers} workers...")
# Optimize batch size for ProcessPoolExecutor (reduce overhead)
# Use larger batches since this is a simpler operation than term extraction
check_batch_size = max(1000, len(filtered_sentences) // (extraction_workers * 5))
check_batches = [filtered_sentences[i:i + check_batch_size]
for i in range(0, len(filtered_sentences), check_batch_size)]
print(f"📑 Processing {len(check_batches)} batches of ~{check_batch_size} sentences")
# Use ProcessPoolExecutor for true parallelism (if not already in subprocess)
use_process_pool_filtering = (not in_subprocess and len(check_batches) > 3)
if use_process_pool_filtering:
print(f"📑 Using ProcessPoolExecutor for true parallel filtering")
new_filtered = []
with ProcessPoolExecutor(max_workers=extraction_workers) as executor:
# Use the module-level function _check_sentence_batch_for_terms
futures = [executor.submit(_check_sentence_batch_for_terms, (batch, term_set))
for batch in check_batches]
for future in as_completed(futures):
new_filtered.extend(future.result())
else:
print(f"📑 Using ThreadPoolExecutor for filtering (small dataset or in subprocess)")
# Simple function to check if sentence contains any top term
def check_batch_simple(batch):
result = []
for sentence in batch:
# Simple substring check - much faster than regex
for term in term_set:
if term in sentence:
result.append(sentence)
break
return result
new_filtered = []
with ThreadPoolExecutor(max_workers=extraction_workers) as executor:
futures = [executor.submit(check_batch_simple, batch) for batch in check_batches]
for future in as_completed(futures):
new_filtered.extend(future.result())
filtered_sentences = new_filtered
print(f"📑 Filtered to {len(filtered_sentences):,} sentences containing top terms")
else:
# For smaller datasets, simple sequential filtering
print(f"📑 Using sequential filtering...")
new_filtered = []
for i, sentence in enumerate(filtered_sentences):
for term in top_terms:
if term in sentence:
new_filtered.append(sentence)
break
if i % 1000 == 0:
print(f"📑 Progress: {i:,}/{len(filtered_sentences):,} sentences")
time.sleep(0.001)
filtered_sentences = new_filtered
print(f"📑 Filtered to {len(filtered_sentences):,} sentences containing top terms")
print(f"📑 Selected {len(filtered_sentences):,} sentences containing frequent terms")
# Track character-like term count for final summary
character_term_count = 0
# Limit the number of sentences to reduce token usage
if max_sentences is None:
max_sentences_fallback = os.getenv("GLOSSARY_MAX_SENTENCES", "200")
print(f"🔍 [DEBUG] max_sentences was None, reading from environment: '{max_sentences_fallback}'")
max_sentences = int(max_sentences_fallback)
else:
print(f"🔍 [DEBUG] max_sentences parameter was provided: {max_sentences}")
print(f"🔍 [DEBUG] Final GLOSSARY_MAX_SENTENCES value being used: {max_sentences}")
# Force smart selection path when dynamic expansion is enabled, even if filtered_sentences <= max_sentences
run_smart_selection = (not force_skip_smart_selection) and (include_all_characters or (max_sentences > 0 and len(filtered_sentences) > max_sentences))
if run_smart_selection and max_sentences > 0:
dynamic_bonus = len(honorific_first_indices) if include_all_characters else 0
effective_preview = max_sentences + dynamic_bonus
if dynamic_bonus > 0:
print(f"📁 Limiting to {max_sentences} + {dynamic_bonus} (dynamic expansion) = {effective_preview} representative sentences (from {len(filtered_sentences):,})")
else:
print(f"📁 Limiting to {max_sentences} representative sentences (from {len(filtered_sentences):,})")
# SMART SELECTION: Prioritize sentences with unique terms and gender context
# instead of blind slicing.
# 1. Identify which terms appear in which sentences
# We need to re-scan briefly or pass this info along. Re-scanning is safer/easier here.
if gender_nuance_enabled:
print("📑 analyzing sentences for term coverage and gender nuance...")
else:
print("📑 analyzing sentences for term coverage (gender nuance disabled)...")
term_to_sentences = {} # term -> list of (score, sentence_index)
sentence_scores = {} # index -> score
# Pre-compile regexes
honorific_pattern_str = None
if primary_lang in PM.CJK_HONORIFICS:
h_list = PM.CJK_HONORIFICS[primary_lang] + PM.CJK_HONORIFICS.get('english', [])
h_list.sort(key=len, reverse=True)
if h_list:
honorific_pattern_str = '|'.join(map(re.escape, h_list))
# Get pronouns for scoring
gender_pronouns = []
if gender_nuance_enabled and hasattr(PM, 'GENDER_PRONOUNS'):
lang_key = 'english'
if primary_lang == 'korean': lang_key = 'korean'
elif primary_lang == 'chinese': lang_key = 'chinese'
elif primary_lang == 'japanese': lang_key = 'japanese'
gender_pronouns = PM.GENDER_PRONOUNS.get(lang_key, {}).get('male', []) + \
PM.GENDER_PRONOUNS.get(lang_key, {}).get('female', [])
# If gender context is OFF or nuance scoring is disabled, skip expensive scoring and just build simple coverage map
if not gender_nuance_enabled:
print("📑 Gender context or nuance toggle disabled: using simple term coverage (no pronoun weighting).")
for idx, sent in enumerate(filtered_sentences):
sentence_scores[idx] = 1.0
for term in frequent_terms:
if term in sent:
term_to_sentences.setdefault(term, []).append(idx)
# Parallelize scoring if dataset is large enough and gender context is ON
elif use_parallel and len(filtered_sentences) > 2000:
print(f"📑 Parallelizing sentence scoring with {extraction_workers} workers...")
# Prepare batches
# Aim for ~500 sentences per batch to get updates every ~2-3 seconds (assuming ~150-200 sent/sec)
batch_size = 500
# However, ensure we don't have too few batches for the workers (utilize parallelism)
if len(filtered_sentences) // batch_size < extraction_workers * 4:
batch_size = max(100, len(filtered_sentences) // (extraction_workers * 4))
batches = []
for i in range(0, len(filtered_sentences), batch_size):
end_idx = min(i + batch_size, len(filtered_sentences))
# Pass (start_index, list_of_sentences)
batches.append((i, filtered_sentences[i:end_idx]))
term_list = list(frequent_terms.keys())
# Use ProcessPoolExecutor for heavy CPU work
if use_process_pool:
executor_cls = ProcessPoolExecutor
else:
executor_cls = ThreadPoolExecutor
with executor_cls(max_workers=extraction_workers) as executor:
# Submit all batches
futures = [executor.submit(
_score_sentence_batch,
(batch_data, term_list, honorific_pattern_str, gender_pronouns, include_gender_context)
) for batch_data in batches]
# Collect results with progress logging
completed_batches = 0
processed_count = 0
scoring_start_time = time.time()
last_log_time = scoring_start_time
total_batches = len(batches)
total_to_score = len(filtered_sentences)
# Emit wait logs even before the first batch completes
try:
from concurrent.futures import wait as _wait, FIRST_COMPLETED as _FIRST_COMPLETED
except Exception:
_wait = None
_FIRST_COMPLETED = None
pending = set(futures)
while pending:
done = set()
if _wait is not None and _FIRST_COMPLETED is not None:
done, pending = _wait(pending, timeout=5.0, return_when=_FIRST_COMPLETED)
done = set(done or [])
else:
# Fallback: block until first completion (no wait logs)
for future in as_completed(list(pending)):
done.add(future)
pending.discard(future)
break
if not done:
# No batch completed within timeout
elapsed = time.time() - scoring_start_time
print(f"📑 Scoring... {elapsed:.0f}s elapsed")
continue
for future in done:
try:
batch_scores, batch_term_map = future.result()
sentence_scores.update(batch_scores)
# Merge term mappings
for term, indices in batch_term_map.items():
if term not in term_to_sentences:
term_to_sentences[term] = []
term_to_sentences[term].extend(indices)
# Update progress stats
completed_batches += 1
processed_count += len(batch_scores)
current_time = time.time()
elapsed = current_time - scoring_start_time
# Log periodically (every ~5 seconds or if it's the last batch)
if (current_time - last_log_time >= 5.0) or (completed_batches == total_batches):
display_count = min(processed_count, total_to_score)
progress_pct = min(99.9, (display_count / total_to_score) * 100)
rate = display_count / elapsed if elapsed > 0 else 0
if completed_batches < total_batches:
print(f"📑 Scoring... {display_count:,}/{total_to_score:,} sentences ({progress_pct:.1f}%) | Batch {completed_batches}/{total_batches} | {rate:.0f} sent/sec | {elapsed:.0f}s elapsed")
else:
print(f"📑 Scoring... {total_to_score:,}/{total_to_score:,} sentences (100.0%) | Batch {total_batches}/{total_batches} | {rate:.0f} sent/sec | {elapsed:.0f}s elapsed")
print(f"📑 Scoring... finalizing last batches | {elapsed:.0f}s elapsed")
last_log_time = current_time
except Exception as e:
print(f"⚠️ Scoring batch failed: {e}")
total_elapsed = time.time() - scoring_start_time
print(f"📁 All scoring batches completed in {total_elapsed:.1f}s!")
else:
# Sequential fallback
honorific_pattern = re.compile(honorific_pattern_str) if honorific_pattern_str else None
for idx, sent in enumerate(filtered_sentences):
score = 1.0
if gender_nuance_enabled and gender_pronouns:
for p in gender_pronouns:
if p in sent:
score += 5.0
break
if honorific_pattern and honorific_pattern.search(sent):
score += 2.0
sentence_scores[idx] = score
for term in frequent_terms:
if term in sent:
if term not in term_to_sentences:
term_to_sentences[term] = []
term_to_sentences[term].append(idx)
# 2. Select sentences via Round-Robin to ensure coverage of ALL unique terms
# with PRIORITY for character-like terms (those with honorifics)
selected_indices = set()
# Sort each term's sentences by score descending (higher score first)
for term in term_to_sentences:
term_to_sentences[term].sort(key=lambda idx: sentence_scores[idx], reverse=True)
# If dynamic expansion is on, prefer character terms derived from honorific-attached names
honorific_char_terms = []
if include_all_characters and honorific_pattern_str:
try:
honor_pat = re.compile(honorific_pattern_str)
char_term_map = {}
name_regex = re.compile(r'([\w\-\u4e00-\u9fff\u3040-\u30ff\uac00-\ud7af]+)$')
for idx, sent in enumerate(filtered_sentences):
for m in honor_pat.finditer(sent):
prefix = sent[:m.start()].strip()
nm = name_regex.search(prefix)
if nm:
name = nm.group(1)
char_term_map.setdefault(name, []).append(idx)
if char_term_map:
term_to_sentences = {k: sorted(v, key=lambda i: sentence_scores.get(i, 0), reverse=True)
for k, v in char_term_map.items()}
honorific_char_terms = list(term_to_sentences.keys())
except Exception:
pass
# Split terms into character-like (with honorifics) and others
def _is_character_like(term: str) -> bool:
try:
if _has_honorific(term):
return True
# CJK short names
if primary_lang in ['korean', 'japanese', 'chinese']:
# Count CJK chars
cjk_len = sum(1 for ch in term if 0x4E00 <= ord(ch) <= 0x9FFF or 0x3040 <= ord(ch) <= 0x30FF or 0xAC00 <= ord(ch) <= 0xD7AF)
if 2 <= cjk_len <= 4:
return True
# English-style names: title case with 1-3 words
parts = term.split()
if 1 <= len(parts) <= 3 and all(p[:1].isupper() for p in parts if p):
return True
except Exception:
pass
return False
character_terms = []
non_character_terms = []
source_terms = honorific_char_terms if (include_all_characters and honorific_char_terms) else sorted(term_to_sentences.keys())
for term in source_terms:
if _is_character_like(term):
character_terms.append(term)
else:
non_character_terms.append(term)
character_term_count = len(character_terms)
# If dynamic limit expansion is enabled, prepare to cover every character-like term once
if include_all_characters and character_terms:
# Build characters strictly from honorific-bearing terms first; fallback to detection if none
honorific_chars = []
if honorific_pattern_str:
try:
honor_pat = re.compile(honorific_pattern_str)
honorific_chars = [t for t in character_terms if honor_pat.search(t)]
except Exception:
honorific_chars = []
if honorific_chars:
character_terms = honorific_chars
# Rank character terms by frequency so most frequent get picked first when sentences are missing
character_terms = sorted(character_terms, key=lambda t: frequent_terms.get(t, 0), reverse=True)
def round_robin_terms(term_list, selected_indices, target_limit, min_per_term=None):
"""Round-robin over provided term list, updating selected_indices in-place."""
term_iterators = [iter(term_to_sentences[t]) for t in term_list]
# If min_per_term is set, ensure we get at least that many for each term first
if min_per_term:
for term in term_list:
sentences = term_to_sentences[term]
for i in range(min(min_per_term, len(sentences))):
selected_indices.add(sentences[i])
while len(selected_indices) < target_limit and term_iterators:
active_iterators = []
for it in term_iterators:
if len(selected_indices) >= target_limit:
break
try:
while True:
idx = next(it)
if idx not in selected_indices:
selected_indices.add(idx)
active_iterators.append(it)
break
except StopIteration:
pass
term_iterators = active_iterators
# Base limit from user/config
base_limit = max_sentences
requested_bonus = 0
# If we collected honorific-first sentences, seed the selection with them
if include_all_characters and honorific_first_indices:
for idx in honorific_first_indices.values():
if 0 <= idx < len(filtered_sentences):
selected_indices.add(idx)
requested_bonus = len(honorific_first_indices)
# Dynamic expansion should ADD to the base limit, not replace it
honorific_bonus = len(selected_indices) if include_all_characters else 0
effective_limit = base_limit + honorific_bonus
requested_total = base_limit + requested_bonus
print(f"📁 Requested sentence budget: base {base_limit} + bonus {requested_bonus} = {requested_total}")
# Standard Fixed Limit Logic
# First, prioritize character-like terms (honorific-based)
if character_terms:
round_robin_terms(character_terms, selected_indices, effective_limit)
# Then, if we still have room, cover remaining non-character terms
if len(selected_indices) < effective_limit and non_character_terms:
round_robin_terms(non_character_terms, selected_indices, effective_limit)
# If we still have room (rare), fill with highest scored remaining sentences
target_limit = effective_limit
if target_limit and len(selected_indices) < target_limit:
remaining = sorted(
[i for i in range(len(filtered_sentences)) if i not in selected_indices],
key=lambda i: sentence_scores[i],
reverse=True
)
selected_indices.update(remaining[:target_limit - len(selected_indices)])
# Log the actual unique sentence count vs requested (base + bonus)
unique_count = len(selected_indices)
dropped = max(0, requested_total - unique_count)
if include_all_characters:
print(f"📁 Deduped sentence budget: requested {base_limit}+{requested_bonus} -> {unique_count} unique (dropped {dropped})")
else:
print(f"📁 Deduped sentence budget: requested {base_limit} -> {unique_count} unique (dropped {dropped})")
# Sort indices to maintain narrative flow
final_indices = sorted(list(selected_indices))
filtered_sentences = [filtered_sentences[i] for i in final_indices]
dropped_windows = 0
dropped_sentence_indices = set()
if include_all_characters:
# Determine base vs bonus allocation before dedup
pre_dedup_sentences = filtered_sentences # already ordered by final_indices
pre_total = len(pre_dedup_sentences)
pre_base = min(base_limit, pre_total)
pre_bonus = max(0, pre_total - pre_base)
base_idx_set = set(final_indices[:pre_base])
bonus_idx_set = set(final_indices[pre_base:])
# Map sentences to terms (characters and others) for coverage-aware dedup
sentence_terms = {}
if 'term_to_sentences' in locals():
for term, idx_list in term_to_sentences.items():
for idx in idx_list:
if idx in final_indices:
sentence_terms.setdefault(idx, set()).add(term)
character_term_set = set(character_terms) if 'character_terms' in locals() else set()
covered_char_terms = set()
covered_terms_global = set()
# Sentence-level dedup post-selection using duplicate_detection_config + slider threshold
dup_config = ddc.get_duplicate_detection_config()
# Fallback to env slider if save_glossary scope variable isn't in this function
fuzzy_threshold_env = float(os.getenv("GLOSSARY_FUZZY_THRESHOLD", "0.90"))
dup_threshold = dup_config.get('threshold', fuzzy_threshold_env)
algo_list = dup_config.get('algorithms', [])
algo_mode = os.getenv("GLOSSARY_DUPLICATE_ALGORITHM", "auto")
print(f"📋 Sentence dedup config: mode={algo_mode}, algos={algo_list}, slider={fuzzy_threshold_env:.2f}, threshold_used={dup_threshold:.2f}, available={ddc.get_algorithm_display_info()}")
dedup_seen_exact = set()
kept_sentences = []
kept_indices = []
base_kept = bonus_kept = 0
base_dropped = bonus_dropped = 0
for idx, sent in zip(final_indices, pre_dedup_sentences):
key = sent.strip()
if not key:
if idx in base_idx_set:
base_dropped += 1
else:
bonus_dropped += 1
continue
# Exact duplicate quick check
if key in dedup_seen_exact:
if idx in base_idx_set:
base_dropped += 1
else:
bonus_dropped += 1
continue
terms_here = sentence_terms.get(idx, set()) if sentence_terms else set()
# Term-based dedup: drop if this sentence contributes no new terms (all terms already covered)
is_dup = False
if terms_here and terms_here.issubset(covered_terms_global):
is_dup = True
else:
if kept_sentences:
klen = len(key)
min_len = int(klen * 0.7)
max_len = int(klen * 1.3)
for other in kept_sentences:
if not (min_len <= len(other) <= max_len):
continue
if len(set(key) & set(other)) < klen * 0.5:
continue
sim = ddc.calculate_similarity_with_config(key, other, dup_config)
if sim >= dup_threshold:
is_dup = True
break
if is_dup:
# Guard: keep if this sentence is the only coverage for an uncovered character term
keep_for_character = False
if sentence_terms:
for t in sentence_terms.get(idx, set()):
if t in character_term_set and t not in covered_char_terms:
keep_for_character = True
break
if not keep_for_character:
if idx in base_idx_set:
base_dropped += 1
else:
bonus_dropped += 1
continue
# Keep
dedup_seen_exact.add(key)
kept_sentences.append(key)
kept_indices.append(idx)
# Mark covered character terms
if sentence_terms:
for t in terms_here:
if t in character_term_set:
covered_char_terms.add(t)
covered_terms_global.add(t)
if idx in base_idx_set:
base_kept += 1
else:
bonus_kept += 1
# Rebuild filtered_sentences preserving original ordering
kept_index_set = set(kept_indices)
filtered_sentences = [sent for idx, sent in zip(final_indices, pre_dedup_sentences) if idx in kept_index_set]
dropped_sentence_indices = set(final_indices) - kept_index_set
total_kept = base_kept + bonus_kept
total_dropped = base_dropped + bonus_dropped
dropped_windows = total_dropped
print(
f"📁 Deduped sentence budget: base {pre_base}->{base_kept} (dropped {base_dropped}), "
f"bonus {pre_bonus}->{bonus_kept} (dropped {bonus_dropped}), total {total_kept}"
)
# Re-log with dedup-applied cap shrink
print(
f"📁 Smart selection complete: Kept {len(filtered_sentences)} sentences covering "
f"{len(term_to_sentences)} unique terms (cap shrink by {total_dropped})"
)
else:
print(f"📁 Smart selection complete: Kept {len(filtered_sentences)} sentences covering {len(term_to_sentences)} unique terms")
dropped_windows = 0
elif max_sentences == 0:
print(f"📁 Including ALL {len(filtered_sentences):,} sentences (max_sentences=0)")
# Check if gender context expansion is enabled
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
if include_gender_context:
context_window = int(os.getenv("GLOSSARY_CONTEXT_WINDOW", "2"))
print(f"📑 Gender context enabled: Expanding snippets with {context_window}-sentence windows...")
if 'dropped_windows' in locals() and dropped_windows:
print(f"📑 Context windows skipped due to dedup: {dropped_windows}")
# Split full text into sentences for context extraction
all_sentences_list = re.split(r'[.!?。!?]+', clean_text)
all_sentences_list = [s.strip() for s in all_sentences_list if s.strip()]
# Create index map for fast lookup - OPTIMIZED to O(n) instead of O(n²)
# Build a lookup dict: sentence -> index for fast matching
sentence_to_index = {}
all_sentences_normalized = {s.strip(): idx for idx, s in enumerate(all_sentences_list)}
print(f"📑 Mapping {len(filtered_sentences):,} filtered sentences to context positions...")
kept_windows = 0
for filtered_sent in filtered_sentences:
filtered_normalized = filtered_sent.strip()
# Try exact match first (fastest)
if filtered_normalized in all_sentences_normalized:
sentence_to_index[filtered_sent] = all_sentences_normalized[filtered_normalized]
else:
# Try substring match (slower fallback)
found = False
for sentence, idx in all_sentences_normalized.items():
if filtered_normalized in sentence or sentence in filtered_normalized:
sentence_to_index[filtered_sent] = idx
found = True
break
if not found:
# Last resort: try finding in original list
for idx, sentence in enumerate(all_sentences_list):
if filtered_normalized in sentence or sentence in filtered_normalized:
sentence_to_index[filtered_sent] = idx
break
# Build context windows with explicit boundaries to avoid cross-window leakage
context_groups: list[str] = []
window_seeds: list[int] = []
included_indices = set()
for filtered_sent in filtered_sentences:
# If we can't locate the sentence in the master list, wrap it individually
if filtered_sent not in sentence_to_index:
if 'dropped_sentence_indices' in locals() and filtered_sent in dropped_sentence_indices:
continue # skip entire window if its seed sentence was deduped
window_num = len(context_groups) + 1
context_groups.append(
f"{filtered_sent}\n=== CONTEXT {window_num} END ==="
)
window_seeds.append(-1)
continue
idx = sentence_to_index[filtered_sent]
# Skip if already included in a previous window
if idx in included_indices:
continue
# Skip window if its seed sentence was deduped
if 'dropped_sentence_indices' in locals() and filtered_sent in dropped_sentence_indices:
continue
# Get context window: [idx-context_window ... idx ... idx+context_window]
start_idx = max(0, idx - context_window)
end_idx = min(len(all_sentences_list), idx + context_window + 1)
# Mark all sentences in this window as included
for i in range(start_idx, end_idx):
included_indices.add(i)
# Extract the window and wrap with start/end markers for splitter safety
window_sentences = all_sentences_list[start_idx:end_idx]
context_group_body = ' '.join(window_sentences)
window_num = len(context_groups) + 1
context_groups.append(
f"{context_group_body}\n=== CONTEXT {window_num} END ==="
)
window_seeds.append(idx)
kept_windows += 1
skipped_windows = (len(filtered_sentences) - kept_windows) if 'kept_windows' in locals() else 0
print(f"📑 Created {len(context_groups):,} context windows (up to {context_window*2+1} sentences each)")
if skipped_windows:
print(f"📑 Context windows removed after dedup: {skipped_windows}")
# Window-level dedup: drop windows whose term set is already covered, while keeping one per character
window_terms = []
if 'sentence_terms' in locals():
for seed_idx in window_seeds:
if seed_idx == -1:
window_terms.append(set())
else:
window_terms.append(sentence_terms.get(seed_idx, set()))
else:
window_terms = [set() for _ in window_seeds]
covered_terms_global = set()
covered_char_terms = set()
kept_context_groups = []
kept_window_seeds = []
for cg, seed_idx, terms in zip(context_groups, window_seeds, window_terms):
if not terms:
# keep empty-term windows to preserve structure
kept_context_groups.append(cg)
kept_window_seeds.append(seed_idx)
continue
drop = False
# STRICT: one window per character. If any character term here is already covered, drop this window.
char_terms = set(t for t in terms if 'character_term_set' in locals() and t in character_term_set)
if char_terms and char_terms & covered_char_terms:
drop = True
elif not char_terms and terms.issubset(covered_terms_global):
drop = True
# If no character terms yet covered, allow first appearance
if drop:
keep_for_char = any((t in character_term_set and t not in covered_char_terms) for t in terms) if 'character_term_set' in locals() else False
if keep_for_char and not (char_terms & covered_char_terms):
drop = False
if drop:
continue
# keep and mark coverage
kept_context_groups.append(cg)
kept_window_seeds.append(seed_idx)
for t in terms:
covered_terms_global.add(t)
if 'character_term_set' in locals() and t in character_term_set:
covered_char_terms.add(t)
dropped_windows_after_terms = len(context_groups) - len(kept_context_groups)
if dropped_windows_after_terms:
print(f"📑 Context windows removed after term-aware dedup: {dropped_windows_after_terms}")
# Compute true total sentences emitted in kept windows
total_window_sentences = 0
for ctx in kept_context_groups:
# split on end marker to avoid counting it
body = ctx.split('=== CONTEXT ')[0]
# crude split by sentence separators
total_window_sentences += len([s for s in re.split(r'[.!?。!?]+', body) if s.strip()])
print(f"📑 Final kept windows: {len(kept_context_groups)}, final kept sentences (within windows): {total_window_sentences}")
filtered_text = '\n\n'.join(kept_context_groups) # Separate windows with double newline
print(f"📑 Context-expanded text: {len(filtered_text):,} characters")
else:
# Even without gender context, add footer markers to preserve boundaries for chapter splitting
context_groups = []
for idx, sent in enumerate(filtered_sentences, 1):
context_groups.append(f"{sent}\n=== CONTEXT {idx} END ===")
filtered_text = '\n\n'.join(context_groups)
# Determine character count for summary (use dynamic-expansion tally when available)
if include_all_characters and honorific_first_indices:
character_term_count = len(honorific_first_indices)
elif 'character_terms' in locals() and character_terms:
character_term_count = len(set(character_terms))
# Calculate and display filtering statistics
filter_end_time = time.time()
filter_duration = filter_end_time - filter_start_time
original_length = len(clean_text)
filtered_length = len(filtered_text)
size_change_percent = ((original_length - filtered_length) / original_length * 100) if original_length > 0 else 0
filtered_text = _normalize_filtered_text(filtered_text)
filtered_length = len(filtered_text)
size_change_percent = ((original_length - filtered_length) / original_length * 100) if original_length > 0 else 0
print("📑 Applied post-filter text normalization to remove orphaned quotes and extra blank lines")
print(f"\n📑 === FILTERING COMPLETE ===")
print(f"📑 Duration: {filter_duration:.1f} seconds")
if size_change_percent >= 0:
print(f"📑 Text reduction: {original_length:,}{filtered_length:,} chars ({size_change_percent:.1f}% reduction)")
else:
print(f"📑 Text expansion: {original_length:,}{filtered_length:,} chars ({abs(size_change_percent):.1f}% expansion)")
print(f"📑 Terms found: {len(frequent_terms):,} unique terms (min frequency: {min_frequency})")
print(f"📑 Characters found (character-like terms): {character_term_count:,}")
print(f"📑 Final output: {len(filtered_sentences)} sentences, {filtered_length:,} characters")
print(f"📑 Performance: {(original_length / filter_duration / 1000):.1f}K chars/second")
print(f"📑 ========================\n")
return filtered_text, frequent_terms
def _normalize_filtered_text(text: str) -> str:
"""Normalize filtered text by collapsing stray blank lines and orphaned quote lines."""
if not text:
return text
quote_open = {"“", "「", "『", "\""}
quote_close = {"”", "」", "』", "\""}
lines = text.replace("\r\n", "\n").split("\n")
normalized_lines = []
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
if stripped in quote_close:
# Remove trailing blank lines before attaching closing quote
while normalized_lines and not normalized_lines[-1].strip():
normalized_lines.pop()
if normalized_lines:
normalized_lines[-1] = normalized_lines[-1].rstrip() + stripped
else:
normalized_lines.append(stripped)
elif stripped in quote_open:
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines):
match = re.match(r"^(\s*)(.*)$", lines[j])
if match:
leading, remainder = match.groups()
lines[j] = f"{leading}{stripped}{remainder}"
else:
lines[j] = f"{stripped}{lines[j]}"
else:
normalized_lines.append(stripped)
else:
normalized_lines.append(line)
i += 1
normalized_text = "\n".join(normalized_lines)
normalized_text = re.sub(r"\n{3,}", "\n\n", normalized_text)
normalized_text = re.sub(r"\n{2,}([”」』])", r"\n\1", normalized_text)
normalized_text = re.sub(r"([“「『])\n{2,}", r"\1\n", normalized_text)
normalized_text = re.sub(r"\n{2,}", "\n", normalized_text)
return normalized_text
def _extract_with_custom_prompt(custom_prompt, all_text, language,
min_frequency, max_names, max_titles,
existing_glossary, output_dir,
strip_honorifics=True, fuzzy_threshold=0.90, filter_mode='all', max_sentences=200, log_callback=None,
chunk_pos=None, total_chunks=None):
"""Extract glossary using custom AI prompt with proper filtering"""
# Redirect stdout to GUI log if callback provided (but not in subprocess - worker handles it)
import sys
in_subprocess = hasattr(sys.stdout, 'queue')
if log_callback and not in_subprocess:
set_output_redirect(log_callback)
print("📑 Using custom automatic glossary prompt")
extraction_start = time.time()
# Check stop flag
if is_stop_requested():
print("📑 ❌ Glossary extraction stopped by user")
return {}
# Note: Filter mode can be controlled via the configurable prompt environment variable
# No hardcoded filter instructions are added here
try:
MODEL = os.getenv("MODEL", "gemini-2.0-flash")
API_KEY = (os.getenv("API_KEY") or
os.getenv("OPENAI_API_KEY") or
os.getenv("OPENAI_OR_Gemini_API_KEY") or
os.getenv("GEMINI_API_KEY"))
if is_traditional_translation_api(MODEL):
# Pattern fallback disabled; traditional translation APIs can't run AI extraction.
print("📑 Traditional translation API selected - skipping automatic glossary extraction (pattern fallback disabled)")
return {}
elif not API_KEY and not _model_uses_own_auth(MODEL):
# Pattern fallback disabled; without an API key we can't run AI extraction.
print("📑 No API key found - skipping automatic glossary extraction (pattern fallback disabled)")
return {}
else:
print(f"📑 Using AI-assisted extraction with custom prompt")
# Ensure multi-key config is available in this process if enabled
_ensure_multi_key_config_loaded()
from unified_api_client import UnifiedClient, UnifiedClientError
client = UnifiedClient(model=MODEL, api_key=API_KEY, output_dir=output_dir)
# Log glossary anti-duplicate parameters usage
if os.getenv("GLOSSARY_ENABLE_ANTI_DUPLICATE", "0") == "1":
ad_top_p = os.getenv("GLOSSARY_TOP_P", "1.0")
ad_top_k = os.getenv("GLOSSARY_TOP_K", "0")
ad_freq = os.getenv("GLOSSARY_FREQUENCY_PENALTY", "0.0")
ad_pres = os.getenv("GLOSSARY_PRESENCE_PENALTY", "0.0")
ad_rep = os.getenv("GLOSSARY_REPETITION_PENALTY", "1.0")
print(f"🎯 Anti-duplicate enabled for glossary (top_p={ad_top_p}, top_k={ad_top_k}, freq_penalty={ad_freq}, presence_penalty={ad_pres}, repetition_penalty={ad_rep})")
# Progress-bar labeling: when running chunked auto-glossary, give each in-flight call a unique name.
# This drives the GUI watchdog tooltip "Active calls" list.
progress_context = 'glossary'
try:
if chunk_pos is not None and total_chunks is not None:
progress_context = f"auto glossary ({int(chunk_pos)}/{int(total_chunks)})"
except Exception:
progress_context = 'glossary'
client.context = progress_context
if hasattr(client, 'reset_cleanup_state'):
client.reset_cleanup_state()
# Apply thread submission delay using the client's method
thread_delay = float(os.getenv("THREAD_SUBMISSION_DELAY_SECONDS", "0.5"))
if thread_delay > 0:
client._apply_thread_submission_delay()
# Check if cancelled during delay
if hasattr(client, '_cancelled') and client._cancelled:
print("📑 ❌ Glossary extraction stopped during delay")
return {}
# Check if text is already filtered (from chunking or cache)
already_filtered = (os.getenv("_CHUNK_ALREADY_FILTERED", "0") == "1" or
os.getenv("_TEXT_ALREADY_FILTERED", "0") == "1")
if already_filtered:
# print("📑 Text already filtered, skipping re-filtering")
text_sample = all_text # Use as-is since it's already filtered
detected_terms = {}
else:
# Apply smart filtering to reduce noise and focus on meaningful content
force_disable = os.getenv("GLOSSARY_FORCE_DISABLE_SMART_FILTER", "0") == "1"
use_smart_filter = (os.getenv("GLOSSARY_USE_SMART_FILTER", "1") == "1") and not force_disable
if not use_smart_filter:
# Smart filter disabled - send FULL text without any filtering or truncation
print("📁 Smart filtering DISABLED by user - sending full text to API (this will be expensive!)")
text_sample = all_text
detected_terms = {}
else:
# Smart filter enabled - apply intelligent filtering
print("📁 Applying smart text filtering to reduce noise...")
# Use max_sentences parameter (passed from parent, already read from environment)
print(f"🔍 [DEBUG] In _extract_with_custom_prompt: max_sentences={max_sentences}")
text_sample, detected_terms = _filter_text_for_glossary(all_text, min_frequency, max_sentences)
# If there is no content left, skip API call
if not text_sample or not str(text_sample).strip():
print("📑 No text available after filtering - skipping automatic glossary generation")
return {}
# Replace placeholders in prompt
# Get target language from environment (used in the prompt for translation output)
target_language = os.getenv('GLOSSARY_TARGET_LANGUAGE', 'English')
# Count context marker windows for {marker} placeholder
marker_matches = re.findall(r"===\s*CONTEXT\s+\d+\s+END\s*===", all_text or "")
marker_count = len(marker_matches)
system_prompt = custom_prompt.replace('{language}', target_language)
system_prompt = system_prompt.replace('{min_frequency}', str(min_frequency))
system_prompt = system_prompt.replace('{max_names}', str(max_names))
system_prompt = system_prompt.replace('{max_titles}', str(max_titles))
system_prompt = system_prompt.replace('{marker}', str(marker_count))
# Send system prompt and text as separate messages
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"{text_sample}"}
]
# Check stop before API call
if is_stop_requested():
print("📑 ❌ Glossary extraction stopped before API call")
return {}
try:
# Use glossary-specific temperature with fallback to global
temperature = float(os.getenv("GLOSSARY_TEMPERATURE", os.getenv("TEMPERATURE", "0.3")))
# Use glossary-specific max output tokens with fallback to global
max_tokens = int(os.getenv("GLOSSARY_MAX_OUTPUT_TOKENS", os.getenv("MAX_OUTPUT_TOKENS", "4096")))
# Use send_with_interrupt for interruptible API call
# Respect RETRY_TIMEOUT toggle - if disabled, use None for infinite timeout
retry_env = os.getenv("RETRY_TIMEOUT")
retry_timeout_enabled = retry_env is None or retry_env.strip().lower() not in ("0", "false", "off", "")
chunk_timeout = None
if retry_timeout_enabled:
env_ct = os.getenv("CHUNK_TIMEOUT", "1800")
try:
ct_val = float(env_ct)
chunk_timeout = None if ct_val <= 0 else ct_val
except Exception:
chunk_timeout = None
print(f"📑 Sending AI extraction request (timeout: {chunk_timeout if chunk_timeout is not None else 'disabled'}s, interruptible)...")
else:
print(f"📑 Sending AI extraction request (timeout: disabled, interruptible)...")
# Before API call
api_start = time.time()
print(f"📑 Preparing API request (text size: {len(text_sample):,} chars)...")
print(f"📑 ⏳ Processing {len(text_sample):,} characters... Please wait, this may take 5-10 minutes")
# Timeout retry logic (matches translation behavior)
try:
max_timeout_retries = int(os.getenv("TIMEOUT_RETRY_ATTEMPTS", "2"))
except Exception:
max_timeout_retries = 2
timeout_retry_count = 0
while True:
try:
response, finish_reason, raw_obj = send_with_interrupt(
messages=messages,
client=client,
temperature=temperature,
max_tokens=max_tokens,
stop_check_fn=is_stop_requested,
chunk_timeout=chunk_timeout,
context=progress_context
)
break
except UnifiedClientError as e:
error_msg = str(e)
lower_msg = error_msg.lower()
# Only treat an explicit user stop as an interrupt; timeouts/cancellations should retry
user_stopped = ("stopped by user" in lower_msg) or (
is_stop_requested() and not any(k in lower_msg for k in ("timeout", "timed out", "cancelled"))
)
if user_stopped:
print(f"📑 ❌ AI extraction interrupted by user")
return {}
# Treat cancelled / client init errors as timeout retries
is_timeout = ("timed out" in lower_msg) or ("timeout" in lower_msg) or ("cancelled" in lower_msg) or ("client not initialized" in lower_msg)
if is_timeout and timeout_retry_count < max_timeout_retries:
timeout_retry_count += 1
if chunk_timeout:
print(f"⚠️ AI extraction timed out after {chunk_timeout} seconds, retrying ({timeout_retry_count}/{max_timeout_retries})...")
else:
print(f"⚠️ AI extraction timed out, retrying ({timeout_retry_count}/{max_timeout_retries})...")
# Clear cancellation flags that timeouts may have set
try:
client.reset_cleanup_state()
except Exception:
pass
try:
# Also clear class-level global cancellation for all clients
client.__class__.set_global_cancellation(False)
except Exception:
pass
# Reinitialize client if needed
client_type = getattr(client, 'client_type', 'unknown')
needs_reinit = False
if client_type == 'gemini':
needs_reinit = hasattr(client, 'gemini_client') and client.gemini_client is None
elif client_type == 'openai':
needs_reinit = hasattr(client, 'openai_client') and client.openai_client is None
if needs_reinit:
try:
print(f" 🔄 Reinitializing {client_type} client...")
client._setup_client()
except Exception as reinit_err:
print(f" ⚠️ Failed to reinitialize client: {reinit_err}")
# Stagger retries to avoid simultaneous API calls
try:
import random
base_delay = float(os.getenv("SEND_INTERVAL_SECONDS", "2"))
retry_delay = random.uniform(base_delay / 2, base_delay)
print(f" ⏳ Waiting {retry_delay:.1f}s before retry...")
time.sleep(retry_delay)
except Exception:
time.sleep(1.0)
continue
else:
raise
api_time = time.time() - api_start
print(f"📑 API call completed in {api_time:.1f}s")
# Get the actual text from the response
if hasattr(response, 'content'):
response_text = response.content
else:
response_text = str(response)
# Before processing response
process_start = time.time()
# print(f"📑 Processing AI response...")
# Process response and build CSV
csv_lines = _process_ai_response(response_text, all_text, min_frequency,
strip_honorifics, fuzzy_threshold,
language, filter_mode)
print(f"📑 AI extracted {len(csv_lines) - 1} valid terms (header excluded)")
process_time = time.time() - process_start
# print(f"📑 Response processing took {process_time:.1f}s")
# If we're running per-chunk, defer all heavy work and saving
if os.getenv("GLOSSARY_DEFER_SAVE", "0") == "1":
return csv_lines
# Check stop before merging
if is_stop_requested():
print("📑 ❌ Glossary generation stopped before merging")
return {}
# Merge with existing glossary if present
if existing_glossary:
csv_lines = _merge_csv_entries(csv_lines, existing_glossary, strip_honorifics, language)
# Always inject the book title BEFORE any deduplication or filtering so it
# survives the first run (previously only happened after a second run/merge)
if os.getenv("GLOSSARY_INCLUDE_BOOK_TITLE", "0") == "1":
csv_lines = _ensure_book_title_csv_lines(csv_lines)
print("📚 Book title injected before dedup (single-shot glossary path)")
# Fuzzy matching deduplication
skip_frequency_check = os.getenv("GLOSSARY_SKIP_FREQUENCY_CHECK", "0") == "1"
if not skip_frequency_check: # Only dedupe if we're checking frequencies
# Time the deduplication
dedup_start = time.time()
original_count = len(csv_lines) - 1 # Exclude header
csv_lines = _deduplicate_glossary_with_fuzzy(csv_lines, fuzzy_threshold)
dedup_time = time.time() - dedup_start
final_count = len(csv_lines) - 1 # Exclude header
removed_count = original_count - final_count
print(f"📑 Deduplication completed in {dedup_time:.1f}s")
print(f"📑 - Original entries: {original_count}")
print(f"📑 - Duplicates removed: {removed_count}")
print(f"📑 - Final entries: {final_count}")
# Store for summary statistics
_dedup_time = 0 + dedup_time
else:
print(f"📑 Skipping deduplication (frequency check disabled)")
# Apply filter mode to final results
csv_lines = _filter_csv_by_mode(csv_lines, filter_mode)
# Check if we should use token-efficient format
use_legacy_format = os.getenv('GLOSSARY_USE_LEGACY_CSV', '0') == '1'
if not use_legacy_format:
# Convert to token-efficient format
csv_lines = _convert_to_token_efficient_format(csv_lines)
# Final sanitize to prevent stray headers
csv_lines = _sanitize_final_glossary_lines(csv_lines, use_legacy_format)
# Create final CSV content
csv_content = '\n'.join(csv_lines)
# Save glossary as CSV with proper extension
glossary_path = os.path.join(output_dir, "glossary.csv")
_atomic_write_file(glossary_path, csv_content)
print(f"\n📑 ✅ AI-ASSISTED GLOSSARY SAVED!")
print(f"📑 File: {glossary_path}")
c_count, t_count, total = _count_glossary_entries(csv_lines, use_legacy_format)
print(f"📑 Character entries: {c_count}")
# print(f"📑 Term entries: {t_count}")
print(f"📑 Total entries: {total}")
total_time = time.time() - extraction_start
print(f"📑 Total extraction time: {total_time:.1f}s")
return _parse_csv_to_dict(csv_content)
except UnifiedClientError as e:
if "stopped by user" in str(e).lower():
print(f"📑 ❌ AI extraction interrupted by user")
return {}
else:
print(f"⚠️ AI extraction failed: {e}")
print("📑 ❌ Glossary generation failed - returning empty glossary")
return {}
except Exception as e:
print(f"⚠️ AI extraction failed: {e}")
import traceback
traceback.print_exc()
print("📑 ❌ Glossary generation failed - returning empty glossary")
return {}
except Exception as e:
print(f"⚠️ Custom prompt processing failed: {e}")
import traceback
traceback.print_exc()
print("📑 ❌ Glossary generation failed - returning empty glossary")
return {}
def _filter_csv_by_mode(csv_lines, filter_mode):
"""Filter CSV lines based on the filter mode"""
if filter_mode == "all":
return csv_lines
filtered = [csv_lines[0]] # Keep header
for line in csv_lines[1:]:
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) < 3:
continue
entry_type = parts[0].lower()
raw_name = parts[1]
if filter_mode == "only_with_honorifics":
# Only keep character entries with honorifics
if entry_type == "character" and _has_honorific(raw_name):
filtered.append(line)
elif filter_mode == "only_without_honorifics":
# Keep terms and characters without honorifics
if entry_type == "term" or (entry_type == "character" and not _has_honorific(raw_name)):
filtered.append(line)
print(f"📑 Filter '{filter_mode}': {len(filtered)-1} entries kept from {len(csv_lines)-1}")
return filtered
def _process_ai_response(response_text, all_text, min_frequency,
strip_honorifics, fuzzy_threshold, language, filter_mode):
"""Process AI response and return CSV lines"""
# Check if gender context and description are enabled (used throughout the function)
include_gender_context = os.getenv("GLOSSARY_INCLUDE_GENDER_CONTEXT", "0") == "1"
include_description = os.getenv("GLOSSARY_INCLUDE_DESCRIPTION", "0") == "1"
# option to completely skip frequency validation for speed
skip_all_validation = os.getenv("GLOSSARY_SKIP_ALL_VALIDATION", "0") == "1"
# if skip_all_validation:
# print("📑 ⚡ FAST MODE: Skipping all frequency validation (accepting all AI results)")
# Clean response text
response_text = response_text.strip()
# Remove string representation artifacts if they wrap the entire response
if response_text.startswith('("') and response_text.endswith('")'):
response_text = response_text[2:-2]
elif response_text.startswith('"') and response_text.endswith('"'):
response_text = response_text[1:-1]
elif response_text.startswith('(') and response_text.endswith(')'):
response_text = response_text[1:-1]
# Unescape the string
response_text = response_text.replace('\\n', '\n')
response_text = response_text.replace('\\r', '')
response_text = response_text.replace('\\t', '\t')
response_text = response_text.replace('\\"', '"')
response_text = response_text.replace("\\'", "'")
response_text = response_text.replace('\\\\', '\\')
# Clean up markdown code blocks if present
if '```' in response_text:
parts = response_text.split('```')
for part in parts:
if 'csv' in part[:10].lower():
response_text = part[part.find('\n')+1:]
break
elif part.strip() and ('type,raw_name' in part or 'character,' in part or 'term,' in part):
response_text = part
break
# Normalize line endings
response_text = response_text.replace('\r\n', '\n').replace('\r', '\n')
lines = [line.strip() for line in response_text.strip().split('\n') if line.strip()]
import csv
# --- Dynamic header capture: accept every column the AI returns ---
dynamic_header = None
dynamic_rows = []
for ln in lines:
low = ln.lower()
if 'type' in low and 'raw_name' in low:
try:
dynamic_header = [c.strip() for c in next(csv.reader([ln])) if c.strip()]
except Exception:
dynamic_header = [c.strip() for c in ln.split(',') if c.strip()]
continue
if dynamic_header:
try:
dynamic_rows.append(next(csv.reader([ln])))
except Exception:
dynamic_rows.append([c.strip() for c in ln.split(',')])
if dynamic_header:
required = {h.lower(): i for i, h in enumerate(dynamic_header)}
if all(k in required for k in ('type', 'raw_name', 'translated_name')):
csv_lines = [','.join(dynamic_header)]
for row in dynamic_rows:
if len(row) < len(dynamic_header):
row += [''] * (len(dynamic_header) - len(row))
elif len(row) > len(dynamic_header):
desc_idx = required.get('description')
if desc_idx is not None and desc_idx < len(dynamic_header):
row = row[:desc_idx] + [','.join(row[desc_idx:])]
else:
row = row[:len(dynamic_header)]
# Clean stop tokens
row = ['' if cell in ("'stop'", "stop") else cell for cell in row]
entry_type = row[required['type']].strip() if len(row) > required['type'] else ''
raw_name = row[required['raw_name']].strip() if len(row) > required['raw_name'] else ''
translated_name = row[required['translated_name']].strip() if len(row) > required['translated_name'] else ''
if not raw_name or not translated_name:
continue
csv_lines.append(','.join(row[:len(dynamic_header)]))
if csv_lines:
print(f"📑 Dynamic header detected from AI: {dynamic_header}")
return csv_lines
csv_lines = []
header_found = False
# Post-response min_frequency filtering is disabled (accept all AI rows);
# skip_frequency_check forced true to bypass frequency gating.
skip_frequency_check = True
# Add option to completely skip ALL validation for maximum speed
skip_all_validation = os.getenv("GLOSSARY_SKIP_ALL_VALIDATION", "0") == "1"
if skip_all_validation:
# print("📑 ⚡ FAST MODE: Skipping all frequency validation (accepting all AI results)")
# Use appropriate header based on gender and description settings
if include_description:
csv_lines.append("type,raw_name,translated_name,gender,description")
elif include_gender_context:
csv_lines.append("type,raw_name,translated_name,gender")
# print("📑 Fast mode: Using 4-column format with gender")
else:
csv_lines.append("type,raw_name,translated_name")
# Process the AI response
for line in lines:
# Skip header lines
if 'type' in line.lower() and 'raw_name' in line.lower():
continue
# Parse CSV line
parts = [p.strip() for p in line.split(',')]
# Replace invalid 'stop' values with empty string
parts = ['' if p == "'stop'" or p == "stop" else p for p in parts]
if include_description and len(parts) >= 5:
# Has all 5 columns (with gender and description)
entry_type = parts[0]
raw_name = parts[1]
translated_name = parts[2]
gender = parts[3] if len(parts) > 3 else ''
description = parts[4] if len(parts) > 4 else ''
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if (raw_name and translated_name and
not (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"'))) and
not (raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"')))):
csv_lines.append(f"{entry_type},{raw_name},{translated_name},{gender},{description}")
elif include_gender_context and len(parts) >= 4:
# Has all 4 columns (with gender)
entry_type = parts[0]
raw_name = parts[1]
translated_name = parts[2]
gender = parts[3] if len(parts) > 3 else ''
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if (raw_name and translated_name and
not (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"'))) and
not (raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"')))):
csv_lines.append(f"{entry_type},{raw_name},{translated_name},{gender}")
elif len(parts) >= 3:
# Has at least 3 columns
entry_type = parts[0]
raw_name = parts[1]
translated_name = parts[2]
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if (raw_name and translated_name and
not (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"'))) and
not (raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"')))):
if include_description:
# Add empty gender and description columns when 5 columns expected
gender = parts[3] if len(parts) > 3 else ''
description = parts[4] if len(parts) > 4 else ''
csv_lines.append(f"{entry_type},{raw_name},{translated_name},{gender},{description}")
elif include_gender_context:
# Add empty gender column for 3-column entries when 4 columns expected
gender = parts[3] if len(parts) > 3 else ''
csv_lines.append(f"{entry_type},{raw_name},{translated_name},{gender}")
else:
csv_lines.append(f"{entry_type},{raw_name},{translated_name}")
elif len(parts) == 2:
# Missing type, default to 'term'
raw_name = parts[0]
translated_name = parts[1]
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if (raw_name and translated_name and
not (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"'))) and
not (raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"')))):
if include_description:
csv_lines.append(f"term,{raw_name},{translated_name},,")
elif include_gender_context:
csv_lines.append(f"term,{raw_name},{translated_name},")
else:
csv_lines.append(f"term,{raw_name},{translated_name}")
# print(f"📑 Fast mode: Accepted {len(csv_lines) - 1} entries without validation")
return csv_lines
# For "only_with_honorifics" mode, ALWAYS skip frequency check
if filter_mode == "only_with_honorifics":
skip_frequency_check = True
print("📑 Filter mode 'only_with_honorifics': Bypassing frequency checks")
print(f'📑 Processing {len(lines)} lines from AI response...')
# print(f'📑 Text corpus size: {len(all_text):,} chars')
# print(f'📑 Frequency checking: DISABLED (post-response min_frequency bypassed)')
# print(f'📑 Fuzzy threshold: {fuzzy_threshold}')
# Collect all terms first for batch processing
all_terms_to_check = []
term_info_map = {} # Map term to its full info
if not skip_frequency_check:
# First pass: collect all terms that need frequency checking
for line in lines:
if 'type' in line.lower() and 'raw_name' in line.lower():
continue # Skip header
parts = [p.strip() for p in line.split(',')]
# Replace invalid 'stop' values with empty string
parts = ['' if p == "'stop'" or p == "stop" else p for p in parts]
# Strip orphaned quotes and filter empty columns
parts = [p.strip('"').strip("'").strip() for p in parts]
parts = [p for p in parts if p] # Remove empty strings
if len(parts) >= 3:
entry_type = parts[0].lower()
raw_name = parts[1]
translated_name = parts[2]
gender = parts[3] if len(parts) > 3 else ''
description = parts[4] if len(parts) > 4 else ''
elif len(parts) == 2:
entry_type = 'term'
raw_name = parts[0]
translated_name = parts[1]
gender = ''
description = ''
else:
continue
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if not raw_name or not translated_name:
continue
if (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"')) or
raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"'))):
continue
if raw_name and translated_name:
# Store for batch processing
original_raw = raw_name
if strip_honorifics:
raw_name = _strip_honorific(raw_name, language)
all_terms_to_check.append(raw_name)
term_info_map[raw_name] = {
'entry_type': entry_type,
'original_raw': original_raw,
'translated_name': translated_name,
'gender': gender,
'description': description,
'line': line
}
# Batch compute all frequencies at once
if all_terms_to_check:
print(f"📑 Computing frequencies for {len(all_terms_to_check)} terms...")
term_frequencies = _batch_compute_frequencies(
all_terms_to_check, all_text, fuzzy_threshold, min_frequency
)
else:
term_frequencies = {}
# Now process the results using pre-computed frequencies
entries_processed = 0
entries_accepted = 0
# Process based on mode
if filter_mode == "only_with_honorifics" or skip_frequency_check:
# For these modes, accept all entries
if include_description:
csv_lines.append("type,raw_name,translated_name,gender,description") # Header with description
elif include_gender_context:
csv_lines.append("type,raw_name,translated_name,gender") # Header with gender
else:
csv_lines.append("type,raw_name,translated_name") # Header
for line in lines:
if 'type' in line.lower() and 'raw_name' in line.lower():
continue # Skip header
parts = [p.strip() for p in line.split(',')]
# Replace invalid 'stop' values with empty string
parts = ['' if p == "'stop'" or p == "stop" else p for p in parts]
# Strip orphaned quotes and filter empty columns
parts = [p.strip('"').strip("'").strip() for p in parts]
parts = [p for p in parts if p] # Remove empty strings
if len(parts) >= 3:
entry_type = parts[0].lower()
raw_name = parts[1]
translated_name = parts[2]
gender = parts[3] if len(parts) > 3 else ''
description = parts[4] if len(parts) > 4 else ''
elif len(parts) == 2:
entry_type = 'term'
raw_name = parts[0]
translated_name = parts[1]
gender = ''
description = ''
else:
continue
# Validate - reject malformed entries that look like tuples/lists or quoted strings
if not raw_name or not translated_name:
continue
if (raw_name.startswith(('[', '(', "'", '"')) or translated_name.startswith(('[', '(', "'", '"')) or
raw_name.endswith(("'", '"')) or translated_name.endswith(("'", '"'))):
continue
if raw_name and translated_name:
if include_description:
csv_line = f"{entry_type},{raw_name},{translated_name},{gender},{description}"
elif include_gender_context:
csv_line = f"{entry_type},{raw_name},{translated_name},{gender}"
else:
csv_line = f"{entry_type},{raw_name},{translated_name}"
csv_lines.append(csv_line)
entries_accepted += 1
print(f"📑 Accepted {entries_accepted} entries (frequency check disabled)")
else:
# Use pre-computed frequencies
if include_description:
csv_lines.append("type,raw_name,translated_name,gender,description") # Header with description
elif include_gender_context:
csv_lines.append("type,raw_name,translated_name,gender") # Header with gender
else:
csv_lines.append("type,raw_name,translated_name") # Header
for term, info in term_info_map.items():
count = term_frequencies.get(term, 0)
# Also check original form if it was stripped
if info['original_raw'] != term:
count += term_frequencies.get(info['original_raw'], 0)
if count >= min_frequency:
if include_description:
csv_line = f"{info['entry_type']},{term},{info['translated_name']},{info['gender']},{info['description']}"
elif include_gender_context:
csv_line = f"{info['entry_type']},{term},{info['translated_name']},{info['gender']}"
else:
csv_line = f"{info['entry_type']},{term},{info['translated_name']}"
csv_lines.append(csv_line)
entries_accepted += 1
# Log first few examples
if entries_accepted <= 5:
print(f"📑 ✓ Example: {term} -> {info['translated_name']} (freq: {count})")
print(f"📑 Frequency filtering complete: {entries_accepted}/{len(term_info_map)} terms accepted")
# Ensure we have at least the header
if len(csv_lines) == 0:
if include_description:
csv_lines.append("type,raw_name,translated_name,gender,description")
elif include_gender_context:
csv_lines.append("type,raw_name,translated_name,gender")
else:
csv_lines.append("type,raw_name,translated_name")
# Print final summary
print(f"📑 Processing complete: {entries_accepted} terms accepted")
return csv_lines
def _deduplicate_glossary_with_fuzzy(csv_lines, fuzzy_threshold):
"""Apply advanced fuzzy matching to remove duplicate entries from the glossary with stop flag checks
Uses a 2-pass approach:
Pass 1: Remove entries with similar raw names (existing logic)
Pass 2: Remove entries with identical translated names (new logic)
"""
from difflib import SequenceMatcher
# Try to import advanced libraries
try:
from rapidfuzz import fuzz as rfuzz
use_rapidfuzz = True
except ImportError:
use_rapidfuzz = False
try:
import jellyfish
use_jellyfish = True
except ImportError:
use_jellyfish = False
algo_info = []
if use_rapidfuzz:
algo_info.append("RapidFuzz")
if use_jellyfish:
algo_info.append("Jaro-Winkler")
if not algo_info:
algo_info.append("difflib")
# Check if translated name deduplication is enabled
# GLOSSARY_DEDUPE_TRANSLATIONS: "1" = enable Pass 2 (remove entries with identical translations)
# : "0" = disable Pass 2 (only remove entries with similar raw names)
dedupe_translations = os.getenv("GLOSSARY_DEDUPE_TRANSLATIONS", "1") == "1"
print(f"📋 Applying 2-pass fuzzy deduplication (threshold: {fuzzy_threshold})...")
print(f"📋 Pass 1: Raw name deduplication (fuzzy matching)")
if dedupe_translations:
print(f"📋 Pass 2: Translated name deduplication (exact matching)")
else:
print(f"📋 Pass 2: DISABLED (GLOSSARY_DEDUPE_TRANSLATIONS=0)")
print(f"📋 Using algorithms: {', '.join(algo_info)}")
# Check stop flag at start
if is_stop_requested():
print(f"📑 ❌ Deduplication stopped by user")
return csv_lines
header_line = csv_lines[0] # Keep header
entry_lines = csv_lines[1:] # Data lines
original_count = len(entry_lines)
print(f"📑 Starting deduplication with {original_count} entries...")
# PASS 1: Raw name deduplication (existing fuzzy matching logic)
print(f"📑 🔄 PASS 1: Raw name deduplication...")
pass1_results = _deduplicate_pass1_raw_names(
entry_lines, fuzzy_threshold, use_rapidfuzz, use_jellyfish
)
pass1_count = len(pass1_results)
pass1_removed = original_count - pass1_count
print(f"📑 ✅ PASS 1 complete: {pass1_removed} duplicates removed ({pass1_count} remaining)")
# PASS 2: Translated name deduplication (if enabled)
if dedupe_translations:
print(f"📑 🔄 PASS 2: Translated name deduplication...")
final_results, replaced_count = _deduplicate_pass2_translated_names(pass1_results)
pass2_removed = pass1_count - len(final_results)
replaced_msg = f" ({replaced_count} replaced with more complete entries)" if replaced_count > 0 else ""
print(f"📑 ✅ PASS 2 complete: {pass2_removed} duplicates removed{replaced_msg} ({len(final_results)} remaining)")
total_removed = pass1_removed + pass2_removed
else:
final_results = pass1_results
total_removed = pass1_removed
print(f"📑 ⏭️ PASS 2 skipped (translation deduplication disabled)")
# Rebuild CSV with header
deduplicated = [header_line] + final_results
print(f"📑 ✅ Total deduplication complete: {total_removed} duplicates removed")
print(f"📑 Final glossary size: {len(final_results)} unique entries")
return deduplicated
def _deduplicate_pass1_raw_names(entry_lines, fuzzy_threshold, use_rapidfuzz, use_jellyfish):
"""Pass 1: Remove entries with similar raw names using fuzzy matching"""
from difflib import SequenceMatcher
if use_rapidfuzz:
from rapidfuzz import fuzz as rfuzz
if use_jellyfish:
import jellyfish
deduplicated = []
seen_entries = {} # raw_name -> (entry_type, translated_name)
seen_names_lower = set() # Quick exact match check
removed_count = 0
total_entries = len(entry_lines)
for idx, line in enumerate(entry_lines):
# Check stop flag every 100 entries
if idx > 0 and idx % 100 == 0:
if is_stop_requested():
print(f"📑 ❌ Pass 1 stopped at entry {idx}/{total_entries}")
break
# Show progress for large glossaries
if total_entries > 500 and idx % 200 == 0:
progress = (idx / total_entries) * 100
print(f"📑 Pass 1 progress: {progress:.1f}% ({idx}/{total_entries})")
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) < 3:
continue
entry_type = parts[0]
raw_name = parts[1]
translated_name = parts[2]
raw_name_lower = raw_name.lower()
# Fast exact duplicate check first
if raw_name_lower in seen_names_lower:
removed_count += 1
if removed_count <= 10: # Only log first few
print(f"📋 Pass 1: Removing exact duplicate: '{raw_name}'")
continue
# For fuzzy matching, only check if threshold is less than 1.0
is_duplicate = False
if fuzzy_threshold < 1.0:
# Use a more efficient approach: only check similar length strings
name_len = len(raw_name)
min_len = int(name_len * 0.7)
max_len = int(name_len * 1.3)
# Only compare with entries of similar length
candidates = []
for seen_name, (seen_type, seen_trans) in seen_entries.items():
if min_len <= len(seen_name) <= max_len:
candidates.append(seen_name)
# Check fuzzy similarity with candidates using multiple algorithms
for seen_name in candidates:
# Quick character overlap check before expensive comparison
char_overlap = len(set(raw_name_lower) & set(seen_name.lower()))
if char_overlap < len(raw_name_lower) * 0.5:
continue # Too different, skip
# Try multiple algorithms and take the best score
scores = []
if use_rapidfuzz:
# RapidFuzz basic ratio
scores.append(rfuzz.ratio(raw_name_lower, seen_name.lower()) / 100.0)
# Token sort (handles word order)
try:
scores.append(rfuzz.token_sort_ratio(raw_name_lower, seen_name.lower()) / 100.0)
except:
pass
# Partial ratio (substring)
try:
scores.append(rfuzz.partial_ratio(raw_name_lower, seen_name.lower()) / 100.0)
except:
pass
else:
# Fallback to difflib
scores.append(SequenceMatcher(None, raw_name_lower, seen_name.lower()).ratio())
# Try Jaro-Winkler (better for names)
if use_jellyfish:
try:
jaro = jellyfish.jaro_winkler_similarity(raw_name, seen_name)
scores.append(jaro)
except:
pass
# Take best score
best_similarity = max(scores) if scores else 0.0
if best_similarity >= fuzzy_threshold:
if removed_count < 10: # Only log first few
print(f"📋 Pass 1: Removing fuzzy duplicate: '{raw_name}' ~= '{seen_name}' (score: {best_similarity:.2%})")
removed_count += 1
is_duplicate = True
break
if not is_duplicate:
seen_entries[raw_name] = (entry_type, translated_name)
seen_names_lower.add(raw_name_lower)
deduplicated.append(line)
return deduplicated
def _deduplicate_pass2_translated_names(entry_lines):
"""Pass 2: Remove entries with identical translated names"""
deduplicated = []
seen_translations = {} # translated_name.lower() -> (raw_name, line)
removed_count = 0
replaced_count = 0
for line in entry_lines:
if not line.strip():
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) < 3:
continue
entry_type = parts[0]
raw_name = parts[1]
translated_name = parts[2]
translated_lower = translated_name.lower().strip()
# Skip empty translations
if not translated_lower:
deduplicated.append(line)
continue
# Check if we've seen this translation before
if translated_lower in seen_translations:
existing_raw, existing_line = seen_translations[translated_lower]
# Get the existing translated name from the line
existing_parts = existing_line.split(',')
existing_translated = existing_parts[2] if len(existing_parts) >= 3 else translated_name
# Count fields in both entries (more fields = higher priority)
current_field_count = len([f.strip() for f in parts if f.strip()])
existing_field_count = len([f.strip() for f in existing_parts if f.strip()])
# If current entry has more fields, replace the existing one
if current_field_count > existing_field_count:
# Remove existing entry from deduplicated list
deduplicated = [l for l in deduplicated if l != existing_line]
# Replace with current entry
seen_translations[translated_lower] = (raw_name, line)
deduplicated.append(line)
removed_count += 1
replaced_count += 1
if removed_count <= 10: # Only log first few
print(f"📋 Pass 2: Replacing '{existing_raw}' -> '{existing_translated}' ({existing_field_count} fields) with '{raw_name}' -> '{translated_name}' ({current_field_count} fields) - more detailed entry")
else:
# Keep existing entry (has same or more fields)
removed_count += 1
if removed_count <= 10: # Only log first few
extra_info = f" ({current_field_count} vs {existing_field_count} fields)" if current_field_count != existing_field_count else ""
print(f"📋 Pass 2: Removing '{raw_name}' -> '{translated_name}' (duplicate translation of '{existing_raw}' -> '{existing_translated}'){extra_info}")
else:
# New translation, keep it
seen_translations[translated_lower] = (raw_name, line)
deduplicated.append(line)
return deduplicated, replaced_count
def _merge_csv_entries(new_csv_lines, existing_glossary, strip_honorifics, language):
"""Merge CSV entries with existing glossary with stop flag checks"""
# Check stop flag at start
if is_stop_requested():
print(f"📑 ❌ Glossary merge stopped by user")
return new_csv_lines
# Parse existing glossary
existing_lines = []
existing_names = set()
if isinstance(existing_glossary, str):
# Already CSV format
lines = existing_glossary.strip().split('\n')
total_lines = len(lines)
for idx, line in enumerate(lines):
# Check stop flag every 50 lines
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Merge stopped while processing existing glossary at line {idx}/{total_lines}")
return new_csv_lines
if total_lines > 200:
progress = (idx / total_lines) * 100
print(f"📑 Processing existing glossary: {progress:.1f}%")
if 'type,raw_name' in line.lower():
continue # Skip header
line_stripped = line.strip()
# Skip token-efficient lines and section/bullet markers
if not line_stripped or line_stripped.startswith('===') or line_stripped.startswith('*') or line_stripped.lower().startswith('glossary:'):
continue
parts = [p.strip() for p in line.split(',')]
# Require at least 3 fields (type, raw_name, translated_name)
if len(parts) < 3:
continue
entry_type = parts[0].strip().lower()
# Only accept reasonable type tokens (letters/underscores only)
import re as _re
if not _re.match(r'^[a-z_]+$', entry_type):
continue
raw_name = parts[1]
if strip_honorifics:
raw_name = _strip_honorific(raw_name, language)
parts[1] = raw_name
if raw_name not in existing_names:
existing_lines.append(','.join(parts))
existing_names.add(raw_name)
# Check stop flag before processing new names
if is_stop_requested():
print(f"📑 ❌ Merge stopped before processing new entries")
return new_csv_lines
# Get new names
new_names = set()
final_lines = []
for idx, line in enumerate(new_csv_lines):
# Check stop flag every 50 lines
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Merge stopped while processing new entries at line {idx}")
return final_lines if final_lines else new_csv_lines
if 'type,raw_name' in line.lower():
final_lines.append(line) # Keep header
continue
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 2:
new_names.add(parts[1])
final_lines.append(line)
# Check stop flag before adding existing entries
if is_stop_requested():
print(f"📑 ❌ Merge stopped before combining entries")
return final_lines
# Add non-duplicate existing entries
added_count = 0
for idx, line in enumerate(existing_lines):
# Check stop flag every 50 additions
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Merge stopped while adding existing entries ({added_count} added)")
return final_lines
parts = [p.strip() for p in line.split(',')]
if len(parts) >= 2 and parts[1] not in new_names:
final_lines.append(line)
added_count += 1
print(f"📑 Merged {added_count} entries from existing glossary")
return final_lines
def _extract_with_patterns(all_text, language, min_frequency,
max_names, max_titles, batch_size,
existing_glossary, output_dir,
strip_honorifics=True, fuzzy_threshold=0.90, filter_mode='all'):
"""Extract glossary using pattern matching with true CSV format output and stop flag checks"""
print("📑 Using pattern-based extraction")
# Check stop flag at start
if is_stop_requested():
print("📑 ❌ Pattern-based extraction stopped by user")
return {}
def is_valid_name(name, language_hint='unknown'):
"""Strict validation for proper names only"""
if not name or len(name.strip()) < 1:
return False
name = name.strip()
if name.lower() in PM.COMMON_WORDS or name in PM.COMMON_WORDS:
return False
if language_hint == 'korean':
if not (2 <= len(name) <= 4):
return False
if not all(0xAC00 <= ord(char) <= 0xD7AF for char in name):
return False
if len(set(name)) == 1:
return False
elif language_hint == 'japanese':
if not (2 <= len(name) <= 6):
return False
has_kanji = any(0x4E00 <= ord(char) <= 0x9FFF for char in name)
has_kana = any((0x3040 <= ord(char) <= 0x309F) or (0x30A0 <= ord(char) <= 0x30FF) for char in name)
if not (has_kanji or has_kana):
return False
elif language_hint == 'chinese':
if not (2 <= len(name) <= 4):
return False
if not all(0x4E00 <= ord(char) <= 0x9FFF for char in name):
return False
elif language_hint == 'english':
if not name[0].isupper():
return False
if sum(1 for c in name if c.isalpha()) < len(name) * 0.8:
return False
if not (2 <= len(name) <= 20):
return False
return True
def detect_language_hint(text_sample):
"""Quick language detection for validation purposes"""
sample = text_sample[:1000]
korean_chars = sum(1 for char in sample if 0xAC00 <= ord(char) <= 0xD7AF)
japanese_kana = sum(1 for char in sample if (0x3040 <= ord(char) <= 0x309F) or (0x30A0 <= ord(char) <= 0x30FF))
chinese_chars = sum(1 for char in sample if 0x4E00 <= ord(char) <= 0x9FFF)
latin_chars = sum(1 for char in sample if 0x0041 <= ord(char) <= 0x007A)
if korean_chars > 50:
return 'korean'
elif japanese_kana > 20:
return 'japanese'
elif chinese_chars > 50 and japanese_kana < 10:
return 'chinese'
elif latin_chars > 100:
return 'english'
else:
return 'unknown'
language_hint = detect_language_hint(all_text)
print(f"📑 Detected primary language: {language_hint}")
# Check stop flag after language detection
if is_stop_requested():
print("📑 ❌ Extraction stopped after language detection")
return {}
honorifics_to_use = []
if language_hint in PM.CJK_HONORIFICS:
honorifics_to_use.extend(PM.CJK_HONORIFICS[language_hint])
honorifics_to_use.extend(PM.CJK_HONORIFICS.get('english', []))
print(f"📑 Using {len(honorifics_to_use)} honorifics for {language_hint}")
names_with_honorifics = {}
standalone_names = {}
# Check if parallel processing is enabled
extraction_workers = int(os.getenv("EXTRACTION_WORKERS", "1"))
# PARALLEL HONORIFIC PROCESSING
if extraction_workers > 1 and len(honorifics_to_use) > 3:
print(f"📑 Scanning for names with honorifics (parallel with {extraction_workers} workers)...")
# Create a wrapper function that can be called in parallel
def process_honorific(args):
"""Process a single honorific in a worker thread"""
honorific, idx, total = args
# Check stop flag
if is_stop_requested():
return None, None
print(f"📑 Worker processing honorific {idx}/{total}: '{honorific}'")
# Local dictionaries for this worker
local_names_with = {}
local_standalone = {}
# Call the extraction method
_extract_names_for_honorific(
honorific, all_text, language_hint,
min_frequency, local_names_with,
local_standalone, is_valid_name, fuzzy_threshold
)
return local_names_with, local_standalone
# Prepare arguments for parallel processing
honorific_args = [
(honorific, idx + 1, len(honorifics_to_use))
for idx, honorific in enumerate(honorifics_to_use)
]
# Process honorifics in parallel
with ThreadPoolExecutor(max_workers=min(extraction_workers, len(honorifics_to_use))) as executor:
futures = []
for args in honorific_args:
if is_stop_requested():
executor.shutdown(wait=False)
return {}
future = executor.submit(process_honorific, args)
futures.append(future)
# Collect results as they complete
completed = 0
for future in as_completed(futures):
if is_stop_requested():
executor.shutdown(wait=False)
return {}
try:
result = future.result()
if result and result[0] is not None:
local_names_with, local_standalone = result
# Merge results (thread-safe since we're in main thread)
for name, count in local_names_with.items():
if name not in names_with_honorifics:
names_with_honorifics[name] = count
else:
names_with_honorifics[name] = max(names_with_honorifics[name], count)
for name, count in local_standalone.items():
if name not in standalone_names:
standalone_names[name] = count
else:
standalone_names[name] = max(standalone_names[name], count)
completed += 1
if completed % 5 == 0 or completed == len(honorifics_to_use):
print(f"📑 Honorific processing: {completed}/{len(honorifics_to_use)} completed")
except Exception as e:
print(f"⚠️ Failed to process honorific: {e}")
completed += 1
print(f"📑 Parallel honorific processing completed: found {len(names_with_honorifics)} names")
else:
# SEQUENTIAL PROCESSING (fallback)
print("📑 Scanning for names with honorifics...")
# Extract names with honorifics
total_honorifics = len(honorifics_to_use)
for idx, honorific in enumerate(honorifics_to_use):
# Check stop flag before each honorific
if is_stop_requested():
print(f"📑 ❌ Extraction stopped at honorific {idx}/{total_honorifics}")
return {}
print(f"📑 Processing honorific {idx + 1}/{total_honorifics}: '{honorific}'")
_extract_names_for_honorific(honorific, all_text, language_hint,
min_frequency, names_with_honorifics,
standalone_names, is_valid_name, fuzzy_threshold)
# Check stop flag before processing terms
if is_stop_requested():
print("📑 ❌ Extraction stopped before processing terms")
return {}
# Apply filter mode
filtered_names = {}
if filter_mode == 'only_with_honorifics':
# Only keep names that have honorifics (no standalone names)
filtered_names = names_with_honorifics.copy()
print(f"📑 Filter: Keeping only names with honorifics ({len(filtered_names)} names)")
elif filter_mode == 'only_without_honorifics':
# Keep standalone names that were NOT found with honorifics
for name, count in standalone_names.items():
# Check if this name also appears with honorifics
appears_with_honorific = False
for honorific_name in names_with_honorifics.keys():
if _strip_honorific(honorific_name, language_hint) == name:
appears_with_honorific = True
break
# Only add if it doesn't appear with honorifics
if not appears_with_honorific:
filtered_names[name] = count
print(f"📑 Filter: Keeping only names without honorifics ({len(filtered_names)} names)")
else: # 'all' mode
# Keep all names (both with and without honorifics)
filtered_names = names_with_honorifics.copy()
# Also add standalone names
for name, count in standalone_names.items():
if name not in filtered_names and not any(
_strip_honorific(n, language_hint) == name for n in filtered_names.keys()
):
filtered_names[name] = count
print(f"📑 Filter: Keeping all names ({len(filtered_names)} names)")
# Process extracted terms
final_terms = {}
term_count = 0
total_terms = len(filtered_names)
for term, count in filtered_names.items():
term_count += 1
# Check stop flag every 20 terms
if term_count % 20 == 0:
if is_stop_requested():
print(f"📑 ❌ Term processing stopped at {term_count}/{total_terms}")
return {}
if strip_honorifics:
clean_term = _strip_honorific(term, language_hint)
if clean_term in final_terms:
final_terms[clean_term] = final_terms[clean_term] + count
else:
final_terms[clean_term] = count
else:
final_terms[term] = count
# Check stop flag before finding titles
if is_stop_requested():
print("📑 ❌ Extraction stopped before finding titles")
return {}
# Find titles (but respect filter mode)
print("📑 Scanning for titles...")
found_titles = {}
# Extract titles for all modes EXCEPT "only_with_honorifics"
# (titles are included in "only_without_honorifics" since titles typically don't have honorifics)
if filter_mode != 'only_with_honorifics':
title_patterns_to_use = []
if language_hint in PM.TITLE_PATTERNS:
title_patterns_to_use.extend(PM.TITLE_PATTERNS[language_hint])
title_patterns_to_use.extend(PM.TITLE_PATTERNS.get('english', []))
total_patterns = len(title_patterns_to_use)
for pattern_idx, pattern in enumerate(title_patterns_to_use):
# Check stop flag before each pattern
if is_stop_requested():
print(f"📑 ❌ Title extraction stopped at pattern {pattern_idx}/{total_patterns}")
return {}
print(f"📑 Processing title pattern {pattern_idx + 1}/{total_patterns}")
matches = list(re.finditer(pattern, all_text, re.IGNORECASE if 'english' in pattern else 0))
for match_idx, match in enumerate(matches):
# Check stop flag every 50 matches
if match_idx > 0 and match_idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Title extraction stopped at match {match_idx}")
return {}
title = match.group(0)
# Skip if this title is already in names
if title in filtered_names or title in names_with_honorifics:
continue
count = _find_fuzzy_matches(title, all_text, fuzzy_threshold)
# Check if stopped during fuzzy matching
if is_stop_requested():
print(f"📑 ❌ Title extraction stopped during fuzzy matching")
return {}
if count >= min_frequency:
if re.match(r'[A-Za-z]', title):
title = title.title()
if strip_honorifics:
title = _strip_honorific(title, language_hint)
if title not in found_titles:
found_titles[title] = count
if filter_mode == 'only_without_honorifics':
print(f"📑 Found {len(found_titles)} titles (included in 'without honorifics' mode)")
else:
print(f"📑 Found {len(found_titles)} unique titles")
else:
print(f"📑 Skipping title extraction (filter mode: only_with_honorifics)")
# Check stop flag before sorting and translation
if is_stop_requested():
print("📑 ❌ Extraction stopped before sorting terms")
return {}
# Combine and sort
sorted_names = sorted(final_terms.items(), key=lambda x: x[1], reverse=True)
sorted_titles = sorted(found_titles.items(), key=lambda x: x[1], reverse=True)
all_terms = []
for name, count in sorted_names:
all_terms.append(name)
for title, count in sorted_titles:
all_terms.append(title)
print(f"📑 Total terms to translate: {len(all_terms)}")
# Check stop flag before translation
if is_stop_requested():
print("📑 ❌ Extraction stopped before translation")
return {}
# Translate terms
if os.getenv("DISABLE_GLOSSARY_TRANSLATION", "0") == "1":
print("📑 Translation disabled - keeping original terms")
translations = {term: term for term in all_terms}
else:
print(f"📑 Translating {len(all_terms)} terms...")
translations = _translate_terms_batch(all_terms, language_hint, batch_size, output_dir)
# Check if translation was stopped
if is_stop_requested():
print("📑 ❌ Extraction stopped after translation")
return translations # Return partial results
# Build CSV lines
csv_lines = ["type,raw_name,translated_name"]
for name, _ in sorted_names:
if name in translations:
csv_lines.append(f"character,{name},{translations[name]}")
for title, _ in sorted_titles:
if title in translations:
csv_lines.append(f"term,{title},{translations[title]}")
# Check stop flag before merging
if is_stop_requested():
print("📑 ❌ Extraction stopped before merging with existing glossary")
# Still save what we have
csv_content = '\n'.join(csv_lines)
glossary_path = os.path.join(output_dir, "glossary.json")
_atomic_write_file(glossary_path, csv_content)
return _parse_csv_to_dict(csv_content)
# Merge with existing glossary
if existing_glossary:
csv_lines = _merge_csv_entries(csv_lines, existing_glossary, strip_honorifics, language_hint)
# Check stop flag before deduplication
if is_stop_requested():
print("📑 ❌ Extraction stopped before deduplication")
csv_content = '\n'.join(csv_lines)
glossary_path = os.path.join(output_dir, "glossary.json")
_atomic_write_file(glossary_path, csv_content)
return _parse_csv_to_dict(csv_content)
# Fuzzy matching deduplication
csv_lines = _deduplicate_glossary_with_fuzzy(csv_lines, fuzzy_threshold)
# Create CSV content
csv_content = '\n'.join(csv_lines)
# Save glossary as CSV
glossary_path = os.path.join(output_dir, "glossary.csv")
_atomic_write_file(glossary_path, csv_content)
print(f"\n📑 ✅ TARGETED GLOSSARY SAVED!")
print(f"📑 File: {glossary_path}")
print(f"📑 Total entries: {len(csv_lines) - 1}") # Exclude header
return _parse_csv_to_dict(csv_content)
def _translate_terms_batch(term_list, profile_name, batch_size=50, output_dir=None, log_callback=None):
"""Use fully configurable prompts for translation with interrupt support"""
# Redirect stdout to GUI log if callback provided
if log_callback:
set_output_redirect(log_callback)
if not term_list or os.getenv("DISABLE_GLOSSARY_TRANSLATION", "0") == "1":
print(f"📑 Glossary translation disabled or no terms to translate")
return {term: term for term in term_list}
# Check stop flag
if is_stop_requested():
print("📑 ❌ Glossary translation stopped by user")
return {term: term for term in term_list}
try:
MODEL = os.getenv("MODEL", "gemini-1.5-flash")
API_KEY = (os.getenv("API_KEY") or
os.getenv("OPENAI_API_KEY") or
os.getenv("OPENAI_OR_Gemini_API_KEY") or
os.getenv("GEMINI_API_KEY"))
if is_traditional_translation_api(MODEL):
return
if not API_KEY and not _model_uses_own_auth(MODEL):
print(f"📑 No API key found, skipping translation")
return {term: term for term in term_list}
print(f"📑 Translating {len(term_list)} {profile_name} terms to English using batch size {batch_size}...")
# Ensure multi-key config is available in this process if enabled
_ensure_multi_key_config_loaded()
from unified_api_client import UnifiedClient, UnifiedClientError
client = UnifiedClient(model=MODEL, api_key=API_KEY, output_dir=output_dir)
if hasattr(client, 'reset_cleanup_state'):
client.reset_cleanup_state()
# Get custom translation prompt from environment
translation_prompt_template = os.getenv("GLOSSARY_TRANSLATION_PROMPT", "")
if not translation_prompt_template:
translation_prompt_template = """You are translating {language} character names and important terms to English.
For character names, provide English transliterations or keep as romanized.
Keep honorifics/suffixes only if they are integral to the name.
Respond with the same numbered format.
Terms to translate:
{terms_list}
Provide translations in the same numbered format."""
all_translations = {}
all_responses = [] # Collect raw responses
# Respect Auto-retry Slow Chunks toggle (RETRY_TIMEOUT env): when off, disable chunk timeouts entirely
retry_env = os.getenv("RETRY_TIMEOUT")
retry_timeout_enabled = retry_env is None or retry_env.strip().lower() not in ("0", "false", "off", "")
if retry_timeout_enabled:
env_ct = os.getenv("CHUNK_TIMEOUT", "1800")
try:
ct_val = float(env_ct)
chunk_timeout = None if ct_val <= 0 else ct_val
except Exception:
chunk_timeout = None
else:
chunk_timeout = None
for i in range(0, len(term_list), batch_size):
# Check stop flag before each batch
if is_stop_requested():
print(f"📑 ❌ Translation stopped at batch {(i // batch_size) + 1}")
# Return partial translations
for term in term_list:
if term not in all_translations:
all_translations[term] = term
return all_translations
batch = term_list[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (len(term_list) + batch_size - 1) // batch_size
print(f"📑 Processing batch {batch_num}/{total_batches} ({len(batch)} terms)...")
# Format terms list
terms_text = ""
for idx, term in enumerate(batch, 1):
terms_text += f"{idx}. {term}\n"
# Replace placeholders in prompt
prompt = translation_prompt_template.replace('{language}', profile_name)
prompt = prompt.replace('{terms_list}', terms_text.strip())
prompt = prompt.replace('{batch_size}', str(len(batch)))
messages = [
{"role": "user", "content": prompt}
]
try:
# Use glossary-specific temperature with fallback to global
temperature = float(os.getenv("GLOSSARY_TEMPERATURE", os.getenv("TEMPERATURE", "0.3")))
# Use glossary-specific max output tokens with fallback to global
max_tokens = int(os.getenv("GLOSSARY_MAX_OUTPUT_TOKENS", os.getenv("MAX_OUTPUT_TOKENS", "4096")))
# Use send_with_interrupt for interruptible API call
print(f"📑 Sending translation request for batch {batch_num} (interruptible)...")
# Timeout retry logic (matches translation behavior)
try:
max_timeout_retries = int(os.getenv("TIMEOUT_RETRY_ATTEMPTS", "2"))
except Exception:
max_timeout_retries = 2
timeout_retry_count = 0
while True:
try:
response, finish_reason, raw_obj = send_with_interrupt(
messages=messages,
client=client,
temperature=temperature,
max_tokens=max_tokens,
stop_check_fn=is_stop_requested,
chunk_timeout=chunk_timeout
)
break
except UnifiedClientError as e:
error_msg = str(e)
lower_msg = error_msg.lower()
if "stopped by user" in lower_msg or is_stop_requested():
raise
is_timeout = ("timed out" in lower_msg) or ("timeout" in lower_msg) or ("cancelled" in lower_msg) or ("client not initialized" in lower_msg)
if is_timeout and timeout_retry_count < max_timeout_retries:
timeout_retry_count += 1
if chunk_timeout:
print(f"⚠️ Glossary translation batch {batch_num} timed out after {chunk_timeout} seconds, retrying ({timeout_retry_count}/{max_timeout_retries})...")
else:
print(f"⚠️ Glossary translation batch {batch_num} timed out, retrying ({timeout_retry_count}/{max_timeout_retries})...")
# Reinitialize client if needed
client_type = getattr(client, 'client_type', 'unknown')
needs_reinit = False
if client_type == 'gemini':
needs_reinit = hasattr(client, 'gemini_client') and client.gemini_client is None
elif client_type == 'openai':
needs_reinit = hasattr(client, 'openai_client') and client.openai_client is None
if needs_reinit:
try:
print(f" 🔄 Reinitializing {client_type} client...")
client._setup_client()
except Exception as reinit_err:
print(f" ⚠️ Failed to reinitialize client: {reinit_err}")
# Stagger retries
try:
import random
base_delay = float(os.getenv("SEND_INTERVAL_SECONDS", "2"))
retry_delay = random.uniform(base_delay / 2, base_delay)
print(f" ⏳ Waiting {retry_delay:.1f}s before retry...")
time.sleep(retry_delay)
except Exception:
time.sleep(1.0)
continue
else:
raise
# Handle response properly
if hasattr(response, 'content'):
response_text = response.content
else:
response_text = str(response)
# Store raw response with batch info
all_responses.append((batch, response_text))
print(f"📑 Batch {batch_num} completed - response received")
# Small delay between batches to avoid rate limiting (configurable)
if i + batch_size < len(term_list):
# Check stop before sleep
if is_stop_requested():
print(f"📑 ❌ Translation stopped after batch {batch_num}")
# Fill in missing translations
for term in term_list:
if term not in all_translations:
all_translations[term] = term
return all_translations
# Use configurable batch delay or default to 0.1s (much faster than 0.5s)
batch_delay = float(os.getenv("GLOSSARY_BATCH_DELAY", "0.001"))
if batch_delay > 0:
time.sleep(batch_delay)
except UnifiedClientError as e:
if "stopped by user" in str(e).lower():
print(f"📑 ❌ Translation interrupted by user at batch {batch_num}")
# Fill in remaining terms with originals
for term in term_list:
if term not in all_translations:
all_translations[term] = term
return all_translations
else:
print(f"⚠️ Translation failed for batch {batch_num}: {e}")
for term in batch:
all_translations[term] = term
except Exception as e:
print(f"⚠️ Translation failed for batch {batch_num}: {e}")
for term in batch:
all_translations[term] = term
# Parse all responses at the end
print(f"📑 Parsing {len(all_responses)} batch responses...")
for batch, response_text in all_responses:
batch_translations = _parse_translation_response(response_text, batch)
all_translations.update(batch_translations)
# Ensure all terms have translations
for term in term_list:
if term not in all_translations:
all_translations[term] = term
translated_count = sum(1 for term, translation in all_translations.items()
if translation != term and translation.strip())
print(f"📑 Successfully translated {translated_count}/{len(term_list)} terms")
return all_translations
except Exception as e:
print(f"⚠️ Glossary translation failed: {e}")
return {term: term for term in term_list}
def _extract_names_for_honorific(honorific, all_text, language_hint,
min_frequency, names_with_honorifics,
standalone_names, is_valid_name, fuzzy_threshold=0.90):
"""Extract names for a specific honorific with fuzzy matching and stop flag checks"""
# Check stop flag at start
if is_stop_requested():
print(f"📑 ❌ Name extraction for '{honorific}' stopped by user")
return
if language_hint == 'korean' and not honorific.startswith('-'):
pattern = r'([\uac00-\ud7af]{2,4})(?=' + re.escape(honorific) + r'(?:\s|[,.\!?]|$))'
matches = list(re.finditer(pattern, all_text))
total_matches = len(matches)
for idx, match in enumerate(matches):
# Check stop flag every 50 matches
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Korean name extraction stopped at {idx}/{total_matches}")
return
# Show progress for large sets
if total_matches > 500:
progress = (idx / total_matches) * 100
print(f"📑 Processing Korean names: {progress:.1f}% ({idx}/{total_matches})")
potential_name = match.group(1)
if is_valid_name(potential_name, 'korean'):
full_form = potential_name + honorific
# Use fuzzy matching for counting with stop check
count = _find_fuzzy_matches(full_form, all_text, fuzzy_threshold)
# Check if stopped during fuzzy matching
if is_stop_requested():
print(f"📑 ❌ Name extraction stopped during fuzzy matching")
return
if count >= min_frequency:
context_patterns = [
full_form + r'[은는이가]',
full_form + r'[을를]',
full_form + r'[에게한테]',
r'["]' + full_form,
full_form + r'[,]',
]
context_count = 0
for ctx_pattern in context_patterns:
context_count += len(re.findall(ctx_pattern, all_text))
if context_count > 0:
names_with_honorifics[full_form] = count
standalone_names[potential_name] = count
elif language_hint == 'japanese' and not honorific.startswith('-'):
pattern = r'([\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]{2,5})(?=' + re.escape(honorific) + r'(?:\s|[、。!?]|$))'
matches = list(re.finditer(pattern, all_text))
total_matches = len(matches)
for idx, match in enumerate(matches):
# Check stop flag every 50 matches
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Japanese name extraction stopped at {idx}/{total_matches}")
return
if total_matches > 500:
progress = (idx / total_matches) * 100
print(f"📑 Processing Japanese names: {progress:.1f}% ({idx}/{total_matches})")
potential_name = match.group(1)
if is_valid_name(potential_name, 'japanese'):
full_form = potential_name + honorific
count = _find_fuzzy_matches(full_form, all_text, fuzzy_threshold)
if is_stop_requested():
print(f"📑 ❌ Name extraction stopped during fuzzy matching")
return
if count >= min_frequency:
names_with_honorifics[full_form] = count
standalone_names[potential_name] = count
elif language_hint == 'chinese' and not honorific.startswith('-'):
pattern = r'([\u4e00-\u9fff]{2,4})(?=' + re.escape(honorific) + r'(?:\s|[,。!?]|$))'
matches = list(re.finditer(pattern, all_text))
total_matches = len(matches)
for idx, match in enumerate(matches):
# Check stop flag every 50 matches
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ Chinese name extraction stopped at {idx}/{total_matches}")
return
if total_matches > 500:
progress = (idx / total_matches) * 100
print(f"📑 Processing Chinese names: {progress:.1f}% ({idx}/{total_matches})")
potential_name = match.group(1)
if is_valid_name(potential_name, 'chinese'):
full_form = potential_name + honorific
count = _find_fuzzy_matches(full_form, all_text, fuzzy_threshold)
if is_stop_requested():
print(f"📑 ❌ Name extraction stopped during fuzzy matching")
return
if count >= min_frequency:
names_with_honorifics[full_form] = count
standalone_names[potential_name] = count
elif honorific.startswith('-') or honorific.startswith(' '):
is_space_separated = honorific.startswith(' ')
if is_space_separated:
pattern_english = r'\b([A-Z][a-zA-Z]+)' + re.escape(honorific) + r'(?=\s|[,.\!?]|$)'
else:
pattern_english = r'\b([A-Z][a-zA-Z]+)' + re.escape(honorific) + r'\b'
matches = list(re.finditer(pattern_english, all_text))
total_matches = len(matches)
for idx, match in enumerate(matches):
# Check stop flag every 50 matches
if idx > 0 and idx % 50 == 0:
if is_stop_requested():
print(f"📑 ❌ English name extraction stopped at {idx}/{total_matches}")
return
if total_matches > 500:
progress = (idx / total_matches) * 100
print(f"📑 Processing English names: {progress:.1f}% ({idx}/{total_matches})")
potential_name = match.group(1)
if is_valid_name(potential_name, 'english'):
full_form = potential_name + honorific
count = _find_fuzzy_matches(full_form, all_text, fuzzy_threshold)
if is_stop_requested():
print(f"📑 ❌ Name extraction stopped during fuzzy matching")
return
if count >= min_frequency:
names_with_honorifics[full_form] = count
standalone_names[potential_name] = count
def _parse_translation_response(response, original_terms):
"""Extract translations from AI response by matching numbered lines to original terms"""
translations = {}
# Handle UnifiedResponse object
if hasattr(response, 'content'):
response_text = response.content
else:
response_text = str(response)
# Split into lines
lines = response_text.strip().split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# Match numbered format: "1. Translation" or "1) Translation" etc
number_match = re.match(r'^(\d+)[\.):\-\s]+(.+)', line)
if number_match:
idx = int(number_match.group(1)) - 1 # Convert to 0-based
translation = number_match.group(2).strip()
# Remove trailing explanations in parentheses
translation = re.sub(r'\s*\([^)]+\)\s*$', '', translation)
if 0 <= idx < len(original_terms):
translations[original_terms[idx]] = translation
print(f"📑 Extracted {len(translations)}/{len(original_terms)} translations")
return translations
def _init_worker_with_env(env_vars_dict):
"""Initialize worker process with environment variables from parent.
MUST be at module level for pickling by multiprocessing.Pool.
"""
import os
for k, v in env_vars_dict.items():
os.environ[k] = str(v)
def _check_sentence_batch_for_terms(args):
"""Check a batch of sentences for term matches - used by ProcessPoolExecutor"""
batch_sentences, terms = args
filtered = []
# Use pre-compiled term list for fast checking
for sentence in batch_sentences:
# Quick check using any() - stops at first match
if any(term in sentence for term in terms):
filtered.append(sentence)
return filtered
def _score_sentence_batch(args):
"""Worker function to score a batch of sentences - Optimized for speed"""
(start_idx, sentences), term_list, honorific_pattern_str, gender_pronouns, include_gender_context = args
import re
local_scores = {}
local_term_map = {}
# Pre-compile regex if needed
honorific_pattern = re.compile(honorific_pattern_str) if honorific_pattern_str else None
# OPTIMIZATION 1: Segregate terms for hybrid strategy
# - Single-token terms: Use O(1) set intersection (FAST)
# - Multi-token terms: Use iteration (SLOWER, but few terms)
# This preserves quality for terms with spaces while keeping speed for CJK/single names
# Simple tokenizer for classification (matches CJK chars or alphanumeric sequences)
tokenizer_pattern = re.compile(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]+|[a-zA-Z0-9]+')
single_token_terms = set()
multi_token_terms = []
for t in term_list:
if len(t) < 2: continue
# Check if term splits into multiple tokens
tokens = tokenizer_pattern.findall(t)
if len(tokens) > 1:
multi_token_terms.append(t)
else:
single_token_terms.add(t)
# Pre-compile multi-token terms regex if there are any (faster than loop)
multi_term_regex = None
if multi_token_terms:
# Sort by length desc to match longest first
multi_token_terms.sort(key=len, reverse=True)
# Escape terms
pattern = '|'.join(map(re.escape, multi_token_terms))
try:
multi_term_regex = re.compile(pattern)
except:
# Fallback if pattern is too huge (unlikely for just multi-word subset)
pass
for idx, sentence in enumerate(sentences):
global_idx = start_idx + idx
score = 1.0
# Gender pronoun check (fast)
if include_gender_context and gender_pronouns:
for p in gender_pronouns:
if p in sentence:
score += 5.0
break
# Honorific check (fast regex)
if honorific_pattern and honorific_pattern.search(sentence):
score += 2.0
local_scores[global_idx] = score
# 1. Fast Path: Single-token terms (Set Intersection)
tokens = set(tokenizer_pattern.findall(sentence))
found_terms = tokens.intersection(single_token_terms)
for term in found_terms:
if term not in local_term_map:
local_term_map[term] = []
local_term_map[term].append(global_idx)
# 2. Slow Path: Multi-token terms (Regex or Iteration)
# Only needed if we actually have multi-word terms
if multi_token_terms:
if multi_term_regex:
# Fast regex batch match
for match in multi_term_regex.findall(sentence):
if match not in local_term_map:
local_term_map[match] = []
# Avoid duplicates if regex matches same term multiple times
if global_idx not in local_term_map[match]:
local_term_map[match].append(global_idx)
else:
# Fallback iteration
for term in multi_token_terms:
if term in sentence:
if term not in local_term_map:
local_term_map[term] = []
local_term_map[term].append(global_idx)
return local_scores, local_term_map
def _process_sentence_batch_for_extraction(args):
"""Process sentences to extract terms - used by ProcessPoolExecutor"""
batch_sentences, batch_idx, combined_pattern, exclude_check_data = args
from collections import Counter
import re
local_word_freq = Counter()
local_important = []
local_seen = set()
# Rebuild the exclusion check function from data
honorifics_to_exclude, title_patterns_str, common_words, chinese_nums = exclude_check_data
title_patterns = [re.compile(p) for p in title_patterns_str]
def should_exclude_term(term):
term_lower = term.lower()
# Check if it's a common word
if term in common_words or term_lower in common_words:
return True
# Check if it contains honorifics
for honorific in honorifics_to_exclude:
if honorific in term or (honorific.startswith('-') and term.endswith(honorific[1:])):
return True
# Check if it matches title patterns
for pattern in title_patterns:
if pattern.search(term):
return True
# Check if it's a number
if term in chinese_nums or term.isdigit():
return True
return False
for sentence in batch_sentences:
sentence = sentence.strip()
if len(sentence) < 10 or len(sentence) > 500:
continue
# Find all potential terms in this sentence
matches = re.findall(combined_pattern, sentence)
if matches:
# Filter out excluded terms
filtered_matches = []
for match in matches:
if not should_exclude_term(match):
local_word_freq[match] += 1
filtered_matches.append(match)
# Keep sentences with valid potential terms
if filtered_matches:
sentence_key = ' '.join(sorted(filtered_matches))
if sentence_key not in local_seen:
local_important.append(sentence)
local_seen.add(sentence_key)
return local_word_freq, local_important, local_seen, batch_idx