Spaces:
Sleeping
Sleeping
| """ | |
| Post-processing of Anthropic responses and confidence scoring. | |
| This module handles the post-processing of raw Anthropic API responses, | |
| including duplicate detection, confidence scoring, and result validation. | |
| """ | |
| import json | |
| import re | |
| from typing import List, Dict, Any, Optional, Tuple, Set | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import difflib | |
| from models.input import TranscriptSentence | |
| from models.output import TopicDetail, TopicCategory | |
| from config.logging import get_logger | |
| logger = get_logger(__name__) | |
| class ProcessingStats: | |
| """Statistics from response processing.""" | |
| raw_topics_count: int | |
| processed_topics_count: int | |
| duplicates_removed: int | |
| invalid_topics_filtered: int | |
| confidence_adjustments: int | |
| processing_time: float | |
| class DuplicateGroup: | |
| """Group of duplicate or similar topics.""" | |
| primary_topic: Dict[str, Any] | |
| duplicate_topics: List[Dict[str, Any]] | |
| similarity_scores: List[float] | |
| merge_reason: str | |
| class ResponseProcessor: | |
| """ | |
| Post-processor for Anthropic API responses. | |
| Handles validation, duplicate detection, confidence scoring, | |
| and result optimization for topic extraction responses. | |
| """ | |
| # Confidence scoring weights | |
| CONFIDENCE_FACTORS = { | |
| "text_clarity": 0.25, # How clear and specific the topic text is | |
| "speaker_consistency": 0.20, # Consistency of speaker attribution | |
| "time_coherence": 0.20, # Logical time boundaries | |
| "category_fit": 0.15, # How well topic fits its category | |
| "evidence_strength": 0.20 # Strength of supporting evidence | |
| } | |
| # Similarity thresholds for duplicate detection | |
| SIMILARITY_THRESHOLDS = { | |
| "name_similarity": 0.75, # Topic name similarity | |
| "content_similarity": 0.70, # Content similarity | |
| "time_overlap": 0.60, # Time overlap ratio | |
| "speaker_overlap": 0.80 # Speaker overlap ratio | |
| } | |
| # Category-specific keywords for validation | |
| CATEGORY_KEYWORDS = { | |
| TopicCategory.CLIENT_NEEDS_B2B: [ | |
| "business", "enterprise", "organization", "company", "workflow", | |
| "process", "efficiency", "productivity", "requirements", "needs" | |
| ], | |
| TopicCategory.CLIENT_NEEDS_B2C: [ | |
| "customer", "user", "personal", "individual", "consumer", | |
| "experience", "satisfaction", "preferences", "lifestyle" | |
| ], | |
| TopicCategory.CUSTOMER_FEEDBACK: [ | |
| "feedback", "opinion", "review", "rating", "satisfaction", | |
| "experience", "suggestion", "complaint", "praise" | |
| ], | |
| TopicCategory.EMPLOYEE_FEEDBACK: [ | |
| "employee", "staff", "team", "internal", "workplace", | |
| "culture", "management", "training", "development" | |
| ], | |
| TopicCategory.SOLUTION_BARRIERS: [ | |
| "barrier", "obstacle", "challenge", "difficulty", "problem", | |
| "issue", "limitation", "constraint", "blocker" | |
| ], | |
| TopicCategory.SOLUTION_BENEFITS: [ | |
| "benefit", "advantage", "value", "improvement", "gain", | |
| "positive", "success", "outcome", "ROI" | |
| ], | |
| TopicCategory.AHA_MOMENTS: [ | |
| "insight", "realization", "breakthrough", "understanding", | |
| "clarity", "discovery", "revelation", "lightbulb" | |
| ], | |
| TopicCategory.COMPANY_INFO: [ | |
| "company", "organization", "revenue", "size", "industry", | |
| "market", "growth", "strategy", "business model" | |
| ], | |
| TopicCategory.TECHNICAL_REQUIREMENTS: [ | |
| "technical", "technology", "system", "integration", "API", | |
| "platform", "infrastructure", "security", "performance" | |
| ], | |
| TopicCategory.ADDITIONAL_COMMENTS: [ | |
| "additional", "other", "miscellaneous", "general", "comment", | |
| "note", "observation", "remark" | |
| ] | |
| } | |
| def __init__(self): | |
| """Initialize the response processor.""" | |
| self.logger = get_logger(f"{__name__}.{self.__class__.__name__}") | |
| def process_response( | |
| self, | |
| raw_response: str, | |
| chunk_sentences: List[TranscriptSentence], | |
| context: Optional[Dict[str, Any]] = None | |
| ) -> Tuple[List[Dict[str, Any]], ProcessingStats]: | |
| """ | |
| Process raw Anthropic response into validated topics. | |
| Args: | |
| raw_response: Raw response from Anthropic API | |
| chunk_sentences: Original transcript sentences for validation | |
| context: Additional context for processing | |
| Returns: | |
| Tuple of (processed topics, processing statistics) | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| # Step 1: Parse JSON response | |
| raw_topics = self._parse_json_response(raw_response) | |
| # Step 2: Validate and clean topics | |
| valid_topics = self._validate_topics(raw_topics, chunk_sentences) | |
| # Step 3: Detect and remove duplicates | |
| deduplicated_topics = self._remove_duplicates(valid_topics) | |
| # Step 4: Adjust confidence scores | |
| scored_topics = self._adjust_confidence_scores( | |
| deduplicated_topics, chunk_sentences, context | |
| ) | |
| # Step 5: Final validation and sorting | |
| final_topics = self._finalize_topics(scored_topics) | |
| # Create processing statistics | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| stats = ProcessingStats( | |
| raw_topics_count=len(raw_topics), | |
| processed_topics_count=len(final_topics), | |
| duplicates_removed=len(valid_topics) - len(deduplicated_topics), | |
| invalid_topics_filtered=len(raw_topics) - len(valid_topics), | |
| confidence_adjustments=len([t for t in scored_topics if t.get("_confidence_adjusted")]), | |
| processing_time=processing_time | |
| ) | |
| self.logger.debug( | |
| f"Response processed: {len(raw_topics)} -> {len(final_topics)} topics " | |
| f"({stats.duplicates_removed} duplicates, {stats.invalid_topics_filtered} invalid)" | |
| ) | |
| return final_topics, stats | |
| except Exception as e: | |
| self.logger.error(f"Response processing failed: {str(e)}") | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| # Return empty result with error stats | |
| error_stats = ProcessingStats( | |
| raw_topics_count=0, | |
| processed_topics_count=0, | |
| duplicates_removed=0, | |
| invalid_topics_filtered=0, | |
| confidence_adjustments=0, | |
| processing_time=processing_time | |
| ) | |
| return [], error_stats | |
| def _parse_json_response(self, raw_response: str) -> List[Dict[str, Any]]: | |
| """Parse JSON from raw response.""" | |
| try: | |
| # Try direct JSON parsing | |
| if raw_response.strip().startswith('['): | |
| return json.loads(raw_response.strip()) | |
| # Extract JSON array from text | |
| json_match = re.search(r'\[.*\]', raw_response, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(0) | |
| return json.loads(json_str) | |
| # Try to find JSON objects and create array | |
| json_objects = re.findall(r'\{[^{}]*\}', raw_response, re.DOTALL) | |
| if json_objects: | |
| topics = [] | |
| for obj_str in json_objects: | |
| try: | |
| topics.append(json.loads(obj_str)) | |
| except json.JSONDecodeError: | |
| continue | |
| return topics | |
| self.logger.warning("No valid JSON found in response") | |
| return [] | |
| except json.JSONDecodeError as e: | |
| self.logger.warning(f"JSON parsing failed: {str(e)}") | |
| return [] | |
| def _validate_topics( | |
| self, | |
| raw_topics: List[Dict[str, Any]], | |
| chunk_sentences: List[TranscriptSentence] | |
| ) -> List[Dict[str, Any]]: | |
| """Validate and clean topic data.""" | |
| valid_topics = [] | |
| for i, topic in enumerate(raw_topics): | |
| try: | |
| # Check required fields | |
| required_fields = [ | |
| "topic_name", "topic_type", "topic_detail", | |
| "start_sentence_index", "end_sentence_index", | |
| "primary_speaker", "confidence_score" | |
| ] | |
| missing_fields = [field for field in required_fields if field not in topic] | |
| if missing_fields: | |
| self.logger.warning(f"Topic {i} missing fields: {missing_fields}") | |
| continue | |
| # Validate sentence indices | |
| start_idx = topic["start_sentence_index"] | |
| end_idx = topic["end_sentence_index"] | |
| if not isinstance(start_idx, int) or not isinstance(end_idx, int): | |
| self.logger.warning(f"Topic {i} has invalid sentence indices") | |
| continue | |
| if start_idx < 1 or end_idx < start_idx: | |
| self.logger.warning(f"Topic {i} has invalid sentence range: {start_idx}-{end_idx}") | |
| continue | |
| # Find matching sentences | |
| matching_sentences = [ | |
| s for s in chunk_sentences | |
| if start_idx <= s.sentence_index <= end_idx | |
| ] | |
| if not matching_sentences: | |
| self.logger.warning(f"Topic {i} has no matching sentences") | |
| continue | |
| # Validate topic category | |
| try: | |
| TopicCategory(topic["topic_type"]) | |
| except ValueError: | |
| self.logger.warning(f"Topic {i} has invalid category: {topic['topic_type']}") | |
| topic["topic_type"] = TopicCategory.GENERAL.value | |
| # Validate confidence score | |
| confidence = topic["confidence_score"] | |
| if not isinstance(confidence, (int, float)) or not (0.0 <= confidence <= 1.0): | |
| self.logger.warning(f"Topic {i} has invalid confidence: {confidence}") | |
| topic["confidence_score"] = 0.5 | |
| # Add timing information | |
| topic["start_time"] = matching_sentences[0].start_time | |
| topic["end_time"] = matching_sentences[-1].end_time | |
| # Ensure required list fields | |
| if "all_speakers" not in topic or not isinstance(topic["all_speakers"], list): | |
| topic["all_speakers"] = [topic["primary_speaker"]] | |
| if "key_phrases" not in topic or not isinstance(topic["key_phrases"], list): | |
| topic["key_phrases"] = [] | |
| if "actionable_insights" not in topic or not isinstance(topic["actionable_insights"], list): | |
| topic["actionable_insights"] = [] | |
| # Validate text lengths | |
| if len(topic["topic_name"]) > 200: | |
| topic["topic_name"] = topic["topic_name"][:197] + "..." | |
| if len(topic["topic_detail"]) > 1000: | |
| topic["topic_detail"] = topic["topic_detail"][:997] + "..." | |
| valid_topics.append(topic) | |
| except Exception as e: | |
| self.logger.warning(f"Error validating topic {i}: {str(e)}") | |
| continue | |
| return valid_topics | |
| def _remove_duplicates(self, topics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Remove duplicate and highly similar topics.""" | |
| if len(topics) <= 1: | |
| return topics | |
| # Find duplicate groups | |
| duplicate_groups = self._find_duplicate_groups(topics) | |
| # Keep only the best topic from each group | |
| deduplicated_topics = [] | |
| used_indices = set() | |
| for group in duplicate_groups: | |
| # Choose the best topic (highest confidence, most complete) | |
| best_topic = self._select_best_topic(group) | |
| # Mark all topics in group as used | |
| for topic in group.duplicate_topics + [group.primary_topic]: | |
| for i, original_topic in enumerate(topics): | |
| if self._topics_equal(topic, original_topic): | |
| used_indices.add(i) | |
| break | |
| deduplicated_topics.append(best_topic) | |
| # Add non-duplicate topics | |
| for i, topic in enumerate(topics): | |
| if i not in used_indices: | |
| deduplicated_topics.append(topic) | |
| return deduplicated_topics | |
| def _find_duplicate_groups(self, topics: List[Dict[str, Any]]) -> List[DuplicateGroup]: | |
| """Find groups of duplicate or similar topics.""" | |
| duplicate_groups = [] | |
| processed_indices = set() | |
| for i, topic in enumerate(topics): | |
| if i in processed_indices: | |
| continue | |
| # Find similar topics | |
| similar_topics = [] | |
| similarity_scores = [] | |
| for j, other_topic in enumerate(topics[i + 1:], i + 1): | |
| if j in processed_indices: | |
| continue | |
| similarity_score = self._calculate_topic_similarity(topic, other_topic) | |
| if similarity_score > 0.7: # High similarity threshold | |
| similar_topics.append(other_topic) | |
| similarity_scores.append(similarity_score) | |
| processed_indices.add(j) | |
| if similar_topics: | |
| group = DuplicateGroup( | |
| primary_topic=topic, | |
| duplicate_topics=similar_topics, | |
| similarity_scores=similarity_scores, | |
| merge_reason=f"High similarity (avg: {sum(similarity_scores)/len(similarity_scores):.2f})" | |
| ) | |
| duplicate_groups.append(group) | |
| processed_indices.add(i) | |
| return duplicate_groups | |
| def _calculate_topic_similarity(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: | |
| """Calculate similarity score between two topics.""" | |
| similarity_scores = [] | |
| # Name similarity | |
| name_sim = self._text_similarity( | |
| topic1["topic_name"].lower(), | |
| topic2["topic_name"].lower() | |
| ) | |
| similarity_scores.append(name_sim * 0.4) | |
| # Content similarity | |
| content_sim = self._text_similarity( | |
| topic1["topic_detail"].lower(), | |
| topic2["topic_detail"].lower() | |
| ) | |
| similarity_scores.append(content_sim * 0.3) | |
| # Time overlap | |
| time_overlap = self._calculate_time_overlap(topic1, topic2) | |
| similarity_scores.append(time_overlap * 0.2) | |
| # Speaker overlap | |
| speaker_overlap = self._calculate_speaker_overlap(topic1, topic2) | |
| similarity_scores.append(speaker_overlap * 0.1) | |
| return sum(similarity_scores) | |
| def _text_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate text similarity using sequence matching.""" | |
| if not text1 or not text2: | |
| return 0.0 | |
| # Use difflib for sequence similarity | |
| similarity = difflib.SequenceMatcher(None, text1, text2).ratio() | |
| # Also check word overlap | |
| words1 = set(text1.split()) | |
| words2 = set(text2.split()) | |
| if words1 and words2: | |
| word_overlap = len(words1.intersection(words2)) / len(words1.union(words2)) | |
| similarity = max(similarity, word_overlap) | |
| return similarity | |
| def _calculate_time_overlap(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: | |
| """Calculate time overlap between two topics.""" | |
| if "start_time" not in topic1 or "start_time" not in topic2: | |
| return 0.0 | |
| start1, end1 = topic1["start_time"], topic1["end_time"] | |
| start2, end2 = topic2["start_time"], topic2["end_time"] | |
| # Calculate overlap | |
| overlap_start = max(start1, start2) | |
| overlap_end = min(end1, end2) | |
| overlap_duration = max(0, overlap_end - overlap_start) | |
| # Calculate total duration | |
| total_duration = max(end1, end2) - min(start1, start2) | |
| return overlap_duration / total_duration if total_duration > 0 else 0.0 | |
| def _calculate_speaker_overlap(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> float: | |
| """Calculate speaker overlap between two topics.""" | |
| speakers1 = set(topic1.get("all_speakers", [topic1["primary_speaker"]])) | |
| speakers2 = set(topic2.get("all_speakers", [topic2["primary_speaker"]])) | |
| if not speakers1 or not speakers2: | |
| return 0.0 | |
| intersection = speakers1.intersection(speakers2) | |
| union = speakers1.union(speakers2) | |
| return len(intersection) / len(union) | |
| def _select_best_topic(self, group: DuplicateGroup) -> Dict[str, Any]: | |
| """Select the best topic from a duplicate group.""" | |
| all_topics = [group.primary_topic] + group.duplicate_topics | |
| # Score topics based on multiple factors | |
| topic_scores = [] | |
| for topic in all_topics: | |
| score = 0.0 | |
| # Confidence score weight | |
| score += topic["confidence_score"] * 0.4 | |
| # Completeness weight (more fields filled) | |
| completeness = sum([ | |
| 1 if topic.get("key_phrases") else 0, | |
| 1 if topic.get("actionable_insights") else 0, | |
| 1 if len(topic.get("topic_detail", "")) > 50 else 0, | |
| 1 if len(topic.get("all_speakers", [])) > 1 else 0 | |
| ]) / 4 | |
| score += completeness * 0.3 | |
| # Length and detail weight | |
| detail_quality = min(len(topic.get("topic_detail", "")) / 200, 1.0) | |
| score += detail_quality * 0.2 | |
| # Time span weight (longer topics might be more comprehensive) | |
| if "start_time" in topic and "end_time" in topic: | |
| duration = topic["end_time"] - topic["start_time"] | |
| duration_score = min(duration / 60, 1.0) # Normalize to 1 minute | |
| score += duration_score * 0.1 | |
| topic_scores.append(score) | |
| # Return topic with highest score | |
| best_index = topic_scores.index(max(topic_scores)) | |
| return all_topics[best_index] | |
| def _topics_equal(self, topic1: Dict[str, Any], topic2: Dict[str, Any]) -> bool: | |
| """Check if two topics are exactly equal.""" | |
| key_fields = ["topic_name", "start_sentence_index", "end_sentence_index", "primary_speaker"] | |
| return all(topic1.get(field) == topic2.get(field) for field in key_fields) | |
| def _adjust_confidence_scores( | |
| self, | |
| topics: List[Dict[str, Any]], | |
| chunk_sentences: List[TranscriptSentence], | |
| context: Optional[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """Adjust confidence scores based on various factors.""" | |
| adjusted_topics = [] | |
| for topic in topics: | |
| original_confidence = topic["confidence_score"] | |
| # Calculate adjustment factors | |
| factors = self._calculate_confidence_factors(topic, chunk_sentences, context) | |
| # Apply weighted adjustments | |
| adjustment = sum( | |
| factors[factor] * weight | |
| for factor, weight in self.CONFIDENCE_FACTORS.items() | |
| if factor in factors | |
| ) | |
| # Adjust confidence (keep within bounds) | |
| new_confidence = max(0.0, min(1.0, original_confidence + adjustment)) | |
| # Mark if significantly adjusted | |
| if abs(new_confidence - original_confidence) > 0.1: | |
| topic["_confidence_adjusted"] = True | |
| topic["_original_confidence"] = original_confidence | |
| topic["confidence_score"] = new_confidence | |
| adjusted_topics.append(topic) | |
| return adjusted_topics | |
| def _calculate_confidence_factors( | |
| self, | |
| topic: Dict[str, Any], | |
| chunk_sentences: List[TranscriptSentence], | |
| context: Optional[Dict[str, Any]] | |
| ) -> Dict[str, float]: | |
| """Calculate confidence adjustment factors.""" | |
| factors = {} | |
| # Text clarity factor | |
| factors["text_clarity"] = self._assess_text_clarity(topic) | |
| # Speaker consistency factor | |
| factors["speaker_consistency"] = self._assess_speaker_consistency(topic, chunk_sentences) | |
| # Time coherence factor | |
| factors["time_coherence"] = self._assess_time_coherence(topic, chunk_sentences) | |
| # Category fit factor | |
| factors["category_fit"] = self._assess_category_fit(topic) | |
| # Evidence strength factor | |
| factors["evidence_strength"] = self._assess_evidence_strength(topic) | |
| return factors | |
| def _assess_text_clarity(self, topic: Dict[str, Any]) -> float: | |
| """Assess text clarity and specificity.""" | |
| score = 0.0 | |
| # Check topic name clarity | |
| name = topic["topic_name"] | |
| if len(name) > 10 and not any(word in name.lower() for word in ["general", "misc", "other"]): | |
| score += 0.3 | |
| # Check detail specificity | |
| detail = topic["topic_detail"] | |
| if len(detail) > 50: | |
| score += 0.3 | |
| # Check for specific keywords | |
| if topic.get("key_phrases"): | |
| score += 0.2 | |
| # Check for actionable insights | |
| if topic.get("actionable_insights"): | |
| score += 0.2 | |
| return min(score - 0.5, 0.5) # Adjustment range: -0.5 to +0.5 | |
| def _assess_speaker_consistency( | |
| self, | |
| topic: Dict[str, Any], | |
| chunk_sentences: List[TranscriptSentence] | |
| ) -> float: | |
| """Assess speaker attribution consistency.""" | |
| try: | |
| start_idx = topic["start_sentence_index"] | |
| end_idx = topic["end_sentence_index"] | |
| # Find sentences in topic range | |
| topic_sentences = [ | |
| s for s in chunk_sentences | |
| if start_idx <= s.sentence_index <= end_idx | |
| ] | |
| if not topic_sentences: | |
| return -0.3 | |
| # Check if primary speaker is actually present | |
| primary_speaker = topic["primary_speaker"] | |
| primary_sentences = [s for s in topic_sentences if s.speaker == primary_speaker] | |
| if not primary_sentences: | |
| return -0.4 | |
| # Check speaker distribution | |
| primary_ratio = len(primary_sentences) / len(topic_sentences) | |
| if primary_ratio > 0.6: | |
| return 0.2 | |
| elif primary_ratio > 0.3: | |
| return 0.0 | |
| else: | |
| return -0.2 | |
| except Exception: | |
| return -0.3 | |
| def _assess_time_coherence( | |
| self, | |
| topic: Dict[str, Any], | |
| chunk_sentences: List[TranscriptSentence] | |
| ) -> float: | |
| """Assess time boundary coherence.""" | |
| try: | |
| if "start_time" not in topic or "end_time" not in topic: | |
| return -0.2 | |
| duration = topic["end_time"] - topic["start_time"] | |
| # Very short topics might be incomplete | |
| if duration < 5: | |
| return -0.2 | |
| # Very long topics might be too broad | |
| if duration > 300: # 5 minutes | |
| return -0.1 | |
| # Reasonable duration | |
| return 0.1 | |
| except Exception: | |
| return -0.2 | |
| def _assess_category_fit(self, topic: Dict[str, Any]) -> float: | |
| """Assess how well topic fits its assigned category.""" | |
| try: | |
| category = TopicCategory(topic["topic_type"]) | |
| keywords = self.CATEGORY_KEYWORDS.get(category, []) | |
| if not keywords: | |
| return 0.0 | |
| # Check if topic text contains category-relevant keywords | |
| text_to_check = (topic["topic_name"] + " " + topic["topic_detail"]).lower() | |
| matching_keywords = sum(1 for keyword in keywords if keyword in text_to_check) | |
| keyword_ratio = matching_keywords / len(keywords) | |
| if keyword_ratio > 0.3: | |
| return 0.2 | |
| elif keyword_ratio > 0.1: | |
| return 0.1 | |
| else: | |
| return -0.1 | |
| except Exception: | |
| return -0.1 | |
| def _assess_evidence_strength(self, topic: Dict[str, Any]) -> float: | |
| """Assess strength of supporting evidence.""" | |
| score = 0.0 | |
| # Check for specific phrases | |
| if topic.get("key_phrases") and len(topic["key_phrases"]) > 2: | |
| score += 0.2 | |
| # Check for actionable insights | |
| if topic.get("actionable_insights") and len(topic["actionable_insights"]) > 1: | |
| score += 0.2 | |
| # Check detail length and specificity | |
| detail = topic.get("topic_detail", "") | |
| if len(detail) > 100 and any(word in detail.lower() for word in ["specific", "exactly", "precisely", "clearly"]): | |
| score += 0.1 | |
| return min(score - 0.25, 0.25) # Adjustment range: -0.25 to +0.25 | |
| def _finalize_topics(self, topics: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Final validation and sorting of topics.""" | |
| # Remove any remaining invalid topics | |
| valid_topics = [ | |
| topic for topic in topics | |
| if topic.get("topic_name") and topic.get("confidence_score", 0) > 0.1 | |
| ] | |
| # Sort by start time | |
| valid_topics.sort(key=lambda t: t.get("start_time", 0)) | |
| # Clean up temporary fields | |
| for topic in valid_topics: | |
| topic.pop("_confidence_adjusted", None) | |
| topic.pop("_original_confidence", None) | |
| return valid_topics |