Spaces:
Running
Running
File size: 11,585 Bytes
ae4e2a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
"""
Analysis Service
================
Main analysis orchestrator (Dependency Inversion + Open/Closed)
"""
import numpy as np
from typing import List, Dict
from collections import defaultdict
from app.models.model_loader import model_loader
from app.services.text_processor import TextProcessor
from app.services.gradient_service import GradientService
from app.services.html_generator import HTMLGenerator
from app.schemas.requests import AnalysisRequest
from app.schemas.responses import (
AnalysisResponse, SentenceResult, WordScore,
ToxicWordSummary, Statistics, SentimentLabel
)
from app.core.config import settings
from app.core.exceptions import AnalysisException
class AnalysisService:
"""
Main analysis service
Responsibilities:
- Orchestrate analysis pipeline
- Coordinate between services
- Build response
Dependencies:
- TextProcessor: Text processing
- GradientService: Gradient computation
- HTMLGenerator: HTML generation
"""
def __init__(self):
self.text_processor = TextProcessor()
self.gradient_service = GradientService()
self.html_generator = HTMLGenerator()
def analyze(self, request: AnalysisRequest) -> AnalysisResponse:
"""
Analyze text for toxicity
Args:
request: Analysis request
Returns:
Analysis response
Raises:
AnalysisException: If analysis fails
"""
try:
# 1. Split into sentences
sentences = self.text_processor.split_into_sentences(request.text)
# 2. Analyze each sentence
sentence_results = []
for i, sent_info in enumerate(sentences, 1):
sent_result = self._analyze_sentence(
sent_info,
i,
request.include_word_scores
)
sentence_results.append(sent_result)
# 3. Generate statistics
statistics = self._compute_statistics(sentence_results)
# 4. Extract toxic words summary
toxic_words_summary = self._extract_toxic_words_summary(sentence_results)
# 5. Generate HTML if requested
html_highlighted = None
if request.include_html:
html_highlighted = self.html_generator.generate_highlighted_html(
request.text,
[self._convert_to_dict(r) for r in sentence_results]
)
# 6. Determine overall label
toxic_count = sum(1 for r in sentence_results if r.label == SentimentLabel.TOXIC)
overall_label = SentimentLabel.TOXIC if toxic_count > 0 else SentimentLabel.CLEAN
# 7. Build response
return AnalysisResponse(
success=True,
text=request.text,
overall_label=overall_label,
toxic_sentence_count=toxic_count,
clean_sentence_count=len(sentences) - toxic_count,
total_sentences=len(sentences),
sentences=sentence_results,
toxic_words_summary=toxic_words_summary,
statistics=statistics,
html_highlighted=html_highlighted
)
except Exception as e:
raise AnalysisException(detail=str(e))
def _analyze_sentence(
self,
sent_info: Dict[str, any],
sent_number: int,
include_word_scores: bool
) -> SentenceResult:
"""Analyze single sentence"""
sent_text = sent_info['text']
# Extract words
words = self.text_processor.extract_words(sent_text)
if len(words) == 0:
return SentenceResult(
sentence_number=sent_number,
text=sent_text,
label=SentimentLabel.CLEAN,
confidence=0.0,
threshold=0.6,
word_count=0,
word_scores=[] if include_word_scores else None
)
# Tokenize
encoding = model_loader.tokenizer(
sent_text.lower().strip(),
add_special_tokens=True,
max_length=settings.MAX_LENGTH,
padding='max_length',
truncation=True,
return_tensors='pt'
)
# Compute gradients
gradient_scores, predicted_class, confidence = self.gradient_service.compute_integrated_gradients(
model=model_loader.model,
input_ids=encoding['input_ids'],
attention_mask=encoding['attention_mask'],
device=model_loader.device
)
# Get tokens
tokens = model_loader.tokenizer.convert_ids_to_tokens(
encoding['input_ids'][0].cpu().numpy()
)
valid_length = encoding['attention_mask'][0].sum().item()
tokens = tokens[:valid_length]
# Normalize gradients
gradient_scores_norm = self.gradient_service.normalize_scores(gradient_scores)
# Map to words
word_scores = self._map_tokens_to_words(tokens, gradient_scores_norm, words)
# Determine toxicity
is_toxic = (predicted_class == 1)
label = SentimentLabel.TOXIC if is_toxic else SentimentLabel.CLEAN
# Compute threshold
threshold = self.gradient_service.compute_threshold(word_scores, is_toxic)
# Build word scores
word_score_objects = None
if include_word_scores:
word_score_objects = []
for word_info, score in zip(words, word_scores):
word_score_objects.append(WordScore(
word=word_info['word'],
score=float(score),
position={'start': word_info['start'], 'end': word_info['end']},
is_toxic=score > threshold and not self.text_processor.is_stop_word(word_info['word']),
is_stop_word=self.text_processor.is_stop_word(word_info['word'])
))
return SentenceResult(
sentence_number=sent_number,
text=sent_text,
label=label,
confidence=float(confidence),
threshold=float(threshold),
word_count=len(words),
word_scores=word_score_objects
)
def _map_tokens_to_words(
self,
tokens: List[str],
token_scores: np.ndarray,
original_words: List[Dict[str, any]]
) -> np.ndarray:
"""Map token scores to words"""
clean_tokens = []
clean_scores = []
for token, score in zip(tokens, token_scores):
if token not in ['<s>', '</s>', '<pad>', '<unk>']:
clean_token = token.replace('_', '').replace('@@', '').strip()
if clean_token and not self.text_processor.is_punctuation(clean_token):
clean_tokens.append(clean_token)
clean_scores.append(score)
word_scores = []
token_idx = 0
for word_info in original_words:
word = word_info['word'].lower()
matching_scores = []
temp_idx = token_idx
accumulated = ""
while temp_idx < len(clean_tokens):
accumulated += clean_tokens[temp_idx]
matching_scores.append(clean_scores[temp_idx])
if accumulated == word:
token_idx = temp_idx + 1
break
elif len(accumulated) >= len(word):
break
temp_idx += 1
word_scores.append(max(matching_scores) if matching_scores else 0.0)
return np.array(word_scores)
def _compute_statistics(self, sentence_results: List[SentenceResult]) -> Statistics:
"""Compute overall statistics"""
all_scores = []
toxic_words_count = 0
for sent_result in sentence_results:
if sent_result.word_scores:
for ws in sent_result.word_scores:
all_scores.append(ws.score)
if ws.is_toxic:
toxic_words_count += 1
if len(all_scores) == 0:
return Statistics(
total_words=0,
toxic_words=0,
mean_score=0.0,
median_score=0.0,
max_score=0.0,
min_score=0.0
)
all_scores = np.array(all_scores)
return Statistics(
total_words=len(all_scores),
toxic_words=toxic_words_count,
mean_score=float(np.mean(all_scores)),
median_score=float(np.median(all_scores)),
max_score=float(np.max(all_scores)),
min_score=float(np.min(all_scores))
)
def _extract_toxic_words_summary(
self,
sentence_results: List[SentenceResult]
) -> List[ToxicWordSummary]:
"""Extract summary of toxic words"""
toxic_words_dict = defaultdict(lambda: {
'max_score': 0.0,
'occurrences': 0,
'sentences': []
})
for sent_result in sentence_results:
if sent_result.word_scores:
for ws in sent_result.word_scores:
if ws.is_toxic:
word = ws.word
toxic_words_dict[word]['max_score'] = max(
toxic_words_dict[word]['max_score'],
ws.score
)
toxic_words_dict[word]['occurrences'] += 1
if sent_result.sentence_number not in toxic_words_dict[word]['sentences']:
toxic_words_dict[word]['sentences'].append(sent_result.sentence_number)
# Convert to list and sort by score
toxic_words_summary = [
ToxicWordSummary(
word=word,
score=data['max_score'],
occurrences=data['occurrences'],
sentences=sorted(data['sentences'])
)
for word, data in toxic_words_dict.items()
]
toxic_words_summary.sort(key=lambda x: x.score, reverse=True)
return toxic_words_summary
def _convert_to_dict(self, sent_result: SentenceResult) -> Dict[str, any]:
"""Convert SentenceResult to dict for HTML generator"""
return {
'sent_start': sent_result.word_scores[0].position['start'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else 0,
'sent_end': sent_result.word_scores[-1].position['end'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else len(sent_result.text),
'is_toxic': sent_result.label == SentimentLabel.TOXIC,
'words': [{'word': ws.word, 'start': ws.position['start'], 'end': ws.position['end']} for ws in sent_result.word_scores] if sent_result.word_scores else [],
'scores': [ws.score for ws in sent_result.word_scores] if sent_result.word_scores else [],
'threshold': sent_result.threshold
}
# Singleton instance
analysis_service = AnalysisService()
|