Spaces:
Running
Running
| """ | |
| Serwis ekstrakcji Community Highlights z recenzji. | |
| Uzywa n-gramow (2-5 tokenow) + TF-IDF do identyfikacji najczesciej uzywanych fraz. | |
| """ | |
| import math | |
| import re | |
| import time | |
| from collections import Counter, defaultdict | |
| from difflib import SequenceMatcher | |
| from typing import Any | |
| import jieba | |
| from zhconv import convert | |
| from app.core.config import settings | |
| from app.core.worker_logging import StageTimingAccumulator | |
| from app.core.stopwords_zh import is_stopword | |
| from app.models.schemas import SentimentType | |
| from app.services.global_pros_cons import ( | |
| GLOBAL_PROS_CONS_CONTEXTS, | |
| build_contextual_global_pros_cons, | |
| ) | |
| _SNIPPET_TRIM_CHARS = " \t\n\r.,!?;:,。!?;:、\"'“”‘’[]()" | |
| _ELLIPSIS = "..." | |
| _EXPLICIT_STEAM_DECK_PATTERN = re.compile( | |
| r"(?:\bsteam\s*deck\b|\bsteamdeck\b)", | |
| re.IGNORECASE, | |
| ) | |
| _ASCII_SNIPPET_EVIDENCE_TOKENS = frozenset( | |
| { | |
| "amazing", | |
| "awkward", | |
| "bad", | |
| "better", | |
| "beautiful", | |
| "boring", | |
| "broken", | |
| "buggy", | |
| "cheap", | |
| "clunky", | |
| "crashes", | |
| "crash", | |
| "dragging", | |
| "drags", | |
| "expensive", | |
| "faster", | |
| "feels", | |
| "felt", | |
| "fun", | |
| "good", | |
| "great", | |
| "hard", | |
| "hate", | |
| "hated", | |
| "interesting", | |
| "janky", | |
| "laggy", | |
| "love", | |
| "loved", | |
| "okay", | |
| "polished", | |
| "repetitive", | |
| "responsive", | |
| "rough", | |
| "runs", | |
| "running", | |
| "satisfying", | |
| "slower", | |
| "slow", | |
| "smooth", | |
| "smoothly", | |
| "stable", | |
| "stiff", | |
| "terrible", | |
| "thin", | |
| "unplayable", | |
| "worth", | |
| "worse", | |
| } | |
| ) | |
| _ZH_SNIPPET_EVIDENCE_MARKERS = frozenset( | |
| { | |
| "不", | |
| "不错", | |
| "一般", | |
| "卡", | |
| "卡顿", | |
| "喜欢", | |
| "失望", | |
| "好", | |
| "好玩", | |
| "崩溃", | |
| "差", | |
| "推荐", | |
| "无聊", | |
| "流畅", | |
| "爽", | |
| "稳定", | |
| "糟糕", | |
| "舒服", | |
| "贵", | |
| "难", | |
| "顺滑", | |
| "闪退", | |
| } | |
| ) | |
| _GLOBAL_SIGNAL_WEAK_TOKENS = frozenset( | |
| { | |
| "after", | |
| "appear", | |
| "appears", | |
| "become", | |
| "becomes", | |
| "during", | |
| "every", | |
| "feel", | |
| "feels", | |
| "felt", | |
| "overall", | |
| "really", | |
| "seem", | |
| "seems", | |
| "stay", | |
| "stays", | |
| "still", | |
| "through", | |
| "very", | |
| "之后", | |
| "以前", | |
| "之后", | |
| "还是", | |
| "依然", | |
| "感觉", | |
| "觉得", | |
| "开始", | |
| "出现", | |
| } | |
| ) | |
| _GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS = frozenset( | |
| { | |
| "audio", | |
| "balance", | |
| "bug", | |
| "bugs", | |
| "combat", | |
| "community", | |
| "content", | |
| "control", | |
| "controls", | |
| "difficulty", | |
| "gameplay", | |
| "graphic", | |
| "graphics", | |
| "hud", | |
| "interface", | |
| "localization", | |
| "menu", | |
| "music", | |
| "narrative", | |
| "optimization", | |
| "patch", | |
| "performance", | |
| "polish", | |
| "progression", | |
| "sound", | |
| "soundtrack", | |
| "story", | |
| "support", | |
| "ui", | |
| "内容", | |
| "剧情", | |
| "平衡", | |
| "性能", | |
| "战斗", | |
| "故事", | |
| "操作", | |
| "玩法", | |
| "界面", | |
| "画面", | |
| "画质", | |
| "配乐", | |
| "配音", | |
| "音效", | |
| "音乐", | |
| "优化", | |
| "手感", | |
| } | |
| ) | |
| _GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS = frozenset( | |
| { | |
| "crash", | |
| "crashes", | |
| "grind", | |
| "lag", | |
| "lags", | |
| "stutter", | |
| "stutters", | |
| } | |
| ) | |
| _TOPIC_CONTEXT_MEANINGFUL_TOKENS = frozenset( | |
| { | |
| "area", | |
| "areas", | |
| "boss", | |
| "bosses", | |
| "build", | |
| "builds", | |
| "chapter", | |
| "chapters", | |
| "class", | |
| "classes", | |
| "enemy", | |
| "enemies", | |
| "fight", | |
| "fights", | |
| "late", | |
| "map", | |
| "maps", | |
| "mission", | |
| "missions", | |
| "quest", | |
| "quests", | |
| "side", | |
| "skill", | |
| "skills", | |
| "town", | |
| "towns", | |
| "weapon", | |
| "weapons", | |
| "主线", | |
| "任务", | |
| "关卡", | |
| "城镇", | |
| "地图", | |
| "支线", | |
| "敌人", | |
| "武器", | |
| "章节", | |
| } | |
| ) | |
| _SNIPPET_WEAK_CONTEXT_TOKENS = frozenset( | |
| { | |
| "after", | |
| "before", | |
| "during", | |
| "in", | |
| "on", | |
| "some", | |
| "certain", | |
| "various", | |
| "several", | |
| "part", | |
| "parts", | |
| "area", | |
| "areas", | |
| "patch", | |
| "update", | |
| "updates", | |
| "version", | |
| "versions", | |
| "之后", | |
| "之前", | |
| "期间", | |
| "有些", | |
| "一些", | |
| "部分", | |
| "地方", | |
| "区域", | |
| "补丁", | |
| "更新", | |
| "版本", | |
| } | |
| ) | |
| _ZH_SNIPPET_FRAGMENT_PREFIXES = ( | |
| "和", | |
| "但", | |
| "但是", | |
| "不过", | |
| "而", | |
| ) | |
| _ZH_SNIPPET_FRAGMENT_SUFFIXES = ( | |
| "的太", | |
| "根本", | |
| "每一个", | |
| "每一条", | |
| ) | |
| _ZH_SNIPPET_GENERIC_EXPERIENCE_TOKENS = frozenset( | |
| { | |
| "互通", | |
| "体验", | |
| "更佳", | |
| "一玩", | |
| } | |
| ) | |
| class HighlightsCollector: | |
| """ | |
| Stateful collector — akumuluje dane przez caly cykl analizy w sposob przyrostowy, | |
| aby oszczedzac pamiec RAM. Oblicza highlights raz na koncu. | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| stage_timings: StageTimingAccumulator | None = None, | |
| ) -> None: | |
| self._stage_timings = stage_timings | |
| self._topic_ngrams: dict[str, Counter] = defaultdict(Counter) | |
| self._topic_snippets: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict) | |
| self._context_topic_snippets: dict[str, dict[str, dict[str, dict[str, Any]]]] = { | |
| "recent": defaultdict(dict), | |
| "current_patch": defaultdict(dict), | |
| } | |
| self._category_ngrams: dict[str, Counter] = defaultdict(Counter) | |
| self._global_signal_doc_freq: dict[str, Counter] = { | |
| context: Counter() for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| self._global_signal_sentiment_sum: dict[str, dict[tuple[str, str], float]] = { | |
| context: defaultdict(float) for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| self._global_signal_sentiment_count: dict[str, Counter] = { | |
| context: Counter() for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| self._global_signal_metadata: dict[str, dict[tuple[str, str], dict[str, Any]]] = { | |
| context: {} for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| self._global_counts: Counter = Counter() | |
| self._ngram_doc_freq: Counter = Counter() | |
| self._ngram_sentiment_sum: dict[str, float] = defaultdict(float) | |
| self._ngram_sentiment_count: Counter = Counter() | |
| self._review_count = 0 | |
| self._current_review_seen_ngrams: set[str] = set() | |
| self._current_review_seen_snippets: set[tuple[str, str, str]] = set() | |
| self._current_review_seen_global_signals: dict[str, set[tuple[str, str]]] = { | |
| context: set() for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| # Source drill-down: current review identity for snippet attribution | |
| self._current_review_steamid: str | None = None | |
| self._current_review_recommendation_id: str | None = None | |
| # Compact source metadata (#49) | |
| self._current_review_timestamp_created: int | None = None | |
| self._current_review_playtime_at_review: int | None = None | |
| self._current_review_voted_up: bool | None = None | |
| self._current_review_language: str | None = None | |
| self._current_review_steam_purchase: bool | None = None | |
| self._current_review_received_for_free: bool | None = None | |
| self._current_review_written_during_early_access: bool | None = None | |
| def set_review_source( | |
| self, | |
| *, | |
| author_steamid: str | None = None, | |
| recommendation_id: str | None = None, | |
| timestamp_created: int | None = None, | |
| playtime_at_review: int | None = None, | |
| voted_up: bool | None = None, | |
| language: str | None = None, | |
| steam_purchase: bool | None = None, | |
| received_for_free: bool | None = None, | |
| written_during_early_access: bool | None = None, | |
| ) -> None: | |
| """Set review-level identity and metadata for the next review's snippet attribution.""" | |
| self._current_review_steamid = author_steamid | |
| self._current_review_recommendation_id = recommendation_id | |
| self._current_review_timestamp_created = timestamp_created | |
| self._current_review_playtime_at_review = playtime_at_review | |
| self._current_review_voted_up = voted_up | |
| self._current_review_language = language | |
| self._current_review_steam_purchase = steam_purchase | |
| self._current_review_received_for_free = received_for_free | |
| self._current_review_written_during_early_access = written_during_early_access | |
| def start_review(self) -> None: | |
| """Sygnalizuje poczatek nowej recenzji (do obliczania Document Frequency).""" | |
| self._review_count += 1 | |
| self._current_review_seen_ngrams = set() | |
| self._current_review_seen_snippets = set() | |
| self._current_review_seen_global_signals = { | |
| context: set() for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| def add_sentence( | |
| self, | |
| review_idx: int, # Zachowane dla kompatybilnosci, uzywaj start_review() do separacji | |
| sentence: str, | |
| topics: list[str], | |
| sentiment_score: float, | |
| categories: list[str] | None = None, | |
| topic_match_texts: dict[str, str] | None = None, | |
| ) -> None: | |
| """Wywolywane per zdanie podczas analyze_batch().""" | |
| stage_start = time.monotonic() | |
| words = self._tokenize_sentence(sentence) | |
| allow_public_evidence = not self._is_steam_deck_specific_sentence(sentence) | |
| if not words: | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "highlights_discussion_patterns_s", | |
| time.monotonic() - stage_start, | |
| ) | |
| return | |
| if allow_public_evidence: | |
| for topic in topics: | |
| self._add_global_signal_candidates( | |
| sentence=sentence, | |
| topic=topic, | |
| sentiment_score=sentiment_score, | |
| categories=categories, | |
| topic_match_texts=topic_match_texts, | |
| ) | |
| if len(words) < 2: | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "highlights_discussion_patterns_s", | |
| time.monotonic() - stage_start, | |
| ) | |
| return | |
| for n in range(settings.highlights_ngram_min, settings.highlights_ngram_max + 1): | |
| for i in range(len(words) - n + 1): | |
| ngram = " ".join(words[i : i + n]) | |
| normalized_ngram = self._normalize_for_topic_match(ngram) | |
| # 1. Globalne liczniki | |
| self._global_counts[ngram] += 1 | |
| self._ngram_sentiment_sum[ngram] += sentiment_score | |
| self._ngram_sentiment_count[ngram] += 1 | |
| # 2. Przyrostowe Document Frequency (raz per recenzja) | |
| if ngram not in self._current_review_seen_ngrams: | |
| self._ngram_doc_freq[ngram] += 1 | |
| self._current_review_seen_ngrams.add(ngram) | |
| # 3. Liczniki tematyczne i kategoryczne | |
| for topic in topics: | |
| if self._topic_ngram_is_relevant( | |
| ngram=normalized_ngram, | |
| topic=topic, | |
| topic_match_texts=topic_match_texts, | |
| ): | |
| self._topic_ngrams[topic][ngram] += 1 | |
| if categories: | |
| for category in categories: | |
| self._category_ngrams[category][ngram] += 1 | |
| if allow_public_evidence: | |
| for topic in topics: | |
| self._add_topic_snippet( | |
| topic=topic, | |
| sentence=sentence, | |
| sentiment_score=sentiment_score, | |
| categories=categories, | |
| topic_match_texts=topic_match_texts, | |
| ) | |
| if self._review_count % 500 == 0: | |
| self._prune_singletons() | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "highlights_discussion_patterns_s", | |
| time.monotonic() - stage_start, | |
| ) | |
| def _is_steam_deck_specific_sentence(sentence: str) -> bool: | |
| """Deck-specific text feeds the dedicated widget, not public grouped evidence.""" | |
| return bool(_EXPLICIT_STEAM_DECK_PATTERN.search(sentence)) | |
| def _prune_singletons(self) -> None: | |
| """Glebokie czyszczenie n-gramow z count=1 (oszczednosc pamieci).""" | |
| singletons = [k for k, v in self._global_counts.items() if v <= 1] | |
| for k in singletons: | |
| del self._global_counts[k] | |
| if k in self._ngram_sentiment_sum: | |
| del self._ngram_sentiment_sum[k] | |
| del self._ngram_sentiment_count[k] | |
| del self._ngram_doc_freq[k] | |
| # Czyszczenie w tematach | |
| for topic in self._topic_ngrams: | |
| if k in self._topic_ngrams[topic]: | |
| del self._topic_ngrams[topic][k] | |
| # Czyszczenie w kategoriach | |
| for cat in self._category_ngrams: | |
| if k in self._category_ngrams[cat]: | |
| del self._category_ngrams[cat][k] | |
| def compute_highlights(self) -> dict[str, Any]: | |
| """ | |
| Oblicza highlights po zakonczeniu analizy. | |
| """ | |
| highlights_start = time.monotonic() | |
| if self._review_count == 0: | |
| empty_results: dict[str, Any] = { | |
| "general": [], | |
| "recent": [], | |
| "current_patch": [], | |
| "topics": {}, | |
| } | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "highlights_discussion_patterns_s", | |
| time.monotonic() - highlights_start, | |
| ) | |
| global_stage_start = time.monotonic() | |
| empty_results["global_pros_cons"] = build_contextual_global_pros_cons({}).model_dump(mode="json") | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "global_pros_cons_s", | |
| time.monotonic() - global_stage_start, | |
| ) | |
| return empty_results | |
| results: dict[str, Any] = { | |
| "general": self._compute_tfidf_highlights( | |
| self._global_counts, | |
| top_n=settings.highlights_top_n_general, | |
| ), | |
| "recent": self._compute_tfidf_highlights( | |
| self._category_ngrams.get("recent", Counter()), | |
| top_n=settings.highlights_top_n_general, | |
| ), | |
| "current_patch": self._compute_tfidf_highlights( | |
| self._category_ngrams.get("current_patch", Counter()), | |
| top_n=settings.highlights_top_n_general, | |
| ), | |
| "topics": {}, | |
| "recent_topic_highlights": {}, | |
| "current_patch_topic_highlights": {}, | |
| } | |
| for topic, counter in self._topic_ngrams.items(): | |
| h = self._compute_topic_snippet_highlights( | |
| topic, | |
| top_n=settings.highlights_top_n_per_topic, | |
| ) | |
| if h: | |
| results["topics"][topic] = h | |
| for context in ("recent", "current_patch"): | |
| topic_buckets = self._context_topic_snippets[context] | |
| for topic in topic_buckets: | |
| h = self._compute_topic_snippet_highlights( | |
| topic, | |
| top_n=settings.highlights_top_n_per_topic, | |
| topic_snippets=topic_buckets, | |
| ) | |
| if h: | |
| results[f"{context}_topic_highlights"][topic] = h | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "highlights_discussion_patterns_s", | |
| time.monotonic() - highlights_start, | |
| ) | |
| global_stage_start = time.monotonic() | |
| results["global_pros_cons"] = self._compute_global_pros_cons() | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "global_pros_cons_s", | |
| time.monotonic() - global_stage_start, | |
| ) | |
| return results | |
| def _is_valid_global_signal_token(token: str) -> bool: | |
| compact = token.strip().lower() | |
| if not compact: | |
| return False | |
| if compact.isascii(): | |
| return len(compact) >= 2 and any(char.isalpha() for char in compact) | |
| return True | |
| def _normalize_signal_token(token: str) -> str: | |
| normalized = convert(token, "zh-cn").lower() | |
| return re.sub(r"[^\w\u4e00-\u9fff]+", "", normalized) | |
| def _find_signal_anchor_span( | |
| cls, | |
| tokens: list[str], | |
| matched_text: str, | |
| ) -> tuple[int, int] | None: | |
| if not tokens or not matched_text: | |
| return None | |
| normalized_tokens = [cls._normalize_signal_token(token) for token in tokens] | |
| anchor_tokens = [ | |
| cls._normalize_signal_token(token) | |
| for token in cls._tokenize_sentence(matched_text) | |
| if cls._normalize_signal_token(token) | |
| ] | |
| if not anchor_tokens: | |
| normalized_match = cls._normalize_signal_token(matched_text) | |
| if not normalized_match: | |
| return None | |
| anchor_tokens = [normalized_match] | |
| for start in range(0, len(tokens) - len(anchor_tokens) + 1): | |
| if normalized_tokens[start:start + len(anchor_tokens)] == anchor_tokens: | |
| return start, start + len(anchor_tokens) | |
| compact_anchor = "".join(anchor_tokens) | |
| for idx, token in enumerate(normalized_tokens): | |
| if compact_anchor and compact_anchor in token: | |
| return idx, idx + 1 | |
| return None | |
| def _global_signal_quality( | |
| cls, | |
| tokens: list[str], | |
| *, | |
| anchor_start: int, | |
| anchor_end: int, | |
| ) -> float: | |
| anchor_tokens = tokens[anchor_start:anchor_end] | |
| modifier_tokens = tokens[:anchor_start] + tokens[anchor_end:] | |
| weak_token_count = sum( | |
| 1 for token in modifier_tokens | |
| if cls._normalize_signal_token(token) in _GLOBAL_SIGNAL_WEAK_TOKENS | |
| ) | |
| sentiment_modifier_count = sum( | |
| 1 for token in modifier_tokens if cls._is_sentiment_bearing_signal_token(token) | |
| ) | |
| semantic_modifier_count = sum( | |
| 1 | |
| for token in modifier_tokens | |
| if cls._normalize_signal_token(token) | |
| and cls._normalize_signal_token(token) not in _GLOBAL_SIGNAL_WEAK_TOKENS | |
| ) | |
| quality = 0.85 | |
| if not modifier_tokens: | |
| if cls._is_generic_topic_anchor_candidate(anchor_tokens): | |
| quality -= 0.7 | |
| elif cls._is_self_contained_anchor_signal(anchor_tokens): | |
| quality += 0.35 | |
| elif len(anchor_tokens) > 1: | |
| quality += 0.2 | |
| else: | |
| quality += 0.0 | |
| else: | |
| quality += min(0.25, semantic_modifier_count * 0.2) | |
| quality += min(0.25, sentiment_modifier_count * 0.2) | |
| quality -= weak_token_count * 0.2 | |
| if len(modifier_tokens) > 2: | |
| quality -= min(0.25, (len(modifier_tokens) - 2) * 0.1) | |
| if ( | |
| anchor_start > 0 | |
| and sentiment_modifier_count == 0 | |
| and all( | |
| token.isascii() and len(token) > 5 | |
| for token in tokens[:anchor_start] | |
| ) | |
| ): | |
| # Title/DLC-like prefixes should not outrank cleaner lexical signals. | |
| quality -= 0.35 | |
| if any(char.isdigit() for token in modifier_tokens for char in token) and sentiment_modifier_count == 0: | |
| quality -= 0.15 | |
| return round(quality, 3) | |
| def _is_sentiment_bearing_signal_token(cls, token: str) -> bool: | |
| normalized = cls._normalize_signal_token(token) | |
| if not normalized: | |
| return False | |
| if normalized.isascii(): | |
| return ( | |
| normalized in _ASCII_SNIPPET_EVIDENCE_TOKENS | |
| or normalized.endswith(("ed", "ing", "ly", "ive", "ful", "less", "ous", "able", "ible")) | |
| ) | |
| return any(marker in normalized for marker in _ZH_SNIPPET_EVIDENCE_MARKERS) | |
| def _is_generic_topic_anchor_candidate(cls, anchor_tokens: list[str]) -> bool: | |
| normalized_tokens = [ | |
| cls._normalize_signal_token(token) | |
| for token in anchor_tokens | |
| if cls._normalize_signal_token(token) | |
| ] | |
| return bool(normalized_tokens) and all( | |
| token in _GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS | |
| for token in normalized_tokens | |
| ) | |
| def _is_self_contained_anchor_signal(cls, anchor_tokens: list[str]) -> bool: | |
| normalized_tokens = [ | |
| cls._normalize_signal_token(token) | |
| for token in anchor_tokens | |
| if cls._normalize_signal_token(token) | |
| ] | |
| if not normalized_tokens: | |
| return False | |
| if all(cls._is_sentiment_bearing_signal_token(token) for token in anchor_tokens): | |
| return True | |
| return len(normalized_tokens) == 1 and normalized_tokens[0] in _GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS | |
| def _add_global_signal_candidates( | |
| self, | |
| *, | |
| sentence: str, | |
| topic: str, | |
| sentiment_score: float, | |
| categories: list[str] | None, | |
| topic_match_texts: dict[str, str] | None, | |
| ) -> None: | |
| matched_text = topic_match_texts.get(topic) if topic_match_texts else None | |
| if matched_text: | |
| candidate_text = self._extract_topic_snippet( | |
| sentence, | |
| matched_text, | |
| other_matched_texts=[ | |
| other_text | |
| for other_topic, other_text in (topic_match_texts or {}).items() | |
| if other_topic != topic | |
| ], | |
| ) | |
| else: | |
| candidate_text = sentence | |
| tokens = [ | |
| word for word in self._tokenize_sentence(candidate_text) | |
| if self._is_valid_global_signal_token(word) | |
| ] | |
| if not tokens: | |
| return | |
| anchor_span = self._find_signal_anchor_span(tokens, matched_text or candidate_text) | |
| if anchor_span is None: | |
| return | |
| anchor_start, anchor_end = anchor_span | |
| best_candidate: tuple[str, float, int] | None = None | |
| max_n = min(3, len(tokens)) | |
| for n in range(1, max_n + 1): | |
| for start in range(0, len(tokens) - n + 1): | |
| end = start + n | |
| if end <= anchor_start or start >= anchor_end: | |
| continue | |
| phrase_tokens = tokens[start:end] | |
| phrase = " ".join(phrase_tokens).strip() | |
| if not phrase: | |
| continue | |
| local_anchor_start = max(0, anchor_start - start) | |
| local_anchor_end = min(len(phrase_tokens), anchor_end - start) | |
| quality = self._global_signal_quality( | |
| phrase_tokens, | |
| anchor_start=local_anchor_start, | |
| anchor_end=local_anchor_end, | |
| ) | |
| first_position = local_anchor_start | |
| candidate = (phrase, quality, first_position) | |
| if best_candidate is None or (quality, -len(phrase), -first_position) > ( | |
| best_candidate[1], | |
| -len(best_candidate[0]), | |
| -best_candidate[2], | |
| ): | |
| best_candidate = candidate | |
| if best_candidate is None: | |
| return | |
| phrase, quality, first_position = best_candidate | |
| sentiment = self._classify_global_signal_sentiment(sentiment_score) | |
| if sentiment is None: | |
| return | |
| for context in self._global_signal_contexts(categories): | |
| self._record_global_signal( | |
| context=context, | |
| phrase=phrase, | |
| sentiment=sentiment, | |
| sentiment_score=sentiment_score, | |
| quality=quality, | |
| first_position=first_position, | |
| ) | |
| def _global_signal_contexts(categories: list[str] | None) -> list[str]: | |
| contexts = ["general"] | |
| for category in categories or []: | |
| if category in GLOBAL_PROS_CONS_CONTEXTS and category not in contexts: | |
| contexts.append(category) | |
| return contexts | |
| def _classify_global_signal_sentiment(sentiment_score: float) -> str | None: | |
| if sentiment_score > settings.sentiment_positive_threshold: | |
| return SentimentType.POSITIVE.value | |
| if sentiment_score < settings.sentiment_negative_threshold: | |
| return SentimentType.NEGATIVE.value | |
| return None | |
| def _record_global_signal( | |
| self, | |
| *, | |
| context: str, | |
| phrase: str, | |
| sentiment: str, | |
| sentiment_score: float, | |
| quality: float, | |
| first_position: int, | |
| ) -> None: | |
| key = (phrase, sentiment) | |
| self._global_signal_sentiment_sum[context][key] += sentiment_score | |
| self._global_signal_sentiment_count[context][key] += 1 | |
| if key not in self._current_review_seen_global_signals[context]: | |
| self._global_signal_doc_freq[context][key] += 1 | |
| self._current_review_seen_global_signals[context].add(key) | |
| metadata = self._global_signal_metadata[context].get(key) | |
| if metadata is None: | |
| self._global_signal_metadata[context][key] = { | |
| "quality": quality, | |
| "first_position": first_position, | |
| } | |
| else: | |
| metadata["quality"] = max(metadata["quality"], quality) | |
| metadata["first_position"] = min(metadata["first_position"], first_position) | |
| def _build_global_signal_sources(self, context: str) -> list[dict[str, Any]]: | |
| sources: list[dict[str, Any]] = [] | |
| for (phrase, sentiment), doc_freq in self._global_signal_doc_freq[context].items(): | |
| sentiment_count = self._global_signal_sentiment_count[context].get((phrase, sentiment), 0) | |
| if sentiment_count <= 0: | |
| continue | |
| metadata = self._global_signal_metadata[context].get((phrase, sentiment), {}) | |
| quality = metadata.get("quality", 0.0) | |
| if quality < 1.0: | |
| continue | |
| avg_score = self._global_signal_sentiment_sum[context][(phrase, sentiment)] / sentiment_count | |
| sources.append( | |
| { | |
| "phrase": phrase, | |
| "mention_count": doc_freq, | |
| "sentiment": sentiment, | |
| "score": round(avg_score, 3), | |
| "quality": quality, | |
| "first_position": metadata.get("first_position", 0), | |
| } | |
| ) | |
| return sources | |
| def compute_global_pros_cons_signals(self) -> dict[str, list[dict[str, Any]]]: | |
| stage_start = time.monotonic() | |
| results = { | |
| context: self._build_global_signal_sources(context) | |
| for context in GLOBAL_PROS_CONS_CONTEXTS | |
| } | |
| if self._stage_timings is not None: | |
| self._stage_timings.add( | |
| "global_pros_cons_s", | |
| time.monotonic() - stage_start, | |
| ) | |
| return results | |
| def _compute_global_pros_cons(self) -> dict[str, dict[str, list[dict[str, Any]]]]: | |
| return build_contextual_global_pros_cons( | |
| self.compute_global_pros_cons_signals() | |
| ).model_dump(mode="json") | |
| def _tokenize_sentence(sentence: str) -> list[str]: | |
| # Prosta detekcja ASCII dla angielskich fraz (unikniecie blednego ciecia przez jieba) | |
| is_ascii = all(ord(c) < 128 for c in sentence) | |
| if is_ascii: | |
| return [w for w in sentence.split() if not is_stopword(w) and len(w.strip()) > 0] | |
| return [w for w in jieba.lcut(sentence) if not is_stopword(w) and len(w.strip()) > 0] | |
| def _normalize_for_topic_match(text: str) -> str: | |
| normalized = convert(text, "zh-cn").lower() | |
| return "".join(normalized.split()) | |
| def _normalize_snippet(cls, text: str) -> str: | |
| normalized = convert(text, "zh-cn").lower().replace(_ELLIPSIS, " ") | |
| normalized = re.sub(r"[^\w\u4e00-\u9fff]+", " ", normalized) | |
| normalized = re.sub(r"\s+", " ", normalized).strip() | |
| return normalized | |
| def _normalize_snippet_token(text: str) -> str: | |
| normalized = convert(text, "zh-cn").lower() | |
| normalized = re.sub(r"[^\w\u4e00-\u9fff]+", "", normalized) | |
| return normalized | |
| def _extract_space_delimited_snippet( | |
| cls, | |
| sentence: str, | |
| matched_text: str, | |
| other_matched_texts: list[str] | None = None, | |
| ) -> str: | |
| tokens = sentence.split() | |
| if not tokens: | |
| return sentence.strip(_SNIPPET_TRIM_CHARS) | |
| normalized_tokens = [cls._normalize_snippet_token(token) for token in tokens] | |
| anchor_tokens = [ | |
| cls._normalize_snippet_token(token) | |
| for token in matched_text.split() | |
| if cls._normalize_snippet_token(token) | |
| ] | |
| if not anchor_tokens: | |
| return sentence.strip(_SNIPPET_TRIM_CHARS) | |
| anchor_start = None | |
| anchor_end = None | |
| for start in range(0, len(tokens) - len(anchor_tokens) + 1): | |
| if normalized_tokens[start:start + len(anchor_tokens)] == anchor_tokens: | |
| anchor_start = start | |
| anchor_end = start + len(anchor_tokens) | |
| break | |
| if anchor_start is None: | |
| compact_anchor = "".join(anchor_tokens) | |
| for idx, token in enumerate(normalized_tokens): | |
| if compact_anchor and compact_anchor in token: | |
| anchor_start = idx | |
| anchor_end = idx + 1 | |
| break | |
| if anchor_start is None or anchor_end is None: | |
| return sentence.strip(_SNIPPET_TRIM_CHARS) | |
| window_start = max(0, anchor_start - 3) | |
| window_end = min(len(tokens), anchor_end + 4) | |
| for other_text in other_matched_texts or []: | |
| other_tokens = [ | |
| cls._normalize_snippet_token(token) | |
| for token in other_text.split() | |
| if cls._normalize_snippet_token(token) | |
| ] | |
| if not other_tokens: | |
| continue | |
| for start in range(0, len(tokens) - len(other_tokens) + 1): | |
| if normalized_tokens[start:start + len(other_tokens)] != other_tokens: | |
| continue | |
| if start > anchor_start and start < window_end: | |
| window_end = start | |
| if start < anchor_start and (start + len(other_tokens)) > window_start: | |
| window_start = start + len(other_tokens) | |
| break | |
| snippet = " ".join(tokens[window_start:window_end]).strip(_SNIPPET_TRIM_CHARS) | |
| if window_start > 0: | |
| snippet = f"{_ELLIPSIS}{snippet}" | |
| if window_end < len(tokens): | |
| snippet = f"{snippet}{_ELLIPSIS}" | |
| return snippet | |
| def _extract_char_window_snippet( | |
| cls, | |
| sentence: str, | |
| matched_text: str, | |
| other_matched_texts: list[str] | None = None, | |
| ) -> str: | |
| display_sentence = convert(sentence, "zh-cn").strip() | |
| search_sentence = display_sentence.lower() | |
| anchor = convert(matched_text, "zh-cn").lower().strip() | |
| if not display_sentence or not anchor: | |
| return display_sentence.strip(_SNIPPET_TRIM_CHARS) | |
| anchor_pos = search_sentence.find(anchor) | |
| if anchor_pos == -1: | |
| compact_anchor = anchor.replace(" ", "") | |
| anchor_pos = search_sentence.find(compact_anchor) | |
| if anchor_pos == -1: | |
| return display_sentence.strip(_SNIPPET_TRIM_CHARS) | |
| anchor = compact_anchor | |
| window_start = max(0, anchor_pos - 10) | |
| window_end = min(len(display_sentence), anchor_pos + len(anchor) + 12) | |
| for other_text in other_matched_texts or []: | |
| normalized_other = convert(other_text, "zh-cn").lower().strip() | |
| if not normalized_other: | |
| continue | |
| other_pos = search_sentence.find(normalized_other) | |
| if other_pos == -1: | |
| continue | |
| if other_pos > anchor_pos and other_pos < window_end: | |
| window_end = other_pos | |
| if other_pos < anchor_pos and other_pos >= window_start: | |
| window_start = max(window_start, other_pos + len(normalized_other)) | |
| snippet = display_sentence[window_start:window_end].strip(_SNIPPET_TRIM_CHARS) | |
| if window_start > 0: | |
| snippet = f"{_ELLIPSIS}{snippet}" | |
| if window_end < len(display_sentence): | |
| snippet = f"{snippet}{_ELLIPSIS}" | |
| return snippet | |
| def _extract_topic_snippet( | |
| cls, | |
| sentence: str, | |
| matched_text: str, | |
| other_matched_texts: list[str] | None = None, | |
| ) -> str: | |
| compact_sentence = re.sub(r"\s+", " ", sentence).strip() | |
| if not compact_sentence: | |
| return "" | |
| if " " in compact_sentence and re.search(r"[a-zA-Z]", matched_text): | |
| snippet = cls._extract_space_delimited_snippet( | |
| compact_sentence, | |
| matched_text, | |
| other_matched_texts=other_matched_texts, | |
| ) | |
| else: | |
| snippet = cls._extract_char_window_snippet( | |
| compact_sentence, | |
| matched_text, | |
| other_matched_texts=other_matched_texts, | |
| ) | |
| return snippet.strip(_SNIPPET_TRIM_CHARS) | |
| def _snippet_sentiment(avg_score: float) -> str: | |
| return ( | |
| "positive" if avg_score > settings.sentiment_positive_threshold | |
| else "negative" if avg_score < settings.sentiment_negative_threshold | |
| else "neutral" | |
| ) | |
| def _snippet_readability(cls, text: str) -> float: | |
| stripped = text.replace(_ELLIPSIS, "").strip(_SNIPPET_TRIM_CHARS) | |
| if not stripped: | |
| return 0.0 | |
| if " " in stripped: | |
| units = len([token for token in stripped.split() if token]) | |
| if units <= 1: | |
| length_score = 0.1 | |
| elif units == 2: | |
| length_score = 0.45 | |
| elif units == 3: | |
| length_score = 0.62 | |
| elif units <= 8: | |
| length_score = 0.95 | |
| elif units <= 12: | |
| length_score = 0.78 | |
| else: | |
| length_score = max(0.35, 0.78 - ((units - 12) * 0.06)) | |
| else: | |
| units = len(stripped) | |
| if units <= 2: | |
| length_score = 0.1 | |
| elif units <= 4: | |
| length_score = 0.58 | |
| elif units <= 12: | |
| length_score = 0.92 | |
| elif units <= 18: | |
| length_score = 0.76 | |
| else: | |
| length_score = max(0.35, 0.76 - ((units - 18) * 0.04)) | |
| boundary_penalty = 0.0 | |
| lowered = stripped.lower() | |
| if lowered.startswith(("and ", "but ", "or ", "但是", "不过", "而且")): | |
| boundary_penalty += 0.15 | |
| if lowered.endswith((" and", " but", " or", "但是", "不过", "而且")): | |
| boundary_penalty += 0.1 | |
| punctuation_penalty = 0.1 if stripped.count(",") + stripped.count(",") >= 2 else 0.0 | |
| return round(max(0.0, length_score - boundary_penalty - punctuation_penalty), 3) | |
| def _snippet_evidence_quality(cls, text: str, matched_text: str | None) -> float: | |
| quality = cls._snippet_readability(text) | |
| stripped = text.replace(_ELLIPSIS, "").strip(_SNIPPET_TRIM_CHARS) | |
| if not stripped: | |
| return 0.0 | |
| if not matched_text: | |
| return quality | |
| normalized_snippet = cls._normalize_snippet(stripped) | |
| normalized_anchor = cls._normalize_snippet(matched_text) | |
| if normalized_anchor and normalized_snippet == normalized_anchor: | |
| return 0.0 | |
| if " " in stripped: | |
| tokens = [ | |
| cls._normalize_snippet_token(token) | |
| for token in stripped.split() | |
| if cls._normalize_snippet_token(token) | |
| ] | |
| token_count = len(tokens) | |
| has_context_marker = any( | |
| token in _ASCII_SNIPPET_EVIDENCE_TOKENS | |
| or token.endswith(("ed", "ing", "ly", "ive", "ful", "less", "ous", "able", "ible")) | |
| for token in tokens | |
| ) | |
| anchor_span = cls._find_signal_anchor_span(stripped.split(), matched_text) | |
| modifier_tokens = [] | |
| if anchor_span is not None: | |
| anchor_start, anchor_end = anchor_span | |
| modifier_tokens = [ | |
| cls._normalize_snippet_token(token) | |
| for idx, token in enumerate(stripped.split()) | |
| if idx < anchor_start or idx >= anchor_end | |
| ] | |
| modifier_tokens = [token for token in modifier_tokens if token] | |
| if token_count <= 2 and not has_context_marker: | |
| quality -= 0.35 | |
| elif token_count == 3 and not has_context_marker: | |
| quality -= 0.2 | |
| if any(char.isdigit() for token in tokens for char in token) and not has_context_marker: | |
| quality -= 0.15 | |
| if modifier_tokens and not has_context_marker: | |
| weak_modifier_count = sum( | |
| 1 | |
| for token in modifier_tokens | |
| if token in _GLOBAL_SIGNAL_WEAK_TOKENS | |
| or token in _SNIPPET_WEAK_CONTEXT_TOKENS | |
| or any(char.isdigit() for char in token) | |
| ) | |
| if weak_modifier_count == len(modifier_tokens): | |
| quality -= 0.45 | |
| elif weak_modifier_count >= len(modifier_tokens) - 1 and token_count <= 5: | |
| quality -= 0.25 | |
| if cls._is_bare_topic_anchor_fragment(stripped, matched_text): | |
| # Suppress one-sided brand/title prefixes when a better topic-local snippet exists. | |
| quality -= 0.5 | |
| if has_context_marker: | |
| quality += 0.12 | |
| else: | |
| compact = cls._normalize_snippet_token(stripped) | |
| has_context_marker = any(marker in stripped or marker in compact for marker in _ZH_SNIPPET_EVIDENCE_MARKERS) | |
| tokens = cls._tokenize_sentence(stripped) | |
| normalized_tokens = [ | |
| cls._normalize_snippet_token(token) | |
| for token in tokens | |
| if cls._normalize_snippet_token(token) | |
| ] | |
| if len(compact) <= 2: | |
| quality -= 0.4 | |
| elif len(compact) <= 4 and not has_context_marker: | |
| quality -= 0.2 | |
| if stripped.startswith(_ZH_SNIPPET_FRAGMENT_PREFIXES): | |
| quality -= 0.3 | |
| if stripped.endswith(_ZH_SNIPPET_FRAGMENT_SUFFIXES): | |
| quality -= 0.4 | |
| anchor_span = cls._find_signal_anchor_span(tokens, matched_text) | |
| if anchor_span is not None: | |
| anchor_start, anchor_end = anchor_span | |
| anchor_at_edge = anchor_start == 0 or anchor_end == len(tokens) | |
| modifier_tokens = [ | |
| normalized_tokens[idx] | |
| for idx in range(len(normalized_tokens)) | |
| if idx < anchor_start or idx >= anchor_end | |
| ] | |
| if anchor_at_edge and len(normalized_tokens) <= 2: | |
| quality -= 0.22 | |
| if anchor_at_edge and len(normalized_tokens) <= 4 and ("," in stripped or "," in stripped): | |
| quality -= 0.32 | |
| if ( | |
| anchor_end == len(tokens) | |
| and len(normalized_tokens) <= 3 | |
| and ("," in stripped or "," in stripped) | |
| ): | |
| quality -= 0.22 | |
| if ( | |
| anchor_start == 0 | |
| and len(normalized_tokens) <= 4 | |
| and modifier_tokens | |
| and all( | |
| token in _ZH_SNIPPET_GENERIC_EXPERIENCE_TOKENS | |
| or token in _SNIPPET_WEAK_CONTEXT_TOKENS | |
| for token in modifier_tokens | |
| ) | |
| ): | |
| quality -= 0.24 | |
| if has_context_marker: | |
| quality += 0.1 | |
| return round(max(0.0, min(1.0, quality)), 3) | |
| def _is_meaningful_topic_context_token(cls, token: str) -> bool: | |
| normalized = cls._normalize_signal_token(token) | |
| if not normalized: | |
| return False | |
| return ( | |
| cls._is_sentiment_bearing_signal_token(token) | |
| or normalized in _GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS | |
| or normalized in _GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS | |
| or normalized in _TOPIC_CONTEXT_MEANINGFUL_TOKENS | |
| ) | |
| def _is_title_like_topic_modifier_token(cls, token: str) -> bool: | |
| normalized = cls._normalize_signal_token(token) | |
| if not normalized or normalized in _GLOBAL_SIGNAL_WEAK_TOKENS: | |
| return False | |
| if cls._is_meaningful_topic_context_token(token): | |
| return False | |
| if any(char.isdigit() for char in normalized): | |
| return True | |
| if normalized.isascii(): | |
| return len(normalized) >= 2 and any(char.isalpha() for char in normalized) | |
| return len(normalized) <= 4 | |
| def _is_bare_topic_anchor_fragment(cls, text: str, matched_text: str | None) -> bool: | |
| if not matched_text: | |
| return False | |
| tokens = cls._tokenize_sentence(text) | |
| if len(tokens) < 3: | |
| return False | |
| anchor_span = cls._find_signal_anchor_span(tokens, matched_text) | |
| if anchor_span is None: | |
| return False | |
| anchor_start, anchor_end = anchor_span | |
| anchor_tokens = tokens[anchor_start:anchor_end] | |
| modifier_tokens = tokens[:anchor_start] + tokens[anchor_end:] | |
| if len(anchor_tokens) != 1 or len(modifier_tokens) < 2: | |
| return False | |
| # Pure title/DLC fragments usually sit on one side of a single topic anchor. | |
| if anchor_start > 0 and anchor_end < len(tokens): | |
| return False | |
| if any(cls._is_meaningful_topic_context_token(token) for token in modifier_tokens): | |
| return False | |
| return all(cls._is_title_like_topic_modifier_token(token) for token in modifier_tokens) | |
| def _add_topic_snippet( | |
| self, | |
| *, | |
| topic: str, | |
| sentence: str, | |
| sentiment_score: float, | |
| categories: list[str] | None, | |
| topic_match_texts: dict[str, str] | None, | |
| ) -> None: | |
| matched_text = topic_match_texts.get(topic) if topic_match_texts else None | |
| if matched_text: | |
| snippet = self._extract_topic_snippet( | |
| sentence, | |
| matched_text, | |
| other_matched_texts=[ | |
| other_text | |
| for other_topic, other_text in (topic_match_texts or {}).items() | |
| if other_topic != topic | |
| ], | |
| ) | |
| else: | |
| snippet = re.sub(r"\s+", " ", sentence).strip(_SNIPPET_TRIM_CHARS) | |
| normalized = self._normalize_snippet(snippet) | |
| if not snippet or not normalized: | |
| return | |
| self._record_topic_snippet( | |
| topic_snippets=self._topic_snippets, | |
| context_key="general", | |
| topic=topic, | |
| normalized=normalized, | |
| snippet=snippet, | |
| sentiment_score=sentiment_score, | |
| matched_text=matched_text, | |
| ) | |
| for context in categories or []: | |
| if context not in self._context_topic_snippets: | |
| continue | |
| self._record_topic_snippet( | |
| topic_snippets=self._context_topic_snippets[context], | |
| context_key=context, | |
| topic=topic, | |
| normalized=normalized, | |
| snippet=snippet, | |
| sentiment_score=sentiment_score, | |
| matched_text=matched_text, | |
| ) | |
| def _record_topic_snippet( | |
| self, | |
| *, | |
| topic_snippets: dict[str, dict[str, dict[str, Any]]], | |
| context_key: str, | |
| topic: str, | |
| normalized: str, | |
| snippet: str, | |
| sentiment_score: float, | |
| matched_text: str | None, | |
| ) -> None: | |
| readability = self._snippet_readability(snippet) | |
| evidence_quality = self._snippet_evidence_quality(snippet, matched_text) | |
| topic_bucket = topic_snippets[topic] | |
| stats = topic_bucket.get(normalized) | |
| if stats is None: | |
| stats = { | |
| "text": snippet, | |
| "mention_count": 0, | |
| "score_sum": 0.0, | |
| "score_count": 0, | |
| "readability": readability, | |
| "quality": evidence_quality, | |
| "has_anchor": matched_text is not None, | |
| # Source drill-down: first reviewer is the representative source | |
| "source_steamid": self._current_review_steamid, | |
| "source_recommendation_id": self._current_review_recommendation_id, | |
| # Compact source metadata (#49) | |
| "source_timestamp_created": self._current_review_timestamp_created, | |
| "source_playtime_at_review": self._current_review_playtime_at_review, | |
| "source_voted_up": self._current_review_voted_up, | |
| "source_language": self._current_review_language, | |
| "source_steam_purchase": self._current_review_steam_purchase, | |
| "source_received_for_free": self._current_review_received_for_free, | |
| "source_written_during_early_access": self._current_review_written_during_early_access, | |
| } | |
| topic_bucket[normalized] = stats | |
| elif readability > stats["readability"] or ( | |
| readability == stats["readability"] and len(snippet) < len(stats["text"]) | |
| ): | |
| stats["text"] = snippet | |
| stats["readability"] = readability | |
| stats["quality"] = max(stats["quality"], evidence_quality) | |
| stats["has_anchor"] = stats["has_anchor"] or (matched_text is not None) | |
| stats["score_sum"] += sentiment_score | |
| stats["score_count"] += 1 | |
| review_seen_key = (context_key, topic, normalized) | |
| if review_seen_key not in self._current_review_seen_snippets: | |
| stats["mention_count"] += 1 | |
| self._current_review_seen_snippets.add(review_seen_key) | |
| def _snippets_are_near_duplicates(first: str, second: str) -> bool: | |
| if first == second or first in second or second in first: | |
| return True | |
| return SequenceMatcher(None, first, second).ratio() >= 0.88 | |
| def _compute_topic_snippet_highlights( | |
| self, | |
| topic: str, | |
| top_n: int, | |
| *, | |
| topic_snippets: dict[str, dict[str, dict[str, Any]]] | None = None, | |
| ) -> list[dict[str, Any]]: | |
| snippet_source = topic_snippets if topic_snippets is not None else self._topic_snippets | |
| snippet_stats = snippet_source.get(topic, {}) | |
| if not snippet_stats: | |
| return [] | |
| candidates: list[dict[str, Any]] = [] | |
| for normalized, stats in snippet_stats.items(): | |
| mention_count = stats["mention_count"] | |
| if mention_count <= 0: | |
| continue | |
| avg_score = stats["score_sum"] / max(1, stats["score_count"]) | |
| candidate: dict[str, Any] = { | |
| "text": stats["text"], | |
| "mention_count": mention_count, | |
| "score": round(avg_score, 3), | |
| "sentiment": self._snippet_sentiment(avg_score), | |
| "_readability": stats["readability"], | |
| "_quality": stats.get("quality", stats["readability"]), | |
| "_canonical": normalized, | |
| "_has_anchor": stats.get("has_anchor", False), | |
| } | |
| # Source drill-down: propagate representative review identity and metadata | |
| src_steamid = stats.get("source_steamid") | |
| src_rec_id = stats.get("source_recommendation_id") | |
| if src_steamid is not None: | |
| candidate["source_steamid"] = src_steamid | |
| if src_rec_id is not None: | |
| candidate["source_recommendation_id"] = src_rec_id | |
| # Compact source metadata (#49) | |
| for meta_key in ( | |
| "source_timestamp_created", | |
| "source_playtime_at_review", | |
| "source_voted_up", | |
| "source_language", | |
| "source_steam_purchase", | |
| "source_received_for_free", | |
| "source_written_during_early_access", | |
| ): | |
| meta_val = stats.get(meta_key) | |
| if meta_val is not None: | |
| candidate[meta_key] = meta_val | |
| candidates.append(candidate) | |
| candidates.sort( | |
| key=lambda item: ( | |
| -item["_quality"], | |
| -item["mention_count"], | |
| -abs(item["score"]), | |
| len(item["text"]), | |
| item["text"], | |
| ) | |
| ) | |
| deduped: list[dict[str, Any]] = [] | |
| for candidate in candidates: | |
| if any( | |
| self._snippets_are_near_duplicates(candidate["_canonical"], existing["_canonical"]) | |
| for existing in deduped | |
| ): | |
| continue | |
| deduped.append(candidate) | |
| qualified = [ | |
| candidate | |
| for candidate in deduped | |
| if ( | |
| candidate["_has_anchor"] | |
| and ( | |
| candidate["_quality"] >= 0.65 | |
| or (candidate["_quality"] >= 0.55 and candidate["mention_count"] >= 2) | |
| ) | |
| ) or ( | |
| not candidate["_has_anchor"] | |
| and ( | |
| candidate["_quality"] >= 0.55 | |
| or (candidate["_quality"] >= 0.45 and candidate["mention_count"] >= 2) | |
| ) | |
| ) | |
| ] | |
| selected = qualified[: min(top_n, 3)] | |
| for candidate in selected: | |
| candidate.pop("_readability", None) | |
| candidate.pop("_quality", None) | |
| candidate.pop("_canonical", None) | |
| candidate.pop("_has_anchor", None) | |
| return selected | |
| def _topic_ngram_is_relevant( | |
| self, | |
| *, | |
| ngram: str, | |
| topic: str, | |
| topic_match_texts: dict[str, str] | None, | |
| ) -> bool: | |
| if not topic_match_texts or topic not in topic_match_texts: | |
| return True | |
| matched_text = self._normalize_for_topic_match(topic_match_texts[topic]) | |
| if not matched_text: | |
| return True | |
| if matched_text not in ngram: | |
| return False | |
| for other_topic, other_matched_text in topic_match_texts.items(): | |
| if other_topic == topic: | |
| continue | |
| normalized_other = self._normalize_for_topic_match(other_matched_text) | |
| if normalized_other and normalized_other in ngram: | |
| return False | |
| return True | |
| def _compute_tfidf_highlights( | |
| self, | |
| counter: Counter, | |
| top_n: int, | |
| *, | |
| prefer_longer_phrases: bool = False, | |
| ) -> list[dict]: | |
| """TF-IDF scoring + filtering + dedup.""" | |
| candidates = [] | |
| n = self._review_count | |
| total_count = sum(counter.values()) if counter.values() else 1 | |
| for ngram, count in counter.items(): | |
| df = self._ngram_doc_freq.get(ngram, 0) | |
| if df < settings.highlights_min_mentions: | |
| continue | |
| if df / n > settings.highlights_max_doc_freq_ratio: | |
| continue | |
| idf = math.log(n / df) if df > 0 else 0 | |
| tf = count / total_count | |
| tfidf = tf * idf | |
| rank_score = count * tfidf | |
| # Oblicz sredni sentyment z sumy i liczby | |
| s_sum = self._ngram_sentiment_sum.get(ngram, 0.0) | |
| s_count = self._ngram_sentiment_count.get(ngram, 0) | |
| avg_score = s_sum / s_count if s_count > 0 else 0.0 | |
| candidates.append({ | |
| "phrase": ngram, | |
| "mention_count": df, | |
| "score": round(avg_score, 3), | |
| "sentiment": ( | |
| "positive" if avg_score > settings.sentiment_positive_threshold | |
| else "negative" if avg_score < settings.sentiment_negative_threshold | |
| else "neutral" | |
| ), | |
| "ngram_size": len(ngram.split()), | |
| "_rank": rank_score, | |
| }) | |
| if prefer_longer_phrases: | |
| candidates.sort( | |
| key=lambda x: (-x["_rank"], -x["ngram_size"], -x["mention_count"], -x["score"], x["phrase"]) | |
| ) | |
| else: | |
| candidates.sort(key=lambda x: x["_rank"], reverse=True) | |
| # Substring absorption | |
| absorbed: set[int] = set() | |
| for i, c in enumerate(candidates): | |
| if i in absorbed: | |
| continue | |
| for j in range(i + 1, len(candidates)): | |
| if j in absorbed: | |
| continue | |
| if candidates[j]["phrase"] in c["phrase"]: | |
| parent_has_neg = any(neg in c["phrase"] for neg in ["不", "没", "无"]) | |
| child_has_neg = any(neg in candidates[j]["phrase"] for neg in ["不", "没", "无"]) | |
| if parent_has_neg == child_has_neg: | |
| absorbed.add(j) | |
| results = [c for i, c in enumerate(candidates) if i not in absorbed] | |
| # Re-sort by mention_count descending for display order. | |
| # TF-IDF sort above selected the top candidates; this ensures the final | |
| # list the UI receives is ordered from most-mentioned to least-mentioned, | |
| # with score and phrase as stable tie-breakers. | |
| if prefer_longer_phrases: | |
| results.sort(key=lambda x: (-x["mention_count"], -x["ngram_size"], -x["score"], x["phrase"])) | |
| else: | |
| results.sort(key=lambda x: (-x["mention_count"], -x["score"], x["phrase"])) | |
| for r in results[:top_n]: | |
| r.pop("_rank", None) | |
| return results[:top_n] | |