""" Analysis Service ================ Main analysis orchestrator (Dependency Inversion + Open/Closed) """ import numpy as np from typing import List, Dict from collections import defaultdict from app.models.model_loader import model_loader from app.services.text_processor import TextProcessor from app.services.gradient_service import GradientService from app.services.html_generator import HTMLGenerator from app.schemas.requests import AnalysisRequest from app.schemas.responses import ( AnalysisResponse, SentenceResult, WordScore, ToxicWordSummary, Statistics, SentimentLabel ) from app.core.config import settings from app.core.exceptions import AnalysisException class AnalysisService: """ Main analysis service Responsibilities: - Orchestrate analysis pipeline - Coordinate between services - Build response Dependencies: - TextProcessor: Text processing - GradientService: Gradient computation - HTMLGenerator: HTML generation """ def __init__(self): self.text_processor = TextProcessor() self.gradient_service = GradientService() self.html_generator = HTMLGenerator() def analyze(self, request: AnalysisRequest) -> AnalysisResponse: """ Analyze text for toxicity Args: request: Analysis request Returns: Analysis response Raises: AnalysisException: If analysis fails """ try: # 1. Split into sentences sentences = self.text_processor.split_into_sentences(request.text) # 2. Analyze each sentence sentence_results = [] for i, sent_info in enumerate(sentences, 1): sent_result = self._analyze_sentence( sent_info, i, request.include_word_scores ) sentence_results.append(sent_result) # 3. Generate statistics statistics = self._compute_statistics(sentence_results) # 4. Extract toxic words summary toxic_words_summary = self._extract_toxic_words_summary(sentence_results) # 5. Generate HTML if requested html_highlighted = None if request.include_html: html_highlighted = self.html_generator.generate_highlighted_html( request.text, [self._convert_to_dict(r) for r in sentence_results] ) # 6. Determine overall label toxic_count = sum(1 for r in sentence_results if r.label == SentimentLabel.TOXIC) overall_label = SentimentLabel.TOXIC if toxic_count > 0 else SentimentLabel.CLEAN # 7. Build response return AnalysisResponse( success=True, text=request.text, overall_label=overall_label, toxic_sentence_count=toxic_count, clean_sentence_count=len(sentences) - toxic_count, total_sentences=len(sentences), sentences=sentence_results, toxic_words_summary=toxic_words_summary, statistics=statistics, html_highlighted=html_highlighted ) except Exception as e: raise AnalysisException(detail=str(e)) def _analyze_sentence( self, sent_info: Dict[str, any], sent_number: int, include_word_scores: bool ) -> SentenceResult: """Analyze single sentence""" sent_text = sent_info['text'] # Extract words words = self.text_processor.extract_words(sent_text) if len(words) == 0: return SentenceResult( sentence_number=sent_number, text=sent_text, label=SentimentLabel.CLEAN, confidence=0.0, threshold=0.6, word_count=0, word_scores=[] if include_word_scores else None ) # Tokenize encoding = model_loader.tokenizer( sent_text.lower().strip(), add_special_tokens=True, max_length=settings.MAX_LENGTH, padding='max_length', truncation=True, return_tensors='pt' ) # Compute gradients gradient_scores, predicted_class, confidence = self.gradient_service.compute_integrated_gradients( model=model_loader.model, input_ids=encoding['input_ids'], attention_mask=encoding['attention_mask'], device=model_loader.device ) # Get tokens tokens = model_loader.tokenizer.convert_ids_to_tokens( encoding['input_ids'][0].cpu().numpy() ) valid_length = encoding['attention_mask'][0].sum().item() tokens = tokens[:valid_length] # Normalize gradients gradient_scores_norm = self.gradient_service.normalize_scores(gradient_scores) # Map to words word_scores = self._map_tokens_to_words(tokens, gradient_scores_norm, words) # Determine toxicity is_toxic = (predicted_class == 1) label = SentimentLabel.TOXIC if is_toxic else SentimentLabel.CLEAN # Compute threshold threshold = self.gradient_service.compute_threshold(word_scores, is_toxic) # Build word scores word_score_objects = None if include_word_scores: word_score_objects = [] for word_info, score in zip(words, word_scores): word_score_objects.append(WordScore( word=word_info['word'], score=float(score), position={'start': word_info['start'], 'end': word_info['end']}, is_toxic=score > threshold and not self.text_processor.is_stop_word(word_info['word']), is_stop_word=self.text_processor.is_stop_word(word_info['word']) )) return SentenceResult( sentence_number=sent_number, text=sent_text, label=label, confidence=float(confidence), threshold=float(threshold), word_count=len(words), word_scores=word_score_objects ) def _map_tokens_to_words( self, tokens: List[str], token_scores: np.ndarray, original_words: List[Dict[str, any]] ) -> np.ndarray: """Map token scores to words""" clean_tokens = [] clean_scores = [] for token, score in zip(tokens, token_scores): if token not in ['', '', '', '']: clean_token = token.replace('_', '').replace('@@', '').strip() if clean_token and not self.text_processor.is_punctuation(clean_token): clean_tokens.append(clean_token) clean_scores.append(score) word_scores = [] token_idx = 0 for word_info in original_words: word = word_info['word'].lower() matching_scores = [] temp_idx = token_idx accumulated = "" while temp_idx < len(clean_tokens): accumulated += clean_tokens[temp_idx] matching_scores.append(clean_scores[temp_idx]) if accumulated == word: token_idx = temp_idx + 1 break elif len(accumulated) >= len(word): break temp_idx += 1 word_scores.append(max(matching_scores) if matching_scores else 0.0) return np.array(word_scores) def _compute_statistics(self, sentence_results: List[SentenceResult]) -> Statistics: """Compute overall statistics""" all_scores = [] toxic_words_count = 0 for sent_result in sentence_results: if sent_result.word_scores: for ws in sent_result.word_scores: all_scores.append(ws.score) if ws.is_toxic: toxic_words_count += 1 if len(all_scores) == 0: return Statistics( total_words=0, toxic_words=0, mean_score=0.0, median_score=0.0, max_score=0.0, min_score=0.0 ) all_scores = np.array(all_scores) return Statistics( total_words=len(all_scores), toxic_words=toxic_words_count, mean_score=float(np.mean(all_scores)), median_score=float(np.median(all_scores)), max_score=float(np.max(all_scores)), min_score=float(np.min(all_scores)) ) def _extract_toxic_words_summary( self, sentence_results: List[SentenceResult] ) -> List[ToxicWordSummary]: """Extract summary of toxic words""" toxic_words_dict = defaultdict(lambda: { 'max_score': 0.0, 'occurrences': 0, 'sentences': [] }) for sent_result in sentence_results: if sent_result.word_scores: for ws in sent_result.word_scores: if ws.is_toxic: word = ws.word toxic_words_dict[word]['max_score'] = max( toxic_words_dict[word]['max_score'], ws.score ) toxic_words_dict[word]['occurrences'] += 1 if sent_result.sentence_number not in toxic_words_dict[word]['sentences']: toxic_words_dict[word]['sentences'].append(sent_result.sentence_number) # Convert to list and sort by score toxic_words_summary = [ ToxicWordSummary( word=word, score=data['max_score'], occurrences=data['occurrences'], sentences=sorted(data['sentences']) ) for word, data in toxic_words_dict.items() ] toxic_words_summary.sort(key=lambda x: x.score, reverse=True) return toxic_words_summary def _convert_to_dict(self, sent_result: SentenceResult) -> Dict[str, any]: """Convert SentenceResult to dict for HTML generator""" return { 'sent_start': sent_result.word_scores[0].position['start'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else 0, 'sent_end': sent_result.word_scores[-1].position['end'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else len(sent_result.text), 'is_toxic': sent_result.label == SentimentLabel.TOXIC, 'words': [{'word': ws.word, 'start': ws.position['start'], 'end': ws.position['end']} for ws in sent_result.word_scores] if sent_result.word_scores else [], 'scores': [ws.score for ws in sent_result.word_scores] if sent_result.word_scores else [], 'threshold': sent_result.threshold } # Singleton instance analysis_service = AnalysisService()