Spaces:
Running
Running
| """ | |
| Keyword dictionary expansion with exclusive category assignment. | |
| Key principle: Each word can only belong to ONE category. | |
| This prevents cross-contamination where a word like "unplayable" | |
| might be counted in both Bugs and Performance categories. | |
| Algorithm: | |
| 1. For each category: find candidate words similar to seed keywords | |
| 2. Collect ALL candidates in a global pool | |
| 3. Assign each word to the category with highest score | |
| 4. Filter by similarity threshold and frequency | |
| """ | |
| import json | |
| import logging | |
| import math | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from gensim.models import FastText | |
| from .config import OUTPUT_DIR, SETTINGS | |
| logger = logging.getLogger(__name__) | |
| class Candidate: | |
| """A candidate word for dictionary expansion.""" | |
| word: str | |
| similarity: float | |
| frequency: int | |
| source_seeds: list[str] = field(default_factory=list) | |
| def score(self) -> float: | |
| """ | |
| Combined score from similarity and frequency. | |
| Formula: 0.7 * similarity + 0.3 * normalized_log_frequency | |
| Frequency factor normalized to ~0-1 range. | |
| """ | |
| freq_factor = math.log10(max(self.frequency, 1) + 1) / 5 | |
| return self.similarity * 0.7 + freq_factor * 0.3 | |
| def to_dict(self) -> dict: | |
| return { | |
| "word": self.word.replace("_", " "), | |
| "similarity": round(self.similarity, 3), | |
| "frequency": self.frequency, | |
| "score": round(self.score, 3), | |
| "source_seeds": self.source_seeds, | |
| } | |
| class KeywordExpander: | |
| """ | |
| Expands keyword dictionary using trained FastText model. | |
| Uses exclusive category assignment to prevent words | |
| appearing in multiple categories. | |
| """ | |
| def __init__( | |
| self, | |
| model: FastText, | |
| existing_keywords: dict[str, list[str]], | |
| word_frequencies: dict[str, int], | |
| similarity_threshold: float | None = None, | |
| max_suggestions_per_seed: int | None = None, | |
| min_frequency: int | None = None, | |
| ): | |
| """ | |
| Initialize expander. | |
| Args: | |
| model: Trained FastText model | |
| existing_keywords: Current TOPIC_KEYWORDS dictionary | |
| word_frequencies: Word frequency counts from corpus | |
| similarity_threshold: Minimum similarity for candidates | |
| max_suggestions_per_seed: Max similar words per seed | |
| min_frequency: Minimum corpus frequency | |
| """ | |
| self.model = model | |
| self.existing = existing_keywords | |
| self.word_freq = word_frequencies | |
| self.similarity_threshold = similarity_threshold or SETTINGS["similarity_threshold"] | |
| self.max_suggestions = max_suggestions_per_seed or SETTINGS["max_suggestions_per_seed"] | |
| self.min_frequency = min_frequency or SETTINGS["min_frequency"] | |
| # Build set of all existing words (normalized) | |
| self.existing_words: set[str] = set() | |
| for words in existing_keywords.values(): | |
| for w in words: | |
| self.existing_words.add(w.lower().replace(" ", "_")) | |
| logger.info(f"Expander initialized with {len(self.existing_words)} existing keywords") | |
| def _find_candidates_for_category( | |
| self, | |
| category: str, | |
| seeds: list[str], | |
| ) -> dict[str, Candidate]: | |
| """ | |
| Find candidate words for a single category. | |
| Returns dict[word -> Candidate] with best similarity per word. | |
| """ | |
| candidates: dict[str, Candidate] = {} | |
| for seed in seeds: | |
| # Normalize seed (e.g., "frame rate" -> "frame_rate") | |
| seed_normalized = seed.lower().replace(" ", "_") | |
| # Skip if seed not in vocabulary | |
| if seed_normalized not in self.model.wv: | |
| continue | |
| # Get similar words | |
| try: | |
| similar = self.model.wv.most_similar( | |
| seed_normalized, | |
| topn=self.max_suggestions, | |
| ) | |
| except KeyError: | |
| continue | |
| for word, similarity in similar: | |
| # Skip existing words | |
| if word in self.existing_words: | |
| continue | |
| # Skip below threshold | |
| if similarity < self.similarity_threshold: | |
| continue | |
| # Check frequency | |
| freq = self.word_freq.get(word, 0) | |
| if freq < self.min_frequency: | |
| continue | |
| # Update or add candidate | |
| if word in candidates: | |
| # Keep higher similarity | |
| if similarity > candidates[word].similarity: | |
| candidates[word].similarity = similarity | |
| candidates[word].source_seeds.append(seed) | |
| else: | |
| candidates[word] = Candidate( | |
| word=word, | |
| similarity=similarity, | |
| frequency=freq, | |
| source_seeds=[seed], | |
| ) | |
| return candidates | |
| def expand_all_exclusive(self) -> dict[str, list[Candidate]]: | |
| """ | |
| Expand all categories with exclusive assignment. | |
| Each word is assigned only to the category where it has | |
| the highest score. | |
| Returns: | |
| Dict mapping category -> list of Candidates (sorted by score) | |
| """ | |
| logger.info("Starting exclusive expansion...") | |
| # Step 1: Collect candidates from all categories | |
| # Format: word -> [(category, Candidate), ...] | |
| all_candidates: dict[str, list[tuple[str, Candidate]]] = defaultdict(list) | |
| for category, seeds in self.existing.items(): | |
| category_candidates = self._find_candidates_for_category(category, seeds) | |
| for word, candidate in category_candidates.items(): | |
| all_candidates[word].append((category, candidate)) | |
| logger.info(f"[{category}] Found {len(category_candidates)} raw candidates") | |
| # Step 2: Assign each word to category with highest score | |
| final_assignments: dict[str, list[Candidate]] = defaultdict(list) | |
| for word, category_candidates in all_candidates.items(): | |
| # Find category with highest score | |
| best_category, best_candidate = max( | |
| category_candidates, | |
| key=lambda x: x[1].score, | |
| ) | |
| final_assignments[best_category].append(best_candidate) | |
| # Step 3: Sort candidates in each category by score | |
| for category in final_assignments: | |
| final_assignments[category].sort(key=lambda c: c.score, reverse=True) | |
| # Log results | |
| total = sum(len(cands) for cands in final_assignments.values()) | |
| logger.info(f"Exclusive assignment complete: {total} total candidates") | |
| for category, cands in sorted(final_assignments.items()): | |
| logger.info(f" {category}: {len(cands)} candidates") | |
| return dict(final_assignments) | |
| def export_candidates( | |
| self, | |
| path: Path | str | None = None, | |
| include_threshold_in_name: bool = False, | |
| ) -> Path: | |
| """ | |
| Export candidates to JSON for manual review. | |
| Args: | |
| path: Output path (default: output/candidates.json) | |
| include_threshold_in_name: Add threshold to filename for comparison | |
| Returns: | |
| Path to exported file | |
| """ | |
| if path: | |
| path = Path(path) | |
| elif include_threshold_in_name: | |
| path = OUTPUT_DIR / f"candidates_t{self.similarity_threshold:.2f}.json" | |
| else: | |
| path = OUTPUT_DIR / "candidates.json" | |
| results = self.expand_all_exclusive() | |
| export_data = { | |
| "metadata": { | |
| "generated_at": datetime.now().isoformat(), | |
| "similarity_threshold": self.similarity_threshold, | |
| "min_frequency": self.min_frequency, | |
| "total_candidates": sum(len(c) for c in results.values()), | |
| }, | |
| "categories": {}, | |
| } | |
| for category, candidates in sorted(results.items()): | |
| export_data["categories"][category] = [c.to_dict() for c in candidates] | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(export_data, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Exported candidates to {path}") | |
| return path | |
| def generate_keywords_py( | |
| self, | |
| output_path: Path | str | None = None, | |
| auto_approve_threshold: float | None = None, | |
| ) -> Path: | |
| """ | |
| Generate new keywords.py with expanded dictionary. | |
| Words with score >= auto_approve_threshold are added directly. | |
| Words below threshold are added as comments for manual review. | |
| Args: | |
| output_path: Output path (default: output/keywords_expanded.py) | |
| auto_approve_threshold: Score threshold for auto-approval | |
| Returns: | |
| Path to generated file | |
| """ | |
| output_path = Path(output_path) if output_path else OUTPUT_DIR / "keywords_expanded.py" | |
| auto_approve = auto_approve_threshold or SETTINGS["auto_approve_threshold"] | |
| results = self.expand_all_exclusive() | |
| lines = [ | |
| '"""', | |
| "Expanded keyword dictionary for game review topic detection.", | |
| f"Generated: {datetime.now().isoformat()}", | |
| f"Auto-approve threshold: {auto_approve}", | |
| '"""', | |
| "", | |
| "TOPIC_KEYWORDS = {", | |
| ] | |
| for category, seeds in self.existing.items(): | |
| lines.append(f' "{category}": [') | |
| # Existing keywords | |
| lines.append(" # Existing") | |
| for seed in seeds: | |
| lines.append(f' "{seed}",') | |
| # New candidates | |
| candidates = results.get(category, []) | |
| if candidates: | |
| # Auto-approved | |
| auto_approved = [c for c in candidates if c.score >= auto_approve] | |
| if auto_approved: | |
| lines.append(f" # NEW (auto-approved, score >= {auto_approve})") | |
| for c in auto_approved: | |
| word_display = c.word.replace("_", " ") | |
| lines.append(f' "{word_display}", # score={c.score:.2f}') | |
| # Candidates requiring review | |
| review_needed = [c for c in candidates if c.score < auto_approve] | |
| if review_needed: | |
| lines.append(f" # CANDIDATES (score < {auto_approve}, require review)") | |
| for c in review_needed: | |
| word_display = c.word.replace("_", " ") | |
| lines.append(f' # "{word_display}", # score={c.score:.2f}') | |
| lines.append(" ],") | |
| lines.append("") | |
| lines.append("}") | |
| lines.append("") | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| f.write("\n".join(lines)) | |
| logger.info(f"Generated keywords file at {output_path}") | |
| return output_path | |
| def get_expansion_stats(self) -> dict: | |
| """Get statistics about the expansion.""" | |
| results = self.expand_all_exclusive() | |
| auto_threshold = SETTINGS["auto_approve_threshold"] | |
| stats = { | |
| "total_candidates": 0, | |
| "auto_approved": 0, | |
| "needs_review": 0, | |
| "by_category": {}, | |
| } | |
| for category, candidates in results.items(): | |
| auto = sum(1 for c in candidates if c.score >= auto_threshold) | |
| review = len(candidates) - auto | |
| stats["by_category"][category] = { | |
| "total": len(candidates), | |
| "auto_approved": auto, | |
| "needs_review": review, | |
| } | |
| stats["total_candidates"] += len(candidates) | |
| stats["auto_approved"] += auto | |
| stats["needs_review"] += review | |
| return stats | |