Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| Glossary Compressor Module | |
| Filters glossary entries based on source text to reduce token usage | |
| """ | |
| import os | |
| import re | |
| import json | |
| import csv | |
| from io import StringIO | |
| def compress_glossary(glossary_content, source_text, glossary_format='auto'): | |
| """ | |
| Compress glossary by excluding entries that don't appear in the source text. | |
| Args: | |
| glossary_content: Raw glossary content (CSV string or JSON dict/list) | |
| source_text: The source text to check against | |
| glossary_format: 'csv', 'json', or 'auto' (detect from content) | |
| Returns: | |
| Compressed glossary in the same format as input | |
| """ | |
| if not glossary_content or not source_text: | |
| return glossary_content | |
| # Auto-detect format | |
| if glossary_format == 'auto': | |
| if isinstance(glossary_content, str): | |
| # Check if it looks like JSON | |
| stripped = glossary_content.strip() | |
| if (stripped.startswith('{') or stripped.startswith('[')) and (stripped.endswith('}') or stripped.endswith(']')): | |
| glossary_format = 'json' | |
| else: | |
| glossary_format = 'csv' | |
| elif isinstance(glossary_content, (dict, list)): | |
| glossary_format = 'json' | |
| else: | |
| return glossary_content | |
| if glossary_format == 'csv': | |
| return _compress_csv_glossary(glossary_content, source_text) | |
| elif glossary_format == 'json': | |
| return _compress_json_glossary(glossary_content, source_text) | |
| else: | |
| return glossary_content | |
| def _compress_csv_glossary(csv_content, source_text): | |
| """ | |
| Compress CSV glossary by excluding entries not found in source text. | |
| Handles both legacy CSV format and token-efficient format. | |
| """ | |
| if not isinstance(csv_content, str): | |
| return csv_content | |
| lines = csv_content.strip().split('\n') | |
| if not lines: | |
| return csv_content | |
| # Check if this is token-efficient format (has section headers like "=== CHARACTERS ===") | |
| is_token_efficient = any(line.strip().startswith('===') for line in lines) | |
| if is_token_efficient: | |
| return _compress_token_efficient_format(lines, source_text) | |
| else: | |
| return _compress_legacy_csv_format(lines, source_text) | |
| def _compress_token_efficient_format(lines, source_text): | |
| """Compress token-efficient glossary format with section headers.""" | |
| filtered_lines = [] | |
| current_section = None | |
| for line in lines: | |
| stripped = line.strip() | |
| # Keep glossary header | |
| if stripped.lower().startswith('glossary:'): | |
| filtered_lines.append(line) | |
| continue | |
| # Track section headers | |
| if stripped.startswith('==='): | |
| current_section = line | |
| continue | |
| # Process entry lines (start with "* ") | |
| if stripped.startswith('* '): | |
| # Extract the raw name from the entry | |
| # Format: * TranslatedName (RawName) [Gender] | |
| match = re.search(r'\(([^)]+)\)', stripped) | |
| if match: | |
| raw_name = match.group(1).strip() | |
| # Check if raw name appears in source text | |
| if _text_contains_term(source_text, raw_name): | |
| # Add section header if this is the first entry in section | |
| if current_section: | |
| filtered_lines.append(current_section) | |
| current_section = None | |
| filtered_lines.append(line) | |
| elif not stripped: | |
| # Keep blank lines | |
| filtered_lines.append(line) | |
| return '\n'.join(filtered_lines) | |
| def _compress_legacy_csv_format(lines, source_text): | |
| """Compress legacy CSV format with type,raw_name,translated_name columns.""" | |
| if not lines: | |
| return '' | |
| # Check if first line is a header | |
| first_line = lines[0].strip().lower() | |
| has_header = first_line.startswith('type,') or 'raw_name' in first_line | |
| filtered_lines = [] | |
| # Keep header if present | |
| if has_header: | |
| filtered_lines.append(lines[0]) | |
| data_lines = lines[1:] | |
| else: | |
| data_lines = lines | |
| # Process each CSV row | |
| for line in data_lines: | |
| if not line.strip(): | |
| continue | |
| try: | |
| # Parse CSV line | |
| parts = list(csv.reader(StringIO(line)))[0] | |
| if len(parts) >= 3: | |
| entry_type = parts[0].strip() | |
| raw_name = parts[1].strip() | |
| translated_name = parts[2].strip() | |
| # Check if raw name appears in source text | |
| if _text_contains_term(source_text, raw_name): | |
| filtered_lines.append(line) | |
| except Exception: | |
| # If parsing fails, keep the line to be safe | |
| filtered_lines.append(line) | |
| return '\n'.join(filtered_lines) | |
| def _compress_json_glossary(json_data, source_text): | |
| """ | |
| Compress JSON glossary by excluding entries not found in source text. | |
| Handles both dict format and list format. | |
| """ | |
| if isinstance(json_data, str): | |
| try: | |
| json_data = json.loads(json_data) | |
| except json.JSONDecodeError: | |
| return json_data | |
| if isinstance(json_data, dict): | |
| # Handle dict with 'entries' key | |
| if 'entries' in json_data: | |
| filtered_entries = {} | |
| for key, value in json_data['entries'].items(): | |
| if _text_contains_term(source_text, key): | |
| filtered_entries[key] = value | |
| result = json_data.copy() | |
| result['entries'] = filtered_entries | |
| return result | |
| else: | |
| # Simple dict format | |
| filtered_dict = {} | |
| for key, value in json_data.items(): | |
| if key == 'metadata': | |
| filtered_dict[key] = value | |
| elif _text_contains_term(source_text, key): | |
| filtered_dict[key] = value | |
| return filtered_dict | |
| elif isinstance(json_data, list): | |
| # List of entry objects | |
| filtered_list = [] | |
| for entry in json_data: | |
| if isinstance(entry, dict): | |
| # Check various possible keys for the raw term | |
| raw_term = entry.get('raw_name') or entry.get('original_name') or entry.get('original') or '' | |
| if raw_term and _text_contains_term(source_text, raw_term): | |
| filtered_list.append(entry) | |
| return filtered_list | |
| return json_data | |
| def _text_contains_term(text, term): | |
| """ | |
| Check if term appears in text using simple substring matching. | |
| Works well with Korean/CJK text where word boundaries are not clear. | |
| """ | |
| if not term or not text: | |
| return False | |
| # For CJK languages (Korean, Chinese, Japanese), simple substring matching works best | |
| # Word boundaries don't apply the same way as in English | |
| return term in text | |
| def compress_glossary_file(glossary_path, source_text): | |
| """ | |
| Load, compress, and return glossary from file path. | |
| Args: | |
| glossary_path: Path to glossary file (.csv or .json) | |
| source_text: The source text to check against | |
| Returns: | |
| Compressed glossary content in appropriate format | |
| """ | |
| if not glossary_path or not os.path.exists(glossary_path): | |
| return None | |
| try: | |
| with open(glossary_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Determine format from file extension | |
| if glossary_path.lower().endswith('.csv'): | |
| return compress_glossary(content, source_text, glossary_format='csv') | |
| elif glossary_path.lower().endswith('.json'): | |
| json_data = json.loads(content) | |
| compressed_data = compress_glossary(json_data, source_text, glossary_format='json') | |
| # Return as JSON string | |
| return json.dumps(compressed_data, ensure_ascii=False, indent=2) | |
| else: | |
| return content | |
| except Exception as e: | |
| print(f"⚠️ Failed to compress glossary: {e}") | |
| return None | |