Spaces:
Running
Running
| """ | |
| Serwis NLP do analizy sentymentu i modelowania tematów. | |
| Architektura: Local Inference (CPU). | |
| Wykorzystuje model Transformer (DistilBERT) uruchamiany bezpośrednio w aplikacji, | |
| co eliminuje opóźnienia sieciowe i zapewnia deterministyczny czas wykonania. | |
| Optymalizacje: | |
| 1. Pre-kompilacja wzorców Regex (O(1) matching). | |
| 2. Wykonywanie inferencji w Executorze (nie blokuje Event Loop). | |
| 3. Batching zapytań do modelu (wykorzystanie instrukcji wektorowych CPU). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import re | |
| import time | |
| from collections import OrderedDict, defaultdict | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dataclasses import dataclass | |
| from typing import TYPE_CHECKING | |
| from pathlib import Path | |
| import jieba | |
| from transformers import AutoTokenizer, pipeline | |
| from optimum.onnxruntime import ORTModelForSequenceClassification | |
| from zhconv import convert | |
| from app.core.config import settings | |
| from app.core.keywords import is_runtime_emittable_topic, load_structured_lexicon | |
| from app.core.worker_logging import StageTimingAccumulator | |
| from app.models.schemas import SentimentType, StructuredLexiconTerm, TopicSentiment | |
| if TYPE_CHECKING: | |
| from app.services.highlights_service import HighlightsCollector | |
| logger = logging.getLogger(__name__) | |
| CARD_LAG_PREFIXES = frozenset({"不", "很", "好", "太", "真", "挺", "老", "总"}) | |
| CARD_STANDALONE_PREVIOUS_TOKENS = frozenset({"有点", "一直", "偶尔"}) | |
| # Zakresy Unicode dla Emoji i symboli graficznych | |
| # UWAGA: Poprzedni pattern "\U000024C2-\U0001F251" był zbyt szeroki i usuwał chińskie znaki! | |
| # Teraz używamy precyzyjnych zakresów tylko dla emoji. | |
| EMOJI_PATTERN = re.compile( | |
| "[" | |
| "\U0001F600-\U0001F64F" # Emoticons | |
| "\U0001F300-\U0001F5FF" # Misc Symbols and Pictographs | |
| "\U0001F680-\U0001F6FF" # Transport and Map | |
| "\U0001F1E0-\U0001F1FF" # Flags (iOS) | |
| "\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs | |
| "\U0001FA00-\U0001FA6F" # Chess Symbols | |
| "\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A | |
| "\U00002702-\U000027B0" # Dingbats | |
| "\U0000FE00-\U0000FE0F" # Variation Selectors | |
| "]+", | |
| flags=re.UNICODE, | |
| ) | |
| # Inteligentny podział na zdania (wspiera angielski i chiński) | |
| # Chiński: 。!?; | |
| # Angielski: .!? | |
| # Interpunkcja do usunięcia przy deduplikacji (EN + ZH) | |
| DEDUP_PUNCTUATION = re.compile(r'[!"#$%&\'()*+,\-./:;<=>?@\[\\\]^_`{|}~。!?,、;:""''【】()《》~…·]') | |
| SENTENCE_SPLIT_PATTERN = re.compile(r""" | |
| (?<=[.!?。!?;])\s* # Koniec zdania (EN + ZH punctuation) | |
| | # LUB | |
| (?<=[a-z]),\s+ # Przecinek po literze + spacja... | |
| (?=but\b|however\b|although\b|though\b) # ...przed spójnikiem przeciwstawnym (EN) | |
| | | |
| \s+(?=but\b|however\b|although\b|though\b) # Spójnik bez przecinka (EN) | |
| | | |
| (?<=。|!|?|;) # Po chińskiej interpunkcji (bez spacji) | |
| | | |
| (?=但是|然而|虽然|不过|可是) # Przed chińskim spójnikiem przeciwstawnym | |
| """, re.VERBOSE | re.IGNORECASE) | |
| _MULTIPLAYER_ABSENCE_PATTERNS: tuple[re.Pattern[str], ...] = ( | |
| re.compile( | |
| r"(?:没有|没|无|别指望|不存在|缺少|不带|不含)" | |
| r"(?:.{0,6})?(?:联机|多人|匹配|合作|组队)", | |
| re.IGNORECASE, | |
| ), | |
| re.compile( | |
| r"(?:不支持|不支援|不能|无法)" | |
| r"(?:.{0,6})?(?:联机|多人|匹配|合作|组队)", | |
| re.IGNORECASE, | |
| ), | |
| re.compile( | |
| r"\b(?:no|without)\s+(?:multiplayer|co-?op|coop|pvp|pve)\b", | |
| re.IGNORECASE, | |
| ), | |
| ) | |
| class BatchAnalysisResult: | |
| """Per-batch NLP output with backward-compatible tuple unpacking.""" | |
| topics: list[TopicSentiment] | |
| skipped_sentences: int | |
| total_sentences: int | None | |
| topic_bearing_review_count: int | None | |
| def __iter__(self): | |
| yield self.topics | |
| yield self.skipped_sentences | |
| class NLPService: | |
| """ | |
| Serwis NLP realizujący analizę hybrydową: | |
| 1. Słowa kluczowe (Regex) -> Wykrywanie tematów. | |
| 2. DistilBERT (Local Model) -> Analiza sentymentu. | |
| """ | |
| def __init__(self) -> None: | |
| """ | |
| Inicjalizuje pipeline ML oraz kompiluje wzorce tekstowe. | |
| Model ładowany jest raz przy starcie aplikacji (Singleton pattern). | |
| """ | |
| logger.info("Inicjalizacja serwisu NLP (ONNX Optimized)...") | |
| # 0. Jieba user dict — terminy gamingowe | |
| userdict_path = Path(__file__).parent.parent / "core" / "jieba_userdict.txt" | |
| if userdict_path.exists(): | |
| jieba.load_userdict(str(userdict_path)) | |
| logger.info(f"Załadowano jieba user dict: {userdict_path}") | |
| # 1. Kompilacja Regexów | |
| # Łączymy słowa kluczowe w jeden efektywny "automat" (Regex). | |
| # UWAGA: \b nie działa z chińskimi znakami, więc używamy różnych wzorców | |
| # dla słów ASCII (z \b) i chińskich (bez \b). | |
| self.topic_patterns: dict[str, re.Pattern[str]] = {} | |
| self.single_char_topic_keywords: dict[str, list[str]] = {} | |
| self.exclusion_patterns: dict[str, re.Pattern[str]] = {} | |
| exclusion_contexts_by_keyword: dict[str, list[str]] = {} | |
| lexicon = load_structured_lexicon() | |
| self._topic_ids = [topic.id for topic in lexicon.topics] | |
| self._topic_terms = {topic.id: list(topic.terms) for topic in lexicon.topics} | |
| self._emitted_topic_ids = { | |
| topic.id for topic in lexicon.topics if is_runtime_emittable_topic(topic) | |
| } | |
| for topic in lexicon.topics: | |
| ascii_keywords: list[str] = [] | |
| chinese_keywords: list[str] = [] | |
| chinese_single_char_keywords: list[str] = [] | |
| for term in topic.terms: | |
| keyword = term.text | |
| if keyword.isascii(): | |
| ascii_keywords.append(keyword) | |
| elif term.match_bucket.value == "single_char" and len(keyword) == 1: | |
| chinese_single_char_keywords.append(keyword) | |
| else: | |
| chinese_keywords.append(keyword) | |
| if term.exclusion_contexts: | |
| term_exclusions = exclusion_contexts_by_keyword.setdefault(keyword, []) | |
| for exclusion in term.exclusion_contexts: | |
| if exclusion not in term_exclusions: | |
| term_exclusions.append(exclusion) | |
| self.single_char_topic_keywords[topic.id] = chinese_single_char_keywords | |
| patterns = [] | |
| if ascii_keywords: | |
| # Use word boundaries for ASCII keywords | |
| sorted_ascii = sorted(ascii_keywords, key=len, reverse=True) | |
| patterns.append(r'\b(' + '|'.join(re.escape(k) for k in sorted_ascii) + r')\b') | |
| if chinese_keywords: | |
| # No word boundaries for Chinese (they don't have spaces), | |
| # but prefer longer keywords so compounds win over partial overlaps. | |
| sorted_chinese = sorted(chinese_keywords, key=len, reverse=True) | |
| patterns.append('(' + '|'.join(re.escape(k) for k in sorted_chinese) + ')') | |
| if patterns: | |
| combined_pattern = '|'.join(patterns) | |
| self.topic_patterns[topic.id] = re.compile(combined_pattern, re.IGNORECASE) | |
| for keyword, exclusions in exclusion_contexts_by_keyword.items(): | |
| if exclusions: | |
| pattern_str = '|'.join(re.escape(e) for e in exclusions) | |
| self.exclusion_patterns[keyword] = re.compile(pattern_str, re.IGNORECASE) | |
| # 2. Ładowanie modelu ONNX | |
| logger.info(f"Ładowanie modelu ONNX {settings.hf_sentiment_model}...") | |
| try: | |
| from onnxruntime import GraphOptimizationLevel, SessionOptions | |
| # OPTYMALIZACJA DLA HF SPACES (Shared CPU) | |
| # Na darmowym tierze mamy 2 vCPU. Ograniczenie wątków zapobiega | |
| # "context switching" i walce o zasoby. | |
| session_options = SessionOptions() | |
| session_options.intra_op_num_threads = settings.nlp_onnx_intra_threads | |
| session_options.inter_op_num_threads = settings.nlp_onnx_inter_threads | |
| session_options.graph_optimization_level = GraphOptimizationLevel.ORT_ENABLE_ALL | |
| # Load pre-built quantized INT8 ONNX model (no PyTorch needed at runtime) | |
| quantized_path = Path(__file__).resolve().parent.parent.parent / "models" / "quantized" | |
| model_file = quantized_path / "model_quantized.onnx" | |
| if not model_file.exists(): | |
| raise FileNotFoundError( | |
| f"Quantized ONNX model not found at {model_file}. " | |
| "Run 'python scripts/quantize_model.py' to generate it." | |
| ) | |
| logger.info(f"Loading quantized INT8 model from {quantized_path}") | |
| model = ORTModelForSequenceClassification.from_pretrained( | |
| str(quantized_path), | |
| file_name="model_quantized.onnx", | |
| session_options=session_options, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(str(quantized_path)) | |
| self.classifier = pipeline( | |
| "sentiment-analysis", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device="cpu", | |
| ) | |
| logger.info("Model NLP ONNX ready: INT8 quantized, graph_optimization=ALL") | |
| except Exception as e: | |
| # Deliberate broad catch — model loading can fail with OSError, RuntimeError, | |
| # ONNX errors, HF Hub errors, etc. Always fatal, always re-raised. | |
| logger.error(f"Krytyczny błąd ładowania modelu ONNX: {e}") | |
| raise | |
| # Pula wątków, żeby ciężkie obliczenia AI nie blokowały serwera (Event Loop) | |
| self.executor = ThreadPoolExecutor(max_workers=1) | |
| # Cache sentymentu: normalized_text -> (label_str, score) | |
| self._sentiment_cache: OrderedDict[str, tuple[str, float]] = OrderedDict() | |
| self._cache_maxsize = settings.dedup_cache_maxsize | |
| def clean_text(self, text: str) -> str: | |
| """Usuwa szum (emoji, nadmiarowe spacje) i normalizuje tekst.""" | |
| text = EMOJI_PATTERN.sub("", text) | |
| text = text.lower() | |
| text = re.sub(r"\s+", " ", text).strip() | |
| max_len = settings.text_max_length | |
| return text[:max_len] if len(text) > max_len else text | |
| def _normalize_for_dedup(self, text: str) -> str: | |
| """Normalizuje zdanie do klucza deduplikacji (zachowuje kolejność słów).""" | |
| text = DEDUP_PUNCTUATION.sub("", text).lower() | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return convert(text, 'zh-cn') | |
| def _split_into_sentences(self, text: str) -> list[str]: | |
| """Rozbija recenzję na logiczne jednostki (zdania/klauzule).""" | |
| parts = SENTENCE_SPLIT_PATTERN.split(text) | |
| return [p.strip() for p in parts if p and p.strip()] | |
| def split_review_into_sentences(self, review: str) -> list[str]: | |
| """Public helper for review-level sentence/clause segmentation.""" | |
| cleaned = self.clean_text(review) | |
| if not cleaned or len(cleaned) < 5: | |
| return [] | |
| return self._split_into_sentences(cleaned) | |
| def _has_negation(self, text: str, position: int) -> bool: | |
| """ | |
| Wykrywa negację przed słowem kluczowym (w zasięgu zdefiniowanym w configu). | |
| Przydatne przy precyzyjniejszej analizie aspektowej w języku chińskim. | |
| """ | |
| window = settings.nlp_negation_window | |
| left_context = text[max(0, position-window):position] | |
| return any(neg in left_context for neg in ["不", "没", "别", "无"]) | |
| def _is_valid_single_char_token(keyword: str, token: str, previous_token: str | None) -> bool: | |
| """Waliduje pojedynczy chiński keyword w kontekście całego tokenu.""" | |
| if keyword != "卡": | |
| return True | |
| if token == keyword: | |
| return previous_token is None or previous_token in CARD_STANDALONE_PREVIOUS_TOKENS | |
| return token.endswith(keyword) and token[:-1] in CARD_LAG_PREFIXES | |
| def _find_single_char_keyword_match(self, sentence: str, keywords: list[str]) -> tuple[int, str] | None: | |
| """Zwraca pierwszy poprawny match dla chińskiego single-char keywordu.""" | |
| if not keywords: | |
| return None | |
| keyword_set = set(keywords) | |
| tokenized_sentence = list(jieba.tokenize(sentence)) | |
| for index, (token, start, _) in enumerate(tokenized_sentence): | |
| previous_token = tokenized_sentence[index - 1][0] if index > 0 else None | |
| for offset, char in enumerate(token): | |
| if char not in keyword_set: | |
| continue | |
| if self._is_valid_single_char_token(char, token, previous_token): | |
| return start + offset, char | |
| return None | |
| def _resolve_matched_term( | |
| self, | |
| topic: str, | |
| matched_word: str, | |
| ) -> StructuredLexiconTerm | None: | |
| normalized_word = matched_word.lower() if matched_word.isascii() else matched_word | |
| for term in self._topic_terms.get(topic, []): | |
| candidate = term.text.lower() if term.text.isascii() else term.text | |
| if candidate == normalized_word: | |
| return term | |
| return None | |
| def _topic_has_disambiguating_context( | |
| self, | |
| topic: str, | |
| sentence_simp: str, | |
| matched_word: str, | |
| ) -> bool: | |
| normalized_word = matched_word.lower() if matched_word.isascii() else matched_word | |
| for term in self._topic_terms.get(topic, []): | |
| candidate = term.text.lower() if term.text.isascii() else term.text | |
| if candidate == normalized_word: | |
| continue | |
| if term.needs_extra_context and not term.public_ui_safe: | |
| continue | |
| if term.text.isascii(): | |
| if re.search(rf"\b{re.escape(term.text.lower())}\b", sentence_simp, re.IGNORECASE): | |
| return True | |
| elif term.match_bucket.value == "single_char" and len(term.text) == 1: | |
| if self._find_single_char_keyword_match(sentence_simp, [term.text]) is not None: | |
| return True | |
| elif term.text in sentence_simp: | |
| return True | |
| return False | |
| def _detect_topic_matches_regex(self, sentence: str) -> dict[str, tuple[bool, str]]: | |
| """ | |
| Szybkie wykrywanie tematów przy użyciu prekompilowanych regexów. | |
| Złożoność: O(N) względem długości zdania, niezależnie od liczby słów kluczowych. | |
| """ | |
| detected: dict[str, tuple[bool, str]] = {} | |
| # Konwersja TYMCZASOWA na uproszczony chiński dla potrzeb matchowania. | |
| # Dzięki temu zachowujemy oryginalny tekst (tradycyjny/uproszczony) w bazie, | |
| # ale słownik keywords.py może pozostać w zh-cn. | |
| sentence_simp = convert(sentence, 'zh-cn') | |
| for topic in self._topic_ids: | |
| regex_match = None | |
| if topic in self.topic_patterns: | |
| regex_match = self.topic_patterns[topic].search(sentence_simp) | |
| single_char_match = self._find_single_char_keyword_match( | |
| sentence_simp, | |
| self.single_char_topic_keywords.get(topic, []), | |
| ) | |
| matched_word: str | None = None | |
| match_start: int | None = None | |
| if regex_match and single_char_match: | |
| if single_char_match[0] < regex_match.start(): | |
| match_start, matched_word = single_char_match | |
| else: | |
| match_start = regex_match.start() | |
| matched_word = regex_match.group(0).lower() | |
| elif regex_match: | |
| match_start = regex_match.start() | |
| matched_word = regex_match.group(0).lower() | |
| elif single_char_match: | |
| match_start, matched_word = single_char_match | |
| if matched_word is not None and match_start is not None: | |
| is_excluded = False | |
| matched_term = self._resolve_matched_term(topic, matched_word) | |
| if matched_word in self.exclusion_patterns: | |
| if self.exclusion_patterns[matched_word].search(sentence_simp): | |
| is_excluded = True | |
| if not is_excluded: | |
| if ( | |
| matched_term is not None | |
| and matched_term.needs_extra_context | |
| and not matched_term.public_ui_safe | |
| and matched_term.match_bucket.value != "single_char" | |
| and not self._topic_has_disambiguating_context(topic, sentence_simp, matched_word) | |
| ): | |
| continue | |
| if ( | |
| topic == "Multiplayer" | |
| and any(pat.search(sentence_simp) for pat in _MULTIPLAYER_ABSENCE_PATTERNS) | |
| ): | |
| continue | |
| negated = self._has_negation(sentence_simp, match_start) | |
| detected[topic] = (negated, matched_word) | |
| return detected | |
| def _detect_topics_regex(self, sentence: str) -> dict[str, bool]: | |
| """Backward-compatible wrapper returning only topic -> negated.""" | |
| return { | |
| topic: is_negated | |
| for topic, (is_negated, _) in self._detect_topic_matches_regex(sentence).items() | |
| } | |
| def _filter_runtime_emittable_matches( | |
| self, | |
| detected_topics: dict[str, tuple[bool, str]], | |
| ) -> dict[str, tuple[bool, str]]: | |
| """Drop helper-only topics from public aggregates while keeping behavioral layers.""" | |
| return { | |
| topic: match | |
| for topic, match in detected_topics.items() | |
| if topic in self._emitted_topic_ids | |
| } | |
| def _run_inference(self, texts: list[str]) -> list[dict]: | |
| """Wrapper dla pipeline'u Hugging Face uruchamiany w wątku.""" | |
| # batch_size=16 optymalizuje operacje macierzowe na CPU (AVX) | |
| # truncation=True, max_length=512 zapobiega przekroczeniu limitu pozycji ONNX | |
| # (max_position_embeddings=512); pipeline uwzględnia tokeny specjalne automatycznie | |
| return self.classifier(texts, batch_size=16, truncation=True, max_length=512) | |
| def _map_label(label_str: str, score: float) -> tuple[SentimentType, float]: | |
| """Mapuje surowy label modelu na (SentimentType, score).""" | |
| label_lower = label_str.lower() | |
| if 'positive' in label_lower or 'label_1' in label_lower: | |
| return (SentimentType.POSITIVE, score) | |
| elif 'negative' in label_lower or 'label_0' in label_lower: | |
| return (SentimentType.NEGATIVE, -score) | |
| return (SentimentType.NEUTRAL, 0.0) | |
| def _cache_put(self, key: str, value: tuple[str, float]) -> None: | |
| """Dodaje wynik do cache LRU, usuwa najstarsze jeśli przekroczono limit.""" | |
| self._sentiment_cache[key] = value | |
| self._sentiment_cache.move_to_end(key) | |
| while len(self._sentiment_cache) > self._cache_maxsize: | |
| self._sentiment_cache.popitem(last=False) | |
| async def analyze_sentiment_batch( | |
| self, texts: list[str] | |
| ) -> list[tuple[SentimentType, float]]: | |
| """ | |
| Asynchroniczny interfejs do analizy sentymentu. | |
| Offloaduje obliczenia do osobnego wątku, nie blokując API. | |
| Wykorzystuje cache LRU do pomijania powtórzonych zdań. | |
| """ | |
| cleaned_texts = [self.clean_text(t) for t in texts] | |
| norm_keys = [self._normalize_for_dedup(t) for t in cleaned_texts] | |
| # Rozdziel na cache hits i misses | |
| final_sentiments: list[tuple[SentimentType, float]] = [(SentimentType.NEUTRAL, 0.0)] * len(texts) | |
| miss_indices: list[int] = [] # indeksy w cleaned_texts, które trzeba wysłać do modelu | |
| miss_texts: list[str] = [] | |
| for i, (cleaned, key) in enumerate(zip(cleaned_texts, norm_keys)): | |
| if not cleaned: | |
| continue | |
| cached = self._sentiment_cache.get(key) | |
| if cached is not None: | |
| self._sentiment_cache.move_to_end(key) | |
| final_sentiments[i] = self._map_label(cached[0], cached[1]) | |
| else: | |
| miss_indices.append(i) | |
| miss_texts.append(cleaned) | |
| cache_hits = len(texts) - len(miss_texts) | |
| logger.debug(f"Cache: {cache_hits} hits, {len(miss_texts)} misses (cache size: {len(self._sentiment_cache)})") | |
| if not miss_texts: | |
| return final_sentiments | |
| # Uruchomienie modelu TYLKO na cache-misses | |
| loop = asyncio.get_event_loop() | |
| results = await loop.run_in_executor(self.executor, self._run_inference, miss_texts) | |
| for j, res in enumerate(results): | |
| original_idx = miss_indices[j] | |
| label_str = res['label'] | |
| score = res['score'] | |
| # Zapisz surowy wynik w cache | |
| self._cache_put(norm_keys[original_idx], (label_str, score)) | |
| final_sentiments[original_idx] = self._map_label(label_str, score) | |
| return final_sentiments | |
| async def analyze_batch( | |
| self, | |
| reviews: list[str], | |
| highlights_collector: HighlightsCollector | None = None, | |
| categories: list[str] | None = None, | |
| extra_highlights_collectors: list[tuple[HighlightsCollector, list[str] | None]] | None = None, | |
| stage_timings: StageTimingAccumulator | None = None, | |
| ) -> BatchAnalysisResult: | |
| """ | |
| Główna metoda przetwarzania partii recenzji. | |
| Łączy segmentację, wykrywanie tematów i analizę sentymentu. | |
| """ | |
| if not reviews: | |
| return BatchAnalysisResult( | |
| topics=[], | |
| skipped_sentences=0, | |
| total_sentences=0, | |
| topic_bearing_review_count=0, | |
| ) | |
| # Krok 1: Pre-processing i identyfikacja zdań do analizy | |
| sentiment_tasks = [] | |
| skipped_sentences = 0 | |
| total_sentences = 0 | |
| topic_bearing_review_count = 0 | |
| for review_idx, review in enumerate(reviews): | |
| if highlights_collector: | |
| highlights_collector.start_review() | |
| for extra_collector, _ in extra_highlights_collectors or []: | |
| extra_collector.start_review() | |
| split_start = time.monotonic() | |
| sentences = self.split_review_into_sentences(review) | |
| total_sentences += len(sentences) | |
| if stage_timings is not None: | |
| stage_timings.add( | |
| "sentence_split_preprocess_s", | |
| time.monotonic() - split_start, | |
| ) | |
| detect_start = time.monotonic() | |
| review_has_detected_topic = False | |
| for sentence in sentences: | |
| topics_map = self._filter_runtime_emittable_matches( | |
| self._detect_topic_matches_regex(sentence) | |
| ) | |
| if topics_map: | |
| review_has_detected_topic = True | |
| sentence_topic_matches = { | |
| topic: matched_word | |
| for topic, (_, matched_word) in topics_map.items() | |
| } | |
| for topic, (is_negated, matched_word) in topics_map.items(): | |
| sentiment_tasks.append( | |
| (review_idx, topic, sentence, is_negated, matched_word, sentence_topic_matches) | |
| ) | |
| else: | |
| skipped_sentences += 1 | |
| if stage_timings is not None: | |
| stage_timings.add( | |
| "topic_detection_aggregation_s", | |
| time.monotonic() - detect_start, | |
| ) | |
| if review_has_detected_topic: | |
| topic_bearing_review_count += 1 | |
| if not sentiment_tasks: | |
| return BatchAnalysisResult( | |
| topics=[], | |
| skipped_sentences=skipped_sentences, | |
| total_sentences=total_sentences, | |
| topic_bearing_review_count=topic_bearing_review_count, | |
| ) | |
| # Krok 2: Deduplikacja + Analiza sentymentu | |
| preprocess_start = time.monotonic() | |
| all_sentences = [task[2] for task in sentiment_tasks] | |
| # Deduplikacja: normalizuj -> znajdź unikalne -> inference tylko na unikatach | |
| norm_keys = [self._normalize_for_dedup(s) for s in all_sentences] | |
| unique_map: dict[str, int] = {} # normalized_key -> index in unique_texts | |
| unique_texts: list[str] = [] | |
| for i, key in enumerate(norm_keys): | |
| if key not in unique_map: | |
| unique_map[key] = len(unique_texts) | |
| unique_texts.append(all_sentences[i]) | |
| dedup_total = len(all_sentences) | |
| dedup_unique = len(unique_texts) | |
| dedup_pct = round((1 - dedup_unique / dedup_total) * 100) if dedup_total else 0 | |
| logger.debug(f"Dedup: {dedup_total} -> {dedup_unique} sentences ({dedup_pct}% reduced)") | |
| if stage_timings is not None: | |
| stage_timings.add( | |
| "sentence_split_preprocess_s", | |
| time.monotonic() - preprocess_start, | |
| ) | |
| sentiment_start = time.monotonic() | |
| unique_results = await self.analyze_sentiment_batch(unique_texts) | |
| if stage_timings is not None: | |
| stage_timings.add( | |
| "sentiment_model_s", | |
| time.monotonic() - sentiment_start, | |
| ) | |
| # Mapowanie wyników z unikalnych z powrotem na wszystkie zdania | |
| sentiment_results = [unique_results[unique_map[key]] for key in norm_keys] | |
| # Krok 3: Agregacja wyników | |
| # review_id -> topic -> list of scores | |
| review_topic_scores: dict[int, dict[str, list[float]]] = defaultdict(lambda: defaultdict(list)) | |
| # topic -> (sentence, score) - online selection najlepszego przykładu | |
| topic_best_example: dict[str, tuple[str, float]] = {} | |
| aggregation_start = time.monotonic() | |
| highlights_elapsed = 0.0 | |
| for i, (review_idx, topic, sentence, is_negated, matched_word, sentence_topic_matches) in enumerate(sentiment_tasks): | |
| _, score = sentiment_results[i] | |
| # KULOODPORNY PIPELINE: Jeśli wykryto negację (np. "nie lubię gameplayu"), | |
| # a model mimo to zwrócił dodatni sentyment, korygujemy go. | |
| if is_negated and score > 0: | |
| score = -score | |
| review_topic_scores[review_idx][topic].append(score) | |
| collector_start = time.monotonic() | |
| if highlights_collector: | |
| highlights_collector.add_sentence( | |
| review_idx=review_idx, | |
| sentence=sentence, | |
| topics=[topic], | |
| sentiment_score=score, | |
| categories=categories, | |
| topic_match_texts=sentence_topic_matches, | |
| ) | |
| for extra_collector, extra_categories in extra_highlights_collectors or []: | |
| extra_collector.add_sentence( | |
| review_idx=review_idx, | |
| sentence=sentence, | |
| topics=[topic], | |
| sentiment_score=score, | |
| categories=extra_categories, | |
| topic_match_texts=sentence_topic_matches, | |
| ) | |
| highlights_elapsed += time.monotonic() - collector_start | |
| # Online selection - aktualizuj jeśli lepszy kandydat (wyższy |score|) | |
| if len(sentence) > 20: | |
| current = topic_best_example.get(topic) | |
| if current is None or abs(score) > abs(current[1]): | |
| topic_best_example[topic] = (sentence, score) | |
| # Agregacja globalna: Średnia per recenzja -> Suma globalna | |
| global_topic_stats: dict[str, dict[str, float]] = defaultdict(lambda: {"sum_score": 0.0, "count": 0.0}) | |
| for review_idx, topics_data in review_topic_scores.items(): | |
| for topic, scores in topics_data.items(): | |
| avg_review_score = sum(scores) / len(scores) | |
| global_topic_stats[topic]["sum_score"] += avg_review_score | |
| global_topic_stats[topic]["count"] += 1.0 | |
| # Krok 4: Formatowanie końcowe | |
| final_results: list[TopicSentiment] = [] | |
| for topic_name, stats in global_topic_stats.items(): | |
| count = int(stats["count"]) | |
| if count == 0: | |
| continue | |
| avg_global_score = stats["sum_score"] / stats["count"] | |
| normalized_score = max(-1.0, min(1.0, avg_global_score)) | |
| if normalized_score > settings.sentiment_positive_threshold: | |
| sentiment = SentimentType.POSITIVE | |
| elif normalized_score < settings.sentiment_negative_threshold: | |
| sentiment = SentimentType.NEGATIVE | |
| else: | |
| sentiment = SentimentType.NEUTRAL | |
| # Pobierz najlepszy przykład i zwaliduj zgodność kierunku | |
| best_example = None | |
| example_score = None | |
| candidate = topic_best_example.get(topic_name) | |
| if candidate: | |
| ex_sentence, ex_score = candidate | |
| # Walidacja: przykład musi być zgodny z kierunkiem sentymentu | |
| if sentiment == SentimentType.NEUTRAL or \ | |
| (sentiment == SentimentType.POSITIVE and ex_score > 0) or \ | |
| (sentiment == SentimentType.NEGATIVE and ex_score < 0): | |
| best_example = ex_sentence | |
| example_score = ex_score | |
| final_results.append( | |
| TopicSentiment( | |
| topic=topic_name, | |
| sentiment=sentiment, | |
| score=round(normalized_score, 3), | |
| mention_count=count, | |
| example=best_example, | |
| example_score=example_score, | |
| ) | |
| ) | |
| final_results.sort(key=lambda x: x.mention_count, reverse=True) | |
| if stage_timings is not None: | |
| stage_timings.add( | |
| "topic_detection_aggregation_s", | |
| max(0.0, time.monotonic() - aggregation_start - highlights_elapsed), | |
| ) | |
| return BatchAnalysisResult( | |
| topics=final_results, | |
| skipped_sentences=skipped_sentences, | |
| total_sentences=total_sentences, | |
| topic_bearing_review_count=topic_bearing_review_count, | |
| ) | |
| _nlp_service: "NLPService | None" = None | |
| def get_nlp_service() -> "NLPService": | |
| global _nlp_service | |
| if _nlp_service is None: | |
| _nlp_service = NLPService() | |
| return _nlp_service | |