sentimentstream-worker / backend /app /services /highlights_service.py
GitHub Action
deploy: worker release from GitHub
d0097df
"""
Serwis ekstrakcji Community Highlights z recenzji.
Uzywa n-gramow (2-5 tokenow) + TF-IDF do identyfikacji najczesciej uzywanych fraz.
"""
import math
import re
import time
from collections import Counter, defaultdict
from difflib import SequenceMatcher
from typing import Any
import jieba
from zhconv import convert
from app.core.config import settings
from app.core.worker_logging import StageTimingAccumulator
from app.core.stopwords_zh import is_stopword
from app.models.schemas import SentimentType
from app.services.global_pros_cons import (
GLOBAL_PROS_CONS_CONTEXTS,
build_contextual_global_pros_cons,
)
_SNIPPET_TRIM_CHARS = " \t\n\r.,!?;:,。!?;:、\"'“”‘’[]()"
_ELLIPSIS = "..."
_EXPLICIT_STEAM_DECK_PATTERN = re.compile(
r"(?:\bsteam\s*deck\b|\bsteamdeck\b)",
re.IGNORECASE,
)
_ASCII_SNIPPET_EVIDENCE_TOKENS = frozenset(
{
"amazing",
"awkward",
"bad",
"better",
"beautiful",
"boring",
"broken",
"buggy",
"cheap",
"clunky",
"crashes",
"crash",
"dragging",
"drags",
"expensive",
"faster",
"feels",
"felt",
"fun",
"good",
"great",
"hard",
"hate",
"hated",
"interesting",
"janky",
"laggy",
"love",
"loved",
"okay",
"polished",
"repetitive",
"responsive",
"rough",
"runs",
"running",
"satisfying",
"slower",
"slow",
"smooth",
"smoothly",
"stable",
"stiff",
"terrible",
"thin",
"unplayable",
"worth",
"worse",
}
)
_ZH_SNIPPET_EVIDENCE_MARKERS = frozenset(
{
"不",
"不错",
"一般",
"卡",
"卡顿",
"喜欢",
"失望",
"好",
"好玩",
"崩溃",
"差",
"推荐",
"无聊",
"流畅",
"爽",
"稳定",
"糟糕",
"舒服",
"贵",
"难",
"顺滑",
"闪退",
}
)
_GLOBAL_SIGNAL_WEAK_TOKENS = frozenset(
{
"after",
"appear",
"appears",
"become",
"becomes",
"during",
"every",
"feel",
"feels",
"felt",
"overall",
"really",
"seem",
"seems",
"stay",
"stays",
"still",
"through",
"very",
"之后",
"以前",
"之后",
"还是",
"依然",
"感觉",
"觉得",
"开始",
"出现",
}
)
_GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS = frozenset(
{
"audio",
"balance",
"bug",
"bugs",
"combat",
"community",
"content",
"control",
"controls",
"difficulty",
"gameplay",
"graphic",
"graphics",
"hud",
"interface",
"localization",
"menu",
"music",
"narrative",
"optimization",
"patch",
"performance",
"polish",
"progression",
"sound",
"soundtrack",
"story",
"support",
"ui",
"内容",
"剧情",
"平衡",
"性能",
"战斗",
"故事",
"操作",
"玩法",
"界面",
"画面",
"画质",
"配乐",
"配音",
"音效",
"音乐",
"优化",
"手感",
}
)
_GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS = frozenset(
{
"crash",
"crashes",
"grind",
"lag",
"lags",
"stutter",
"stutters",
}
)
_TOPIC_CONTEXT_MEANINGFUL_TOKENS = frozenset(
{
"area",
"areas",
"boss",
"bosses",
"build",
"builds",
"chapter",
"chapters",
"class",
"classes",
"enemy",
"enemies",
"fight",
"fights",
"late",
"map",
"maps",
"mission",
"missions",
"quest",
"quests",
"side",
"skill",
"skills",
"town",
"towns",
"weapon",
"weapons",
"主线",
"任务",
"关卡",
"城镇",
"地图",
"支线",
"敌人",
"武器",
"章节",
}
)
_SNIPPET_WEAK_CONTEXT_TOKENS = frozenset(
{
"after",
"before",
"during",
"in",
"on",
"some",
"certain",
"various",
"several",
"part",
"parts",
"area",
"areas",
"patch",
"update",
"updates",
"version",
"versions",
"之后",
"之前",
"期间",
"有些",
"一些",
"部分",
"地方",
"区域",
"补丁",
"更新",
"版本",
}
)
_ZH_SNIPPET_FRAGMENT_PREFIXES = (
"和",
"但",
"但是",
"不过",
"而",
)
_ZH_SNIPPET_FRAGMENT_SUFFIXES = (
"的太",
"根本",
"每一个",
"每一条",
)
_ZH_SNIPPET_GENERIC_EXPERIENCE_TOKENS = frozenset(
{
"互通",
"体验",
"更佳",
"一玩",
}
)
class HighlightsCollector:
"""
Stateful collector — akumuluje dane przez caly cykl analizy w sposob przyrostowy,
aby oszczedzac pamiec RAM. Oblicza highlights raz na koncu.
"""
def __init__(
self,
*,
stage_timings: StageTimingAccumulator | None = None,
) -> None:
self._stage_timings = stage_timings
self._topic_ngrams: dict[str, Counter] = defaultdict(Counter)
self._topic_snippets: dict[str, dict[str, dict[str, Any]]] = defaultdict(dict)
self._context_topic_snippets: dict[str, dict[str, dict[str, dict[str, Any]]]] = {
"recent": defaultdict(dict),
"current_patch": defaultdict(dict),
}
self._category_ngrams: dict[str, Counter] = defaultdict(Counter)
self._global_signal_doc_freq: dict[str, Counter] = {
context: Counter() for context in GLOBAL_PROS_CONS_CONTEXTS
}
self._global_signal_sentiment_sum: dict[str, dict[tuple[str, str], float]] = {
context: defaultdict(float) for context in GLOBAL_PROS_CONS_CONTEXTS
}
self._global_signal_sentiment_count: dict[str, Counter] = {
context: Counter() for context in GLOBAL_PROS_CONS_CONTEXTS
}
self._global_signal_metadata: dict[str, dict[tuple[str, str], dict[str, Any]]] = {
context: {} for context in GLOBAL_PROS_CONS_CONTEXTS
}
self._global_counts: Counter = Counter()
self._ngram_doc_freq: Counter = Counter()
self._ngram_sentiment_sum: dict[str, float] = defaultdict(float)
self._ngram_sentiment_count: Counter = Counter()
self._review_count = 0
self._current_review_seen_ngrams: set[str] = set()
self._current_review_seen_snippets: set[tuple[str, str, str]] = set()
self._current_review_seen_global_signals: dict[str, set[tuple[str, str]]] = {
context: set() for context in GLOBAL_PROS_CONS_CONTEXTS
}
# Source drill-down: current review identity for snippet attribution
self._current_review_steamid: str | None = None
self._current_review_recommendation_id: str | None = None
# Compact source metadata (#49)
self._current_review_timestamp_created: int | None = None
self._current_review_playtime_at_review: int | None = None
self._current_review_voted_up: bool | None = None
self._current_review_language: str | None = None
self._current_review_steam_purchase: bool | None = None
self._current_review_received_for_free: bool | None = None
self._current_review_written_during_early_access: bool | None = None
def set_review_source(
self,
*,
author_steamid: str | None = None,
recommendation_id: str | None = None,
timestamp_created: int | None = None,
playtime_at_review: int | None = None,
voted_up: bool | None = None,
language: str | None = None,
steam_purchase: bool | None = None,
received_for_free: bool | None = None,
written_during_early_access: bool | None = None,
) -> None:
"""Set review-level identity and metadata for the next review's snippet attribution."""
self._current_review_steamid = author_steamid
self._current_review_recommendation_id = recommendation_id
self._current_review_timestamp_created = timestamp_created
self._current_review_playtime_at_review = playtime_at_review
self._current_review_voted_up = voted_up
self._current_review_language = language
self._current_review_steam_purchase = steam_purchase
self._current_review_received_for_free = received_for_free
self._current_review_written_during_early_access = written_during_early_access
def start_review(self) -> None:
"""Sygnalizuje poczatek nowej recenzji (do obliczania Document Frequency)."""
self._review_count += 1
self._current_review_seen_ngrams = set()
self._current_review_seen_snippets = set()
self._current_review_seen_global_signals = {
context: set() for context in GLOBAL_PROS_CONS_CONTEXTS
}
def add_sentence(
self,
review_idx: int, # Zachowane dla kompatybilnosci, uzywaj start_review() do separacji
sentence: str,
topics: list[str],
sentiment_score: float,
categories: list[str] | None = None,
topic_match_texts: dict[str, str] | None = None,
) -> None:
"""Wywolywane per zdanie podczas analyze_batch()."""
stage_start = time.monotonic()
words = self._tokenize_sentence(sentence)
allow_public_evidence = not self._is_steam_deck_specific_sentence(sentence)
if not words:
if self._stage_timings is not None:
self._stage_timings.add(
"highlights_discussion_patterns_s",
time.monotonic() - stage_start,
)
return
if allow_public_evidence:
for topic in topics:
self._add_global_signal_candidates(
sentence=sentence,
topic=topic,
sentiment_score=sentiment_score,
categories=categories,
topic_match_texts=topic_match_texts,
)
if len(words) < 2:
if self._stage_timings is not None:
self._stage_timings.add(
"highlights_discussion_patterns_s",
time.monotonic() - stage_start,
)
return
for n in range(settings.highlights_ngram_min, settings.highlights_ngram_max + 1):
for i in range(len(words) - n + 1):
ngram = " ".join(words[i : i + n])
normalized_ngram = self._normalize_for_topic_match(ngram)
# 1. Globalne liczniki
self._global_counts[ngram] += 1
self._ngram_sentiment_sum[ngram] += sentiment_score
self._ngram_sentiment_count[ngram] += 1
# 2. Przyrostowe Document Frequency (raz per recenzja)
if ngram not in self._current_review_seen_ngrams:
self._ngram_doc_freq[ngram] += 1
self._current_review_seen_ngrams.add(ngram)
# 3. Liczniki tematyczne i kategoryczne
for topic in topics:
if self._topic_ngram_is_relevant(
ngram=normalized_ngram,
topic=topic,
topic_match_texts=topic_match_texts,
):
self._topic_ngrams[topic][ngram] += 1
if categories:
for category in categories:
self._category_ngrams[category][ngram] += 1
if allow_public_evidence:
for topic in topics:
self._add_topic_snippet(
topic=topic,
sentence=sentence,
sentiment_score=sentiment_score,
categories=categories,
topic_match_texts=topic_match_texts,
)
if self._review_count % 500 == 0:
self._prune_singletons()
if self._stage_timings is not None:
self._stage_timings.add(
"highlights_discussion_patterns_s",
time.monotonic() - stage_start,
)
@staticmethod
def _is_steam_deck_specific_sentence(sentence: str) -> bool:
"""Deck-specific text feeds the dedicated widget, not public grouped evidence."""
return bool(_EXPLICIT_STEAM_DECK_PATTERN.search(sentence))
def _prune_singletons(self) -> None:
"""Glebokie czyszczenie n-gramow z count=1 (oszczednosc pamieci)."""
singletons = [k for k, v in self._global_counts.items() if v <= 1]
for k in singletons:
del self._global_counts[k]
if k in self._ngram_sentiment_sum:
del self._ngram_sentiment_sum[k]
del self._ngram_sentiment_count[k]
del self._ngram_doc_freq[k]
# Czyszczenie w tematach
for topic in self._topic_ngrams:
if k in self._topic_ngrams[topic]:
del self._topic_ngrams[topic][k]
# Czyszczenie w kategoriach
for cat in self._category_ngrams:
if k in self._category_ngrams[cat]:
del self._category_ngrams[cat][k]
def compute_highlights(self) -> dict[str, Any]:
"""
Oblicza highlights po zakonczeniu analizy.
"""
highlights_start = time.monotonic()
if self._review_count == 0:
empty_results: dict[str, Any] = {
"general": [],
"recent": [],
"current_patch": [],
"topics": {},
}
if self._stage_timings is not None:
self._stage_timings.add(
"highlights_discussion_patterns_s",
time.monotonic() - highlights_start,
)
global_stage_start = time.monotonic()
empty_results["global_pros_cons"] = build_contextual_global_pros_cons({}).model_dump(mode="json")
if self._stage_timings is not None:
self._stage_timings.add(
"global_pros_cons_s",
time.monotonic() - global_stage_start,
)
return empty_results
results: dict[str, Any] = {
"general": self._compute_tfidf_highlights(
self._global_counts,
top_n=settings.highlights_top_n_general,
),
"recent": self._compute_tfidf_highlights(
self._category_ngrams.get("recent", Counter()),
top_n=settings.highlights_top_n_general,
),
"current_patch": self._compute_tfidf_highlights(
self._category_ngrams.get("current_patch", Counter()),
top_n=settings.highlights_top_n_general,
),
"topics": {},
"recent_topic_highlights": {},
"current_patch_topic_highlights": {},
}
for topic, counter in self._topic_ngrams.items():
h = self._compute_topic_snippet_highlights(
topic,
top_n=settings.highlights_top_n_per_topic,
)
if h:
results["topics"][topic] = h
for context in ("recent", "current_patch"):
topic_buckets = self._context_topic_snippets[context]
for topic in topic_buckets:
h = self._compute_topic_snippet_highlights(
topic,
top_n=settings.highlights_top_n_per_topic,
topic_snippets=topic_buckets,
)
if h:
results[f"{context}_topic_highlights"][topic] = h
if self._stage_timings is not None:
self._stage_timings.add(
"highlights_discussion_patterns_s",
time.monotonic() - highlights_start,
)
global_stage_start = time.monotonic()
results["global_pros_cons"] = self._compute_global_pros_cons()
if self._stage_timings is not None:
self._stage_timings.add(
"global_pros_cons_s",
time.monotonic() - global_stage_start,
)
return results
@staticmethod
def _is_valid_global_signal_token(token: str) -> bool:
compact = token.strip().lower()
if not compact:
return False
if compact.isascii():
return len(compact) >= 2 and any(char.isalpha() for char in compact)
return True
@staticmethod
def _normalize_signal_token(token: str) -> str:
normalized = convert(token, "zh-cn").lower()
return re.sub(r"[^\w\u4e00-\u9fff]+", "", normalized)
@classmethod
def _find_signal_anchor_span(
cls,
tokens: list[str],
matched_text: str,
) -> tuple[int, int] | None:
if not tokens or not matched_text:
return None
normalized_tokens = [cls._normalize_signal_token(token) for token in tokens]
anchor_tokens = [
cls._normalize_signal_token(token)
for token in cls._tokenize_sentence(matched_text)
if cls._normalize_signal_token(token)
]
if not anchor_tokens:
normalized_match = cls._normalize_signal_token(matched_text)
if not normalized_match:
return None
anchor_tokens = [normalized_match]
for start in range(0, len(tokens) - len(anchor_tokens) + 1):
if normalized_tokens[start:start + len(anchor_tokens)] == anchor_tokens:
return start, start + len(anchor_tokens)
compact_anchor = "".join(anchor_tokens)
for idx, token in enumerate(normalized_tokens):
if compact_anchor and compact_anchor in token:
return idx, idx + 1
return None
@classmethod
def _global_signal_quality(
cls,
tokens: list[str],
*,
anchor_start: int,
anchor_end: int,
) -> float:
anchor_tokens = tokens[anchor_start:anchor_end]
modifier_tokens = tokens[:anchor_start] + tokens[anchor_end:]
weak_token_count = sum(
1 for token in modifier_tokens
if cls._normalize_signal_token(token) in _GLOBAL_SIGNAL_WEAK_TOKENS
)
sentiment_modifier_count = sum(
1 for token in modifier_tokens if cls._is_sentiment_bearing_signal_token(token)
)
semantic_modifier_count = sum(
1
for token in modifier_tokens
if cls._normalize_signal_token(token)
and cls._normalize_signal_token(token) not in _GLOBAL_SIGNAL_WEAK_TOKENS
)
quality = 0.85
if not modifier_tokens:
if cls._is_generic_topic_anchor_candidate(anchor_tokens):
quality -= 0.7
elif cls._is_self_contained_anchor_signal(anchor_tokens):
quality += 0.35
elif len(anchor_tokens) > 1:
quality += 0.2
else:
quality += 0.0
else:
quality += min(0.25, semantic_modifier_count * 0.2)
quality += min(0.25, sentiment_modifier_count * 0.2)
quality -= weak_token_count * 0.2
if len(modifier_tokens) > 2:
quality -= min(0.25, (len(modifier_tokens) - 2) * 0.1)
if (
anchor_start > 0
and sentiment_modifier_count == 0
and all(
token.isascii() and len(token) > 5
for token in tokens[:anchor_start]
)
):
# Title/DLC-like prefixes should not outrank cleaner lexical signals.
quality -= 0.35
if any(char.isdigit() for token in modifier_tokens for char in token) and sentiment_modifier_count == 0:
quality -= 0.15
return round(quality, 3)
@classmethod
def _is_sentiment_bearing_signal_token(cls, token: str) -> bool:
normalized = cls._normalize_signal_token(token)
if not normalized:
return False
if normalized.isascii():
return (
normalized in _ASCII_SNIPPET_EVIDENCE_TOKENS
or normalized.endswith(("ed", "ing", "ly", "ive", "ful", "less", "ous", "able", "ible"))
)
return any(marker in normalized for marker in _ZH_SNIPPET_EVIDENCE_MARKERS)
@classmethod
def _is_generic_topic_anchor_candidate(cls, anchor_tokens: list[str]) -> bool:
normalized_tokens = [
cls._normalize_signal_token(token)
for token in anchor_tokens
if cls._normalize_signal_token(token)
]
return bool(normalized_tokens) and all(
token in _GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS
for token in normalized_tokens
)
@classmethod
def _is_self_contained_anchor_signal(cls, anchor_tokens: list[str]) -> bool:
normalized_tokens = [
cls._normalize_signal_token(token)
for token in anchor_tokens
if cls._normalize_signal_token(token)
]
if not normalized_tokens:
return False
if all(cls._is_sentiment_bearing_signal_token(token) for token in anchor_tokens):
return True
return len(normalized_tokens) == 1 and normalized_tokens[0] in _GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS
def _add_global_signal_candidates(
self,
*,
sentence: str,
topic: str,
sentiment_score: float,
categories: list[str] | None,
topic_match_texts: dict[str, str] | None,
) -> None:
matched_text = topic_match_texts.get(topic) if topic_match_texts else None
if matched_text:
candidate_text = self._extract_topic_snippet(
sentence,
matched_text,
other_matched_texts=[
other_text
for other_topic, other_text in (topic_match_texts or {}).items()
if other_topic != topic
],
)
else:
candidate_text = sentence
tokens = [
word for word in self._tokenize_sentence(candidate_text)
if self._is_valid_global_signal_token(word)
]
if not tokens:
return
anchor_span = self._find_signal_anchor_span(tokens, matched_text or candidate_text)
if anchor_span is None:
return
anchor_start, anchor_end = anchor_span
best_candidate: tuple[str, float, int] | None = None
max_n = min(3, len(tokens))
for n in range(1, max_n + 1):
for start in range(0, len(tokens) - n + 1):
end = start + n
if end <= anchor_start or start >= anchor_end:
continue
phrase_tokens = tokens[start:end]
phrase = " ".join(phrase_tokens).strip()
if not phrase:
continue
local_anchor_start = max(0, anchor_start - start)
local_anchor_end = min(len(phrase_tokens), anchor_end - start)
quality = self._global_signal_quality(
phrase_tokens,
anchor_start=local_anchor_start,
anchor_end=local_anchor_end,
)
first_position = local_anchor_start
candidate = (phrase, quality, first_position)
if best_candidate is None or (quality, -len(phrase), -first_position) > (
best_candidate[1],
-len(best_candidate[0]),
-best_candidate[2],
):
best_candidate = candidate
if best_candidate is None:
return
phrase, quality, first_position = best_candidate
sentiment = self._classify_global_signal_sentiment(sentiment_score)
if sentiment is None:
return
for context in self._global_signal_contexts(categories):
self._record_global_signal(
context=context,
phrase=phrase,
sentiment=sentiment,
sentiment_score=sentiment_score,
quality=quality,
first_position=first_position,
)
@staticmethod
def _global_signal_contexts(categories: list[str] | None) -> list[str]:
contexts = ["general"]
for category in categories or []:
if category in GLOBAL_PROS_CONS_CONTEXTS and category not in contexts:
contexts.append(category)
return contexts
@staticmethod
def _classify_global_signal_sentiment(sentiment_score: float) -> str | None:
if sentiment_score > settings.sentiment_positive_threshold:
return SentimentType.POSITIVE.value
if sentiment_score < settings.sentiment_negative_threshold:
return SentimentType.NEGATIVE.value
return None
def _record_global_signal(
self,
*,
context: str,
phrase: str,
sentiment: str,
sentiment_score: float,
quality: float,
first_position: int,
) -> None:
key = (phrase, sentiment)
self._global_signal_sentiment_sum[context][key] += sentiment_score
self._global_signal_sentiment_count[context][key] += 1
if key not in self._current_review_seen_global_signals[context]:
self._global_signal_doc_freq[context][key] += 1
self._current_review_seen_global_signals[context].add(key)
metadata = self._global_signal_metadata[context].get(key)
if metadata is None:
self._global_signal_metadata[context][key] = {
"quality": quality,
"first_position": first_position,
}
else:
metadata["quality"] = max(metadata["quality"], quality)
metadata["first_position"] = min(metadata["first_position"], first_position)
def _build_global_signal_sources(self, context: str) -> list[dict[str, Any]]:
sources: list[dict[str, Any]] = []
for (phrase, sentiment), doc_freq in self._global_signal_doc_freq[context].items():
sentiment_count = self._global_signal_sentiment_count[context].get((phrase, sentiment), 0)
if sentiment_count <= 0:
continue
metadata = self._global_signal_metadata[context].get((phrase, sentiment), {})
quality = metadata.get("quality", 0.0)
if quality < 1.0:
continue
avg_score = self._global_signal_sentiment_sum[context][(phrase, sentiment)] / sentiment_count
sources.append(
{
"phrase": phrase,
"mention_count": doc_freq,
"sentiment": sentiment,
"score": round(avg_score, 3),
"quality": quality,
"first_position": metadata.get("first_position", 0),
}
)
return sources
def compute_global_pros_cons_signals(self) -> dict[str, list[dict[str, Any]]]:
stage_start = time.monotonic()
results = {
context: self._build_global_signal_sources(context)
for context in GLOBAL_PROS_CONS_CONTEXTS
}
if self._stage_timings is not None:
self._stage_timings.add(
"global_pros_cons_s",
time.monotonic() - stage_start,
)
return results
def _compute_global_pros_cons(self) -> dict[str, dict[str, list[dict[str, Any]]]]:
return build_contextual_global_pros_cons(
self.compute_global_pros_cons_signals()
).model_dump(mode="json")
@staticmethod
def _tokenize_sentence(sentence: str) -> list[str]:
# Prosta detekcja ASCII dla angielskich fraz (unikniecie blednego ciecia przez jieba)
is_ascii = all(ord(c) < 128 for c in sentence)
if is_ascii:
return [w for w in sentence.split() if not is_stopword(w) and len(w.strip()) > 0]
return [w for w in jieba.lcut(sentence) if not is_stopword(w) and len(w.strip()) > 0]
@staticmethod
def _normalize_for_topic_match(text: str) -> str:
normalized = convert(text, "zh-cn").lower()
return "".join(normalized.split())
@classmethod
def _normalize_snippet(cls, text: str) -> str:
normalized = convert(text, "zh-cn").lower().replace(_ELLIPSIS, " ")
normalized = re.sub(r"[^\w\u4e00-\u9fff]+", " ", normalized)
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
@staticmethod
def _normalize_snippet_token(text: str) -> str:
normalized = convert(text, "zh-cn").lower()
normalized = re.sub(r"[^\w\u4e00-\u9fff]+", "", normalized)
return normalized
@classmethod
def _extract_space_delimited_snippet(
cls,
sentence: str,
matched_text: str,
other_matched_texts: list[str] | None = None,
) -> str:
tokens = sentence.split()
if not tokens:
return sentence.strip(_SNIPPET_TRIM_CHARS)
normalized_tokens = [cls._normalize_snippet_token(token) for token in tokens]
anchor_tokens = [
cls._normalize_snippet_token(token)
for token in matched_text.split()
if cls._normalize_snippet_token(token)
]
if not anchor_tokens:
return sentence.strip(_SNIPPET_TRIM_CHARS)
anchor_start = None
anchor_end = None
for start in range(0, len(tokens) - len(anchor_tokens) + 1):
if normalized_tokens[start:start + len(anchor_tokens)] == anchor_tokens:
anchor_start = start
anchor_end = start + len(anchor_tokens)
break
if anchor_start is None:
compact_anchor = "".join(anchor_tokens)
for idx, token in enumerate(normalized_tokens):
if compact_anchor and compact_anchor in token:
anchor_start = idx
anchor_end = idx + 1
break
if anchor_start is None or anchor_end is None:
return sentence.strip(_SNIPPET_TRIM_CHARS)
window_start = max(0, anchor_start - 3)
window_end = min(len(tokens), anchor_end + 4)
for other_text in other_matched_texts or []:
other_tokens = [
cls._normalize_snippet_token(token)
for token in other_text.split()
if cls._normalize_snippet_token(token)
]
if not other_tokens:
continue
for start in range(0, len(tokens) - len(other_tokens) + 1):
if normalized_tokens[start:start + len(other_tokens)] != other_tokens:
continue
if start > anchor_start and start < window_end:
window_end = start
if start < anchor_start and (start + len(other_tokens)) > window_start:
window_start = start + len(other_tokens)
break
snippet = " ".join(tokens[window_start:window_end]).strip(_SNIPPET_TRIM_CHARS)
if window_start > 0:
snippet = f"{_ELLIPSIS}{snippet}"
if window_end < len(tokens):
snippet = f"{snippet}{_ELLIPSIS}"
return snippet
@classmethod
def _extract_char_window_snippet(
cls,
sentence: str,
matched_text: str,
other_matched_texts: list[str] | None = None,
) -> str:
display_sentence = convert(sentence, "zh-cn").strip()
search_sentence = display_sentence.lower()
anchor = convert(matched_text, "zh-cn").lower().strip()
if not display_sentence or not anchor:
return display_sentence.strip(_SNIPPET_TRIM_CHARS)
anchor_pos = search_sentence.find(anchor)
if anchor_pos == -1:
compact_anchor = anchor.replace(" ", "")
anchor_pos = search_sentence.find(compact_anchor)
if anchor_pos == -1:
return display_sentence.strip(_SNIPPET_TRIM_CHARS)
anchor = compact_anchor
window_start = max(0, anchor_pos - 10)
window_end = min(len(display_sentence), anchor_pos + len(anchor) + 12)
for other_text in other_matched_texts or []:
normalized_other = convert(other_text, "zh-cn").lower().strip()
if not normalized_other:
continue
other_pos = search_sentence.find(normalized_other)
if other_pos == -1:
continue
if other_pos > anchor_pos and other_pos < window_end:
window_end = other_pos
if other_pos < anchor_pos and other_pos >= window_start:
window_start = max(window_start, other_pos + len(normalized_other))
snippet = display_sentence[window_start:window_end].strip(_SNIPPET_TRIM_CHARS)
if window_start > 0:
snippet = f"{_ELLIPSIS}{snippet}"
if window_end < len(display_sentence):
snippet = f"{snippet}{_ELLIPSIS}"
return snippet
@classmethod
def _extract_topic_snippet(
cls,
sentence: str,
matched_text: str,
other_matched_texts: list[str] | None = None,
) -> str:
compact_sentence = re.sub(r"\s+", " ", sentence).strip()
if not compact_sentence:
return ""
if " " in compact_sentence and re.search(r"[a-zA-Z]", matched_text):
snippet = cls._extract_space_delimited_snippet(
compact_sentence,
matched_text,
other_matched_texts=other_matched_texts,
)
else:
snippet = cls._extract_char_window_snippet(
compact_sentence,
matched_text,
other_matched_texts=other_matched_texts,
)
return snippet.strip(_SNIPPET_TRIM_CHARS)
@staticmethod
def _snippet_sentiment(avg_score: float) -> str:
return (
"positive" if avg_score > settings.sentiment_positive_threshold
else "negative" if avg_score < settings.sentiment_negative_threshold
else "neutral"
)
@classmethod
def _snippet_readability(cls, text: str) -> float:
stripped = text.replace(_ELLIPSIS, "").strip(_SNIPPET_TRIM_CHARS)
if not stripped:
return 0.0
if " " in stripped:
units = len([token for token in stripped.split() if token])
if units <= 1:
length_score = 0.1
elif units == 2:
length_score = 0.45
elif units == 3:
length_score = 0.62
elif units <= 8:
length_score = 0.95
elif units <= 12:
length_score = 0.78
else:
length_score = max(0.35, 0.78 - ((units - 12) * 0.06))
else:
units = len(stripped)
if units <= 2:
length_score = 0.1
elif units <= 4:
length_score = 0.58
elif units <= 12:
length_score = 0.92
elif units <= 18:
length_score = 0.76
else:
length_score = max(0.35, 0.76 - ((units - 18) * 0.04))
boundary_penalty = 0.0
lowered = stripped.lower()
if lowered.startswith(("and ", "but ", "or ", "但是", "不过", "而且")):
boundary_penalty += 0.15
if lowered.endswith((" and", " but", " or", "但是", "不过", "而且")):
boundary_penalty += 0.1
punctuation_penalty = 0.1 if stripped.count(",") + stripped.count(",") >= 2 else 0.0
return round(max(0.0, length_score - boundary_penalty - punctuation_penalty), 3)
@classmethod
def _snippet_evidence_quality(cls, text: str, matched_text: str | None) -> float:
quality = cls._snippet_readability(text)
stripped = text.replace(_ELLIPSIS, "").strip(_SNIPPET_TRIM_CHARS)
if not stripped:
return 0.0
if not matched_text:
return quality
normalized_snippet = cls._normalize_snippet(stripped)
normalized_anchor = cls._normalize_snippet(matched_text)
if normalized_anchor and normalized_snippet == normalized_anchor:
return 0.0
if " " in stripped:
tokens = [
cls._normalize_snippet_token(token)
for token in stripped.split()
if cls._normalize_snippet_token(token)
]
token_count = len(tokens)
has_context_marker = any(
token in _ASCII_SNIPPET_EVIDENCE_TOKENS
or token.endswith(("ed", "ing", "ly", "ive", "ful", "less", "ous", "able", "ible"))
for token in tokens
)
anchor_span = cls._find_signal_anchor_span(stripped.split(), matched_text)
modifier_tokens = []
if anchor_span is not None:
anchor_start, anchor_end = anchor_span
modifier_tokens = [
cls._normalize_snippet_token(token)
for idx, token in enumerate(stripped.split())
if idx < anchor_start or idx >= anchor_end
]
modifier_tokens = [token for token in modifier_tokens if token]
if token_count <= 2 and not has_context_marker:
quality -= 0.35
elif token_count == 3 and not has_context_marker:
quality -= 0.2
if any(char.isdigit() for token in tokens for char in token) and not has_context_marker:
quality -= 0.15
if modifier_tokens and not has_context_marker:
weak_modifier_count = sum(
1
for token in modifier_tokens
if token in _GLOBAL_SIGNAL_WEAK_TOKENS
or token in _SNIPPET_WEAK_CONTEXT_TOKENS
or any(char.isdigit() for char in token)
)
if weak_modifier_count == len(modifier_tokens):
quality -= 0.45
elif weak_modifier_count >= len(modifier_tokens) - 1 and token_count <= 5:
quality -= 0.25
if cls._is_bare_topic_anchor_fragment(stripped, matched_text):
# Suppress one-sided brand/title prefixes when a better topic-local snippet exists.
quality -= 0.5
if has_context_marker:
quality += 0.12
else:
compact = cls._normalize_snippet_token(stripped)
has_context_marker = any(marker in stripped or marker in compact for marker in _ZH_SNIPPET_EVIDENCE_MARKERS)
tokens = cls._tokenize_sentence(stripped)
normalized_tokens = [
cls._normalize_snippet_token(token)
for token in tokens
if cls._normalize_snippet_token(token)
]
if len(compact) <= 2:
quality -= 0.4
elif len(compact) <= 4 and not has_context_marker:
quality -= 0.2
if stripped.startswith(_ZH_SNIPPET_FRAGMENT_PREFIXES):
quality -= 0.3
if stripped.endswith(_ZH_SNIPPET_FRAGMENT_SUFFIXES):
quality -= 0.4
anchor_span = cls._find_signal_anchor_span(tokens, matched_text)
if anchor_span is not None:
anchor_start, anchor_end = anchor_span
anchor_at_edge = anchor_start == 0 or anchor_end == len(tokens)
modifier_tokens = [
normalized_tokens[idx]
for idx in range(len(normalized_tokens))
if idx < anchor_start or idx >= anchor_end
]
if anchor_at_edge and len(normalized_tokens) <= 2:
quality -= 0.22
if anchor_at_edge and len(normalized_tokens) <= 4 and ("," in stripped or "," in stripped):
quality -= 0.32
if (
anchor_end == len(tokens)
and len(normalized_tokens) <= 3
and ("," in stripped or "," in stripped)
):
quality -= 0.22
if (
anchor_start == 0
and len(normalized_tokens) <= 4
and modifier_tokens
and all(
token in _ZH_SNIPPET_GENERIC_EXPERIENCE_TOKENS
or token in _SNIPPET_WEAK_CONTEXT_TOKENS
for token in modifier_tokens
)
):
quality -= 0.24
if has_context_marker:
quality += 0.1
return round(max(0.0, min(1.0, quality)), 3)
@classmethod
def _is_meaningful_topic_context_token(cls, token: str) -> bool:
normalized = cls._normalize_signal_token(token)
if not normalized:
return False
return (
cls._is_sentiment_bearing_signal_token(token)
or normalized in _GLOBAL_SIGNAL_GENERIC_TOPIC_LABEL_TOKENS
or normalized in _GLOBAL_SIGNAL_STANDALONE_ALLOWED_TOKENS
or normalized in _TOPIC_CONTEXT_MEANINGFUL_TOKENS
)
@classmethod
def _is_title_like_topic_modifier_token(cls, token: str) -> bool:
normalized = cls._normalize_signal_token(token)
if not normalized or normalized in _GLOBAL_SIGNAL_WEAK_TOKENS:
return False
if cls._is_meaningful_topic_context_token(token):
return False
if any(char.isdigit() for char in normalized):
return True
if normalized.isascii():
return len(normalized) >= 2 and any(char.isalpha() for char in normalized)
return len(normalized) <= 4
@classmethod
def _is_bare_topic_anchor_fragment(cls, text: str, matched_text: str | None) -> bool:
if not matched_text:
return False
tokens = cls._tokenize_sentence(text)
if len(tokens) < 3:
return False
anchor_span = cls._find_signal_anchor_span(tokens, matched_text)
if anchor_span is None:
return False
anchor_start, anchor_end = anchor_span
anchor_tokens = tokens[anchor_start:anchor_end]
modifier_tokens = tokens[:anchor_start] + tokens[anchor_end:]
if len(anchor_tokens) != 1 or len(modifier_tokens) < 2:
return False
# Pure title/DLC fragments usually sit on one side of a single topic anchor.
if anchor_start > 0 and anchor_end < len(tokens):
return False
if any(cls._is_meaningful_topic_context_token(token) for token in modifier_tokens):
return False
return all(cls._is_title_like_topic_modifier_token(token) for token in modifier_tokens)
def _add_topic_snippet(
self,
*,
topic: str,
sentence: str,
sentiment_score: float,
categories: list[str] | None,
topic_match_texts: dict[str, str] | None,
) -> None:
matched_text = topic_match_texts.get(topic) if topic_match_texts else None
if matched_text:
snippet = self._extract_topic_snippet(
sentence,
matched_text,
other_matched_texts=[
other_text
for other_topic, other_text in (topic_match_texts or {}).items()
if other_topic != topic
],
)
else:
snippet = re.sub(r"\s+", " ", sentence).strip(_SNIPPET_TRIM_CHARS)
normalized = self._normalize_snippet(snippet)
if not snippet or not normalized:
return
self._record_topic_snippet(
topic_snippets=self._topic_snippets,
context_key="general",
topic=topic,
normalized=normalized,
snippet=snippet,
sentiment_score=sentiment_score,
matched_text=matched_text,
)
for context in categories or []:
if context not in self._context_topic_snippets:
continue
self._record_topic_snippet(
topic_snippets=self._context_topic_snippets[context],
context_key=context,
topic=topic,
normalized=normalized,
snippet=snippet,
sentiment_score=sentiment_score,
matched_text=matched_text,
)
def _record_topic_snippet(
self,
*,
topic_snippets: dict[str, dict[str, dict[str, Any]]],
context_key: str,
topic: str,
normalized: str,
snippet: str,
sentiment_score: float,
matched_text: str | None,
) -> None:
readability = self._snippet_readability(snippet)
evidence_quality = self._snippet_evidence_quality(snippet, matched_text)
topic_bucket = topic_snippets[topic]
stats = topic_bucket.get(normalized)
if stats is None:
stats = {
"text": snippet,
"mention_count": 0,
"score_sum": 0.0,
"score_count": 0,
"readability": readability,
"quality": evidence_quality,
"has_anchor": matched_text is not None,
# Source drill-down: first reviewer is the representative source
"source_steamid": self._current_review_steamid,
"source_recommendation_id": self._current_review_recommendation_id,
# Compact source metadata (#49)
"source_timestamp_created": self._current_review_timestamp_created,
"source_playtime_at_review": self._current_review_playtime_at_review,
"source_voted_up": self._current_review_voted_up,
"source_language": self._current_review_language,
"source_steam_purchase": self._current_review_steam_purchase,
"source_received_for_free": self._current_review_received_for_free,
"source_written_during_early_access": self._current_review_written_during_early_access,
}
topic_bucket[normalized] = stats
elif readability > stats["readability"] or (
readability == stats["readability"] and len(snippet) < len(stats["text"])
):
stats["text"] = snippet
stats["readability"] = readability
stats["quality"] = max(stats["quality"], evidence_quality)
stats["has_anchor"] = stats["has_anchor"] or (matched_text is not None)
stats["score_sum"] += sentiment_score
stats["score_count"] += 1
review_seen_key = (context_key, topic, normalized)
if review_seen_key not in self._current_review_seen_snippets:
stats["mention_count"] += 1
self._current_review_seen_snippets.add(review_seen_key)
@staticmethod
def _snippets_are_near_duplicates(first: str, second: str) -> bool:
if first == second or first in second or second in first:
return True
return SequenceMatcher(None, first, second).ratio() >= 0.88
def _compute_topic_snippet_highlights(
self,
topic: str,
top_n: int,
*,
topic_snippets: dict[str, dict[str, dict[str, Any]]] | None = None,
) -> list[dict[str, Any]]:
snippet_source = topic_snippets if topic_snippets is not None else self._topic_snippets
snippet_stats = snippet_source.get(topic, {})
if not snippet_stats:
return []
candidates: list[dict[str, Any]] = []
for normalized, stats in snippet_stats.items():
mention_count = stats["mention_count"]
if mention_count <= 0:
continue
avg_score = stats["score_sum"] / max(1, stats["score_count"])
candidate: dict[str, Any] = {
"text": stats["text"],
"mention_count": mention_count,
"score": round(avg_score, 3),
"sentiment": self._snippet_sentiment(avg_score),
"_readability": stats["readability"],
"_quality": stats.get("quality", stats["readability"]),
"_canonical": normalized,
"_has_anchor": stats.get("has_anchor", False),
}
# Source drill-down: propagate representative review identity and metadata
src_steamid = stats.get("source_steamid")
src_rec_id = stats.get("source_recommendation_id")
if src_steamid is not None:
candidate["source_steamid"] = src_steamid
if src_rec_id is not None:
candidate["source_recommendation_id"] = src_rec_id
# Compact source metadata (#49)
for meta_key in (
"source_timestamp_created",
"source_playtime_at_review",
"source_voted_up",
"source_language",
"source_steam_purchase",
"source_received_for_free",
"source_written_during_early_access",
):
meta_val = stats.get(meta_key)
if meta_val is not None:
candidate[meta_key] = meta_val
candidates.append(candidate)
candidates.sort(
key=lambda item: (
-item["_quality"],
-item["mention_count"],
-abs(item["score"]),
len(item["text"]),
item["text"],
)
)
deduped: list[dict[str, Any]] = []
for candidate in candidates:
if any(
self._snippets_are_near_duplicates(candidate["_canonical"], existing["_canonical"])
for existing in deduped
):
continue
deduped.append(candidate)
qualified = [
candidate
for candidate in deduped
if (
candidate["_has_anchor"]
and (
candidate["_quality"] >= 0.65
or (candidate["_quality"] >= 0.55 and candidate["mention_count"] >= 2)
)
) or (
not candidate["_has_anchor"]
and (
candidate["_quality"] >= 0.55
or (candidate["_quality"] >= 0.45 and candidate["mention_count"] >= 2)
)
)
]
selected = qualified[: min(top_n, 3)]
for candidate in selected:
candidate.pop("_readability", None)
candidate.pop("_quality", None)
candidate.pop("_canonical", None)
candidate.pop("_has_anchor", None)
return selected
def _topic_ngram_is_relevant(
self,
*,
ngram: str,
topic: str,
topic_match_texts: dict[str, str] | None,
) -> bool:
if not topic_match_texts or topic not in topic_match_texts:
return True
matched_text = self._normalize_for_topic_match(topic_match_texts[topic])
if not matched_text:
return True
if matched_text not in ngram:
return False
for other_topic, other_matched_text in topic_match_texts.items():
if other_topic == topic:
continue
normalized_other = self._normalize_for_topic_match(other_matched_text)
if normalized_other and normalized_other in ngram:
return False
return True
def _compute_tfidf_highlights(
self,
counter: Counter,
top_n: int,
*,
prefer_longer_phrases: bool = False,
) -> list[dict]:
"""TF-IDF scoring + filtering + dedup."""
candidates = []
n = self._review_count
total_count = sum(counter.values()) if counter.values() else 1
for ngram, count in counter.items():
df = self._ngram_doc_freq.get(ngram, 0)
if df < settings.highlights_min_mentions:
continue
if df / n > settings.highlights_max_doc_freq_ratio:
continue
idf = math.log(n / df) if df > 0 else 0
tf = count / total_count
tfidf = tf * idf
rank_score = count * tfidf
# Oblicz sredni sentyment z sumy i liczby
s_sum = self._ngram_sentiment_sum.get(ngram, 0.0)
s_count = self._ngram_sentiment_count.get(ngram, 0)
avg_score = s_sum / s_count if s_count > 0 else 0.0
candidates.append({
"phrase": ngram,
"mention_count": df,
"score": round(avg_score, 3),
"sentiment": (
"positive" if avg_score > settings.sentiment_positive_threshold
else "negative" if avg_score < settings.sentiment_negative_threshold
else "neutral"
),
"ngram_size": len(ngram.split()),
"_rank": rank_score,
})
if prefer_longer_phrases:
candidates.sort(
key=lambda x: (-x["_rank"], -x["ngram_size"], -x["mention_count"], -x["score"], x["phrase"])
)
else:
candidates.sort(key=lambda x: x["_rank"], reverse=True)
# Substring absorption
absorbed: set[int] = set()
for i, c in enumerate(candidates):
if i in absorbed:
continue
for j in range(i + 1, len(candidates)):
if j in absorbed:
continue
if candidates[j]["phrase"] in c["phrase"]:
parent_has_neg = any(neg in c["phrase"] for neg in ["不", "没", "无"])
child_has_neg = any(neg in candidates[j]["phrase"] for neg in ["不", "没", "无"])
if parent_has_neg == child_has_neg:
absorbed.add(j)
results = [c for i, c in enumerate(candidates) if i not in absorbed]
# Re-sort by mention_count descending for display order.
# TF-IDF sort above selected the top candidates; this ensures the final
# list the UI receives is ordered from most-mentioned to least-mentioned,
# with score and phrase as stable tie-breakers.
if prefer_longer_phrases:
results.sort(key=lambda x: (-x["mention_count"], -x["ngram_size"], -x["score"], x["phrase"]))
else:
results.sort(key=lambda x: (-x["mention_count"], -x["score"], x["phrase"]))
for r in results[:top_n]:
r.pop("_rank", None)
return results[:top_n]