File size: 11,585 Bytes
ae4e2a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
Analysis Service
================
Main analysis orchestrator (Dependency Inversion + Open/Closed)
"""

import numpy as np
from typing import List, Dict
from collections import defaultdict

from app.models.model_loader import model_loader
from app.services.text_processor import TextProcessor
from app.services.gradient_service import GradientService
from app.services.html_generator import HTMLGenerator
from app.schemas.requests import AnalysisRequest
from app.schemas.responses import (
    AnalysisResponse, SentenceResult, WordScore,
    ToxicWordSummary, Statistics, SentimentLabel
)
from app.core.config import settings
from app.core.exceptions import AnalysisException


class AnalysisService:
    """
    Main analysis service
    
    Responsibilities:
    - Orchestrate analysis pipeline
    - Coordinate between services
    - Build response
    
    Dependencies:
    - TextProcessor: Text processing
    - GradientService: Gradient computation
    - HTMLGenerator: HTML generation
    """
    
    def __init__(self):
        self.text_processor = TextProcessor()
        self.gradient_service = GradientService()
        self.html_generator = HTMLGenerator()
    
    def analyze(self, request: AnalysisRequest) -> AnalysisResponse:
        """
        Analyze text for toxicity
        
        Args:
            request: Analysis request
            
        Returns:
            Analysis response
            
        Raises:
            AnalysisException: If analysis fails
        """
        try:
            # 1. Split into sentences
            sentences = self.text_processor.split_into_sentences(request.text)
            
            # 2. Analyze each sentence
            sentence_results = []
            for i, sent_info in enumerate(sentences, 1):
                sent_result = self._analyze_sentence(
                    sent_info,
                    i,
                    request.include_word_scores
                )
                sentence_results.append(sent_result)
            
            # 3. Generate statistics
            statistics = self._compute_statistics(sentence_results)
            
            # 4. Extract toxic words summary
            toxic_words_summary = self._extract_toxic_words_summary(sentence_results)
            
            # 5. Generate HTML if requested
            html_highlighted = None
            if request.include_html:
                html_highlighted = self.html_generator.generate_highlighted_html(
                    request.text,
                    [self._convert_to_dict(r) for r in sentence_results]
                )
            
            # 6. Determine overall label
            toxic_count = sum(1 for r in sentence_results if r.label == SentimentLabel.TOXIC)
            overall_label = SentimentLabel.TOXIC if toxic_count > 0 else SentimentLabel.CLEAN
            
            # 7. Build response
            return AnalysisResponse(
                success=True,
                text=request.text,
                overall_label=overall_label,
                toxic_sentence_count=toxic_count,
                clean_sentence_count=len(sentences) - toxic_count,
                total_sentences=len(sentences),
                sentences=sentence_results,
                toxic_words_summary=toxic_words_summary,
                statistics=statistics,
                html_highlighted=html_highlighted
            )
            
        except Exception as e:
            raise AnalysisException(detail=str(e))
    
    def _analyze_sentence(
        self,
        sent_info: Dict[str, any],
        sent_number: int,
        include_word_scores: bool
    ) -> SentenceResult:
        """Analyze single sentence"""
        sent_text = sent_info['text']
        
        # Extract words
        words = self.text_processor.extract_words(sent_text)
        
        if len(words) == 0:
            return SentenceResult(
                sentence_number=sent_number,
                text=sent_text,
                label=SentimentLabel.CLEAN,
                confidence=0.0,
                threshold=0.6,
                word_count=0,
                word_scores=[] if include_word_scores else None
            )
        
        # Tokenize
        encoding = model_loader.tokenizer(
            sent_text.lower().strip(),
            add_special_tokens=True,
            max_length=settings.MAX_LENGTH,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        # Compute gradients
        gradient_scores, predicted_class, confidence = self.gradient_service.compute_integrated_gradients(
            model=model_loader.model,
            input_ids=encoding['input_ids'],
            attention_mask=encoding['attention_mask'],
            device=model_loader.device
        )
        
        # Get tokens
        tokens = model_loader.tokenizer.convert_ids_to_tokens(
            encoding['input_ids'][0].cpu().numpy()
        )
        valid_length = encoding['attention_mask'][0].sum().item()
        tokens = tokens[:valid_length]
        
        # Normalize gradients
        gradient_scores_norm = self.gradient_service.normalize_scores(gradient_scores)
        
        # Map to words
        word_scores = self._map_tokens_to_words(tokens, gradient_scores_norm, words)
        
        # Determine toxicity
        is_toxic = (predicted_class == 1)
        label = SentimentLabel.TOXIC if is_toxic else SentimentLabel.CLEAN
        
        # Compute threshold
        threshold = self.gradient_service.compute_threshold(word_scores, is_toxic)
        
        # Build word scores
        word_score_objects = None
        if include_word_scores:
            word_score_objects = []
            for word_info, score in zip(words, word_scores):
                word_score_objects.append(WordScore(
                    word=word_info['word'],
                    score=float(score),
                    position={'start': word_info['start'], 'end': word_info['end']},
                    is_toxic=score > threshold and not self.text_processor.is_stop_word(word_info['word']),
                    is_stop_word=self.text_processor.is_stop_word(word_info['word'])
                ))
        
        return SentenceResult(
            sentence_number=sent_number,
            text=sent_text,
            label=label,
            confidence=float(confidence),
            threshold=float(threshold),
            word_count=len(words),
            word_scores=word_score_objects
        )
    
    def _map_tokens_to_words(
        self,
        tokens: List[str],
        token_scores: np.ndarray,
        original_words: List[Dict[str, any]]
    ) -> np.ndarray:
        """Map token scores to words"""
        clean_tokens = []
        clean_scores = []
        
        for token, score in zip(tokens, token_scores):
            if token not in ['<s>', '</s>', '<pad>', '<unk>']:
                clean_token = token.replace('_', '').replace('@@', '').strip()
                if clean_token and not self.text_processor.is_punctuation(clean_token):
                    clean_tokens.append(clean_token)
                    clean_scores.append(score)
        
        word_scores = []
        token_idx = 0
        
        for word_info in original_words:
            word = word_info['word'].lower()
            
            matching_scores = []
            temp_idx = token_idx
            accumulated = ""
            
            while temp_idx < len(clean_tokens):
                accumulated += clean_tokens[temp_idx]
                matching_scores.append(clean_scores[temp_idx])
                
                if accumulated == word:
                    token_idx = temp_idx + 1
                    break
                elif len(accumulated) >= len(word):
                    break
                
                temp_idx += 1
            
            word_scores.append(max(matching_scores) if matching_scores else 0.0)
        
        return np.array(word_scores)
    
    def _compute_statistics(self, sentence_results: List[SentenceResult]) -> Statistics:
        """Compute overall statistics"""
        all_scores = []
        toxic_words_count = 0
        
        for sent_result in sentence_results:
            if sent_result.word_scores:
                for ws in sent_result.word_scores:
                    all_scores.append(ws.score)
                    if ws.is_toxic:
                        toxic_words_count += 1
        
        if len(all_scores) == 0:
            return Statistics(
                total_words=0,
                toxic_words=0,
                mean_score=0.0,
                median_score=0.0,
                max_score=0.0,
                min_score=0.0
            )
        
        all_scores = np.array(all_scores)
        
        return Statistics(
            total_words=len(all_scores),
            toxic_words=toxic_words_count,
            mean_score=float(np.mean(all_scores)),
            median_score=float(np.median(all_scores)),
            max_score=float(np.max(all_scores)),
            min_score=float(np.min(all_scores))
        )
    
    def _extract_toxic_words_summary(
        self,
        sentence_results: List[SentenceResult]
    ) -> List[ToxicWordSummary]:
        """Extract summary of toxic words"""
        toxic_words_dict = defaultdict(lambda: {
            'max_score': 0.0,
            'occurrences': 0,
            'sentences': []
        })
        
        for sent_result in sentence_results:
            if sent_result.word_scores:
                for ws in sent_result.word_scores:
                    if ws.is_toxic:
                        word = ws.word
                        toxic_words_dict[word]['max_score'] = max(
                            toxic_words_dict[word]['max_score'],
                            ws.score
                        )
                        toxic_words_dict[word]['occurrences'] += 1
                        if sent_result.sentence_number not in toxic_words_dict[word]['sentences']:
                            toxic_words_dict[word]['sentences'].append(sent_result.sentence_number)
        
        # Convert to list and sort by score
        toxic_words_summary = [
            ToxicWordSummary(
                word=word,
                score=data['max_score'],
                occurrences=data['occurrences'],
                sentences=sorted(data['sentences'])
            )
            for word, data in toxic_words_dict.items()
        ]
        
        toxic_words_summary.sort(key=lambda x: x.score, reverse=True)
        
        return toxic_words_summary
    
    def _convert_to_dict(self, sent_result: SentenceResult) -> Dict[str, any]:
        """Convert SentenceResult to dict for HTML generator"""
        return {
            'sent_start': sent_result.word_scores[0].position['start'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else 0,
            'sent_end': sent_result.word_scores[-1].position['end'] if sent_result.word_scores and len(sent_result.word_scores) > 0 else len(sent_result.text),
            'is_toxic': sent_result.label == SentimentLabel.TOXIC,
            'words': [{'word': ws.word, 'start': ws.position['start'], 'end': ws.position['end']} for ws in sent_result.word_scores] if sent_result.word_scores else [],
            'scores': [ws.score for ws in sent_result.word_scores] if sent_result.word_scores else [],
            'threshold': sent_result.threshold
        }


# Singleton instance
analysis_service = AnalysisService()