Spaces:
Runtime error
Runtime error
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| import re | |
| import uuid | |
| try: | |
| from elevenlabs import VoiceSettings | |
| from elevenlabs.client import ElevenLabs | |
| ELEVENLABS_AVAILABLE = True | |
| except ImportError: | |
| ELEVENLABS_AVAILABLE = False | |
| import config | |
| from services.llamaindex_service import LlamaIndexService | |
| from services.llm_service import LLMService | |
| from services.document_store_service import DocumentStoreService | |
| logger = logging.getLogger(__name__) | |
| class DocumentAnalysis: | |
| """Analysis results from document(s)""" | |
| key_insights: List[str] | |
| topics: List[str] | |
| complexity_level: str | |
| estimated_words: int | |
| source_documents: List[str] | |
| summary: str | |
| class DialogueLine: | |
| """Single line of podcast dialogue""" | |
| speaker: str | |
| text: str | |
| pause_after: float = 0.5 | |
| class PodcastScript: | |
| """Complete podcast script""" | |
| dialogue: List[DialogueLine] | |
| total_duration_estimate: float | |
| word_count: int | |
| style: str | |
| def to_text(self) -> str: | |
| lines = [] | |
| for line in self.dialogue: | |
| lines.append(f"{line.speaker}: {line.text}") | |
| return "\n\n".join(lines) | |
| class PodcastMetadata: | |
| """Metadata for generated podcast""" | |
| podcast_id: str | |
| title: str | |
| description: str | |
| source_documents: List[str] | |
| style: str | |
| duration_seconds: float | |
| file_size_mb: float | |
| voices: Dict[str, str] | |
| generated_at: str | |
| generation_cost: Dict[str, float] | |
| key_topics: List[str] | |
| class PodcastResult: | |
| """Complete podcast generation result""" | |
| podcast_id: str | |
| audio_file_path: str | |
| transcript: str | |
| metadata: PodcastMetadata | |
| generation_time: float | |
| success: bool | |
| error: Optional[str] = None | |
| class PodcastGeneratorService: | |
| """ | |
| Service for generating conversational podcasts from documents. | |
| """ | |
| WORDS_PER_MINUTE = 150 | |
| SCRIPT_PROMPTS = { | |
| "conversational": """You are an expert podcast script writer. Create an engaging 2-host podcast discussing the provided documents. | |
| DOCUMENT CONTENT: | |
| {document_content} | |
| KEY INSIGHTS: | |
| {key_insights} | |
| REQUIREMENTS: | |
| - Duration: {duration_minutes} minutes (approximately {word_count} words) | |
| - Style: Conversational, friendly, and accessible | |
| - Format: Alternating dialogue between HOST1 and HOST2 | |
| - Make the content engaging and easy to understand | |
| - Include natural transitions and enthusiasm | |
| DIALOGUE FORMAT (strictly follow): | |
| HOST1: [What they say] | |
| HOST2: [What they say] | |
| STRUCTURE: | |
| 1. Opening Hook (30 seconds): Grab attention | |
| 2. Introduction (1 minute): Set context | |
| 3. Main Discussion (70% of time): Deep dive into insights | |
| 4. Wrap-up (1 minute): Summarize key takeaways | |
| Generate the complete podcast script now:""", | |
| "educational": """Create an educational podcast discussing the provided documents. | |
| DOCUMENT CONTENT: | |
| {document_content} | |
| KEY INSIGHTS: | |
| {key_insights} | |
| REQUIREMENTS: | |
| - Duration: {duration_minutes} minutes (approximately {word_count} words) | |
| - Style: Clear, methodical, educational | |
| - HOST1 acts as teacher, HOST2 as curious learner | |
| DIALOGUE FORMAT: | |
| HOST1: [Expert explanation] | |
| HOST2: [Clarifying question] | |
| Generate the educational podcast script now:""", | |
| "technical": """Create a technical podcast for an informed audience. | |
| DOCUMENT CONTENT: | |
| {document_content} | |
| KEY INSIGHTS: | |
| {key_insights} | |
| REQUIREMENTS: | |
| - Duration: {duration_minutes} minutes (approximately {word_count} words) | |
| - Style: Professional, detailed, technically accurate | |
| - HOST1 is expert, HOST2 is informed interviewer | |
| DIALOGUE FORMAT: | |
| HOST1: [Technical insight] | |
| HOST2: [Probing question] | |
| Generate the technical podcast script now:""", | |
| "casual": """Create a fun, casual podcast discussing the documents. | |
| DOCUMENT CONTENT: | |
| {document_content} | |
| KEY INSIGHTS: | |
| {key_insights} | |
| REQUIREMENTS: | |
| - Duration: {duration_minutes} minutes (approximately {word_count} words) | |
| - Style: Relaxed, humorous, energetic | |
| - Make it entertaining while informative | |
| DIALOGUE FORMAT: | |
| HOST1: [Casual commentary] | |
| HOST2: [Enthusiastic response] | |
| Generate the casual podcast script now:""" | |
| } | |
| def __init__( | |
| self, | |
| llamaindex_service: LlamaIndexService, | |
| llm_service: LLMService, | |
| elevenlabs_api_key: Optional[str] = None | |
| ): | |
| self.config = config.config | |
| self.llamaindex_service = llamaindex_service | |
| self.llm_service = llm_service | |
| # Get document store from llamaindex service | |
| self.document_store = llamaindex_service.document_store | |
| # Initialize ElevenLabs client | |
| self.elevenlabs_client = None | |
| if ELEVENLABS_AVAILABLE: | |
| api_key = elevenlabs_api_key or self.config.ELEVENLABS_API_KEY | |
| if api_key: | |
| try: | |
| self.elevenlabs_client = ElevenLabs(api_key=api_key) | |
| logger.info("ElevenLabs client initialized for podcast generation") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize ElevenLabs client: {e}") | |
| # Create podcast storage directory | |
| self.podcast_dir = Path("./data/podcasts") | |
| self.podcast_dir.mkdir(parents=True, exist_ok=True) | |
| # Metadata database file | |
| self.metadata_file = self.podcast_dir / "metadata_db.json" | |
| self._ensure_metadata_db() | |
| # Voice cache | |
| self._voice_cache = {} | |
| def _ensure_metadata_db(self): | |
| """Ensure metadata database exists""" | |
| if not self.metadata_file.exists(): | |
| import json | |
| self.metadata_file.write_text(json.dumps([], indent=2)) | |
| async def generate_podcast( | |
| self, | |
| document_ids: List[str], | |
| style: str = "conversational", | |
| duration_minutes: int = 10, | |
| host1_voice: str = "Rachel", | |
| host2_voice: str = "Adam" | |
| ) -> PodcastResult: | |
| """Generate a complete podcast from documents""" | |
| start_time = datetime.now() | |
| podcast_id = str(uuid.uuid4()) | |
| try: | |
| logger.info(f"Starting podcast generation {podcast_id}") | |
| logger.info(f"Documents: {document_ids}, Style: {style}, Duration: {duration_minutes}min") | |
| # Step 1: Retrieve and analyze documents | |
| logger.info("Step 1: Retrieving and analyzing documents...") | |
| analysis = await self.analyze_documents(document_ids) | |
| # Step 2: Generate script | |
| logger.info("Step 2: Generating podcast script...") | |
| script = await self.generate_script(analysis, style, duration_minutes) | |
| # Step 3: Synthesize audio | |
| logger.info("Step 3: Synthesizing audio with voices...") | |
| audio_file_path = await self.synthesize_audio( | |
| podcast_id, | |
| script, | |
| host1_voice, | |
| host2_voice | |
| ) | |
| # Calculate generation time | |
| generation_time = (datetime.now() - start_time).total_seconds() | |
| # Step 4: Create metadata | |
| logger.info("Step 4: Creating metadata...") | |
| metadata = self._create_metadata( | |
| podcast_id, | |
| analysis, | |
| script, | |
| audio_file_path, | |
| {host1_voice, host2_voice}, | |
| document_ids, | |
| style | |
| ) | |
| # Save metadata | |
| self._save_metadata(metadata) | |
| # Save transcript | |
| transcript_path = self.podcast_dir / f"{podcast_id}_transcript.txt" | |
| transcript_path.write_text(script.to_text(), encoding="utf-8") | |
| logger.info(f"Podcast generated successfully: {podcast_id}") | |
| return PodcastResult( | |
| podcast_id=podcast_id, | |
| audio_file_path=str(audio_file_path), | |
| transcript=script.to_text(), | |
| metadata=metadata, | |
| generation_time=generation_time, | |
| success=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Podcast generation failed: {str(e)}", exc_info=True) | |
| return PodcastResult( | |
| podcast_id=podcast_id, | |
| audio_file_path="", | |
| transcript="", | |
| metadata=None, | |
| generation_time=(datetime.now() - start_time).total_seconds(), | |
| success=False, | |
| error=str(e) | |
| ) | |
| async def analyze_documents(self, document_ids: List[str]) -> DocumentAnalysis: | |
| """ | |
| Retrieve documents and extract key insights for podcast | |
| FIXED: Now actually retrieves document content from document store | |
| """ | |
| try: | |
| # Step 1: Retrieve actual documents from document store | |
| logger.info(f"Retrieving {len(document_ids)} documents from store...") | |
| documents = [] | |
| document_contents = [] | |
| for doc_id in document_ids: | |
| doc = await self.document_store.get_document(doc_id) | |
| if doc: | |
| documents.append(doc) | |
| document_contents.append(doc.content) | |
| logger.info(f"Retrieved document: {doc.filename} ({len(doc.content)} chars)") | |
| else: | |
| logger.warning(f"Document {doc_id} not found in store") | |
| if not documents: | |
| raise ValueError(f"No documents found for IDs: {document_ids}") | |
| # Step 2: Combine document content | |
| combined_content = "\n\n---DOCUMENT SEPARATOR---\n\n".join(document_contents) | |
| # Truncate if too long (keep first portion for context) | |
| max_content_length = 15000 # Adjust based on your LLM context window | |
| if len(combined_content) > max_content_length: | |
| logger.warning(f"Content too long ({len(combined_content)} chars), truncating to {max_content_length}") | |
| combined_content = combined_content[:max_content_length] + "\n\n[Content truncated...]" | |
| # Step 3: Use LLM to analyze the content | |
| analysis_prompt = f"""Analyze the following document(s) and provide: | |
| 1. The 5-7 most important insights or key points (be specific and detailed) | |
| 2. Main themes and topics covered | |
| 3. The overall complexity level (beginner/intermediate/advanced) | |
| 4. A comprehensive summary suitable for podcast discussion | |
| DOCUMENTS: | |
| {combined_content} | |
| Provide a structured analysis optimized for creating an engaging podcast discussion. | |
| Format your response as: | |
| KEY INSIGHTS: | |
| 1. [First key insight] | |
| 2. [Second key insight] | |
| ... | |
| TOPICS: | |
| - [Topic 1] | |
| - [Topic 2] | |
| ... | |
| COMPLEXITY: [beginner/intermediate/advanced] | |
| SUMMARY: | |
| [Your comprehensive summary here] | |
| """ | |
| logger.info("Analyzing content with LLM...") | |
| result = await self.llm_service.generate_text( | |
| analysis_prompt, | |
| max_tokens=2000, | |
| temperature=0.7 | |
| ) | |
| # Step 4: Parse the structured response | |
| insights = self._extract_insights(result) | |
| topics = self._extract_topics(result) | |
| complexity = self._determine_complexity(result) | |
| summary = self._extract_summary(result) | |
| logger.info(f"Analysis complete: {len(insights)} insights, {len(topics)} topics") | |
| return DocumentAnalysis( | |
| key_insights=insights[:7], | |
| topics=topics, | |
| complexity_level=complexity, | |
| estimated_words=len(combined_content.split()), | |
| source_documents=[doc.filename for doc in documents], | |
| summary=summary or result[:500] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Document analysis failed: {str(e)}", exc_info=True) | |
| raise RuntimeError(f"Failed to analyze documents: {str(e)}") | |
| def _extract_summary(self, text: str) -> str: | |
| """Extract summary section from analysis""" | |
| try: | |
| if "SUMMARY:" in text: | |
| parts = text.split("SUMMARY:") | |
| if len(parts) > 1: | |
| summary = parts[1].strip() | |
| # Take first 500 chars if too long | |
| return summary[:500] if len(summary) > 500 else summary | |
| except: | |
| pass | |
| # Fallback: take first few sentences | |
| sentences = text.split('.') | |
| return '. '.join(sentences[:3]) + '.' | |
| def _extract_insights(self, text: str) -> List[str]: | |
| """Extract key insights from analysis text""" | |
| insights = [] | |
| lines = text.split('\n') | |
| in_insights_section = False | |
| for line in lines: | |
| line = line.strip() | |
| if "KEY INSIGHTS:" in line.upper(): | |
| in_insights_section = True | |
| continue | |
| elif line.upper().startswith(("TOPICS:", "COMPLEXITY:", "SUMMARY:")): | |
| in_insights_section = False | |
| if in_insights_section and line: | |
| # Match patterns like "1.", "2.", "-", "*", "•" | |
| insight = re.sub(r'^\d+\.|\-|\*|•', '', line).strip() | |
| if len(insight) > 20: | |
| insights.append(insight) | |
| # Fallback if no insights found | |
| if not insights: | |
| sentences = text.split('.') | |
| insights = [s.strip() + '.' for s in sentences[:7] if len(s.strip()) > 20] | |
| return insights | |
| def _extract_topics(self, text: str) -> List[str]: | |
| """Extract main topics from analysis""" | |
| topics = [] | |
| lines = text.split('\n') | |
| in_topics_section = False | |
| for line in lines: | |
| line = line.strip() | |
| if "TOPICS:" in line.upper(): | |
| in_topics_section = True | |
| continue | |
| elif line.upper().startswith(("KEY INSIGHTS:", "COMPLEXITY:", "SUMMARY:")): | |
| in_topics_section = False | |
| if in_topics_section and line: | |
| topic = re.sub(r'^\-|\*|•', '', line).strip() | |
| if len(topic) > 2: | |
| topics.append(topic) | |
| # Fallback: simple keyword extraction | |
| if not topics: | |
| common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} | |
| words = text.lower().split() | |
| word_freq = {} | |
| for word in words: | |
| word = re.sub(r'[^\w\s]', '', word) | |
| if len(word) > 4 and word not in common_words: | |
| word_freq[word] = word_freq.get(word, 0) + 1 | |
| top_topics = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5] | |
| topics = [topic[0].title() for topic in top_topics] | |
| return topics[:5] | |
| def _determine_complexity(self, text: str) -> str: | |
| """Determine content complexity level""" | |
| text_lower = text.lower() | |
| if "complexity:" in text_lower: | |
| for level in ["beginner", "intermediate", "advanced"]: | |
| if level in text_lower.split("complexity:")[1][:100]: | |
| return level | |
| # Heuristic based on keywords | |
| if any(word in text_lower for word in ['basic', 'introduction', 'beginner', 'simple']): | |
| return "beginner" | |
| elif any(word in text_lower for word in ['advanced', 'complex', 'sophisticated', 'expert']): | |
| return "advanced" | |
| else: | |
| return "intermediate" | |
| async def generate_script( | |
| self, | |
| analysis: DocumentAnalysis, | |
| style: str, | |
| duration_minutes: int | |
| ) -> PodcastScript: | |
| """Generate podcast script from analysis""" | |
| target_words = duration_minutes * self.WORDS_PER_MINUTE | |
| # Prepare context with insights | |
| insights_text = "\n".join(f"{i+1}. {insight}" for i, insight in enumerate(analysis.key_insights)) | |
| # Get prompt template | |
| prompt_template = self.SCRIPT_PROMPTS.get(style, self.SCRIPT_PROMPTS["conversational"]) | |
| # Fill template | |
| prompt = prompt_template.format( | |
| document_content=analysis.summary, | |
| key_insights=insights_text, | |
| duration_minutes=duration_minutes, | |
| word_count=target_words | |
| ) | |
| # Generate script | |
| script_text = await self.llm_service.generate_text( | |
| prompt, | |
| max_tokens=target_words * 2, | |
| temperature=0.8 | |
| ) | |
| # Parse into dialogue | |
| dialogue = self._parse_script(script_text) | |
| if not dialogue: | |
| raise ValueError("Failed to parse script into dialogue lines") | |
| word_count = sum(len(line.text.split()) for line in dialogue) | |
| duration_estimate = word_count / self.WORDS_PER_MINUTE | |
| return PodcastScript( | |
| dialogue=dialogue, | |
| total_duration_estimate=duration_estimate * 60, | |
| word_count=word_count, | |
| style=style | |
| ) | |
| def _parse_script(self, script_text: str) -> List[DialogueLine]: | |
| """Parse generated script into dialogue lines""" | |
| dialogue = [] | |
| lines = script_text.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('HOST1:'): | |
| text = line[6:].strip() | |
| if text: | |
| dialogue.append(DialogueLine(speaker="HOST1", text=text)) | |
| elif line.startswith('HOST2:'): | |
| text = line[6:].strip() | |
| if text: | |
| dialogue.append(DialogueLine(speaker="HOST2", text=text)) | |
| return dialogue | |
| def _get_voice_id(self, voice_name: str) -> str: | |
| """Get voice ID from voice name""" | |
| try: | |
| # Use cache if available | |
| if not self._voice_cache: | |
| voices = self.elevenlabs_client.voices.get_all() | |
| if not voices or not voices.voices: | |
| raise RuntimeError("No voices available") | |
| for voice in voices.voices: | |
| self._voice_cache[voice.name.lower()] = voice.voice_id | |
| # Exact match | |
| if voice_name.lower() in self._voice_cache: | |
| return self._voice_cache[voice_name.lower()] | |
| # Partial match | |
| for name, voice_id in self._voice_cache.items(): | |
| if voice_name.lower() in name: | |
| logger.info(f"Partial match for '{voice_name}': {name}") | |
| return voice_id | |
| # Fallback | |
| first_voice_id = list(self._voice_cache.values())[0] | |
| logger.warning(f"Voice '{voice_name}' not found, using default") | |
| return first_voice_id | |
| except Exception as e: | |
| logger.error(f"Could not fetch voices: {e}") | |
| raise RuntimeError(f"Failed to get voice ID: {str(e)}") | |
| async def synthesize_audio( | |
| self, | |
| podcast_id: str, | |
| script: PodcastScript, | |
| host1_voice: str, | |
| host2_voice: str | |
| ) -> Path: | |
| """Synthesize audio with alternating voices""" | |
| if not self.elevenlabs_client: | |
| raise RuntimeError("ElevenLabs client not initialized") | |
| audio_file = self.podcast_dir / f"{podcast_id}.mp3" | |
| try: | |
| # Get voice IDs | |
| host1_voice_id = self._get_voice_id(host1_voice) | |
| host2_voice_id = self._get_voice_id(host2_voice) | |
| logger.info(f"HOST1: {host1_voice}, HOST2: {host2_voice}") | |
| voice_map = { | |
| "HOST1": host1_voice_id, | |
| "HOST2": host2_voice_id | |
| } | |
| audio_chunks = [] | |
| # Process each line with correct voice | |
| for i, line in enumerate(script.dialogue): | |
| logger.info(f"Line {i+1}/{len(script.dialogue)}: {line.speaker}") | |
| voice_id = voice_map.get(line.speaker, host1_voice_id) | |
| audio_generator = self.elevenlabs_client.text_to_speech.convert( | |
| voice_id=voice_id, | |
| text=line.text, | |
| model_id="eleven_multilingual_v2" | |
| ) | |
| line_chunks = [] | |
| for chunk in audio_generator: | |
| if chunk: | |
| line_chunks.append(chunk) | |
| if line_chunks: | |
| audio_chunks.append(b''.join(line_chunks)) | |
| if not audio_chunks: | |
| raise RuntimeError("No audio chunks generated") | |
| full_audio = b''.join(audio_chunks) | |
| with open(audio_file, 'wb') as f: | |
| f.write(full_audio) | |
| if audio_file.exists() and audio_file.stat().st_size > 1000: | |
| logger.info(f"Audio created: {audio_file} ({audio_file.stat().st_size} bytes)") | |
| return audio_file | |
| else: | |
| raise RuntimeError("Audio file too small or empty") | |
| except Exception as e: | |
| logger.error(f"Audio synthesis failed: {e}", exc_info=True) | |
| raise RuntimeError(f"Failed to generate audio: {str(e)}") | |
| def _create_metadata( | |
| self, | |
| podcast_id: str, | |
| analysis: DocumentAnalysis, | |
| script: PodcastScript, | |
| audio_path: Path, | |
| voices: set, | |
| document_ids: List[str], | |
| style: str | |
| ) -> PodcastMetadata: | |
| """Create podcast metadata""" | |
| title = f"Podcast: {analysis.topics[0] if analysis.topics else 'Document Discussion'}" | |
| description = f"A {style} podcast discussing: {', '.join(analysis.source_documents)}" | |
| file_size_mb = audio_path.stat().st_size / (1024 * 1024) if audio_path.exists() else 0 | |
| llm_cost = (script.word_count / 1000) * 0.01 | |
| tts_cost = (script.word_count * 5 / 1000) * 0.30 | |
| return PodcastMetadata( | |
| podcast_id=podcast_id, | |
| title=title, | |
| description=description, | |
| source_documents=analysis.source_documents, | |
| style=style, | |
| duration_seconds=script.total_duration_estimate, | |
| file_size_mb=file_size_mb, | |
| voices={"host1": list(voices)[0] if len(voices) > 0 else "Rachel", | |
| "host2": list(voices)[1] if len(voices) > 1 else "Adam"}, | |
| generated_at=datetime.now().isoformat(), | |
| generation_cost={"llm_cost": llm_cost, "tts_cost": tts_cost, "total": llm_cost + tts_cost}, | |
| key_topics=analysis.topics | |
| ) | |
| def _save_metadata(self, metadata: PodcastMetadata): | |
| """Save metadata to database""" | |
| try: | |
| import json | |
| existing = json.loads(self.metadata_file.read_text()) | |
| existing.append(asdict(metadata)) | |
| self.metadata_file.write_text(json.dumps(existing, indent=2)) | |
| logger.info(f"Metadata saved: {metadata.podcast_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata: {e}") | |
| def list_podcasts(self, limit: int = 10) -> List[PodcastMetadata]: | |
| """List generated podcasts""" | |
| try: | |
| import json | |
| data = json.loads(self.metadata_file.read_text()) | |
| podcasts = [PodcastMetadata(**item) for item in data[-limit:]] | |
| return list(reversed(podcasts)) | |
| except Exception as e: | |
| logger.error(f"Failed to list podcasts: {e}") | |
| return [] | |
| def get_podcast(self, podcast_id: str) -> Optional[PodcastMetadata]: | |
| """Get specific podcast metadata""" | |
| try: | |
| import json | |
| data = json.loads(self.metadata_file.read_text()) | |
| for item in data: | |
| if item.get('podcast_id') == podcast_id: | |
| return PodcastMetadata(**item) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get podcast: {e}") | |
| return None |