toxic-api / app /services /analysis_service.py
handrix
Initial deployment - Toxic Detection API
ae4e2a6
"""
Analysis Service
================
Main analysis orchestrator (Dependency Inversion + Open/Closed)
"""
import numpy as np
from typing import List, Dict
from collections import defaultdict
from app.models.model_loader import model_loader
from app.services.text_processor import TextProcessor
from app.services.gradient_service import GradientService
from app.services.html_generator import HTMLGenerator
from app.schemas.requests import AnalysisRequest
from app.schemas.responses import (
AnalysisResponse, SentenceResult, WordScore,
ToxicWordSummary, Statistics, SentimentLabel
)
from app.core.config import settings
from app.core.exceptions import AnalysisException
class AnalysisService:
"""
Main analysis service
Responsibilities:
- Orchestrate analysis pipeline
- Coordinate between services
- Build response
Dependencies:
- TextProcessor: Text processing
- GradientService: Gradient computation
- HTMLGenerator: HTML generation
"""
def __init__(self):
self.text_processor = TextProcessor()
self.gradient_service = GradientService()
self.html_generator = HTMLGenerator()
def analyze(self, request: AnalysisRequest) -> AnalysisResponse:
"""
Analyze text for toxicity
Args:
request: Analysis request
Returns:
Analysis response
Raises:
AnalysisException: If analysis fails
"""
try:
# 1. Split into sentences
sentences = self.text_processor.split_into_sentences(request.text)
# 2. Analyze each sentence
sentence_results = []
for i, sent_info in enumerate(sentences, 1):
sent_result = self._analyze_sentence(
sent_info,
i,
request.include_word_scores
)
sentence_results.append(sent_result)
# 3. Generate statistics
statistics = self._compute_statistics(sentence_results)
# 4. Extract toxic words summary
toxic_words_summary = self._extract_toxic_words_summary(sentence_results)
# 5. Generate HTML if requested
html_highlighted = None
if request.include_html:
html_highlighted = self.html_generator.generate_highlighted_html(
request.text,
[self._convert_to_dict(r) for r in sentence_results]
)
# 6. Determine overall label
toxic_count = sum(1 for r in sentence_results if r.label == SentimentLabel.TOXIC)
overall_label = SentimentLabel.TOXIC if toxic_count > 0 else SentimentLabel.CLEAN
# 7. Build response
return AnalysisResponse(
success=True,
text=request.text,
overall_label=overall_label,
toxic_sentence_count=toxic_count,
clean_sentence_count=len(sentences) - toxic_count,
total_sentences=len(sentences),
sentences=sentence_results,
toxic_words_summary=toxic_words_summary,
statistics=statistics,
html_highlighted=html_highlighted
)
except Exception as e:
raise AnalysisException(detail=str(e))
def _analyze_sentence(
self,
sent_info: Dict[str, any],
sent_number: int,
include_word_scores: bool
) -> SentenceResult:
"""Analyze single sentence"""
sent_text = sent_info['text']
# Extract words
words = self.text_processor.extract_words(sent_text)
if len(words) == 0:
return SentenceResult(
sentence_number=sent_number,
text=sent_text,
label=SentimentLabel.CLEAN,
confidence=0.0,
threshold=0.6,
word_count=0,
word_scores=[] if include_word_scores else None
)
# Tokenize
encoding = model_loader.tokenizer(
sent_text.lower().strip(),
add_special_tokens=True,
max_length=settings.MAX_LENGTH,
padding='max_length',
truncation=True,
return_tensors='pt'
)
# Compute gradients
gradient_scores, predicted_class, confidence = self.gradient_service.compute_integrated_gradients(
model=model_loader.model,
input_ids=encoding['input_ids'],
attention_mask=encoding['attention_mask'],
device=model_loader.device
)
# Get tokens
tokens = model_loader.tokenizer.convert_ids_to_tokens(
encoding['input_ids'][0].cpu().numpy()
)
valid_length = encoding['attention_mask'][0].sum().item()
tokens = tokens[:valid_length]
# Normalize gradients
gradient_scores_norm = self.gradient_service.normalize_scores(gradient_scores)
# Map to words
word_scores = self._map_tokens_to_words(tokens, gradient_scores_norm, words)
# Determine toxicity
is_toxic = (predicted_class == 1)
label = SentimentLabel.TOXIC if is_toxic else SentimentLabel.CLEAN
# Compute threshold
threshold = self.gradient_service.compute_threshold(word_scores, is_toxic)
# Build word scores
word_score_objects = None
if include_word_scores:
word_score_objects = []
for word_info, score in zip(words, word_scores):
word_score_objects.append(WordScore(
word=word_info['word'],
score=float(score),
position={'start': word_info['start'], 'end': word_info['end']},
is_toxic=score > threshold and not self.text_processor.is_stop_word(word_info['word']),
is_stop_word=self.text_processor.is_stop_word(word_info['word'])
))
return SentenceResult(
sentence_number=sent_number,
text=sent_text,
label=label,
confidence=float(confidence),
threshold=float(threshold),
word_count=len(words),
word_scores=word_score_objects
)
def _map_tokens_to_words(
self,
tokens: List[str],
token_scores: np.ndarray,
original_words: List[Dict[str, any]]
) -> np.ndarray:
"""Map token scores to words"""
clean_tokens = []
clean_scores = []
for token, score in zip(tokens, token_scores):
if token not in ['<s>', '</s>', '<pad>', '<unk>']:
clean_token = token.replace('_', '').replace('@@', '').strip()
if clean_token and not self.text_processor.is_punctuation(clean_token):
clean_tokens.append(clean_token)
clean_scores.append(score)
word_scores = []
token_idx = 0
for word_info in original_words:
word = word_info['word'].lower()
matching_scores = []
temp_idx = token_idx
accumulated = ""
while temp_idx < len(clean_tokens):
accumulated += clean_tokens[temp_idx]
matching_scores.append(clean_scores[temp_idx])
if accumulated == word:
token_idx = temp_idx + 1
break
elif len(accumulated) >= len(word):
break
temp_idx += 1
word_scores.append(max(matching_scores) if matching_scores else 0.0)
return np.array(word_scores)
def _compute_statistics(self, sentence_results: List[SentenceResult]) -> Statistics:
"""Compute overall statistics"""
all_scores = []
toxic_words_count = 0
for sent_result in sentence_results:
if sent_result.word_scores:
for ws in sent_result.word_scores:
all_scores.append(ws.score)
if ws.is_toxic:
toxic_words_count += 1
if len(all_scores) == 0:
return Statistics(
total_words=0,
toxic_words=0,
mean_score=0.0,
median_score=0.0,
max_score=0.0,
min_score=0.0
)
all_scores = np.array(all_scores)
return Statistics(
total_words=len(all_scores),
toxic_words=toxic_words_count,
mean_score=float(np.mean(all_scores)),
median_score=float(np.median(all_scores)),
max_score=float(np.max(all_scores)),
min_score=float(np.min(all_scores))
)
def _extract_toxic_words_summary(
self,
sentence_results: List[SentenceResult]
) -> List[ToxicWordSummary]:
"""Extract summary of toxic words"""
toxic_words_dict = defaultdict(lambda: {
'max_score': 0.0,
'occurrences': 0,
'sentences': []
})
for sent_result in sentence_results:
if sent_result.word_scores:
for ws in sent_result.word_scores:
if ws.is_toxic:
word = ws.word
toxic_words_dict[word]['max_score'] = max(
toxic_words_dict[word]['max_score'],
ws.score
)
toxic_words_dict[word]['occurrences'] += 1
if sent_result.sentence_number not in toxic_words_dict[word]['sentences']:
toxic_words_dict[word]['sentences'].append(sent_result.sentence_number)
# Convert to list and sort by score
toxic_words_summary = [
ToxicWordSummary(
word=word,
score=data['max_score'],
occurrences=data['occurrences'],
sentences=sorted(data['sentences'])
)
for word, data in toxic_words_dict.items()
]
toxic_words_summary.sort(key=lambda x: x.score, reverse=True)
return toxic_words_summary
def _convert_to_dict(self, sent_result: SentenceResult) -> Dict[str, any]:
"""Convert SentenceResult to dict for HTML generator"""
return {
'sent_start': sent_result.word_scores[0].position['start'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else 0,
'sent_end': sent_result.word_scores[-1].position['end'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else len(sent_result.text),
'is_toxic': sent_result.label == SentimentLabel.TOXIC,
'words': [{'word': ws.word, 'start': ws.position['start'], 'end': ws.position['end']} for ws in sent_result.word_scores] if sent_result.word_scores else [],
'scores': [ws.score for ws in sent_result.word_scores] if sent_result.word_scores else [],
'threshold': sent_result.threshold
}
# Singleton instance
analysis_service = AnalysisService()