Spaces:
Sleeping
Sleeping
| """ | |
| Analysis Service | |
| ================ | |
| Main analysis orchestrator (Dependency Inversion + Open/Closed) | |
| """ | |
| import numpy as np | |
| from typing import List, Dict | |
| from collections import defaultdict | |
| from app.models.model_loader import model_loader | |
| from app.services.text_processor import TextProcessor | |
| from app.services.gradient_service import GradientService | |
| from app.services.html_generator import HTMLGenerator | |
| from app.schemas.requests import AnalysisRequest | |
| from app.schemas.responses import ( | |
| AnalysisResponse, SentenceResult, WordScore, | |
| ToxicWordSummary, Statistics, SentimentLabel | |
| ) | |
| from app.core.config import settings | |
| from app.core.exceptions import AnalysisException | |
| class AnalysisService: | |
| """ | |
| Main analysis service | |
| Responsibilities: | |
| - Orchestrate analysis pipeline | |
| - Coordinate between services | |
| - Build response | |
| Dependencies: | |
| - TextProcessor: Text processing | |
| - GradientService: Gradient computation | |
| - HTMLGenerator: HTML generation | |
| """ | |
| def __init__(self): | |
| self.text_processor = TextProcessor() | |
| self.gradient_service = GradientService() | |
| self.html_generator = HTMLGenerator() | |
| def analyze(self, request: AnalysisRequest) -> AnalysisResponse: | |
| """ | |
| Analyze text for toxicity | |
| Args: | |
| request: Analysis request | |
| Returns: | |
| Analysis response | |
| Raises: | |
| AnalysisException: If analysis fails | |
| """ | |
| try: | |
| # 1. Split into sentences | |
| sentences = self.text_processor.split_into_sentences(request.text) | |
| # 2. Analyze each sentence | |
| sentence_results = [] | |
| for i, sent_info in enumerate(sentences, 1): | |
| sent_result = self._analyze_sentence( | |
| sent_info, | |
| i, | |
| request.include_word_scores | |
| ) | |
| sentence_results.append(sent_result) | |
| # 3. Generate statistics | |
| statistics = self._compute_statistics(sentence_results) | |
| # 4. Extract toxic words summary | |
| toxic_words_summary = self._extract_toxic_words_summary(sentence_results) | |
| # 5. Generate HTML if requested | |
| html_highlighted = None | |
| if request.include_html: | |
| html_highlighted = self.html_generator.generate_highlighted_html( | |
| request.text, | |
| [self._convert_to_dict(r) for r in sentence_results] | |
| ) | |
| # 6. Determine overall label | |
| toxic_count = sum(1 for r in sentence_results if r.label == SentimentLabel.TOXIC) | |
| overall_label = SentimentLabel.TOXIC if toxic_count > 0 else SentimentLabel.CLEAN | |
| # 7. Build response | |
| return AnalysisResponse( | |
| success=True, | |
| text=request.text, | |
| overall_label=overall_label, | |
| toxic_sentence_count=toxic_count, | |
| clean_sentence_count=len(sentences) - toxic_count, | |
| total_sentences=len(sentences), | |
| sentences=sentence_results, | |
| toxic_words_summary=toxic_words_summary, | |
| statistics=statistics, | |
| html_highlighted=html_highlighted | |
| ) | |
| except Exception as e: | |
| raise AnalysisException(detail=str(e)) | |
| def _analyze_sentence( | |
| self, | |
| sent_info: Dict[str, any], | |
| sent_number: int, | |
| include_word_scores: bool | |
| ) -> SentenceResult: | |
| """Analyze single sentence""" | |
| sent_text = sent_info['text'] | |
| # Extract words | |
| words = self.text_processor.extract_words(sent_text) | |
| if len(words) == 0: | |
| return SentenceResult( | |
| sentence_number=sent_number, | |
| text=sent_text, | |
| label=SentimentLabel.CLEAN, | |
| confidence=0.0, | |
| threshold=0.6, | |
| word_count=0, | |
| word_scores=[] if include_word_scores else None | |
| ) | |
| # Tokenize | |
| encoding = model_loader.tokenizer( | |
| sent_text.lower().strip(), | |
| add_special_tokens=True, | |
| max_length=settings.MAX_LENGTH, | |
| padding='max_length', | |
| truncation=True, | |
| return_tensors='pt' | |
| ) | |
| # Compute gradients | |
| gradient_scores, predicted_class, confidence = self.gradient_service.compute_integrated_gradients( | |
| model=model_loader.model, | |
| input_ids=encoding['input_ids'], | |
| attention_mask=encoding['attention_mask'], | |
| device=model_loader.device | |
| ) | |
| # Get tokens | |
| tokens = model_loader.tokenizer.convert_ids_to_tokens( | |
| encoding['input_ids'][0].cpu().numpy() | |
| ) | |
| valid_length = encoding['attention_mask'][0].sum().item() | |
| tokens = tokens[:valid_length] | |
| # Normalize gradients | |
| gradient_scores_norm = self.gradient_service.normalize_scores(gradient_scores) | |
| # Map to words | |
| word_scores = self._map_tokens_to_words(tokens, gradient_scores_norm, words) | |
| # Determine toxicity | |
| is_toxic = (predicted_class == 1) | |
| label = SentimentLabel.TOXIC if is_toxic else SentimentLabel.CLEAN | |
| # Compute threshold | |
| threshold = self.gradient_service.compute_threshold(word_scores, is_toxic) | |
| # Build word scores | |
| word_score_objects = None | |
| if include_word_scores: | |
| word_score_objects = [] | |
| for word_info, score in zip(words, word_scores): | |
| word_score_objects.append(WordScore( | |
| word=word_info['word'], | |
| score=float(score), | |
| position={'start': word_info['start'], 'end': word_info['end']}, | |
| is_toxic=score > threshold and not self.text_processor.is_stop_word(word_info['word']), | |
| is_stop_word=self.text_processor.is_stop_word(word_info['word']) | |
| )) | |
| return SentenceResult( | |
| sentence_number=sent_number, | |
| text=sent_text, | |
| label=label, | |
| confidence=float(confidence), | |
| threshold=float(threshold), | |
| word_count=len(words), | |
| word_scores=word_score_objects | |
| ) | |
| def _map_tokens_to_words( | |
| self, | |
| tokens: List[str], | |
| token_scores: np.ndarray, | |
| original_words: List[Dict[str, any]] | |
| ) -> np.ndarray: | |
| """Map token scores to words""" | |
| clean_tokens = [] | |
| clean_scores = [] | |
| for token, score in zip(tokens, token_scores): | |
| if token not in ['<s>', '</s>', '<pad>', '<unk>']: | |
| clean_token = token.replace('_', '').replace('@@', '').strip() | |
| if clean_token and not self.text_processor.is_punctuation(clean_token): | |
| clean_tokens.append(clean_token) | |
| clean_scores.append(score) | |
| word_scores = [] | |
| token_idx = 0 | |
| for word_info in original_words: | |
| word = word_info['word'].lower() | |
| matching_scores = [] | |
| temp_idx = token_idx | |
| accumulated = "" | |
| while temp_idx < len(clean_tokens): | |
| accumulated += clean_tokens[temp_idx] | |
| matching_scores.append(clean_scores[temp_idx]) | |
| if accumulated == word: | |
| token_idx = temp_idx + 1 | |
| break | |
| elif len(accumulated) >= len(word): | |
| break | |
| temp_idx += 1 | |
| word_scores.append(max(matching_scores) if matching_scores else 0.0) | |
| return np.array(word_scores) | |
| def _compute_statistics(self, sentence_results: List[SentenceResult]) -> Statistics: | |
| """Compute overall statistics""" | |
| all_scores = [] | |
| toxic_words_count = 0 | |
| for sent_result in sentence_results: | |
| if sent_result.word_scores: | |
| for ws in sent_result.word_scores: | |
| all_scores.append(ws.score) | |
| if ws.is_toxic: | |
| toxic_words_count += 1 | |
| if len(all_scores) == 0: | |
| return Statistics( | |
| total_words=0, | |
| toxic_words=0, | |
| mean_score=0.0, | |
| median_score=0.0, | |
| max_score=0.0, | |
| min_score=0.0 | |
| ) | |
| all_scores = np.array(all_scores) | |
| return Statistics( | |
| total_words=len(all_scores), | |
| toxic_words=toxic_words_count, | |
| mean_score=float(np.mean(all_scores)), | |
| median_score=float(np.median(all_scores)), | |
| max_score=float(np.max(all_scores)), | |
| min_score=float(np.min(all_scores)) | |
| ) | |
| def _extract_toxic_words_summary( | |
| self, | |
| sentence_results: List[SentenceResult] | |
| ) -> List[ToxicWordSummary]: | |
| """Extract summary of toxic words""" | |
| toxic_words_dict = defaultdict(lambda: { | |
| 'max_score': 0.0, | |
| 'occurrences': 0, | |
| 'sentences': [] | |
| }) | |
| for sent_result in sentence_results: | |
| if sent_result.word_scores: | |
| for ws in sent_result.word_scores: | |
| if ws.is_toxic: | |
| word = ws.word | |
| toxic_words_dict[word]['max_score'] = max( | |
| toxic_words_dict[word]['max_score'], | |
| ws.score | |
| ) | |
| toxic_words_dict[word]['occurrences'] += 1 | |
| if sent_result.sentence_number not in toxic_words_dict[word]['sentences']: | |
| toxic_words_dict[word]['sentences'].append(sent_result.sentence_number) | |
| # Convert to list and sort by score | |
| toxic_words_summary = [ | |
| ToxicWordSummary( | |
| word=word, | |
| score=data['max_score'], | |
| occurrences=data['occurrences'], | |
| sentences=sorted(data['sentences']) | |
| ) | |
| for word, data in toxic_words_dict.items() | |
| ] | |
| toxic_words_summary.sort(key=lambda x: x.score, reverse=True) | |
| return toxic_words_summary | |
| def _convert_to_dict(self, sent_result: SentenceResult) -> Dict[str, any]: | |
| """Convert SentenceResult to dict for HTML generator""" | |
| return { | |
| 'sent_start': sent_result.word_scores[0].position['start'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else 0, | |
| 'sent_end': sent_result.word_scores[-1].position['end'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else len(sent_result.text), | |
| 'is_toxic': sent_result.label == SentimentLabel.TOXIC, | |
| 'words': [{'word': ws.word, 'start': ws.position['start'], 'end': ws.position['end']} for ws in sent_result.word_scores] if sent_result.word_scores else [], | |
| 'scores': [ws.score for ws in sent_result.word_scores] if sent_result.word_scores else [], | |
| 'threshold': sent_result.threshold | |
| } | |
| # Singleton instance | |
| analysis_service = AnalysisService() | |