GitHub Action
deploy: worker release from GitHub
fe4486b
"""
Serwis NLP do analizy sentymentu i modelowania tematów.
Architektura: Local Inference (CPU).
Wykorzystuje model Transformer (DistilBERT) uruchamiany bezpośrednio w aplikacji,
co eliminuje opóźnienia sieciowe i zapewnia deterministyczny czas wykonania.
Optymalizacje:
1. Pre-kompilacja wzorców Regex (O(1) matching).
2. Wykonywanie inferencji w Executorze (nie blokuje Event Loop).
3. Batching zapytań do modelu (wykorzystanie instrukcji wektorowych CPU).
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from collections import OrderedDict, defaultdict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING
from pathlib import Path
import jieba
from transformers import AutoTokenizer, pipeline
from optimum.onnxruntime import ORTModelForSequenceClassification
from zhconv import convert
from app.core.config import settings
from app.core.keywords import is_runtime_emittable_topic, load_structured_lexicon
from app.core.worker_logging import StageTimingAccumulator
from app.models.schemas import SentimentType, StructuredLexiconTerm, TopicSentiment
if TYPE_CHECKING:
from app.services.highlights_service import HighlightsCollector
logger = logging.getLogger(__name__)
CARD_LAG_PREFIXES = frozenset({"不", "很", "好", "太", "真", "挺", "老", "总"})
CARD_STANDALONE_PREVIOUS_TOKENS = frozenset({"有点", "一直", "偶尔"})
# Zakresy Unicode dla Emoji i symboli graficznych
# UWAGA: Poprzedni pattern "\U000024C2-\U0001F251" był zbyt szeroki i usuwał chińskie znaki!
# Teraz używamy precyzyjnych zakresów tylko dla emoji.
EMOJI_PATTERN = re.compile(
"["
"\U0001F600-\U0001F64F" # Emoticons
"\U0001F300-\U0001F5FF" # Misc Symbols and Pictographs
"\U0001F680-\U0001F6FF" # Transport and Map
"\U0001F1E0-\U0001F1FF" # Flags (iOS)
"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
"\U0001FA00-\U0001FA6F" # Chess Symbols
"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
"\U00002702-\U000027B0" # Dingbats
"\U0000FE00-\U0000FE0F" # Variation Selectors
"]+",
flags=re.UNICODE,
)
# Inteligentny podział na zdania (wspiera angielski i chiński)
# Chiński: 。!?;
# Angielski: .!?
# Interpunkcja do usunięcia przy deduplikacji (EN + ZH)
DEDUP_PUNCTUATION = re.compile(r'[!"#$%&\'()*+,\-./:;<=>?@\[\\\]^_`{|}~。!?,、;:""''【】()《》~…·]')
SENTENCE_SPLIT_PATTERN = re.compile(r"""
(?<=[.!?。!?;])\s* # Koniec zdania (EN + ZH punctuation)
| # LUB
(?<=[a-z]),\s+ # Przecinek po literze + spacja...
(?=but\b|however\b|although\b|though\b) # ...przed spójnikiem przeciwstawnym (EN)
|
\s+(?=but\b|however\b|although\b|though\b) # Spójnik bez przecinka (EN)
|
(?<=。|!|?|;) # Po chińskiej interpunkcji (bez spacji)
|
(?=但是|然而|虽然|不过|可是) # Przed chińskim spójnikiem przeciwstawnym
""", re.VERBOSE | re.IGNORECASE)
_MULTIPLAYER_ABSENCE_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(
r"(?:没有|没|无|别指望|不存在|缺少|不带|不含)"
r"(?:.{0,6})?(?:联机|多人|匹配|合作|组队)",
re.IGNORECASE,
),
re.compile(
r"(?:不支持|不支援|不能|无法)"
r"(?:.{0,6})?(?:联机|多人|匹配|合作|组队)",
re.IGNORECASE,
),
re.compile(
r"\b(?:no|without)\s+(?:multiplayer|co-?op|coop|pvp|pve)\b",
re.IGNORECASE,
),
)
@dataclass(frozen=True)
class BatchAnalysisResult:
"""Per-batch NLP output with backward-compatible tuple unpacking."""
topics: list[TopicSentiment]
skipped_sentences: int
total_sentences: int | None
topic_bearing_review_count: int | None
def __iter__(self):
yield self.topics
yield self.skipped_sentences
class NLPService:
"""
Serwis NLP realizujący analizę hybrydową:
1. Słowa kluczowe (Regex) -> Wykrywanie tematów.
2. DistilBERT (Local Model) -> Analiza sentymentu.
"""
def __init__(self) -> None:
"""
Inicjalizuje pipeline ML oraz kompiluje wzorce tekstowe.
Model ładowany jest raz przy starcie aplikacji (Singleton pattern).
"""
logger.info("Inicjalizacja serwisu NLP (ONNX Optimized)...")
# 0. Jieba user dict — terminy gamingowe
userdict_path = Path(__file__).parent.parent / "core" / "jieba_userdict.txt"
if userdict_path.exists():
jieba.load_userdict(str(userdict_path))
logger.info(f"Załadowano jieba user dict: {userdict_path}")
# 1. Kompilacja Regexów
# Łączymy słowa kluczowe w jeden efektywny "automat" (Regex).
# UWAGA: \b nie działa z chińskimi znakami, więc używamy różnych wzorców
# dla słów ASCII (z \b) i chińskich (bez \b).
self.topic_patterns: dict[str, re.Pattern[str]] = {}
self.single_char_topic_keywords: dict[str, list[str]] = {}
self.exclusion_patterns: dict[str, re.Pattern[str]] = {}
exclusion_contexts_by_keyword: dict[str, list[str]] = {}
lexicon = load_structured_lexicon()
self._topic_ids = [topic.id for topic in lexicon.topics]
self._topic_terms = {topic.id: list(topic.terms) for topic in lexicon.topics}
self._emitted_topic_ids = {
topic.id for topic in lexicon.topics if is_runtime_emittable_topic(topic)
}
for topic in lexicon.topics:
ascii_keywords: list[str] = []
chinese_keywords: list[str] = []
chinese_single_char_keywords: list[str] = []
for term in topic.terms:
keyword = term.text
if keyword.isascii():
ascii_keywords.append(keyword)
elif term.match_bucket.value == "single_char" and len(keyword) == 1:
chinese_single_char_keywords.append(keyword)
else:
chinese_keywords.append(keyword)
if term.exclusion_contexts:
term_exclusions = exclusion_contexts_by_keyword.setdefault(keyword, [])
for exclusion in term.exclusion_contexts:
if exclusion not in term_exclusions:
term_exclusions.append(exclusion)
self.single_char_topic_keywords[topic.id] = chinese_single_char_keywords
patterns = []
if ascii_keywords:
# Use word boundaries for ASCII keywords
sorted_ascii = sorted(ascii_keywords, key=len, reverse=True)
patterns.append(r'\b(' + '|'.join(re.escape(k) for k in sorted_ascii) + r')\b')
if chinese_keywords:
# No word boundaries for Chinese (they don't have spaces),
# but prefer longer keywords so compounds win over partial overlaps.
sorted_chinese = sorted(chinese_keywords, key=len, reverse=True)
patterns.append('(' + '|'.join(re.escape(k) for k in sorted_chinese) + ')')
if patterns:
combined_pattern = '|'.join(patterns)
self.topic_patterns[topic.id] = re.compile(combined_pattern, re.IGNORECASE)
for keyword, exclusions in exclusion_contexts_by_keyword.items():
if exclusions:
pattern_str = '|'.join(re.escape(e) for e in exclusions)
self.exclusion_patterns[keyword] = re.compile(pattern_str, re.IGNORECASE)
# 2. Ładowanie modelu ONNX
logger.info(f"Ładowanie modelu ONNX {settings.hf_sentiment_model}...")
try:
from onnxruntime import GraphOptimizationLevel, SessionOptions
# OPTYMALIZACJA DLA HF SPACES (Shared CPU)
# Na darmowym tierze mamy 2 vCPU. Ograniczenie wątków zapobiega
# "context switching" i walce o zasoby.
session_options = SessionOptions()
session_options.intra_op_num_threads = settings.nlp_onnx_intra_threads
session_options.inter_op_num_threads = settings.nlp_onnx_inter_threads
session_options.graph_optimization_level = GraphOptimizationLevel.ORT_ENABLE_ALL
# Load pre-built quantized INT8 ONNX model (no PyTorch needed at runtime)
quantized_path = Path(__file__).resolve().parent.parent.parent / "models" / "quantized"
model_file = quantized_path / "model_quantized.onnx"
if not model_file.exists():
raise FileNotFoundError(
f"Quantized ONNX model not found at {model_file}. "
"Run 'python scripts/quantize_model.py' to generate it."
)
logger.info(f"Loading quantized INT8 model from {quantized_path}")
model = ORTModelForSequenceClassification.from_pretrained(
str(quantized_path),
file_name="model_quantized.onnx",
session_options=session_options,
)
tokenizer = AutoTokenizer.from_pretrained(str(quantized_path))
self.classifier = pipeline(
"sentiment-analysis",
model=model,
tokenizer=tokenizer,
device="cpu",
)
logger.info("Model NLP ONNX ready: INT8 quantized, graph_optimization=ALL")
except Exception as e:
# Deliberate broad catch — model loading can fail with OSError, RuntimeError,
# ONNX errors, HF Hub errors, etc. Always fatal, always re-raised.
logger.error(f"Krytyczny błąd ładowania modelu ONNX: {e}")
raise
# Pula wątków, żeby ciężkie obliczenia AI nie blokowały serwera (Event Loop)
self.executor = ThreadPoolExecutor(max_workers=1)
# Cache sentymentu: normalized_text -> (label_str, score)
self._sentiment_cache: OrderedDict[str, tuple[str, float]] = OrderedDict()
self._cache_maxsize = settings.dedup_cache_maxsize
def clean_text(self, text: str) -> str:
"""Usuwa szum (emoji, nadmiarowe spacje) i normalizuje tekst."""
text = EMOJI_PATTERN.sub("", text)
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
max_len = settings.text_max_length
return text[:max_len] if len(text) > max_len else text
def _normalize_for_dedup(self, text: str) -> str:
"""Normalizuje zdanie do klucza deduplikacji (zachowuje kolejność słów)."""
text = DEDUP_PUNCTUATION.sub("", text).lower()
text = re.sub(r"\s+", " ", text).strip()
return convert(text, 'zh-cn')
def _split_into_sentences(self, text: str) -> list[str]:
"""Rozbija recenzję na logiczne jednostki (zdania/klauzule)."""
parts = SENTENCE_SPLIT_PATTERN.split(text)
return [p.strip() for p in parts if p and p.strip()]
def split_review_into_sentences(self, review: str) -> list[str]:
"""Public helper for review-level sentence/clause segmentation."""
cleaned = self.clean_text(review)
if not cleaned or len(cleaned) < 5:
return []
return self._split_into_sentences(cleaned)
def _has_negation(self, text: str, position: int) -> bool:
"""
Wykrywa negację przed słowem kluczowym (w zasięgu zdefiniowanym w configu).
Przydatne przy precyzyjniejszej analizie aspektowej w języku chińskim.
"""
window = settings.nlp_negation_window
left_context = text[max(0, position-window):position]
return any(neg in left_context for neg in ["不", "没", "别", "无"])
@staticmethod
def _is_valid_single_char_token(keyword: str, token: str, previous_token: str | None) -> bool:
"""Waliduje pojedynczy chiński keyword w kontekście całego tokenu."""
if keyword != "卡":
return True
if token == keyword:
return previous_token is None or previous_token in CARD_STANDALONE_PREVIOUS_TOKENS
return token.endswith(keyword) and token[:-1] in CARD_LAG_PREFIXES
def _find_single_char_keyword_match(self, sentence: str, keywords: list[str]) -> tuple[int, str] | None:
"""Zwraca pierwszy poprawny match dla chińskiego single-char keywordu."""
if not keywords:
return None
keyword_set = set(keywords)
tokenized_sentence = list(jieba.tokenize(sentence))
for index, (token, start, _) in enumerate(tokenized_sentence):
previous_token = tokenized_sentence[index - 1][0] if index > 0 else None
for offset, char in enumerate(token):
if char not in keyword_set:
continue
if self._is_valid_single_char_token(char, token, previous_token):
return start + offset, char
return None
def _resolve_matched_term(
self,
topic: str,
matched_word: str,
) -> StructuredLexiconTerm | None:
normalized_word = matched_word.lower() if matched_word.isascii() else matched_word
for term in self._topic_terms.get(topic, []):
candidate = term.text.lower() if term.text.isascii() else term.text
if candidate == normalized_word:
return term
return None
def _topic_has_disambiguating_context(
self,
topic: str,
sentence_simp: str,
matched_word: str,
) -> bool:
normalized_word = matched_word.lower() if matched_word.isascii() else matched_word
for term in self._topic_terms.get(topic, []):
candidate = term.text.lower() if term.text.isascii() else term.text
if candidate == normalized_word:
continue
if term.needs_extra_context and not term.public_ui_safe:
continue
if term.text.isascii():
if re.search(rf"\b{re.escape(term.text.lower())}\b", sentence_simp, re.IGNORECASE):
return True
elif term.match_bucket.value == "single_char" and len(term.text) == 1:
if self._find_single_char_keyword_match(sentence_simp, [term.text]) is not None:
return True
elif term.text in sentence_simp:
return True
return False
def _detect_topic_matches_regex(self, sentence: str) -> dict[str, tuple[bool, str]]:
"""
Szybkie wykrywanie tematów przy użyciu prekompilowanych regexów.
Złożoność: O(N) względem długości zdania, niezależnie od liczby słów kluczowych.
"""
detected: dict[str, tuple[bool, str]] = {}
# Konwersja TYMCZASOWA na uproszczony chiński dla potrzeb matchowania.
# Dzięki temu zachowujemy oryginalny tekst (tradycyjny/uproszczony) w bazie,
# ale słownik keywords.py może pozostać w zh-cn.
sentence_simp = convert(sentence, 'zh-cn')
for topic in self._topic_ids:
regex_match = None
if topic in self.topic_patterns:
regex_match = self.topic_patterns[topic].search(sentence_simp)
single_char_match = self._find_single_char_keyword_match(
sentence_simp,
self.single_char_topic_keywords.get(topic, []),
)
matched_word: str | None = None
match_start: int | None = None
if regex_match and single_char_match:
if single_char_match[0] < regex_match.start():
match_start, matched_word = single_char_match
else:
match_start = regex_match.start()
matched_word = regex_match.group(0).lower()
elif regex_match:
match_start = regex_match.start()
matched_word = regex_match.group(0).lower()
elif single_char_match:
match_start, matched_word = single_char_match
if matched_word is not None and match_start is not None:
is_excluded = False
matched_term = self._resolve_matched_term(topic, matched_word)
if matched_word in self.exclusion_patterns:
if self.exclusion_patterns[matched_word].search(sentence_simp):
is_excluded = True
if not is_excluded:
if (
matched_term is not None
and matched_term.needs_extra_context
and not matched_term.public_ui_safe
and matched_term.match_bucket.value != "single_char"
and not self._topic_has_disambiguating_context(topic, sentence_simp, matched_word)
):
continue
if (
topic == "Multiplayer"
and any(pat.search(sentence_simp) for pat in _MULTIPLAYER_ABSENCE_PATTERNS)
):
continue
negated = self._has_negation(sentence_simp, match_start)
detected[topic] = (negated, matched_word)
return detected
def _detect_topics_regex(self, sentence: str) -> dict[str, bool]:
"""Backward-compatible wrapper returning only topic -> negated."""
return {
topic: is_negated
for topic, (is_negated, _) in self._detect_topic_matches_regex(sentence).items()
}
def _filter_runtime_emittable_matches(
self,
detected_topics: dict[str, tuple[bool, str]],
) -> dict[str, tuple[bool, str]]:
"""Drop helper-only topics from public aggregates while keeping behavioral layers."""
return {
topic: match
for topic, match in detected_topics.items()
if topic in self._emitted_topic_ids
}
def _run_inference(self, texts: list[str]) -> list[dict]:
"""Wrapper dla pipeline'u Hugging Face uruchamiany w wątku."""
# batch_size=16 optymalizuje operacje macierzowe na CPU (AVX)
# truncation=True, max_length=512 zapobiega przekroczeniu limitu pozycji ONNX
# (max_position_embeddings=512); pipeline uwzględnia tokeny specjalne automatycznie
return self.classifier(texts, batch_size=16, truncation=True, max_length=512)
@staticmethod
def _map_label(label_str: str, score: float) -> tuple[SentimentType, float]:
"""Mapuje surowy label modelu na (SentimentType, score)."""
label_lower = label_str.lower()
if 'positive' in label_lower or 'label_1' in label_lower:
return (SentimentType.POSITIVE, score)
elif 'negative' in label_lower or 'label_0' in label_lower:
return (SentimentType.NEGATIVE, -score)
return (SentimentType.NEUTRAL, 0.0)
def _cache_put(self, key: str, value: tuple[str, float]) -> None:
"""Dodaje wynik do cache LRU, usuwa najstarsze jeśli przekroczono limit."""
self._sentiment_cache[key] = value
self._sentiment_cache.move_to_end(key)
while len(self._sentiment_cache) > self._cache_maxsize:
self._sentiment_cache.popitem(last=False)
async def analyze_sentiment_batch(
self, texts: list[str]
) -> list[tuple[SentimentType, float]]:
"""
Asynchroniczny interfejs do analizy sentymentu.
Offloaduje obliczenia do osobnego wątku, nie blokując API.
Wykorzystuje cache LRU do pomijania powtórzonych zdań.
"""
cleaned_texts = [self.clean_text(t) for t in texts]
norm_keys = [self._normalize_for_dedup(t) for t in cleaned_texts]
# Rozdziel na cache hits i misses
final_sentiments: list[tuple[SentimentType, float]] = [(SentimentType.NEUTRAL, 0.0)] * len(texts)
miss_indices: list[int] = [] # indeksy w cleaned_texts, które trzeba wysłać do modelu
miss_texts: list[str] = []
for i, (cleaned, key) in enumerate(zip(cleaned_texts, norm_keys)):
if not cleaned:
continue
cached = self._sentiment_cache.get(key)
if cached is not None:
self._sentiment_cache.move_to_end(key)
final_sentiments[i] = self._map_label(cached[0], cached[1])
else:
miss_indices.append(i)
miss_texts.append(cleaned)
cache_hits = len(texts) - len(miss_texts)
logger.debug(f"Cache: {cache_hits} hits, {len(miss_texts)} misses (cache size: {len(self._sentiment_cache)})")
if not miss_texts:
return final_sentiments
# Uruchomienie modelu TYLKO na cache-misses
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(self.executor, self._run_inference, miss_texts)
for j, res in enumerate(results):
original_idx = miss_indices[j]
label_str = res['label']
score = res['score']
# Zapisz surowy wynik w cache
self._cache_put(norm_keys[original_idx], (label_str, score))
final_sentiments[original_idx] = self._map_label(label_str, score)
return final_sentiments
async def analyze_batch(
self,
reviews: list[str],
highlights_collector: HighlightsCollector | None = None,
categories: list[str] | None = None,
extra_highlights_collectors: list[tuple[HighlightsCollector, list[str] | None]] | None = None,
stage_timings: StageTimingAccumulator | None = None,
) -> BatchAnalysisResult:
"""
Główna metoda przetwarzania partii recenzji.
Łączy segmentację, wykrywanie tematów i analizę sentymentu.
"""
if not reviews:
return BatchAnalysisResult(
topics=[],
skipped_sentences=0,
total_sentences=0,
topic_bearing_review_count=0,
)
# Krok 1: Pre-processing i identyfikacja zdań do analizy
sentiment_tasks = []
skipped_sentences = 0
total_sentences = 0
topic_bearing_review_count = 0
for review_idx, review in enumerate(reviews):
if highlights_collector:
highlights_collector.start_review()
for extra_collector, _ in extra_highlights_collectors or []:
extra_collector.start_review()
split_start = time.monotonic()
sentences = self.split_review_into_sentences(review)
total_sentences += len(sentences)
if stage_timings is not None:
stage_timings.add(
"sentence_split_preprocess_s",
time.monotonic() - split_start,
)
detect_start = time.monotonic()
review_has_detected_topic = False
for sentence in sentences:
topics_map = self._filter_runtime_emittable_matches(
self._detect_topic_matches_regex(sentence)
)
if topics_map:
review_has_detected_topic = True
sentence_topic_matches = {
topic: matched_word
for topic, (_, matched_word) in topics_map.items()
}
for topic, (is_negated, matched_word) in topics_map.items():
sentiment_tasks.append(
(review_idx, topic, sentence, is_negated, matched_word, sentence_topic_matches)
)
else:
skipped_sentences += 1
if stage_timings is not None:
stage_timings.add(
"topic_detection_aggregation_s",
time.monotonic() - detect_start,
)
if review_has_detected_topic:
topic_bearing_review_count += 1
if not sentiment_tasks:
return BatchAnalysisResult(
topics=[],
skipped_sentences=skipped_sentences,
total_sentences=total_sentences,
topic_bearing_review_count=topic_bearing_review_count,
)
# Krok 2: Deduplikacja + Analiza sentymentu
preprocess_start = time.monotonic()
all_sentences = [task[2] for task in sentiment_tasks]
# Deduplikacja: normalizuj -> znajdź unikalne -> inference tylko na unikatach
norm_keys = [self._normalize_for_dedup(s) for s in all_sentences]
unique_map: dict[str, int] = {} # normalized_key -> index in unique_texts
unique_texts: list[str] = []
for i, key in enumerate(norm_keys):
if key not in unique_map:
unique_map[key] = len(unique_texts)
unique_texts.append(all_sentences[i])
dedup_total = len(all_sentences)
dedup_unique = len(unique_texts)
dedup_pct = round((1 - dedup_unique / dedup_total) * 100) if dedup_total else 0
logger.debug(f"Dedup: {dedup_total} -> {dedup_unique} sentences ({dedup_pct}% reduced)")
if stage_timings is not None:
stage_timings.add(
"sentence_split_preprocess_s",
time.monotonic() - preprocess_start,
)
sentiment_start = time.monotonic()
unique_results = await self.analyze_sentiment_batch(unique_texts)
if stage_timings is not None:
stage_timings.add(
"sentiment_model_s",
time.monotonic() - sentiment_start,
)
# Mapowanie wyników z unikalnych z powrotem na wszystkie zdania
sentiment_results = [unique_results[unique_map[key]] for key in norm_keys]
# Krok 3: Agregacja wyników
# review_id -> topic -> list of scores
review_topic_scores: dict[int, dict[str, list[float]]] = defaultdict(lambda: defaultdict(list))
# topic -> (sentence, score) - online selection najlepszego przykładu
topic_best_example: dict[str, tuple[str, float]] = {}
aggregation_start = time.monotonic()
highlights_elapsed = 0.0
for i, (review_idx, topic, sentence, is_negated, matched_word, sentence_topic_matches) in enumerate(sentiment_tasks):
_, score = sentiment_results[i]
# KULOODPORNY PIPELINE: Jeśli wykryto negację (np. "nie lubię gameplayu"),
# a model mimo to zwrócił dodatni sentyment, korygujemy go.
if is_negated and score > 0:
score = -score
review_topic_scores[review_idx][topic].append(score)
collector_start = time.monotonic()
if highlights_collector:
highlights_collector.add_sentence(
review_idx=review_idx,
sentence=sentence,
topics=[topic],
sentiment_score=score,
categories=categories,
topic_match_texts=sentence_topic_matches,
)
for extra_collector, extra_categories in extra_highlights_collectors or []:
extra_collector.add_sentence(
review_idx=review_idx,
sentence=sentence,
topics=[topic],
sentiment_score=score,
categories=extra_categories,
topic_match_texts=sentence_topic_matches,
)
highlights_elapsed += time.monotonic() - collector_start
# Online selection - aktualizuj jeśli lepszy kandydat (wyższy |score|)
if len(sentence) > 20:
current = topic_best_example.get(topic)
if current is None or abs(score) > abs(current[1]):
topic_best_example[topic] = (sentence, score)
# Agregacja globalna: Średnia per recenzja -> Suma globalna
global_topic_stats: dict[str, dict[str, float]] = defaultdict(lambda: {"sum_score": 0.0, "count": 0.0})
for review_idx, topics_data in review_topic_scores.items():
for topic, scores in topics_data.items():
avg_review_score = sum(scores) / len(scores)
global_topic_stats[topic]["sum_score"] += avg_review_score
global_topic_stats[topic]["count"] += 1.0
# Krok 4: Formatowanie końcowe
final_results: list[TopicSentiment] = []
for topic_name, stats in global_topic_stats.items():
count = int(stats["count"])
if count == 0:
continue
avg_global_score = stats["sum_score"] / stats["count"]
normalized_score = max(-1.0, min(1.0, avg_global_score))
if normalized_score > settings.sentiment_positive_threshold:
sentiment = SentimentType.POSITIVE
elif normalized_score < settings.sentiment_negative_threshold:
sentiment = SentimentType.NEGATIVE
else:
sentiment = SentimentType.NEUTRAL
# Pobierz najlepszy przykład i zwaliduj zgodność kierunku
best_example = None
example_score = None
candidate = topic_best_example.get(topic_name)
if candidate:
ex_sentence, ex_score = candidate
# Walidacja: przykład musi być zgodny z kierunkiem sentymentu
if sentiment == SentimentType.NEUTRAL or \
(sentiment == SentimentType.POSITIVE and ex_score > 0) or \
(sentiment == SentimentType.NEGATIVE and ex_score < 0):
best_example = ex_sentence
example_score = ex_score
final_results.append(
TopicSentiment(
topic=topic_name,
sentiment=sentiment,
score=round(normalized_score, 3),
mention_count=count,
example=best_example,
example_score=example_score,
)
)
final_results.sort(key=lambda x: x.mention_count, reverse=True)
if stage_timings is not None:
stage_timings.add(
"topic_detection_aggregation_s",
max(0.0, time.monotonic() - aggregation_start - highlights_elapsed),
)
return BatchAnalysisResult(
topics=final_results,
skipped_sentences=skipped_sentences,
total_sentences=total_sentences,
topic_bearing_review_count=topic_bearing_review_count,
)
_nlp_service: "NLPService | None" = None
def get_nlp_service() -> "NLPService":
global _nlp_service
if _nlp_service is None:
_nlp_service = NLPService()
return _nlp_service