import os import sys import json import pickle import csv import io import uuid import asyncio import time import logging from collections import OrderedDict, defaultdict, Counter from functools import wraps from typing import Dict, List, Any, Optional, Tuple import torch import torch.nn as nn import numpy as np import re import networkx as nx import pymorphy3 import requests import psutil from fastapi import FastAPI, Request, Form, HTTPException, File, UploadFile, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel import uvicorn from transformers import BertTokenizer, BertModel from sklearn.preprocessing import LabelEncoder import warnings warnings.filterwarnings('ignore') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Используется устройство: {device}") SMILEY_MAPPING = { ':)': 'смайлик_радость', ')': 'смайлик_радость', '🥰': 'смайлик_радость', '😀': 'смайлик_радость', '😃': 'смайлик_радость', '😄': 'смайлик_радость', '😁': 'смайлик_радость', '😘': 'смайлик_радость', '😍': 'смайлик_радость', '😇': 'смайлик_радость', '😊': 'смайлик_радость', ':D': 'смайлик_смех', ';)': 'смайлик_подмигивание', '❤️': 'смайлик_радость_любовь', '🩷': 'смайлик_радость_любовь', '🧡': 'смайлик_радость_любовь', '💛': 'смайлик_радость_любовь', '💚': 'смайлик_радость_любовь', '💙': 'смайлик_радость_любовь', '🩵': 'смайлик_радость_любовь', '💜': 'смайлик_радость_любовь', '👍': 'смайлик_радость_класс', '👌': 'смайлик_радость_ок', ':(': 'смайлик_грусть', '(': 'смайлик_грусть', '👎': 'смайлик_грусть', '🥺': 'смайлик_грусть', '😞': 'смайлик_грусть', '🙁': 'смайлик_грусть', '😭': 'смайлик_грусть_слезы', '🥲': 'смайлик_грусть', '☹️': 'смайлик_грусть', '😔': 'смайлик_грусть_слезы', '😓': 'смайлик_грусть', '😢': 'смайлик_грусть_слезы', '😡': 'смайлик_злость', '👿': 'смайлик_злость', '🤬': 'смайлик_злость', '😈': 'смайлик_злость', '😠': 'смайлик_злость', } def clean_russian_text(text): if not isinstance(text, str): return "" text = text.lower() text = re.sub(r'http\S+|www\S+|https\S+', '', text) text = re.sub(r'\S+@\S+', '', text) # Замена смайликов с использованием глобального словаря for smiley, replacement in SMILEY_MAPPING.items(): text = text.replace(smiley, f' {replacement} ') # Удаление прочих символов, кроме букв, цифр, знаков препинания text = re.sub(r'[^\w\sа-яё.,!?;:)(-]', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def contains_russian(text: str) -> bool: """Проверяет, есть ли в тексте хотя бы одна русская буква.""" return bool(re.search(r'[а-яё]', text.lower())) def is_valid_for_analysis(text: str, min_length: int = 3) -> Tuple[bool, str]: """ Проверяет, подходит ли текст для анализа. Возвращает (валидность, причина невалидности). """ if not isinstance(text, str) or len(text.strip()) == 0: return False, "пустой текст" cleaned = clean_russian_text(text) if len(cleaned) < min_length: return False, f"текст слишком короткий (менее {min_length} значимых символов)" if not contains_russian(cleaned): return False, "текст не содержит русских букв" return True, "" class LRUCache: def __init__(self, maxsize=10000): self.cache = OrderedDict() self.maxsize = maxsize def get(self, key): if key not in self.cache: return None self.cache.move_to_end(key) return self.cache[key] def put(self, key, value): if key in self.cache: self.cache.move_to_end(key) self.cache[key] = value if len(self.cache) > self.maxsize: self.cache.popitem(last=False) def size(self): return len(self.cache) class OntologyEmotionModel: def __init__(self, emotions: List[str], train_texts: List[str] = None, train_labels: List[int] = None): self.emotions = emotions self.morph = pymorphy3.MorphAnalyzer() self.ontology_graph = nx.DiGraph() self.empirical_base = defaultdict(list) self.hypotheses_db = {} self.verified_hypotheses = defaultdict(list) self.sentiment_lexicon = {} self.rule_stats = {} if train_texts is not None and train_labels is not None: self._build_sentiment_lexicon(train_texts, train_labels) self._load_rusentilex() print(f"📊 Всего слов в лексиконе: {len(self.sentiment_lexicon)}") fear_words = ['бояться', 'страшно', 'опасно', 'угроза', 'тревога', 'паника'] for w in fear_words: lemma = self.morph.parse(w)[0].normal_form self.sentiment_lexicon[lemma] = 'страх' print(f"📊 Добавлено {len(fear_words)} слов для эмоции 'страх'") explicit_sadness = ['грустно', 'печально', 'тоскливо', 'уныло', 'горестно', 'жалко', 'сожаление', 'обидно', 'печалька'] added_sad = 0 for w in explicit_sadness: try: lemma = self.morph.parse(w)[0].normal_form self.sentiment_lexicon[lemma] = 'грусть' added_sad += 1 except: pass print(f"📊 Добавлено {added_sad} явных слов для эмоции 'грусть'") disappointment_words = [ 'разочарован', 'ожидал', 'надеялся', 'обманулся', 'не оправдал', 'не впечатлило', 'слабо', 'посредственно', 'неудовлетворен' ] added_dis = 0 for w in disappointment_words: try: lemma = self.morph.parse(w)[0].normal_form self.sentiment_lexicon[lemma] = 'грусть' added_dis += 1 except: pass print(f"📊 Добавлено {added_dis} слов для эмоции 'грусть' (разочарование)") positive_colloquial = [ 'спасибо', 'благодарю', 'благодарить', 'норм', 'нормально', 'ок', 'окей', 'класс', 'супер', 'здорово', 'прекрасно', 'отлично', 'ого', 'вау', 'круто', 'зачет', 'лады', 'добре', 'хорошо', 'неплохо', 'приемлемо', 'удовлетворительно', 'пойдет', 'в порядке', 'без проблем', 'лепота', 'зашибись', 'ого-го' ] negative_colloquial = [ 'плохо', 'ужасно', 'кошмар', 'отвратительно', 'неуд', 'брак', 'сломалось', 'не работает', 'фигня', 'ерунда', 'разочарование', 'недоволен', 'не доволен', 'жаль', 'обидно', 'печаль', 'тоска', 'неудачно', 'провал', 'не годен', 'неприемлемо', 'неудовлетворительно', 'нехорошо', 'не ок', 'не окей', 'не норм' ] for w in positive_colloquial: try: lemma = self.morph.parse(w)[0].normal_form if lemma not in self.sentiment_lexicon: self.sentiment_lexicon[lemma] = 'радость' except: pass for w in negative_colloquial: try: lemma = self.morph.parse(w)[0].normal_form if lemma not in self.sentiment_lexicon: self.sentiment_lexicon[lemma] = 'грусть' except: pass smiley_texts = set(SMILEY_MAPPING.values()) for text in smiley_texts: if 'радость' in text or 'смех' in text or 'подмигивание' in text or 'класс' in text or 'ок' in text or 'любовь' in text: emotion = 'радость' elif 'грусть' in text or 'слезы' in text: emotion = 'грусть' elif 'злость' in text: emotion = 'злость' else: emotion = 'радость' self.sentiment_lexicon[text] = emotion print(f"📊 Добавлено {len(smiley_texts)} текстовых меток для распознавания смайликов") self.init_ontology_level1() self.init_ontology_level2() def _build_sentiment_lexicon(self, texts: List[str], labels: List[int]): word_class_counts = defaultdict(lambda: np.zeros(len(self.emotions))) for text, label in zip(texts, labels): words = set(clean_russian_text(text).split()) for word in words: lemma = self.morph.parse(word)[0].normal_form word_class_counts[lemma][label] += 1 for lemma, counts in word_class_counts.items(): prob = counts / (counts.sum() + 1e-10) if prob.max() > 0.6 and counts.sum() > 5: dominant_class = self.emotions[np.argmax(prob)] self.sentiment_lexicon[lemma] = dominant_class def _parse_rusentilex(self, content): lines = content.splitlines() added = 0 sample_line = None for line in lines: line = line.strip() if line: sample_line = line break if not sample_line: return parts = sample_line.split(',') is_two_column = len(parts) == 2 for line in lines[1:]: line = line.strip() if not line: continue try: if is_two_column: term, tone_str = line.split(',') term = term.strip().lower() tone = int(tone_str) lemma = self.morph.parse(term)[0].normal_form if tone == 1: self.sentiment_lexicon[lemma] = 'радость' added += 1 elif tone == -1: self.sentiment_lexicon[lemma] = 'грусть' added += 1 else: parts = line.split(',') if len(parts) >= 3: term = parts[0].strip().lower() sentiment = parts[2].strip().lower() lemma = self.morph.parse(term)[0].normal_form if sentiment == 'positive': self.sentiment_lexicon[lemma] = 'радость' added += 1 elif sentiment == 'negative': self.sentiment_lexicon[lemma] = 'грусть' added += 1 except Exception: continue print(f" Добавлено слов из RuSentiLex: {added}") def _load_rusentilex(self): possible_paths = [ 'model/rusentilex.csv', 'rusentilex.csv', '/app/model/rusentilex.csv', os.path.join(os.path.dirname(__file__), 'model', 'rusentilex.csv') ] loaded = False print("📂 Поиск RuSentiLex...") for path in possible_paths: if os.path.exists(path): try: with open(path, 'r', encoding='utf-8') as f: content = f.read() self._parse_rusentilex(content) print(f"✅ RuSentiLex загружен из файла: {path}") loaded = True break except Exception as e: print(f"⚠️ Ошибка при загрузке {path}: {e}") if not loaded: print("⚠️ RuSentiLex не загружен. Используется только статистический лексикон.") def init_ontology_level1(self): self.emotion_definitions = { 'радость': { 'valence': 'positive', 'arousal': 'high', 'definition': 'Позитивное эмоциональное состояние', 'opposite': ['грусть', 'злость'] }, 'грусть': { 'valence': 'negative', 'arousal': 'low', 'definition': 'Негативное эмоциональное состояние', 'opposite': ['радость'] }, 'злость': { 'valence': 'negative', 'arousal': 'high', 'definition': 'Негативное эмоциональное состояние', 'opposite': ['радость'] }, 'страх': { 'valence': 'negative', 'arousal': 'high', 'definition': 'Эмоциональная реакция на угрозу', 'opposite': ['уверенность', 'спокойствие'] }, 'сарказм': { 'valence': 'negative', 'arousal': 'high', 'definition': 'Язвительная насмешка', 'opposite': ['радость'] } } for emotion in self.emotions: if emotion in self.emotion_definitions: self.ontology_graph.add_node(emotion, **self.emotion_definitions[emotion]) else: self.ontology_graph.add_node(emotion, valence='neutral', arousal='neutral') for emotion, data in self.emotion_definitions.items(): if 'opposite' in data: for opposite in data['opposite']: if opposite in self.emotions: self.ontology_graph.add_edge(emotion, opposite, relation='opposite') def init_ontology_level2(self): self.linguistic_rules = { 'усилители': { 'words': ['очень', 'сильно', 'крайне', 'чрезвычайно', 'невероятно', 'абсолютно'], 'effect': 'increase_arousal', 'weight': 0.3, 'learnable': True }, 'ослабители': { 'words': ['слегка', 'немного', 'чуть-чуть', 'отчасти', 'несколько'], 'effect': 'decrease_arousal', 'weight': -0.2, 'learnable': True }, 'отрицания': { 'words': ['не', 'ни', 'нет', 'нельзя', 'невозможно'], 'effect': 'negation', 'weight': -0.5, 'learnable': True }, 'восклицания': { 'patterns': [r'!+', r'\?+'], 'effect': 'increase_arousal', 'weight': 0.4, 'learnable': True }, 'вопросительные': { 'patterns': [r'\?+'], 'effect': 'uncertainty', 'weight': 0.2, 'learnable': True }, 'сарказм_маркеры': { 'words': ['ага', 'да что вы', 'ну да', 'кто бы мог подумать'], 'effect': 'sarcasm', 'weight': 0.6, 'learnable': True } } def add_empirical_knowledge(self, text: str, emotion: str, confidence: float): self.empirical_base[emotion].append({'text': text, 'confidence': confidence}) if len(self.empirical_base[emotion]) > 1000: self.empirical_base[emotion] = self.empirical_base[emotion][-1000:] def formulate_hypothesis(self, text: str, model_prediction: Dict, rule_based_prediction: Dict) -> Dict: hypothesis_id = f"hyp_{len(self.hypotheses_db) + 1:06d}" hypothesis = { 'id': hypothesis_id, 'text': text, 'model_prediction': model_prediction, 'rule_based_prediction': rule_based_prediction, 'disagreement': self.calculate_disagreement(model_prediction, rule_based_prediction), 'status': 'pending' } self.hypotheses_db[hypothesis_id] = hypothesis return hypothesis def verify_hypothesis(self, hypothesis_id: str, actual_emotion: str = None) -> Dict: if hypothesis_id not in self.hypotheses_db: return None hypothesis = self.hypotheses_db[hypothesis_id] if actual_emotion: model_correct = hypothesis['model_prediction']['emotion'] == actual_emotion rule_correct = hypothesis['rule_based_prediction']['emotion'] == actual_emotion if model_correct and not rule_correct: hypothesis['status'] = 'model_superior' elif rule_correct and not model_correct: hypothesis['status'] = 'rule_superior' elif model_correct and rule_correct: hypothesis['status'] = 'both_correct' else: hypothesis['status'] = 'both_incorrect' return hypothesis def apply_linguistic_rules(self, text: str) -> Dict: rules_applied = [] adjustments = {'valence': 0, 'arousal': 0, 'uncertainty': 0, 'sarcasm': 0} words = text.lower().split() parsed = [self.morph.parse(w)[0] for w in words] lemmas = [p.normal_form for p in parsed] pos_tags = [p.tag.POS for p in parsed] for lemma in lemmas: sentiment = self.sentiment_lexicon.get(lemma, 'neutral') if sentiment == 'радость': rules_applied.append(f"позитивное слово: {lemma}") adjustments['valence'] += 0.2 elif sentiment in ('грусть', 'злость', 'страх'): rules_applied.append(f"негативное слово: {lemma}") adjustments['valence'] -= 0.2 for category, rule in self.linguistic_rules.items(): if 'words' in rule: for word in rule['words']: if word in lemmas: rules_applied.append(f"{category}: {word}") effect = rule['effect'] weight = rule['weight'] if effect == 'increase_arousal': adjustments['arousal'] += weight elif effect == 'decrease_arousal': adjustments['arousal'] += weight elif effect == 'negation': adjustments['valence'] += weight elif effect == 'sarcasm': adjustments['sarcasm'] += weight if 'patterns' in rule: for pattern in rule['patterns']: if re.search(pattern, text): rules_applied.append(f"{category}: {pattern}") weight = rule['weight'] if rule['effect'] == 'increase_arousal': adjustments['arousal'] += weight elif rule['effect'] == 'uncertainty': adjustments['uncertainty'] += weight if 'не' in lemmas: idx = lemmas.index('не') if idx + 1 < len(lemmas) and lemmas[idx+1] == 'очень': adjustments['arousal'] -= 0.2 adjustments['valence'] -= 0.3 rules_applied.append("сочетание: не очень") else: for j in range(idx+1, min(idx+4, len(lemmas))): if pos_tags[j] in ('ADJF', 'ADJS', 'ADVB'): target_word = lemmas[j] sentiment = self.sentiment_lexicon.get(target_word, 'neutral') if sentiment in ('грусть', 'злость', 'страх'): adjustments['valence'] += 1.0 rules_applied.append(f"инверсия негатива: не {target_word}") elif sentiment == 'радость': adjustments['valence'] -= 1.0 rules_applied.append(f"инверсия позитива: не {target_word}") break pos_words = [w for w in lemmas if self.sentiment_lexicon.get(w) == 'радость'] neg_words = [w for w in lemmas if self.sentiment_lexicon.get(w) in ('грусть', 'злость', 'страх')] if pos_words and neg_words: adjustments['sarcasm'] += 0.5 rules_applied.append(f"контраст тональности: позитив {pos_words[:2]} vs негатив {neg_words[:2]}") sarcasm_phrases = ['ага', 'ну да', 'да что вы', 'кто бы мог подумать'] for phrase in sarcasm_phrases: if phrase in text.lower(): adjustments['sarcasm'] += 0.6 rules_applied.append(f"саркастическая фраза: {phrase}") if adjustments['sarcasm'] > 0.5: rules_applied.append("обнаружен сарказм") disappointment_verbs = ['ожидать', 'надеяться', 'думать'] disappointment_adjs = ['большой', 'лучший', 'хороший', 'много', 'высокий'] found_disappointment = False for v in disappointment_verbs: if v in lemmas: for a in disappointment_adjs: if a in lemmas: adjustments['valence'] -= 0.8 rules_applied.append(f"разочарование: {v} {a}") found_disappointment = True break if found_disappointment: break if 'оправдать' in lemmas and 'ожидание' in lemmas: adjustments['valence'] -= 0.7 rules_applied.append("разочарование: не оправдал ожиданий") return {'rules_applied': rules_applied, 'adjustments': adjustments, 'lemmas': lemmas} def calculate_disagreement(self, pred1: Dict, pred2: Dict) -> float: if pred1['emotion'] == pred2['emotion']: return 0.0 emotions = list(self.emotion_definitions.keys()) idx1 = emotions.index(pred1['emotion']) if pred1['emotion'] in emotions else -1 idx2 = emotions.index(pred2['emotion']) if pred2['emotion'] in emotions else -1 if idx1 == -1 or idx2 == -1: return 0.5 distance = abs(idx1 - idx2) / len(emotions) return 0.7 * distance def explain_transition(self, from_emotion: str, to_emotion: str) -> List[str]: try: return nx.shortest_path(self.ontology_graph, source=from_emotion, target=to_emotion) except: return [] def adjust_prediction_with_rules(self, prediction: Dict, rule_analysis: Dict) -> Dict: original_emotion = prediction['emotion'] original_confidence = prediction['confidence'] adj = rule_analysis['adjustments'] rules = rule_analysis['rules_applied'] original_confidence_value = original_confidence was_corrected = len(rules) > 0 conf_mult = 1.0 + adj['arousal'] * 0.2 + adj['uncertainty'] * 0.1 - abs(adj['valence']) * 0.1 conf_mult = np.clip(conf_mult, 0.5, 1.5) new_confidence = original_confidence * conf_mult new_emotion = original_emotion has_negative = (any('негативное слово' in r for r in rules) or any('разочарование' in r for r in rules) or any('инверсия негатива' in r for r in rules) or any('сочетание: не очень' in r for r in rules)) has_positive = any('позитивное слово' in r for r in rules) or any('радость' in r for r in rules) fear_keywords = ['страшно', 'бояться', 'опасно', 'угроза', 'тревога', 'паника'] has_fear_word = any(any(kw in r for kw in fear_keywords) for r in rules) if has_fear_word and original_emotion in ('грусть', 'злость', 'радость'): new_emotion = 'страх' new_confidence *= 0.9 rules.append("коррекция: обнаружены слова страха → страх") # Если нет эмоциональных слов → не определено if not has_positive and not has_negative: new_emotion = 'не определено' new_confidence = 1.0 rules.append("нет эмоциональных слов → не определено") else: if has_negative and not has_positive: if original_emotion == 'радость': new_emotion = 'грусть' new_confidence *= 0.8 rules.append("коррекция: негативные слова без позитивных") elif original_emotion == 'сарказм': new_emotion = 'грусть' new_confidence *= 0.9 elif has_positive and not has_negative and original_emotion in ('грусть', 'злость', 'страх'): new_emotion = 'радость' rules.append("коррекция: позитивные слова") for rule in rules: if rule.startswith("инверсия негатива:"): new_emotion = 'радость' break elif rule.startswith("инверсия позитива:"): if adj['arousal'] > 0.3: new_emotion = 'злость' else: new_emotion = 'грусть' break sarcasm_flag = adj['sarcasm'] > 0.5 if sarcasm_flag: new_emotion = 'сарказм' new_confidence = min(new_confidence * 0.8, 0.9) if "саркастическая фраза" in str(rules): new_confidence = min(new_confidence * 1.1, 0.95) if any('восклицание' in r for r in rules): new_confidence = min(new_confidence * 1.2, 1.0) if not was_corrected and original_confidence_value < 0.9: new_confidence = min(new_confidence * 1.10, 1.0) new_confidence = min(new_confidence, 1.0) return { 'emotion': new_emotion, 'confidence': new_confidence, 'rules_applied': rules } def get_ontology_analysis(self, text: str, model_prediction: Dict) -> Dict: rule_analysis = self.apply_linguistic_rules(text) adjusted = self.adjust_prediction_with_rules(model_prediction, rule_analysis) disagreement = self.calculate_disagreement(model_prediction, adjusted) hypothesis = self.formulate_hypothesis(text, model_prediction, adjusted) if disagreement > 0.2 else None return { 'rule_analysis': rule_analysis, 'adjusted_prediction': adjusted, 'disagreement': disagreement, 'hypothesis': hypothesis } def get_statistics(self) -> Dict: return { 'ontology_nodes': len(self.ontology_graph.nodes), 'ontology_edges': len(self.ontology_graph.edges), 'linguistic_rules': len(self.linguistic_rules), 'emotions_covered': len(self.emotions), 'pending_hypotheses': len([h for h in self.hypotheses_db.values() if h['status'] == 'pending']) } class EmotionLSTM(nn.Module): def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_classes=3, dropout=0.3, num_layers=2): super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True, dropout=dropout) self.dropout = nn.Dropout(dropout) self.classifier = nn.Sequential( nn.Linear(hidden_dim * 2, 128), nn.ReLU(), nn.Dropout(dropout), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, num_classes) ) def forward(self, x, return_confidence=False): embedded = self.embedding(x) lstm_out, (hidden, cell) = self.lstm(embedded) lstm_last = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1) features = self.dropout(lstm_last) logits = self.classifier(features) if return_confidence: probs = torch.softmax(logits, dim=1) conf, _ = torch.max(probs, dim=1) return logits, conf return logits class EmotionBERT(nn.Module): def __init__(self, bert_model_name, num_classes, dropout=0.3): super().__init__() self.bert = BertModel.from_pretrained(bert_model_name) hidden = self.bert.config.hidden_size self.classifier = nn.Sequential( nn.Dropout(dropout), nn.Linear(hidden, 256), nn.ReLU(), nn.Dropout(dropout), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, num_classes) ) def forward(self, input_ids, attention_mask, return_confidence=False): out = self.bert(input_ids, attention_mask, return_dict=True) cls = out.last_hidden_state[:, 0, :] logits = self.classifier(cls) if return_confidence: probs = torch.softmax(logits, dim=1) conf, _ = torch.max(probs, dim=1) return logits, conf return logits class CascadeEmotionClassifier: def __init__(self, lstm_model, bert_model, vocab, tokenizer, label_encoder, ontology_model, threshold=0.95, device='cpu', max_length_lstm=100, max_length_bert=128, enable_cache=True, cache_maxsize=10000): self.lstm_model = lstm_model self.bert_model = bert_model self.vocab = vocab self.tokenizer = tokenizer self.label_encoder = label_encoder self.ontology_model = ontology_model self.threshold = threshold self.device = device self.max_length_lstm = max_length_lstm self.max_length_bert = max_length_bert self.lstm_model.eval() self.bert_model.eval() self.lstm_model.to(device) self.bert_model.to(device) self.stats = {'total': 0, 'lstm': 0, 'bert': 0, 'corrections': 0} self.enable_cache = enable_cache self.cache = LRUCache(maxsize=cache_maxsize) if enable_cache else None self.cache_hits = 0 self.cache_misses = 0 def text_to_sequence(self, text): words = str(text).split()[:self.max_length_lstm] sequence = [self.vocab.get(word, self.vocab.get('', 1)) for word in words] if len(sequence) < self.max_length_lstm: sequence += [self.vocab.get('', 0)] * (self.max_length_lstm - len(sequence)) return sequence[:self.max_length_lstm] def predict(self, text): if self.enable_cache: text_clean = clean_russian_text(text) cached = self.cache.get(text_clean) if cached is not None: self.cache_hits += 1 return cached self.cache_misses += 1 result = self.predict_batch([text])[0] if self.enable_cache: self.cache.put(text_clean, result) return result def predict_batch(self, texts: List[str]) -> List[Dict]: self.stats['total'] += len(texts) MIN_VALID_LENGTH = 3 # минимальная длина очищенного текста для анализа # Очищаем тексты и проверяем валидность texts_clean = [clean_russian_text(t) for t in texts] valid_mask = [] invalid_reasons = [] for t_clean in texts_clean: if len(t_clean) < MIN_VALID_LENGTH: valid_mask.append(False) invalid_reasons.append(f"текст слишком короткий (менее {MIN_VALID_LENGTH} символов)") elif not contains_russian(t_clean): valid_mask.append(False) invalid_reasons.append("текст не содержит русских букв") else: valid_mask.append(True) invalid_reasons.append("") valid_indices = [i for i, v in enumerate(valid_mask) if v] results = [None] * len(texts) # Заполняем результаты для невалидных текстов for i in range(len(texts)): if not valid_mask[i]: results[i] = { 'text': texts[i], 'predicted_emotion': 'не определено', 'confidence': 1.0, 'used_model': '', 'rules_applied': [invalid_reasons[i]] } if not valid_indices: return results # Обработка только валидных текстов lstm_inputs = [self.text_to_sequence(texts_clean[i]) for i in valid_indices] lstm_inputs_tensor = torch.LongTensor(lstm_inputs).to(self.device) with torch.no_grad(): lstm_logits, lstm_conf = self.lstm_model(lstm_inputs_tensor, return_confidence=True) lstm_probs = torch.softmax(lstm_logits, dim=1) lstm_preds = lstm_probs.argmax(dim=1).cpu().numpy() lstm_confs = lstm_conf.cpu().numpy() lstm_group = [] bert_group = [] for idx, conf in enumerate(lstm_confs): if conf >= self.threshold: lstm_group.append(idx) else: bert_group.append(idx) for idx_in_batch in lstm_group: orig_idx = valid_indices[idx_in_batch] text_clean = texts_clean[orig_idx] lstm_emo = self.label_encoder.inverse_transform([lstm_preds[idx_in_batch]])[0] lstm_pred_dict = { 'emotion': lstm_emo, 'confidence': lstm_confs[idx_in_batch], 'probabilities': lstm_probs[idx_in_batch].cpu().numpy().tolist() } lstm_onto = self.ontology_model.get_ontology_analysis(text_clean, lstm_pred_dict) final = lstm_onto['adjusted_prediction'] results[orig_idx] = { 'text': texts[orig_idx], 'predicted_emotion': final['emotion'], 'confidence': final['confidence'], 'used_model': "LSTM + онтология", 'rules_applied': lstm_onto['rule_analysis']['rules_applied'] } self.stats['lstm'] += 1 if bert_group: bert_indices_orig = [valid_indices[idx] for idx in bert_group] bert_texts_clean = [texts_clean[i] for i in bert_indices_orig] enc = self.tokenizer(bert_texts_clean, truncation=True, padding=True, max_length=self.max_length_bert, return_tensors='pt').to(self.device) with torch.no_grad(): bert_logits, bert_conf = self.bert_model(enc['input_ids'], enc['attention_mask'], return_confidence=True) bert_probs = torch.softmax(bert_logits, dim=1) bert_preds = bert_probs.argmax(dim=1).cpu().numpy() bert_confs = bert_conf.cpu().numpy() for j, orig_idx in enumerate(bert_indices_orig): text_clean = bert_texts_clean[j] bert_emo = self.label_encoder.inverse_transform([bert_preds[j]])[0] bert_pred_dict = { 'emotion': bert_emo, 'confidence': bert_confs[j], 'probabilities': bert_probs[j].cpu().numpy().tolist() } bert_onto = self.ontology_model.get_ontology_analysis(text_clean, bert_pred_dict) final = bert_onto['adjusted_prediction'] results[orig_idx] = { 'text': texts[orig_idx], 'predicted_emotion': final['emotion'], 'confidence': final['confidence'], 'used_model': "BERT + онтология", 'rules_applied': bert_onto['rule_analysis']['rules_applied'] } self.stats['bert'] += 1 # Страховка: если какой-то результат остался None (маловероятно) for i in range(len(texts)): if results[i] is None: results[i] = { 'text': texts[i], 'predicted_emotion': 'не определено', 'confidence': 1.0, 'used_model': '', 'rules_applied': ['внутренняя ошибка обработки'] } return results def predict_batch_with_cache(self, texts): if not self.enable_cache: return self.predict_batch(texts) results = [None] * len(texts) texts_to_predict = [] indices_to_predict = [] for i, text in enumerate(texts): text_clean = clean_russian_text(text) cached = self.cache.get(text_clean) if cached is not None: results[i] = cached self.cache_hits += 1 else: texts_to_predict.append(text) indices_to_predict.append(i) if texts_to_predict: batch_results = self.predict_batch(texts_to_predict) for idx, orig_idx in enumerate(indices_to_predict): result = batch_results[idx] results[orig_idx] = result self.cache.put(clean_russian_text(texts[orig_idx]), result) self.cache_misses += 1 return results def load_model(): print("Загрузка модели...") model_dir = 'model' with open(f'{model_dir}/model_info.json', 'r', encoding='utf-8') as f: model_info = json.load(f) with open(f'{model_dir}/vocab.json', 'r', encoding='utf-8') as f: vocab = json.load(f) print("📂 Создание label_encoder...") label_encoder = LabelEncoder() label_encoder.classes_ = np.array(model_info['classes']) print(f"✅ label_encoder создан, классы: {list(label_encoder.classes_)}") print("📂 Создание онтологии...") ontology_model = OntologyEmotionModel( emotions=list(label_encoder.classes_), train_texts=None, train_labels=None ) print("✅ Онтология создана") print("📂 Загрузка LSTM...") lstm_model = EmotionLSTM( vocab_size=len(vocab), embed_dim=model_info.get('embed_dim', 300), hidden_dim=256, num_classes=model_info['num_classes'], dropout=0.3, num_layers=2 ) lstm_state = torch.load(f'{model_dir}/lstm_model.pth', map_location=device, weights_only=False) lstm_model.load_state_dict(lstm_state) print("✅ LSTM загружена") print("📂 Загрузка BERT...") bert_model = EmotionBERT( bert_model_name=model_info['bert_model_name'], num_classes=model_info['num_classes'], dropout=0.3 ) bert_state = torch.load(f'{model_dir}/bert_model.pth', map_location=device, weights_only=False) bert_model.load_state_dict(bert_state) print("✅ BERT загружена") print("📂 Загрузка токенизатора...") try: tokenizer = BertTokenizer.from_pretrained(model_dir) print("✅ Токенизатор загружен из model_dir") except Exception as e: print(f"⚠️ Ошибка: {e}") print("🔄 Загружаем токенизатор из Hugging Face...") tokenizer = BertTokenizer.from_pretrained('DeepPavlov/rubert-base-cased') print("✅ Токенизатор загружен из Hugging Face") print("📂 Создание каскадного классификатора с кэшем...") cascade = CascadeEmotionClassifier( lstm_model=lstm_model, bert_model=bert_model, vocab=vocab, tokenizer=tokenizer, label_encoder=label_encoder, ontology_model=ontology_model, threshold=model_info.get('threshold', 0.95), device=device, max_length_lstm=model_info.get('max_length_lstm', 100), max_length_bert=model_info.get('max_length_bert', 128), enable_cache=True, cache_maxsize=10000 ) print("✅ Модель успешно загружена!") return cascade, model_info class ModelLoader: def __init__(self): self.classifier = None self.model_info = None self.loading = True self.error = None self._task = None async def load_async(self): self.loading = True self.error = None try: loop = asyncio.get_event_loop() self.classifier, self.model_info = await loop.run_in_executor(None, load_model) logger.info("Модели успешно загружены!") except Exception as e: logger.exception("Ошибка загрузки модели") self.error = str(e) finally: self.loading = False def is_ready(self): return self.classifier is not None and not self.loading class Monitor: def __init__(self): self.request_count = 0 self.total_prediction_time = 0.0 self.emotion_counter = defaultdict(int) self.error_count = 0 self.start_time = time.time() def record_request(self, duration_ms: float, emotion: str = None): self.request_count += 1 self.total_prediction_time += duration_ms if emotion: self.emotion_counter[emotion] += 1 def record_error(self): self.error_count += 1 def get_metrics(self): avg_time_ms = (self.total_prediction_time / self.request_count) if self.request_count > 0 else 0 uptime = time.time() - self.start_time cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() cache_stats = {} if model_loader.classifier and model_loader.classifier.cache: cache_stats = { "hits": model_loader.classifier.cache_hits, "misses": model_loader.classifier.cache_misses, "size": model_loader.classifier.cache.size() } return { "requests": { "total": self.request_count, "errors": self.error_count, "avg_prediction_time_ms": round(avg_time_ms, 2) }, "emotions": dict(self.emotion_counter), "system": { "cpu_percent": cpu_percent, "memory_percent": memory.percent, "memory_used_mb": memory.used // (1024*1024), "uptime_seconds": round(uptime, 1) }, "cache": cache_stats } class TaskStatus(BaseModel): task_id: str status: str # pending, processing, completed, failed progress: int total: int processed: int result: Optional[Dict] = None error: Optional[str] = None created_at: float updated_at: float task_storage = {} async def process_file_async(task_id: str, file_content: bytes, filename: str, classifier): task = task_storage[task_id] task.status = "processing" task.updated_at = time.time() try: try: text_content = file_content.decode('utf-8') except UnicodeDecodeError: text_content = file_content.decode('cp1251') reader = csv.reader(text_content.splitlines(), delimiter='|') rows = list(reader) if not rows: raise ValueError("Файл пуст") header = rows[0] text_col_idx = None for i, col in enumerate(header): if col.strip().lower() == 'текст': text_col_idx = i break if text_col_idx is None: text_col_idx = 0 texts = [] for row in rows[1:]: if len(row) > text_col_idx: texts.append(row[text_col_idx]) task.total = len(texts) batch_size = 100 all_results = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] batch_results = classifier.predict_batch_with_cache(batch) all_results.extend(batch_results) task.processed = min(i + batch_size, len(texts)) task.progress = int(task.processed / task.total * 100) task.updated_at = time.time() await asyncio.sleep(0.01) emotion_counts = defaultdict(int) examples = defaultdict(list) for r in all_results: emotion = r['predicted_emotion'] if emotion == 'не определено': continue emotion_counts[emotion] += 1 examples[emotion].append((r['confidence'], r['text'])) examples_top = {} for emotion, lst in examples.items(): lst.sort(key=lambda x: x[0], reverse=True) examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]] task.result = { "emotion_counts": dict(emotion_counts), "examples": examples_top, "results": all_results } task.status = "completed" task.progress = 100 logger.info(f"Task {task_id} completed") except Exception as e: logger.exception(f"Task {task_id} failed") task.status = "failed" task.error = str(e) finally: task.updated_at = time.time() app = FastAPI(title="Emotion Analysis with BERT, Ontology, Cache and Async") templates = Jinja2Templates(directory="templates") model_loader = ModelLoader() monitor = Monitor() @app.on_event("startup") async def startup_event(): asyncio.create_task(model_loader.load_async()) @app.middleware("http") async def add_monitoring(request: Request, call_next): start_time = time.time() try: response = await call_next(request) duration_ms = (time.time() - start_time) * 1000 if request.url.path not in ("/metrics", "/health"): monitor.record_request(duration_ms) return response except Exception as e: monitor.record_error() raise @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/predict") async def predict(text: str = Form(...)): if not model_loader.is_ready(): raise HTTPException(status_code=503, detail="Модель ещё загружается") if not text or len(text.strip()) < 3: return JSONResponse({"error": "Слишком короткий текст. Пожалуйста, введите не менее 3 символов."}, status_code=400) # Проверка, что текст содержит русские буквы if not contains_russian(text): return JSONResponse({"error": "Анализ возможен только для русскоязычных текстов."}, status_code=400) try: start = time.time() result = model_loader.classifier.predict(text) duration = (time.time() - start) * 1000 monitor.record_request(duration, result['predicted_emotion']) rules_display = [] for rule in result['rules_applied'][:10]: if ':' in rule: cat, val = rule.split(':', 1) rules_display.append(f"{cat}: {val}") else: rules_display.append(f"{rule}") return JSONResponse({ "success": True, "emotion": result['predicted_emotion'], "confidence": f"{result['confidence']*100:.1f}%", "used_model": result['used_model'], "rules": "".join(rules_display) if rules_display else "Нет правил" }) except Exception as e: monitor.record_error() return JSONResponse({"error": str(e)}, status_code=500) @app.post("/upload") async def upload_file(file: UploadFile = File(...)): if not model_loader.is_ready(): raise HTTPException(status_code=503, detail="Модель ещё загружается") if not file.filename.endswith('.csv'): return JSONResponse({"error": "Поддерживаются только CSV-файлы (колонка «текст»). Попробуйте ещё раз с файлом правильного формата."}, status_code=400) try: content = await file.read() try: text_content = content.decode('utf-8') except UnicodeDecodeError: text_content = content.decode('cp1251') reader = csv.reader(text_content.splitlines(), delimiter='|') rows = list(reader) if not rows: return JSONResponse({"error": "Файл пуст"}, status_code=400) header = rows[0] text_col_idx = None for i, col in enumerate(header): if col.strip().lower() == 'текст': text_col_idx = i break if text_col_idx is None: text_col_idx = 0 texts = [] for row in rows[1:]: if len(row) > text_col_idx: texts.append(row[text_col_idx]) if not texts: return JSONResponse({"error": "В файле нет данных для анализа"}, status_code=400) results = model_loader.classifier.predict_batch_with_cache(texts) emotion_counts = defaultdict(int) examples = defaultdict(list) for r in results: emotion = r['predicted_emotion'] if emotion == 'не определено': continue emotion_counts[emotion] += 1 examples[emotion].append((r['confidence'], r['text'])) examples_top = {} for emotion, lst in examples.items(): lst.sort(key=lambda x: x[0], reverse=True) examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]] session_id = str(uuid.uuid4()) task_storage[session_id] = TaskStatus( task_id=session_id, status="completed", progress=100, total=len(texts), processed=len(texts), result={"results": results, "emotion_counts": emotion_counts, "examples": examples_top}, created_at=time.time(), updated_at=time.time() ) return JSONResponse({ "success": True, "emotion_counts": dict(emotion_counts), "examples": examples_top, "session_id": session_id }) except Exception as e: return JSONResponse({"error": f"Ошибка обработки файла: {str(e)}"}, status_code=500) @app.post("/upload_async") async def upload_file_async(file: UploadFile = File(...)): if not model_loader.is_ready(): raise HTTPException(status_code=503, detail="Модель ещё загружается") if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="Файл должен быть в формате CSV") content = await file.read() task_id = str(uuid.uuid4()) task_status = TaskStatus( task_id=task_id, status="pending", progress=0, total=0, processed=0, created_at=time.time(), updated_at=time.time() ) task_storage[task_id] = task_status asyncio.create_task(process_file_async(task_id, content, file.filename, model_loader.classifier)) return JSONResponse({ "task_id": task_id, "status": "pending", "message": "Файл принят в обработку. Используйте /status/{task_id} для отслеживания прогресса." }) @app.get("/status/{task_id}") async def get_task_status(task_id: str): task = task_storage.get(task_id) if not task: raise HTTPException(status_code=404, detail="Задача не найдена") return JSONResponse(task.dict()) @app.get("/result/{task_id}") async def get_task_result(task_id: str): task = task_storage.get(task_id) if not task: raise HTTPException(status_code=404, detail="Задача не найдена") if task.status != "completed": raise HTTPException(status_code=400, detail=f"Задача ещё не завершена (статус: {task.status})") return JSONResponse({ "emotion_counts": task.result["emotion_counts"], "examples": task.result["examples"], "task_id": task_id }) @app.get("/download/{session_id}") async def download_results(session_id: str): task = task_storage.get(session_id) if not task or task.status != "completed": raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена") results = task.result.get("results", []) output = io.StringIO() writer = csv.writer(output) writer.writerow(["текст", "эмоция", "уверенность", "модель", "правила"]) for r in results: writer.writerow([ r['text'], r['predicted_emotion'], f"{r['confidence']:.4f}", r['used_model'], "|".join(r['rules_applied']) ]) output.seek(0) return StreamingResponse( iter([output.getvalue().encode('utf-8-sig')]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=emotion_results_{session_id}.csv"} ) @app.get("/download_async/{task_id}") async def download_async_results(task_id: str): task = task_storage.get(task_id) if not task or task.status != "completed": raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена") results = task.result.get("results", []) output = io.StringIO() writer = csv.writer(output) writer.writerow(["текст", "эмоция", "уверенность", "модель", "правила"]) for r in results: writer.writerow([ r['text'], r['predicted_emotion'], f"{r['confidence']:.4f}", r['used_model'], "|".join(r['rules_applied']) ]) output.seek(0) return StreamingResponse( iter([output.getvalue().encode('utf-8-sig')]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=emotion_results_{task_id}.csv"} ) @app.get("/health") async def health_check(): return { "status": "healthy" if model_loader.is_ready() else "loading", "model_loaded": model_loader.is_ready(), "loading": model_loader.loading, "error": model_loader.error } @app.get("/metrics") async def get_metrics(): return JSONResponse(monitor.get_metrics()) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)