""" Post-processing of Anthropic responses and confidence scoring. This module handles the post-processing of raw Anthropic API responses, including duplicate detection, confidence scoring, and result validation. """ import json import re from typing import List, Dict, Any, Optional, Tuple, Set from dataclasses import dataclass from datetime import datetime import difflib from models.input import TranscriptSentence from models.output import TopicDetail, TopicCategory from config.logging import get_logger logger = get_logger(__name__) @dataclass class ProcessingStats: """Statistics from response processing.""" raw_topics_count: int processed_topics_count: int duplicates_removed: int invalid_topics_filtered: int confidence_adjustments: int processing_time: float @dataclass class DuplicateGroup: """Group of duplicate or similar topics.""" primary_topic: Dict[str, Any] duplicate_topics: List[Dict[str, Any]] similarity_scores: List[float] merge_reason: str class ResponseProcessor: """ Post-processor for Anthropic API responses. Handles validation, duplicate detection, confidence scoring, and result optimization for topic extraction responses. """ # Confidence scoring weights CONFIDENCE_FACTORS = { "text_clarity": 0.25, # How clear and specific the topic text is "speaker_consistency": 0.20, # Consistency of speaker attribution "time_coherence": 0.20, # Logical time boundaries "category_fit": 0.15, # How well topic fits its category "evidence_strength": 0.20 # Strength of supporting evidence } # Similarity thresholds for duplicate detection SIMILARITY_THRESHOLDS = { "name_similarity": 0.75, # Topic name similarity "content_similarity": 0.70, # Content similarity "time_overlap": 0.60, # Time overlap ratio "speaker_overlap": 0.80 # Speaker overlap ratio } # Category-specific keywords for validation CATEGORY_KEYWORDS = { TopicCategory.CLIENT_NEEDS_B2B: [ "business", "enterprise", "organization", "company", "workflow", "process", "efficiency", "productivity", "requirements", "needs" ], TopicCategory.CLIENT_NEEDS_B2C: [ "customer", "user", "personal", "individual", "consumer", "experience", "satisfaction", "preferences", "lifestyle" ], TopicCategory.CUSTOMER_FEEDBACK: [ "feedback", "opinion", "review", "rating", "satisfaction", "experience", "suggestion", "complaint", "praise" ], TopicCategory.EMPLOYEE_FEEDBACK: [ "employee", "staff", "team", "internal", "workplace", "culture", "management", "training", "development" ], TopicCategory.SOLUTION_BARRIERS: [ "barrier", "obstacle", "challenge", "difficulty", "problem", "issue", "limitation", "constraint", "blocker" ], TopicCategory.SOLUTION_BENEFITS: [ "benefit", "advantage", "value", "improvement", "gain", "positive", "success", "outcome", "ROI" ], TopicCategory.AHA_MOMENTS: [ "insight", "realization", "breakthrough", "understanding", "clarity", "discovery", "revelation", "lightbulb" ], TopicCategory.COMPANY_INFO: [ "company", "organization", "revenue", "size", "industry", "market", "growth", "strategy", "business model" ], TopicCategory.TECHNICAL_REQUIREMENTS: [ "technical", "technology", "system", "integration", "API", "platform", "infrastructure", "security", "performance" ], TopicCategory.ADDITIONAL_COMMENTS: [ "additional", "other", "miscellaneous", "general", "comment", "note", "observation", "remark" ] } def __init__(self): """Initialize the response processor.""" self.logger = get_logger(f"{__name__}.{self.__class__.__name__}") def process_response( self, raw_response: str, chunk_sentences: List[TranscriptSentence], context: Optional[Dict[str, Any]] = None ) -> Tuple[List[Dict[str, Any]], ProcessingStats]: """ Process raw Anthropic response into validated topics. Args: raw_response: Raw response from Anthropic API chunk_sentences: Original transcript sentences for validation context: Additional context for processing Returns: Tuple of (processed topics, processing statistics) """ start_time = datetime.now() try: # Step 1: Parse JSON response raw_topics = self._parse_json_response(raw_response) # Step 2: Validate and clean topics valid_topics = self._validate_topics(raw_topics, chunk_sentences) # Step 3: Detect and remove duplicates deduplicated_topics = self._remove_duplicates(valid_topics) # Step 4: Adjust confidence scores scored_topics = self._adjust_confidence_scores( deduplicated_topics, chunk_sentences, context ) # Step 5: Final validation and sorting final_topics = self._finalize_topics(scored_topics) # Create processing statistics processing_time = (datetime.now() - start_time).total_seconds() stats = ProcessingStats( raw_topics_count=len(raw_topics), processed_topics_count=len(final_topics), duplicates_removed=len(valid_topics) - len(deduplicated_topics), invalid_topics_filtered=len(raw_topics) - len(valid_topics), confidence_adjustments=len([t for t in scored_topics if t.get("_confidence_adjusted")]), processing_time=processing_time ) self.logger.debug( f"Response processed: {len(raw_topics)} -> {len(final_topics)} topics " f"({stats.duplicates_removed} duplicates, {stats.invalid_topics_filtered} invalid)" ) return final_topics, stats except Exception as e: self.logger.error(f"Response processing failed: {str(e)}") processing_time = (datetime.now() - start_time).total_seconds() # Return empty result with error stats error_stats = ProcessingStats( raw_topics_count=0, processed_topics_count=0, duplicates_removed=0, invalid_topics_filtered=0, confidence_adjustments=0, processing_time=processing_time ) return [], error_stats def _parse_json_response(self, raw_response: str) -> List[Dict[str, Any]]: """Parse JSON from raw response.""" try: # Try direct JSON parsing if raw_response.strip().startswith('['): return json.loads(raw_response.strip()) # Extract JSON array from text json_match = re.search(r'\[.*\]', raw_response, re.DOTALL) if json_match: json_str = json_match.group(0) return json.loads(json_str) # Try to find JSON objects and create array json_objects = re.findall(r'\{[^{}]*\}', raw_response, re.DOTALL) if json_objects: topics = [] for obj_str in json_objects: try: topics.append(json.loads(obj_str)) except json.JSONDecodeError: continue return topics self.logger.warning("No valid JSON found in response") return [] except json.JSONDecodeError as e: self.logger.warning(f"JSON parsing failed: {str(e)}") return [] def _validate_topics( self, raw_topics: List[Dict[str, Any]], chunk_sentences: List[TranscriptSentence] ) -> List[Dict[str, Any]]: """Validate and clean topic data.""" valid_topics = [] for i, topic in enumerate(raw_topics): try: # Check required fields required_fields = [ "topic_name", "topic_type", "topic_detail", "start_sentence_index", "end_sentence_index", "primary_speaker", "confidence_score" ] missing_fields = [field for field in required_fields if field not in topic] if missing_fields: self.logger.warning(f"Topic {i} missing fields: {missing_fields}") continue # Validate sentence indices start_idx = topic["start_sentence_index"] end_idx = topic["end_sentence_index"] if not isinstance(start_idx, int) or not isinstance(end_idx, int): self.logger.warning(f"Topic {i} has invalid sentence indices") continue if start_idx < 1 or end_idx < start_idx: self.logger.warning(f"Topic {i} has invalid sentence range: {start_idx}-{end_idx}") continue # Find matching sentences matching_sentences = [ s for s in chunk_sentences if start_idx <= s.sentence_index <= end_idx ] if not matching_sentences: self.logger.warning(f"Topic {i} has no matching sentences") continue # Validate topic category try: TopicCategory(topic["topic_type"]) except ValueError: self.logger.warning(f"Topic {i} has invalid category: {topic['topic_type']}") topic["topic_type"] = TopicCategory.GENERAL.value # Validate confidence score confidence = topic["confidence_score"] if not isinstance(confidence, (int, float)) or not (0.0 <= confidence <= 1.0): self.logger.warning(f"Topic {i} has invalid confidence: {confidence}") topic["confidence_score"] = 0.5 # Add timing information topic["start_time"] = matching_sentences[0].start_time topic["end_time"] = matching_sentences[-1].end_time # Ensure required list fields if "all_speakers" not in topic or not isinstance(topic["all_speakers"], list): topic["all_speakers"] = [topic["primary_speaker"]] if "key_phrases" not in topic or not isinstance(topic["key_phrases"], list): topic["key_phrases"] = [] if "actionable_insights" not in topic or not isinstance(topic["actionable_insights"], list): topic["actionable_insights"] = [] # Validate text lengths if len(topic["topic_name"]) > 200: topic["topic_name"] = topic["topic_name"][:197] + "..." if len(topic["topic_detail"]) > 1000: topic["topic_detail"] = topic["topic_detail"][:997] + "..." valid_topics.append(topic) except Exception as e: self.logger.warning(f"Error validating topic {i}: {str(e)}") continue return valid_topics def _remove_duplicates(self, topics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove duplicate and highly similar topics.""" if len(topics) <= 1: return topics # Find duplicate groups duplicate_groups = self._find_duplicate_groups(topics) # Keep only the best topic from each group deduplicated_topics = [] used_indices = set() for group in duplicate_groups: # Choose the best topic (highest confidence, most complete) best_topic = self._select_best_topic(group) # Mark all topics in group as used for topic in group.duplicate_topics + [group.primary_topic]: for i, original_topic in enumerate(topics): if self._topics_equal(topic, original_topic): used_indices.add(i) break deduplicated_topics.append(best_topic) # Add non-duplicate topics for i, topic in enumerate(topics): if i not in used_indices: deduplicated_topics.append(topic) return deduplicated_topics def _find_duplicate_groups(self, topics: List[Dict[str, Any]]) -> List[DuplicateGroup]: """Find groups of duplicate or similar topics.""" duplicate_groups = [] processed_indices = set() for i, topic in enumerate(topics): if i in processed_indices: continue # Find similar topics similar_topics = [] similarity_scores = [] for j, other_topic in enumerate(topics[i + 1:], i + 1): if j in processed_indices: continue similarity_score = self._calculate_topic_similarity(topic, other_topic) if similarity_score > 0.7: # High similarity threshold similar_topics.append(other_topic) similarity_scores.append(similarity_score) processed_indices.add(j) if similar_topics: group = DuplicateGroup( primary_topic=topic, duplicate_topics=similar_topics, similarity_scores=similarity_scores, merge_reason=f"High similarity (avg: {sum(similarity_scores)/len(similarity_scores):.2f})" ) duplicate_groups.append(group) processed_indices.add(i) return duplicate_groups def _calculate_topic_similarity(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: """Calculate similarity score between two topics.""" similarity_scores = [] # Name similarity name_sim = self._text_similarity( topic1["topic_name"].lower(), topic2["topic_name"].lower() ) similarity_scores.append(name_sim * 0.4) # Content similarity content_sim = self._text_similarity( topic1["topic_detail"].lower(), topic2["topic_detail"].lower() ) similarity_scores.append(content_sim * 0.3) # Time overlap time_overlap = self._calculate_time_overlap(topic1, topic2) similarity_scores.append(time_overlap * 0.2) # Speaker overlap speaker_overlap = self._calculate_speaker_overlap(topic1, topic2) similarity_scores.append(speaker_overlap * 0.1) return sum(similarity_scores) def _text_similarity(self, text1: str, text2: str) -> float: """Calculate text similarity using sequence matching.""" if not text1 or not text2: return 0.0 # Use difflib for sequence similarity similarity = difflib.SequenceMatcher(None, text1, text2).ratio() # Also check word overlap words1 = set(text1.split()) words2 = set(text2.split()) if words1 and words2: word_overlap = len(words1.intersection(words2)) / len(words1.union(words2)) similarity = max(similarity, word_overlap) return similarity def _calculate_time_overlap(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: """Calculate time overlap between two topics.""" if "start_time" not in topic1 or "start_time" not in topic2: return 0.0 start1, end1 = topic1["start_time"], topic1["end_time"] start2, end2 = topic2["start_time"], topic2["end_time"] # Calculate overlap overlap_start = max(start1, start2) overlap_end = min(end1, end2) overlap_duration = max(0, overlap_end - overlap_start) # Calculate total duration total_duration = max(end1, end2) - min(start1, start2) return overlap_duration / total_duration if total_duration > 0 else 0.0 def _calculate_speaker_overlap(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: """Calculate speaker overlap between two topics.""" speakers1 = set(topic1.get("all_speakers", [topic1["primary_speaker"]])) speakers2 = set(topic2.get("all_speakers", [topic2["primary_speaker"]])) if not speakers1 or not speakers2: return 0.0 intersection = speakers1.intersection(speakers2) union = speakers1.union(speakers2) return len(intersection) / len(union) def _select_best_topic(self, group: DuplicateGroup) -> Dict[str, Any]: """Select the best topic from a duplicate group.""" all_topics = [group.primary_topic] + group.duplicate_topics # Score topics based on multiple factors topic_scores = [] for topic in all_topics: score = 0.0 # Confidence score weight score += topic["confidence_score"] * 0.4 # Completeness weight (more fields filled) completeness = sum([ 1 if topic.get("key_phrases") else 0, 1 if topic.get("actionable_insights") else 0, 1 if len(topic.get("topic_detail", "")) > 50 else 0, 1 if len(topic.get("all_speakers", [])) > 1 else 0 ]) / 4 score += completeness * 0.3 # Length and detail weight detail_quality = min(len(topic.get("topic_detail", "")) / 200, 1.0) score += detail_quality * 0.2 # Time span weight (longer topics might be more comprehensive) if "start_time" in topic and "end_time" in topic: duration = topic["end_time"] - topic["start_time"] duration_score = min(duration / 60, 1.0) # Normalize to 1 minute score += duration_score * 0.1 topic_scores.append(score) # Return topic with highest score best_index = topic_scores.index(max(topic_scores)) return all_topics[best_index] def _topics_equal(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> bool: """Check if two topics are exactly equal.""" key_fields = ["topic_name", "start_sentence_index", "end_sentence_index", "primary_speaker"] return all(topic1.get(field) == topic2.get(field) for field in key_fields) def _adjust_confidence_scores( self, topics: List[Dict[str, Any]], chunk_sentences: List[TranscriptSentence], context: Optional[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Adjust confidence scores based on various factors.""" adjusted_topics = [] for topic in topics: original_confidence = topic["confidence_score"] # Calculate adjustment factors factors = self._calculate_confidence_factors(topic, chunk_sentences, context) # Apply weighted adjustments adjustment = sum( factors[factor] * weight for factor, weight in self.CONFIDENCE_FACTORS.items() if factor in factors ) # Adjust confidence (keep within bounds) new_confidence = max(0.0, min(1.0, original_confidence + adjustment)) # Mark if significantly adjusted if abs(new_confidence - original_confidence) > 0.1: topic["_confidence_adjusted"] = True topic["_original_confidence"] = original_confidence topic["confidence_score"] = new_confidence adjusted_topics.append(topic) return adjusted_topics def _calculate_confidence_factors( self, topic: Dict[str, Any], chunk_sentences: List[TranscriptSentence], context: Optional[Dict[str, Any]] ) -> Dict[str, float]: """Calculate confidence adjustment factors.""" factors = {} # Text clarity factor factors["text_clarity"] = self._assess_text_clarity(topic) # Speaker consistency factor factors["speaker_consistency"] = self._assess_speaker_consistency(topic, chunk_sentences) # Time coherence factor factors["time_coherence"] = self._assess_time_coherence(topic, chunk_sentences) # Category fit factor factors["category_fit"] = self._assess_category_fit(topic) # Evidence strength factor factors["evidence_strength"] = self._assess_evidence_strength(topic) return factors def _assess_text_clarity(self, topic: Dict[str, Any]) -> float: """Assess text clarity and specificity.""" score = 0.0 # Check topic name clarity name = topic["topic_name"] if len(name) > 10 and not any(word in name.lower() for word in ["general", "misc", "other"]): score += 0.3 # Check detail specificity detail = topic["topic_detail"] if len(detail) > 50: score += 0.3 # Check for specific keywords if topic.get("key_phrases"): score += 0.2 # Check for actionable insights if topic.get("actionable_insights"): score += 0.2 return min(score - 0.5, 0.5) # Adjustment range: -0.5 to +0.5 def _assess_speaker_consistency( self, topic: Dict[str, Any], chunk_sentences: List[TranscriptSentence] ) -> float: """Assess speaker attribution consistency.""" try: start_idx = topic["start_sentence_index"] end_idx = topic["end_sentence_index"] # Find sentences in topic range topic_sentences = [ s for s in chunk_sentences if start_idx <= s.sentence_index <= end_idx ] if not topic_sentences: return -0.3 # Check if primary speaker is actually present primary_speaker = topic["primary_speaker"] primary_sentences = [s for s in topic_sentences if s.speaker == primary_speaker] if not primary_sentences: return -0.4 # Check speaker distribution primary_ratio = len(primary_sentences) / len(topic_sentences) if primary_ratio > 0.6: return 0.2 elif primary_ratio > 0.3: return 0.0 else: return -0.2 except Exception: return -0.3 def _assess_time_coherence( self, topic: Dict[str, Any], chunk_sentences: List[TranscriptSentence] ) -> float: """Assess time boundary coherence.""" try: if "start_time" not in topic or "end_time" not in topic: return -0.2 duration = topic["end_time"] - topic["start_time"] # Very short topics might be incomplete if duration < 5: return -0.2 # Very long topics might be too broad if duration > 300: # 5 minutes return -0.1 # Reasonable duration return 0.1 except Exception: return -0.2 def _assess_category_fit(self, topic: Dict[str, Any]) -> float: """Assess how well topic fits its assigned category.""" try: category = TopicCategory(topic["topic_type"]) keywords = self.CATEGORY_KEYWORDS.get(category, []) if not keywords: return 0.0 # Check if topic text contains category-relevant keywords text_to_check = (topic["topic_name"] + " " + topic["topic_detail"]).lower() matching_keywords = sum(1 for keyword in keywords if keyword in text_to_check) keyword_ratio = matching_keywords / len(keywords) if keyword_ratio > 0.3: return 0.2 elif keyword_ratio > 0.1: return 0.1 else: return -0.1 except Exception: return -0.1 def _assess_evidence_strength(self, topic: Dict[str, Any]) -> float: """Assess strength of supporting evidence.""" score = 0.0 # Check for specific phrases if topic.get("key_phrases") and len(topic["key_phrases"]) > 2: score += 0.2 # Check for actionable insights if topic.get("actionable_insights") and len(topic["actionable_insights"]) > 1: score += 0.2 # Check detail length and specificity detail = topic.get("topic_detail", "") if len(detail) > 100 and any(word in detail.lower() for word in ["specific", "exactly", "precisely", "clearly"]): score += 0.1 return min(score - 0.25, 0.25) # Adjustment range: -0.25 to +0.25 def _finalize_topics(self, topics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Final validation and sorting of topics.""" # Remove any remaining invalid topics valid_topics = [ topic for topic in topics if topic.get("topic_name") and topic.get("confidence_score", 0) > 0.1 ] # Sort by start time valid_topics.sort(key=lambda t: t.get("start_time", 0)) # Clean up temporary fields for topic in valid_topics: topic.pop("_confidence_adjusted", None) topic.pop("_original_confidence", None) return valid_topics