Spaces:
Sleeping
Sleeping
| """ | |
| Core topic extraction logic using Anthropic models. | |
| This module implements the main topic extraction functionality, | |
| processing transcript chunks and extracting business-relevant topics | |
| with confidence scoring and categorization. | |
| """ | |
| import json | |
| import asyncio | |
| import uuid | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from datetime import datetime | |
| from dataclasses import dataclass | |
| from models.input import TranscriptSentence, TranscriptRequest, PromptConfiguration, LanguageCode | |
| from models.output import ( | |
| TopicDetail, TopicCategory, SpeakerInsight, ProcessingMetadata, | |
| SegmentationResult, ProcessingStatus | |
| ) | |
| from utils.chunking import TranscriptChunker, ChunkingResult, create_chunker, ChunkingStrategy | |
| from core.model_manager import ModelManager | |
| from core.anthropic_client import AnthropicResponse, AnthropicError | |
| from config.logging import get_logger | |
| logger = get_logger(__name__) | |
| class ExtractionContext: | |
| """Context information for topic extraction.""" | |
| request_id: str | |
| transcript_id: Optional[str] | |
| language: LanguageCode | |
| business_domain: Optional[str] | |
| total_sentences: int | |
| total_duration: float | |
| unique_speakers: List[str] | |
| prompt_config: PromptConfiguration | |
| class ChunkExtractionResult: | |
| """Result of topic extraction from a single chunk.""" | |
| chunk_id: int | |
| topics: List[Dict[str, Any]] | |
| processing_time: float | |
| tokens_used: Dict[str, int] | |
| model_used: str | |
| confidence_scores: List[float] | |
| warnings: List[str] | |
| class TopicExtractor: | |
| """ | |
| Core topic extraction engine using Anthropic models. | |
| Handles the complete pipeline from transcript chunking to topic extraction, | |
| including business categorization, confidence scoring, and result aggregation. | |
| """ | |
| # Business category mapping for topic classification | |
| CATEGORY_KEYWORDS = { | |
| TopicCategory.CLIENT_NEEDS_B2B: [ | |
| "business needs", "requirements", "challenges", "pain points", "workflow", | |
| "process", "efficiency", "productivity", "enterprise", "organization" | |
| ], | |
| TopicCategory.CLIENT_NEEDS_B2C: [ | |
| "customer needs", "user experience", "personal", "individual", "consumer", | |
| "lifestyle", "convenience", "satisfaction", "preferences" | |
| ], | |
| TopicCategory.CUSTOMER_FEEDBACK: [ | |
| "feedback", "opinion", "review", "rating", "satisfaction", "experience", | |
| "suggestion", "improvement", "complaint", "praise" | |
| ], | |
| TopicCategory.EMPLOYEE_FEEDBACK: [ | |
| "employee", "staff", "team", "worker", "internal", "workplace", | |
| "culture", "management", "training", "development" | |
| ], | |
| TopicCategory.SOLUTION_BARRIERS: [ | |
| "barrier", "obstacle", "challenge", "difficulty", "problem", "issue", | |
| "limitation", "constraint", "blocker", "resistance" | |
| ], | |
| TopicCategory.SOLUTION_BENEFITS: [ | |
| "benefit", "advantage", "value", "improvement", "gain", "positive", | |
| "success", "achievement", "outcome", "result" | |
| ], | |
| TopicCategory.AHA_MOMENTS: [ | |
| "aha", "realization", "insight", "breakthrough", "understanding", | |
| "clarity", "revelation", "discovery", "lightbulb", "eureka" | |
| ], | |
| TopicCategory.COMPANY_INFO: [ | |
| "company", "organization", "business", "revenue", "size", "industry", | |
| "market", "competition", "growth", "strategy" | |
| ], | |
| TopicCategory.TECHNICAL_REQUIREMENTS: [ | |
| "technical", "technology", "system", "integration", "API", "platform", | |
| "infrastructure", "security", "performance", "scalability" | |
| ], | |
| TopicCategory.ADDITIONAL_COMMENTS: [ | |
| "additional", "other", "miscellaneous", "general", "comment", | |
| "note", "remark", "observation", "thought" | |
| ] | |
| } | |
| def __init__(self, model_manager: ModelManager): | |
| """ | |
| Initialize the topic extractor. | |
| Args: | |
| model_manager: Model manager for Anthropic API access | |
| """ | |
| self.model_manager = model_manager | |
| self.logger = get_logger(f"{__name__}.{self.__class__.__name__}") | |
| async def extract_topics( | |
| self, | |
| request: TranscriptRequest, | |
| context: Optional[ExtractionContext] = None | |
| ) -> SegmentationResult: | |
| """ | |
| Extract topics from a transcript request. | |
| Args: | |
| request: Transcript request with sentences and configuration | |
| context: Optional extraction context | |
| Returns: | |
| Complete segmentation result with topics and metadata | |
| """ | |
| start_time = datetime.now() | |
| request_id = context.request_id if context else str(uuid.uuid4()) | |
| self.logger.info(f"Starting topic extraction for request {request_id}") | |
| try: | |
| # Create extraction context if not provided | |
| if not context: | |
| context = self._create_context(request, request_id) | |
| # Step 1: Chunk the transcript | |
| chunking_result = await self._chunk_transcript(request, context) | |
| # Step 2: Extract topics from each chunk | |
| chunk_results = await self._extract_from_chunks(chunking_result, context) | |
| # Check if all chunks failed | |
| successful_chunks = [r for r in chunk_results if len(r.topics) > 0 or len(r.warnings) == 0] | |
| if not successful_chunks and chunk_results: | |
| # All chunks failed | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| error_metadata = ProcessingMetadata( | |
| request_id=request_id, | |
| timestamp=start_time, | |
| model_used=self.model_manager.current_model.value, | |
| processing_time=processing_time, | |
| total_sentences=len(request.sentences), | |
| total_duration=request.sentences[-1].end_time - request.sentences[0].start_time, | |
| unique_speakers=len(set(s.speaker for s in request.sentences)), | |
| topics_extracted=0, | |
| average_confidence=0.0, | |
| coverage_percentage=0.0, | |
| warnings=[f"All {len(chunk_results)} chunks failed to process"] | |
| ) | |
| return SegmentationResult( | |
| status=ProcessingStatus.FAILED, | |
| topics=[], | |
| speaker_insights=[], | |
| metadata=error_metadata | |
| ) | |
| # Step 3: Aggregate and merge topics | |
| aggregated_topics = await self._aggregate_topics(chunk_results, context, request) | |
| # Step 4: Generate speaker insights | |
| speaker_insights = await self._generate_speaker_insights( | |
| request.sentences, aggregated_topics, context | |
| ) | |
| # Step 5: Create processing metadata | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| metadata = self._create_metadata( | |
| request, context, chunk_results, processing_time | |
| ) | |
| # Step 6: Create final result | |
| result = SegmentationResult( | |
| status=ProcessingStatus.SUCCESS, | |
| topics=aggregated_topics, | |
| speaker_insights=speaker_insights, | |
| metadata=metadata, | |
| executive_summary=self._generate_executive_summary(aggregated_topics), | |
| key_takeaways=self._extract_key_takeaways(aggregated_topics), | |
| category_summary=self._create_category_summary(aggregated_topics) | |
| ) | |
| self.logger.info( | |
| f"Topic extraction completed for request {request_id}: " | |
| f"{len(aggregated_topics)} topics in {processing_time:.2f}s" | |
| ) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Topic extraction failed for request {request_id}: {str(e)}") | |
| # Return error result | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| error_metadata = ProcessingMetadata( | |
| request_id=request_id, | |
| timestamp=start_time, | |
| model_used="unknown", | |
| processing_time=processing_time, | |
| total_sentences=len(request.sentences), | |
| total_duration=request.sentences[-1].end_time - request.sentences[0].start_time, | |
| unique_speakers=len(set(s.speaker for s in request.sentences)), | |
| topics_extracted=0, | |
| average_confidence=0.0, | |
| coverage_percentage=0.0, | |
| warnings=[f"Extraction failed: {str(e)}"] | |
| ) | |
| return SegmentationResult( | |
| status=ProcessingStatus.FAILED, | |
| topics=[], | |
| speaker_insights=[], | |
| metadata=error_metadata | |
| ) | |
| def _create_context(self, request: TranscriptRequest, request_id: str) -> ExtractionContext: | |
| """Create extraction context from request.""" | |
| return ExtractionContext( | |
| request_id=request_id, | |
| transcript_id=request.transcript_id, | |
| language=request.prompt_config.language if request.prompt_config else LanguageCode.AUTO_DETECT, | |
| business_domain=request.prompt_config.business_domain if request.prompt_config else None, | |
| total_sentences=len(request.sentences), | |
| total_duration=request.sentences[-1].end_time - request.sentences[0].start_time, | |
| unique_speakers=list(set(s.speaker for s in request.sentences)), | |
| prompt_config=request.prompt_config or PromptConfiguration() | |
| ) | |
| async def _chunk_transcript( | |
| self, | |
| request: TranscriptRequest, | |
| context: ExtractionContext | |
| ) -> ChunkingResult: | |
| """Chunk the transcript for processing.""" | |
| # Determine optimal chunking strategy | |
| chunker = create_chunker( | |
| strategy="token_estimate", # Use token-based chunking for Anthropic | |
| chunk_size=6000, # Conservative chunk size | |
| overlap_sentences=5, # Increased overlap for better context | |
| model_name=self.model_manager.current_model.value | |
| ) | |
| # Get optimal strategy based on transcript characteristics | |
| optimal_strategy = chunker.get_optimal_strategy(request.sentences) | |
| if optimal_strategy != chunker.strategy: | |
| self.logger.info(f"Switching to optimal chunking strategy: {optimal_strategy}") | |
| chunker.strategy = optimal_strategy | |
| # Update chunk size for the new strategy | |
| chunker.chunk_size = chunker.DEFAULT_CHUNK_SIZES[optimal_strategy] | |
| # For large transcripts, force smaller chunks to get more granular topics | |
| total_sentences = len(request.sentences) | |
| if total_sentences > 100: | |
| if chunker.strategy == ChunkingStrategy.SENTENCE_COUNT: | |
| # Use smaller chunks for better topic granularity | |
| chunker.chunk_size = min(40, chunker.chunk_size) | |
| self.logger.info(f"Large transcript detected ({total_sentences} sentences), using smaller chunks: {chunker.chunk_size}") | |
| elif chunker.strategy == ChunkingStrategy.TOKEN_ESTIMATE: | |
| # Use smaller token chunks for better topic extraction | |
| chunker.chunk_size = min(4000, chunker.chunk_size) | |
| self.logger.info(f"Large transcript detected ({total_sentences} sentences), using smaller token chunks: {chunker.chunk_size}") | |
| # Chunk the transcript | |
| result = chunker.chunk_transcript(request.sentences, enable_overlap=True) | |
| # Validate chunking result | |
| warnings = chunker.validate_chunks(result) | |
| if warnings: | |
| self.logger.warning(f"Chunking warnings: {warnings}") | |
| self.logger.info( | |
| f"Transcript chunked into {result.total_chunks} chunks " | |
| f"using {result.strategy_used} strategy (chunk_size: {chunker.chunk_size})" | |
| ) | |
| return result | |
| async def _extract_from_chunks( | |
| self, | |
| chunking_result: ChunkingResult, | |
| context: ExtractionContext | |
| ) -> List[ChunkExtractionResult]: | |
| """Extract topics from each chunk.""" | |
| chunk_results = [] | |
| # Process chunks in parallel (with concurrency limit) | |
| semaphore = asyncio.Semaphore(3) # Limit concurrent requests | |
| async def process_chunk(chunk_idx: int, chunk_sentences: List[TranscriptSentence]): | |
| async with semaphore: | |
| return await self._extract_from_single_chunk( | |
| chunk_idx, chunk_sentences, context | |
| ) | |
| # Create tasks for all chunks | |
| tasks = [ | |
| process_chunk(i, chunk) | |
| for i, chunk in enumerate(chunking_result.chunks) | |
| ] | |
| # Execute tasks and collect results | |
| chunk_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Handle any exceptions | |
| valid_results = [] | |
| for i, result in enumerate(chunk_results): | |
| if isinstance(result, Exception): | |
| self.logger.error(f"Chunk {i} processing failed: {str(result)}") | |
| # Create error result | |
| error_result = ChunkExtractionResult( | |
| chunk_id=i, | |
| topics=[], | |
| processing_time=0.0, | |
| tokens_used={"input_tokens": 0, "output_tokens": 0}, | |
| model_used="unknown", | |
| confidence_scores=[], | |
| warnings=[f"Processing failed: {str(result)}"] | |
| ) | |
| valid_results.append(error_result) | |
| else: | |
| valid_results.append(result) | |
| return valid_results | |
| async def _extract_from_single_chunk( | |
| self, | |
| chunk_idx: int, | |
| chunk_sentences: List[TranscriptSentence], | |
| context: ExtractionContext | |
| ) -> ChunkExtractionResult: | |
| """Extract topics from a single chunk.""" | |
| start_time = datetime.now() | |
| try: | |
| # Build the prompt for this chunk | |
| prompt = self._build_extraction_prompt(chunk_sentences, context) | |
| # Make the API call | |
| response = await self.model_manager.generate_completion( | |
| messages=[{"role": "user", "content": prompt}], | |
| system_prompt=self._get_system_prompt(context), | |
| max_tokens=4000, | |
| temperature=0.0 # Deterministic for consistency | |
| ) | |
| # Parse the response | |
| topics = self._parse_extraction_response(response.content, chunk_sentences) | |
| # Calculate confidence scores | |
| confidence_scores = [topic.get("confidence_score", 0.5) for topic in topics] | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| result = ChunkExtractionResult( | |
| chunk_id=chunk_idx, | |
| topics=topics, | |
| processing_time=processing_time, | |
| tokens_used=response.usage or {"input_tokens": 0, "output_tokens": 0}, | |
| model_used=response.model, | |
| confidence_scores=confidence_scores, | |
| warnings=[] | |
| ) | |
| self.logger.debug( | |
| f"Chunk {chunk_idx} processed: {len(topics)} topics, " | |
| f"{processing_time:.2f}s, {sum(response.usage.values()) if response.usage else 0} tokens" | |
| ) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Error processing chunk {chunk_idx}: {str(e)}") | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| return ChunkExtractionResult( | |
| chunk_id=chunk_idx, | |
| topics=[], | |
| processing_time=processing_time, | |
| tokens_used={"input_tokens": 0, "output_tokens": 0}, | |
| model_used="unknown", | |
| confidence_scores=[], | |
| warnings=[f"Processing error: {str(e)}"] | |
| ) | |
| def _build_extraction_prompt( | |
| self, | |
| chunk_sentences: List[TranscriptSentence], | |
| context: ExtractionContext | |
| ) -> str: | |
| """Build the topic extraction prompt for a chunk using the prompt manager.""" | |
| from core.prompt_manager import get_prompt_manager | |
| # Get the prompt manager | |
| prompt_manager = get_prompt_manager() | |
| # Process the prompt configuration | |
| prompt_context = { | |
| "transcript_text": self._format_transcript_chunk(chunk_sentences), | |
| "speaker_count": len(context.unique_speakers), | |
| "sentence_count": len(chunk_sentences), | |
| "business_domain": context.business_domain | |
| } | |
| try: | |
| processed_prompt = prompt_manager.process_prompt_configuration( | |
| context.prompt_config, | |
| prompt_context, | |
| chunk_sentences | |
| ) | |
| # Log validation results | |
| if hasattr(processed_prompt.validation_result, 'warnings') and processed_prompt.validation_result.warnings: | |
| self.logger.warning( | |
| f"Prompt validation warnings: {len(processed_prompt.validation_result.warnings)}" | |
| ) | |
| # Use the processed user prompt | |
| self.logger.info(f"Generated prompt length: {len(processed_prompt.user_prompt)} chars") | |
| self.logger.debug(f"Generated prompt (first 300 chars): {processed_prompt.user_prompt[:300]}") | |
| return processed_prompt.user_prompt | |
| except Exception as e: | |
| self.logger.warning(f"Error processing prompt configuration: {str(e)}") | |
| # Fallback to basic prompt | |
| fallback_prompt = self._build_fallback_prompt(chunk_sentences, context) | |
| self.logger.info(f"Using fallback prompt, length: {len(fallback_prompt)} chars") | |
| self.logger.debug(f"Fallback prompt (first 300 chars): {fallback_prompt[:300]}") | |
| return fallback_prompt | |
| def _build_fallback_prompt( | |
| self, | |
| chunk_sentences: List[TranscriptSentence], | |
| context: ExtractionContext | |
| ) -> str: | |
| """Build a fallback prompt when prompt manager fails.""" | |
| transcript_text = self._format_transcript_chunk(chunk_sentences) | |
| return f"""Please analyze the following transcript segment and extract meaningful business topics. | |
| TRANSCRIPT SEGMENT: | |
| {transcript_text} | |
| CONTEXT: | |
| - Language: {context.language.value} | |
| - Business Domain: {context.business_domain or "General"} | |
| - Total Speakers: {len(context.unique_speakers)} | |
| Please respond with a JSON array of topics with the following fields: | |
| topic_name, topic_type, topic_detail, start_sentence_index, end_sentence_index, | |
| primary_speaker, all_speakers, confidence_score, key_phrases, actionable_insights""" | |
| def _format_transcript_chunk(self, chunk_sentences: List[TranscriptSentence]) -> str: | |
| """Format transcript chunk for prompt.""" | |
| lines = [] | |
| for sentence in chunk_sentences: | |
| timestamp = f"[{sentence.start_time:.1f}s-{sentence.end_time:.1f}s]" | |
| lines.append(f"{sentence.sentence_index}. {timestamp} {sentence.speaker}: {sentence.text}") | |
| return "\n".join(lines) | |
| def _get_system_prompt(self, context: ExtractionContext) -> str: | |
| """Get the system prompt for topic extraction.""" | |
| from core.prompt_manager import get_prompt_manager | |
| try: | |
| prompt_manager = get_prompt_manager() | |
| processed_prompt = prompt_manager.process_prompt_configuration( | |
| context.prompt_config, | |
| {"business_domain": context.business_domain}, | |
| None | |
| ) | |
| return processed_prompt.system_prompt | |
| except Exception as e: | |
| self.logger.warning(f"Error getting system prompt: {str(e)}") | |
| # Fallback system prompt | |
| return f"""You are an expert business analyst specializing in topic extraction from business conversations. | |
| Language: {context.language.value} | |
| Business domain: {context.business_domain or "General business"} | |
| Always respond with valid JSON format and ensure all required fields are included.""" | |
| def _parse_extraction_response( | |
| self, | |
| response_content: str, | |
| chunk_sentences: List[TranscriptSentence] | |
| ) -> List[Dict[str, Any]]: | |
| """Parse the Anthropic response into topic dictionaries.""" | |
| try: | |
| content = response_content.strip() | |
| topics_data = [] | |
| # Method 1: Try to parse as direct JSON array | |
| if content.startswith('['): | |
| topics_data = json.loads(content) | |
| # Method 2: Try to parse as JSON object with "topics" key | |
| elif content.startswith('{'): | |
| parsed_obj = json.loads(content) | |
| if isinstance(parsed_obj, dict) and "topics" in parsed_obj: | |
| topics_data = parsed_obj["topics"] | |
| else: | |
| raise ValueError("JSON object does not contain 'topics' key") | |
| # Method 3: Extract JSON from markdown code blocks | |
| elif "```json" in content: | |
| start_marker = "```json" | |
| end_marker = "```" | |
| start_idx = content.find(start_marker) + len(start_marker) | |
| end_idx = content.find(end_marker, start_idx) | |
| if start_idx > len(start_marker) - 1 and end_idx > start_idx: | |
| json_str = content[start_idx:end_idx].strip() | |
| parsed_obj = json.loads(json_str) | |
| if isinstance(parsed_obj, list): | |
| topics_data = parsed_obj | |
| elif isinstance(parsed_obj, dict) and "topics" in parsed_obj: | |
| topics_data = parsed_obj["topics"] | |
| else: | |
| raise ValueError("Extracted JSON is not valid format") | |
| else: | |
| raise ValueError("Could not extract JSON from markdown") | |
| # Method 4: Search for JSON array or object in text | |
| else: | |
| # Look for JSON array first | |
| array_start = content.find('[') | |
| array_end = content.rfind(']') + 1 | |
| # Look for JSON object | |
| obj_start = content.find('{') | |
| obj_end = content.rfind('}') + 1 | |
| json_str = None | |
| if array_start >= 0 and array_end > array_start: | |
| json_str = content[array_start:array_end] | |
| topics_data = json.loads(json_str) | |
| elif obj_start >= 0 and obj_end > obj_start: | |
| json_str = content[obj_start:obj_end] | |
| parsed_obj = json.loads(json_str) | |
| if isinstance(parsed_obj, dict) and "topics" in parsed_obj: | |
| topics_data = parsed_obj["topics"] | |
| else: | |
| raise ValueError("JSON object does not contain 'topics' key") | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| # Ensure we have a list | |
| if not isinstance(topics_data, list): | |
| raise ValueError(f"Expected list of topics, got {type(topics_data)}") | |
| # Validate and enhance topics | |
| validated_topics = [] | |
| for topic_data in topics_data: | |
| validated_topic = self._validate_and_enhance_topic(topic_data, chunk_sentences) | |
| if validated_topic: | |
| validated_topics.append(validated_topic) | |
| self.logger.info(f"Successfully parsed {len(validated_topics)} topics from response") | |
| return validated_topics | |
| except (json.JSONDecodeError, ValueError) as e: | |
| self.logger.warning(f"Failed to parse extraction response: {str(e)}") | |
| self.logger.warning(f"Response content (first 500 chars): {response_content[:500]}") | |
| self.logger.warning(f"Response content (last 200 chars): {response_content[-200:]}") | |
| # Return empty list if parsing fails | |
| return [] | |
| def _validate_and_enhance_topic( | |
| self, | |
| topic_data: Dict[str, Any], | |
| chunk_sentences: List[TranscriptSentence] | |
| ) -> Optional[Dict[str, Any]]: | |
| """Validate and enhance a topic from the response.""" | |
| try: | |
| # Required fields | |
| required_fields = [ | |
| "topic_name", "topic_type", "topic_detail", | |
| "start_sentence_index", "end_sentence_index", | |
| "primary_speaker", "all_speakers", "confidence_score" | |
| ] | |
| for field in required_fields: | |
| if field not in topic_data: | |
| self.logger.warning(f"Missing required field '{field}' in topic") | |
| return None | |
| # Validate sentence indices | |
| start_idx = topic_data["start_sentence_index"] | |
| end_idx = topic_data["end_sentence_index"] | |
| if start_idx < 1 or end_idx < start_idx: | |
| self.logger.warning(f"Invalid sentence indices: {start_idx}-{end_idx}") | |
| return None | |
| # Find corresponding sentences | |
| matching_sentences = [ | |
| s for s in chunk_sentences | |
| if start_idx <= s.sentence_index <= end_idx | |
| ] | |
| if not matching_sentences: | |
| self.logger.warning(f"No matching sentences for indices {start_idx}-{end_idx}") | |
| return None | |
| # Enhance with timing information | |
| topic_data["start_time"] = matching_sentences[0].start_time | |
| topic_data["end_time"] = matching_sentences[-1].end_time | |
| # Validate topic category | |
| topic_type = topic_data["topic_type"] | |
| try: | |
| TopicCategory(topic_type) | |
| except ValueError: | |
| self.logger.warning(f"Invalid topic category: {topic_type}, using 'general'") | |
| topic_data["topic_type"] = TopicCategory.GENERAL.value | |
| # Ensure confidence score is valid | |
| confidence = topic_data["confidence_score"] | |
| if not isinstance(confidence, (int, float)) or not (0.0 <= confidence <= 1.0): | |
| self.logger.warning(f"Invalid confidence score: {confidence}, using 0.5") | |
| topic_data["confidence_score"] = 0.5 | |
| # Ensure lists are properly formatted | |
| if not isinstance(topic_data.get("all_speakers", []), list): | |
| topic_data["all_speakers"] = [topic_data["primary_speaker"]] | |
| if not isinstance(topic_data.get("key_phrases", []), list): | |
| topic_data["key_phrases"] = [] | |
| # Handle actionable_insights - can be string or list | |
| insights = topic_data.get("actionable_insights", []) | |
| if isinstance(insights, str): | |
| # Split string into list if it's a single string | |
| topic_data["actionable_insights"] = [insights] if insights.strip() else [] | |
| elif not isinstance(insights, list): | |
| topic_data["actionable_insights"] = [] | |
| return topic_data | |
| except Exception as e: | |
| self.logger.warning(f"Error validating topic: {str(e)}") | |
| return None | |
| async def _aggregate_topics( | |
| self, | |
| chunk_results: List[ChunkExtractionResult], | |
| context: ExtractionContext, | |
| request: TranscriptRequest | |
| ) -> List[TopicDetail]: | |
| """Aggregate topics from all chunks and remove duplicates.""" | |
| all_topics = [] | |
| # Collect all topics from chunks | |
| for chunk_result in chunk_results: | |
| for topic_data in chunk_result.topics: | |
| try: | |
| topic = TopicDetail( | |
| topic_name=topic_data["topic_name"], | |
| topic_type=TopicCategory(topic_data["topic_type"]), | |
| topic_detail=topic_data["topic_detail"], | |
| start_time=topic_data["start_time"], | |
| end_time=topic_data["end_time"], | |
| start_sentence_index=topic_data["start_sentence_index"], | |
| end_sentence_index=topic_data["end_sentence_index"], | |
| primary_speaker=topic_data["primary_speaker"], | |
| all_speakers=topic_data["all_speakers"], | |
| confidence_score=topic_data["confidence_score"], | |
| key_phrases=topic_data.get("key_phrases", []), | |
| actionable_insights=topic_data.get("actionable_insights", []) | |
| ) | |
| all_topics.append(topic) | |
| except Exception as e: | |
| self.logger.warning(f"Error creating TopicDetail: {str(e)}") | |
| # Remove duplicates and merge similar topics | |
| if request.merge_similar_topics: | |
| all_topics = self._merge_similar_topics(all_topics) | |
| # Sort by start time | |
| all_topics.sort(key=lambda t: t.start_time) | |
| return all_topics | |
| def _merge_similar_topics(self, topics: List[TopicDetail]) -> List[TopicDetail]: | |
| """Merge similar or duplicate topics.""" | |
| if len(topics) <= 1: | |
| return topics | |
| merged_topics = [] | |
| used_indices = set() | |
| for i, topic in enumerate(topics): | |
| if i in used_indices: | |
| continue | |
| # Find similar topics | |
| similar_topics = [topic] | |
| for j, other_topic in enumerate(topics[i + 1:], i + 1): | |
| if j in used_indices: | |
| continue | |
| if self._are_topics_similar(topic, other_topic): | |
| similar_topics.append(other_topic) | |
| used_indices.add(j) | |
| # Merge similar topics | |
| if len(similar_topics) > 1: | |
| merged_topic = self._merge_topic_group(similar_topics) | |
| merged_topics.append(merged_topic) | |
| else: | |
| merged_topics.append(topic) | |
| used_indices.add(i) | |
| return merged_topics | |
| def _are_topics_similar(self, topic1: TopicDetail, topic2: TopicDetail) -> bool: | |
| """Check if two topics are similar enough to merge.""" | |
| # Same category and similar names | |
| if topic1.topic_type == topic2.topic_type: | |
| name_similarity = self._calculate_text_similarity( | |
| topic1.topic_name.lower(), topic2.topic_name.lower() | |
| ) | |
| if name_similarity > 0.7: | |
| return True | |
| # Overlapping time ranges | |
| time_overlap = min(topic1.end_time, topic2.end_time) - max(topic1.start_time, topic2.start_time) | |
| if time_overlap > 0: | |
| overlap_ratio = time_overlap / min(topic1.duration, topic2.duration) | |
| if overlap_ratio > 0.5: | |
| return True | |
| return False | |
| def _calculate_text_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate simple text similarity (Jaccard similarity).""" | |
| words1 = set(text1.split()) | |
| words2 = set(text2.split()) | |
| if not words1 and not words2: | |
| return 1.0 | |
| intersection = words1.intersection(words2) | |
| union = words1.union(words2) | |
| return len(intersection) / len(union) if union else 0.0 | |
| def _merge_topic_group(self, topics: List[TopicDetail]) -> TopicDetail: | |
| """Merge a group of similar topics into one.""" | |
| # Use the topic with highest confidence as base | |
| base_topic = max(topics, key=lambda t: t.confidence_score) | |
| # Merge time ranges | |
| start_time = min(t.start_time for t in topics) | |
| end_time = max(t.end_time for t in topics) | |
| start_sentence_index = min(t.start_sentence_index for t in topics) | |
| end_sentence_index = max(t.end_sentence_index for t in topics) | |
| # Merge speakers | |
| all_speakers = list(set(speaker for t in topics for speaker in t.all_speakers)) | |
| # Merge key phrases and insights | |
| key_phrases = list(set(phrase for t in topics for phrase in t.key_phrases)) | |
| actionable_insights = list(set(insight for t in topics for insight in t.actionable_insights)) | |
| # Average confidence score | |
| avg_confidence = sum(t.confidence_score for t in topics) / len(topics) | |
| # Create merged topic | |
| return TopicDetail( | |
| topic_name=base_topic.topic_name, | |
| topic_type=base_topic.topic_type, | |
| topic_detail=f"Merged topic: {base_topic.topic_detail}", | |
| start_time=start_time, | |
| end_time=end_time, | |
| start_sentence_index=start_sentence_index, | |
| end_sentence_index=end_sentence_index, | |
| primary_speaker=base_topic.primary_speaker, | |
| all_speakers=all_speakers, | |
| confidence_score=avg_confidence, | |
| key_phrases=key_phrases, | |
| actionable_insights=actionable_insights | |
| ) | |
| async def _generate_speaker_insights( | |
| self, | |
| sentences: List[TranscriptSentence], | |
| topics: List[TopicDetail], | |
| context: ExtractionContext | |
| ) -> List[SpeakerInsight]: | |
| """Generate insights for each speaker.""" | |
| speaker_insights = [] | |
| for speaker in context.unique_speakers: | |
| speaker_sentences = [s for s in sentences if s.speaker == speaker] | |
| if not speaker_sentences: | |
| continue | |
| # Calculate speaker statistics | |
| total_sentences = len(speaker_sentences) | |
| total_duration = sum(s.end_time - s.start_time for s in speaker_sentences) | |
| # Find topics this speaker contributed to | |
| topics_mentioned = [] | |
| for topic in topics: | |
| if speaker in topic.all_speakers: | |
| topics_mentioned.append(topic.topic_name) | |
| # Generate key insights (simplified) | |
| key_insights = [] | |
| if total_sentences > 0: | |
| avg_sentence_length = sum(len(s.text.split()) for s in speaker_sentences) / total_sentences | |
| key_insights.append(f"Average sentence length: {avg_sentence_length:.1f} words") | |
| if total_duration > 0: | |
| speaking_rate = total_sentences / (total_duration / 60) # sentences per minute | |
| key_insights.append(f"Speaking rate: {speaking_rate:.1f} sentences/minute") | |
| insight = SpeakerInsight( | |
| speaker=speaker, | |
| speaker_role=speaker_sentences[0].speaker_role if speaker_sentences else None, | |
| total_sentences=total_sentences, | |
| total_duration=total_duration, | |
| topics_mentioned=topics_mentioned, | |
| key_insights=key_insights | |
| ) | |
| speaker_insights.append(insight) | |
| return speaker_insights | |
| def _create_metadata( | |
| self, | |
| request: TranscriptRequest, | |
| context: ExtractionContext, | |
| chunk_results: List[ChunkExtractionResult], | |
| processing_time: float | |
| ) -> ProcessingMetadata: | |
| """Create processing metadata.""" | |
| # Aggregate token usage | |
| total_tokens = {"input_tokens": 0, "output_tokens": 0} | |
| for result in chunk_results: | |
| for key in total_tokens: | |
| total_tokens[key] += result.tokens_used.get(key, 0) | |
| # Collect warnings | |
| warnings = [] | |
| for result in chunk_results: | |
| warnings.extend(result.warnings) | |
| # Calculate average confidence | |
| all_confidences = [] | |
| for result in chunk_results: | |
| all_confidences.extend(result.confidence_scores) | |
| avg_confidence = sum(all_confidences) / len(all_confidences) if all_confidences else 0.0 | |
| return ProcessingMetadata( | |
| request_id=context.request_id, | |
| timestamp=datetime.now(), | |
| model_used=self.model_manager.current_model.value, | |
| processing_time=processing_time, | |
| total_sentences=context.total_sentences, | |
| total_duration=context.total_duration, | |
| unique_speakers=len(context.unique_speakers), | |
| topics_extracted=sum(len(result.topics) for result in chunk_results), | |
| average_confidence=avg_confidence, | |
| coverage_percentage=100.0, # Simplified | |
| tokens_used=total_tokens, | |
| detected_language=context.language, | |
| warnings=warnings | |
| ) | |
| def _generate_executive_summary(self, topics: List[TopicDetail]) -> str: | |
| """Generate an executive summary of the topics.""" | |
| if not topics: | |
| return "No topics were extracted from the transcript." | |
| # Count topics by category | |
| category_counts = {} | |
| for topic in topics: | |
| category_counts[topic.topic_type] = category_counts.get(topic.topic_type, 0) + 1 | |
| # Generate summary | |
| summary_parts = [ | |
| f"Analysis identified {len(topics)} key topics from the transcript." | |
| ] | |
| if category_counts: | |
| top_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:3] | |
| category_text = ", ".join([f"{count} {cat.value.replace('_', ' ')}" for cat, count in top_categories]) | |
| summary_parts.append(f"Primary focus areas include: {category_text}.") | |
| # Add confidence information | |
| avg_confidence = sum(t.confidence_score for t in topics) / len(topics) | |
| confidence_text = "high" if avg_confidence > 0.8 else "moderate" if avg_confidence > 0.6 else "low" | |
| summary_parts.append(f"Overall extraction confidence is {confidence_text} ({avg_confidence:.2f}).") | |
| return " ".join(summary_parts) | |
| def _extract_key_takeaways(self, topics: List[TopicDetail]) -> List[str]: | |
| """Extract key takeaways from topics.""" | |
| takeaways = [] | |
| # Collect actionable insights | |
| for topic in topics: | |
| takeaways.extend(topic.actionable_insights) | |
| # Add category-based insights | |
| category_counts = {} | |
| for topic in topics: | |
| category_counts[topic.topic_type] = category_counts.get(topic.topic_type, 0) + 1 | |
| if TopicCategory.CLIENT_NEEDS_B2B in category_counts or TopicCategory.CLIENT_NEEDS_B2C in category_counts: | |
| takeaways.append("Customer needs and requirements were clearly identified") | |
| if TopicCategory.SOLUTION_BARRIERS in category_counts: | |
| takeaways.append("Implementation barriers and challenges were discussed") | |
| if TopicCategory.SOLUTION_BENEFITS in category_counts: | |
| takeaways.append("Solution benefits and value propositions were highlighted") | |
| # Limit to top takeaways | |
| return takeaways[:10] | |
| def _create_category_summary(self, topics: List[TopicDetail]) -> Dict[TopicCategory, int]: | |
| """Create a summary of topics by category.""" | |
| category_summary = {} | |
| for topic in topics: | |
| category_summary[topic.topic_type] = category_summary.get(topic.topic_type, 0) + 1 | |
| return category_summary |