Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI Backend for Audio Transcription - Hugging Face Spaces Deployment | |
| This provides a REST API that can be consumed by any frontend | |
| """ | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from transformers import pipeline | |
| from openai import OpenAI | |
| import tempfile | |
| import os | |
| import time | |
| import librosa | |
| import soundfile as sf | |
| import numpy as np | |
| import subprocess | |
| from youtube_downloader import youtube_downloader | |
| import re | |
| import requests | |
| import json | |
| from typing import Dict | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="AudioScribe API", | |
| description="AI-powered audio transcription and summarization API", | |
| version="1.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # Configure CORS to allow all origins (important for frontend deployment) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins for frontend deployment | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| # Global variables for models | |
| transcription_pipeline = None | |
| summarization_pipeline = None | |
| summarization_model_name = None | |
| # OpenRouter API configuration using OpenAI client | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| if not OPENROUTER_API_KEY: | |
| OPENROUTER_API_KEY = "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" | |
| print("β οΈ Using fallback API key") | |
| else: | |
| print("β Loaded API key from environment") | |
| print(f"π API Key (masked): {OPENROUTER_API_KEY[:10]}...{OPENROUTER_API_KEY[-5:]}") | |
| # Initialize OpenAI client for OpenRouter | |
| openrouter_client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=OPENROUTER_API_KEY, | |
| ) | |
| # Request models | |
| class YouTubeRequest(BaseModel): | |
| url: str | |
| mode: str = "summarize" # "summarize" or "notes" | |
| def is_youtube_url(url: str) -> bool: | |
| """Check if URL is a valid YouTube URL""" | |
| youtube_regex = re.compile( | |
| r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
| r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' | |
| ) | |
| return youtube_regex.match(url) is not None | |
| def download_youtube_audio(url: str) -> str: | |
| """Download audio from YouTube URL using robust downloader""" | |
| try: | |
| return youtube_downloader.download_youtube_audio(url) | |
| except Exception as e: | |
| # Clean up on error | |
| youtube_downloader.cleanup_old_files() | |
| raise e | |
| def preprocess_audio(audio_path: str) -> str: | |
| """Preprocess audio file for Whisper""" | |
| try: | |
| # Load audio at 16kHz (Whisper's expected sample rate) | |
| audio, sr = librosa.load(audio_path, sr=16000) | |
| # Normalize audio | |
| if np.max(np.abs(audio)) > 0: | |
| audio = audio / np.max(np.abs(audio)) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
| sf.write(tmp_file.name, audio, 16000) | |
| return tmp_file.name | |
| except Exception as e: | |
| print(f"Audio preprocessing error: {e}") | |
| return audio_path | |
| def generate_summary(text: str, mode: str = "summarize") -> str: | |
| """Generate summary or study notes from transcribed text using OpenRouter API""" | |
| if len(text.split()) < 30: | |
| return "Text too short for meaningful summarization." | |
| try: | |
| if mode == "notes": | |
| # Generate structured study notes using OpenRouter | |
| print("π§ Generating comprehensive study notes with OpenRouter AI...") | |
| formatted_notes = generate_comprehensive_study_notes(text) | |
| return formatted_notes | |
| else: | |
| # Generate regular summary using OpenRouter | |
| print("π Generating summary with OpenRouter AI...") | |
| summary = generate_openrouter_summary(text) | |
| # Fallback to local summarization if OpenRouter fails | |
| if not summary or summary == "Summary temporarily unavailable.": | |
| print("β οΈ OpenRouter failed, falling back to local summarization...") | |
| if summarization_pipeline: | |
| # Truncate text if too long for local model | |
| max_length = 512 if summarization_model_name == "t5-small" else 1024 | |
| words = text.split() | |
| if len(words) > max_length: | |
| text = " ".join(words[:max_length]) | |
| result = summarization_pipeline( | |
| text, | |
| max_length=150, | |
| min_length=30, | |
| do_sample=False | |
| ) | |
| return result[0]['summary_text'] | |
| else: | |
| return "Summarization temporarily unavailable." | |
| return summary | |
| except Exception as e: | |
| print(f"Summarization error: {e}") | |
| return "Summarization temporarily unavailable." | |
| def generate_openrouter_summary(text: str) -> str: | |
| """Generate a concise summary using OpenRouter API""" | |
| try: | |
| # Truncate text if too long for API | |
| if len(text) > 3000: | |
| text = text[:3000] + "..." | |
| prompt = f""" | |
| Please provide a clear, concise summary of this content in 2-3 paragraphs: | |
| {text} | |
| Requirements: | |
| - Focus on the main points and key information | |
| - Keep it informative but concise (150-200 words) | |
| - Use clear, simple language | |
| - Don't include unnecessary details or filler words | |
| - Structure it logically with the most important points first | |
| """ | |
| summary = call_openrouter_api(prompt, max_tokens=300) | |
| return summary if summary else "Summary temporarily unavailable." | |
| except Exception as e: | |
| print(f"OpenRouter summary generation error: {e}") | |
| return "Summary temporarily unavailable." | |
| def generate_comprehensive_study_notes(text: str) -> str: | |
| """Generate comprehensive study notes using OpenRouter API""" | |
| try: | |
| # Truncate text if too long for API | |
| if len(text) > 3000: | |
| text = text[:3000] + "..." | |
| # Detect content type for better prompting | |
| content_type = detect_content_type(text.lower()) | |
| prompt = f""" | |
| Analyze this {content_type} content and create comprehensive study notes: | |
| {text} | |
| Create structured study notes with these sections: | |
| π STUDY NOTES - {content_type.upper()} | |
| π― KEY CONCEPTS: | |
| [Extract 3-5 main concepts, each in one clear sentence] | |
| π MAIN POINTS: | |
| [List 4-6 important points from the content] | |
| β‘ PRACTICAL INFORMATION: | |
| [Include any commands, steps, examples, or actionable items mentioned] | |
| π SUMMARY: | |
| [Provide a clear 2-3 sentence summary of the entire content] | |
| π‘ STUDY TIPS: | |
| [Give 3-4 specific study recommendations based on the subject] | |
| π NEXT STEPS: | |
| [Suggest 2-3 practical actions someone could take to learn more] | |
| Requirements: | |
| - Keep each point concise and actionable | |
| - Focus on learning and understanding | |
| - Include specific details when available | |
| - Use bullet points for easy reading | |
| - Don't repeat the original transcription verbatim | |
| - Make it genuinely helpful for studying | |
| """ | |
| notes = call_openrouter_api(prompt, max_tokens=1200) | |
| return notes if notes else "Study notes temporarily unavailable." | |
| except Exception as e: | |
| print(f"Comprehensive study notes generation error: {e}") | |
| return "Study notes temporarily unavailable." | |
| def format_as_study_notes(summary: str, original_text: str) -> str: | |
| """Format summary as structured study notes based on content analysis""" | |
| try: | |
| import re | |
| # Clean and analyze the text | |
| text_lower = original_text.lower() | |
| sentences = [s.strip() for s in original_text.split('.') if len(s.strip()) > 10] | |
| # Detect content type and subject matter | |
| content_type = detect_content_type(text_lower) | |
| subject_keywords = extract_subject_keywords(text_lower) | |
| # Extract meaningful concepts and key points | |
| key_concepts = extract_key_concepts(sentences, subject_keywords) | |
| important_points = extract_important_points(sentences) | |
| definitions = extract_definitions(sentences) | |
| # Extract practical content (NEW) | |
| commands = extract_commands_and_code(original_text) | |
| steps = extract_step_by_step_instructions(sentences) | |
| examples = extract_practical_examples(sentences, content_type) | |
| tools_mentioned = extract_tools_and_technologies(original_text) | |
| # Build dynamic study notes | |
| notes = f"π STUDY NOTES - {content_type.upper()}\n\n" | |
| # Add subject-specific sections | |
| if key_concepts: | |
| notes += "π― KEY CONCEPTS:\n" | |
| for concept in key_concepts[:5]: | |
| notes += f"β’ {concept}\n" | |
| notes += "\n" | |
| if definitions: | |
| notes += "π DEFINITIONS & EXPLANATIONS:\n" | |
| for definition in definitions[:3]: | |
| notes += f"β’ {definition}\n" | |
| notes += "\n" | |
| # Add practical sections (NEW) | |
| if commands: | |
| notes += "β‘ COMMANDS & CODE:\n" | |
| for command in commands[:5]: | |
| notes += f"β’ `{command}`\n" | |
| notes += "\n" | |
| if steps: | |
| notes += "π STEP-BY-STEP:\n" | |
| for i, step in enumerate(steps[:6], 1): | |
| notes += f"{i}. {step}\n" | |
| notes += "\n" | |
| if examples: | |
| notes += "π‘ PRACTICAL EXAMPLES:\n" | |
| for example in examples[:4]: | |
| notes += f"β’ {example}\n" | |
| notes += "\n" | |
| if tools_mentioned: | |
| notes += "π οΈ TOOLS & TECHNOLOGIES:\n" | |
| for tool in tools_mentioned[:5]: | |
| notes += f"β’ {tool}\n" | |
| notes += "\n" | |
| if important_points: | |
| notes += "β IMPORTANT POINTS:\n" | |
| for point in important_points[:4]: | |
| notes += f"β’ {point}\n" | |
| notes += "\n" | |
| # Add the AI summary | |
| notes += f"π AI SUMMARY:\n{summary}\n\n" | |
| # Generate enhanced notes using OpenRouter API | |
| print("π§ Generating enhanced notes with OpenRouter AI...") | |
| enhanced_notes = generate_enhanced_notes_with_openrouter(summary, original_text, content_type) | |
| if enhanced_notes and enhanced_notes != "Enhanced notes temporarily unavailable.": | |
| notes += f"π AI-ENHANCED STUDY NOTES:\n{enhanced_notes}\n\n" | |
| # Generate subject-specific examples using OpenRouter | |
| print("π― Generating practical examples...") | |
| examples_content = generate_subject_specific_examples(content_type, key_concepts) | |
| if examples_content: | |
| notes += f"{examples_content}\n\n" | |
| # Generate troubleshooting tips for technical content | |
| if any(tech in content_type.lower() for tech in ['programming', 'computer', 'aws', 'cloud', 'technical']): | |
| print("π§ Generating troubleshooting tips...") | |
| troubleshooting = generate_troubleshooting_tips(content_type, tools_mentioned + subject_keywords[:3]) | |
| if troubleshooting: | |
| notes += f"{troubleshooting}\n\n" | |
| # Generate subject-specific study tips with practical focus | |
| study_tips = generate_subject_specific_tips(content_type, subject_keywords, commands, tools_mentioned) | |
| notes += "π‘ STUDY RECOMMENDATIONS:\n" | |
| for tip in study_tips: | |
| notes += f"β’ {tip}\n" | |
| # Add content-specific learning strategies | |
| learning_strategies = get_learning_strategies(content_type) | |
| if learning_strategies: | |
| notes += f"\nπ LEARNING STRATEGIES FOR {content_type.upper()}:\n" | |
| for strategy in learning_strategies: | |
| notes += f"β’ {strategy}\n" | |
| # Add practical next steps | |
| next_steps = generate_next_steps(content_type, tools_mentioned, commands) | |
| if next_steps: | |
| notes += f"\nπ NEXT STEPS TO PRACTICE:\n" | |
| for step in next_steps: | |
| notes += f"β’ {step}\n" | |
| return notes | |
| except Exception as e: | |
| print(f"Notes formatting error: {e}") | |
| # Fallback to basic format | |
| return f"π STUDY NOTES\n\nπ SUMMARY:\n{summary}\n\nπ‘ STUDY TIPS:\nβ’ Review the main concepts\nβ’ Take notes on key points\nβ’ Practice applying the knowledge" | |
| def extract_commands_and_code(text: str) -> list: | |
| """Extract commands, code snippets, and technical instructions""" | |
| commands = [] | |
| # Common command patterns | |
| command_patterns = [ | |
| r'npm install[^\n]*', | |
| r'pip install[^\n]*', | |
| r'docker run[^\n]*', | |
| r'kubectl[^\n]*', | |
| r'aws[^\n]*', | |
| r'git[^\n]*', | |
| r'terraform[^\n]*', | |
| r'ansible[^\n]*', | |
| r'sudo[^\n]*', | |
| r'chmod[^\n]*', | |
| r'ssh[^\n]*', | |
| r'curl[^\n]*', | |
| r'wget[^\n]*', | |
| r'python[^\n]*\.py', | |
| r'node[^\n]*\.js', | |
| r'java[^\n]*\.java', | |
| r'gcc[^\n]*', | |
| r'make[^\n]*', | |
| r'yarn[^\n]*', | |
| r'brew install[^\n]*', | |
| r'apt-get[^\n]*', | |
| r'yum[^\n]*', | |
| r'systemctl[^\n]*', | |
| r'service[^\n]*' | |
| ] | |
| # Extract commands | |
| import re | |
| for pattern in command_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| commands.extend(matches) | |
| # Look for code blocks or inline code | |
| code_blocks = re.findall(r'```[^`]*```', text) | |
| inline_code = re.findall(r'`[^`]+`', text) | |
| # Clean and add code snippets | |
| for code in code_blocks + inline_code: | |
| clean_code = code.strip('`').strip() | |
| if len(clean_code) > 5 and len(clean_code) < 100: | |
| commands.append(clean_code) | |
| return list(set(commands))[:5] | |
| def extract_step_by_step_instructions(sentences: list) -> list: | |
| """Extract step-by-step instructions from text""" | |
| steps = [] | |
| step_indicators = [ | |
| 'first', 'second', 'third', 'fourth', 'fifth', | |
| 'step 1', 'step 2', 'step 3', 'step 4', 'step 5', | |
| 'next', 'then', 'after that', 'finally', | |
| 'start by', 'begin by', 'to start', | |
| '1.', '2.', '3.', '4.', '5.' | |
| ] | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| if any(indicator in sentence_lower for indicator in step_indicators): | |
| # Clean up the step | |
| clean_step = sentence.strip() | |
| if len(clean_step) > 10 and len(clean_step) < 200: | |
| steps.append(clean_step) | |
| return steps[:6] | |
| def extract_practical_examples(sentences: list, content_type: str) -> list: | |
| """Extract practical examples based on content type""" | |
| examples = [] | |
| example_indicators = [ | |
| 'for example', 'such as', 'like', 'including', | |
| 'instance', 'case', 'sample', 'demo', 'illustration' | |
| ] | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| if any(indicator in sentence_lower for indicator in example_indicators): | |
| examples.append(sentence.strip()) | |
| # Content-specific example extraction | |
| if 'programming' in content_type.lower(): | |
| # Look for function calls, variable assignments, etc. | |
| import re | |
| code_examples = re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*\s*=\s*[^.]*', ' '.join(sentences)) | |
| examples.extend(code_examples[:2]) | |
| return examples[:4] | |
| def extract_tools_and_technologies(text: str) -> list: | |
| """Extract mentioned tools, technologies, and platforms""" | |
| # Common tools and technologies | |
| tech_keywords = [ | |
| # Cloud Platforms | |
| 'aws', 'azure', 'gcp', 'google cloud', 'amazon web services', | |
| # DevOps Tools | |
| 'docker', 'kubernetes', 'jenkins', 'gitlab', 'github actions', | |
| 'terraform', 'ansible', 'chef', 'puppet', | |
| # Programming Languages | |
| 'python', 'javascript', 'java', 'c++', 'go', 'rust', 'typescript', | |
| # Databases | |
| 'mysql', 'postgresql', 'mongodb', 'redis', 'elasticsearch', | |
| # Web Technologies | |
| 'react', 'vue', 'angular', 'node.js', 'express', 'django', 'flask', | |
| # Development Tools | |
| 'vscode', 'intellij', 'eclipse', 'vim', 'emacs', | |
| 'git', 'svn', 'mercurial', | |
| # Operating Systems | |
| 'linux', 'ubuntu', 'centos', 'windows', 'macos', | |
| # Others | |
| 'apache', 'nginx', 'tomcat', 'webpack', 'babel' | |
| ] | |
| text_lower = text.lower() | |
| found_tools = [] | |
| for tool in tech_keywords: | |
| if tool in text_lower: | |
| found_tools.append(tool.title()) | |
| # Also look for capitalized technical terms | |
| import re | |
| capitalized_terms = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text) | |
| tech_terms = [term for term in capitalized_terms if len(term) > 2 and any(char.isupper() for char in term)] | |
| found_tools.extend(tech_terms) | |
| return list(set(found_tools))[:5] | |
| def detect_content_type(text: str) -> str: | |
| """Detect the type of content based on keywords""" | |
| if any(word in text for word in ['algorithm', 'programming', 'code', 'function', 'software', 'computer']): | |
| return "Computer Science/Programming" | |
| elif any(word in text for word in ['equation', 'formula', 'mathematics', 'calculate', 'theorem', 'proof']): | |
| return "Mathematics" | |
| elif any(word in text for word in ['history', 'historical', 'century', 'ancient', 'civilization', 'war']): | |
| return "History" | |
| elif any(word in text for word in ['science', 'experiment', 'hypothesis', 'research', 'theory', 'study']): | |
| return "Science" | |
| elif any(word in text for word in ['business', 'marketing', 'strategy', 'management', 'economics', 'finance']): | |
| return "Business/Economics" | |
| elif any(word in text for word in ['language', 'grammar', 'literature', 'writing', 'linguistic']): | |
| return "Language/Literature" | |
| elif any(word in text for word in ['music', 'song', 'melody', 'rhythm', 'instrument', 'composition']): | |
| return "Music" | |
| elif any(word in text for word in ['health', 'medical', 'disease', 'treatment', 'medicine', 'therapy']): | |
| return "Health/Medicine" | |
| else: | |
| return "General Knowledge" | |
| def extract_subject_keywords(text: str) -> list: | |
| """Extract subject-specific keywords""" | |
| import re | |
| # Find capitalized words and technical terms | |
| keywords = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', text) | |
| # Find repeated important terms | |
| words = text.split() | |
| word_freq = {} | |
| for word in words: | |
| if len(word) > 4 and word.isalpha(): | |
| word_freq[word] = word_freq.get(word, 0) + 1 | |
| frequent_terms = [word for word, freq in word_freq.items() if freq > 2] | |
| return list(set(keywords + frequent_terms))[:10] | |
| def extract_key_concepts(sentences: list, keywords: list) -> list: | |
| """Extract sentences that contain key concepts""" | |
| concepts = [] | |
| for sentence in sentences: | |
| # Look for sentences with multiple keywords or definition patterns | |
| if (len([kw for kw in keywords if kw.lower() in sentence.lower()]) >= 2 or | |
| any(pattern in sentence.lower() for pattern in ['is defined as', 'refers to', 'means that', 'is the process'])): | |
| concepts.append(sentence.strip()) | |
| return concepts[:5] | |
| def extract_important_points(sentences: list) -> list: | |
| """Extract important points based on linguistic patterns""" | |
| important = [] | |
| importance_indicators = [ | |
| 'important', 'crucial', 'essential', 'key', 'significant', 'major', 'primary', | |
| 'first', 'second', 'third', 'finally', 'most', 'best', 'main', 'fundamental' | |
| ] | |
| for sentence in sentences: | |
| if any(indicator in sentence.lower() for indicator in importance_indicators): | |
| important.append(sentence.strip()) | |
| return important[:4] | |
| def extract_definitions(sentences: list) -> list: | |
| """Extract definition-like sentences""" | |
| definitions = [] | |
| definition_patterns = [ | |
| 'is defined as', 'is called', 'refers to', 'means', 'is the', 'are the', | |
| 'is a type of', 'is a form of', 'is known as' | |
| ] | |
| for sentence in sentences: | |
| if any(pattern in sentence.lower() for pattern in definition_patterns): | |
| definitions.append(sentence.strip()) | |
| return definitions[:3] | |
| def generate_subject_specific_tips(content_type: str, keywords: list, commands: list = None, tools: list = None) -> list: | |
| """Generate study tips based on content type and practical content""" | |
| base_tips = ["Review the main concepts regularly", "Create your own examples"] | |
| if "programming" in content_type.lower() or "computer" in content_type.lower(): | |
| tips = base_tips + [ | |
| "Practice coding the concepts discussed", | |
| "Try implementing the algorithms mentioned", | |
| "Look up documentation for the technologies mentioned" | |
| ] | |
| if commands: | |
| tips.append("Execute the commands in a safe environment to understand them") | |
| if tools: | |
| tips.append(f"Set up and experiment with: {', '.join(tools[:3])}") | |
| return tips | |
| elif "math" in content_type.lower(): | |
| return base_tips + [ | |
| "Work through similar problems step by step", | |
| "Memorize key formulas and theorems", | |
| "Practice with different types of examples" | |
| ] | |
| elif "history" in content_type.lower(): | |
| return base_tips + [ | |
| "Create a timeline of events mentioned", | |
| "Connect events to their causes and effects", | |
| "Research additional context about the time period" | |
| ] | |
| elif "science" in content_type.lower(): | |
| return base_tips + [ | |
| "Understand the scientific method applied", | |
| "Look up related experiments or studies", | |
| "Connect concepts to real-world applications" | |
| ] | |
| elif "business" in content_type.lower(): | |
| return base_tips + [ | |
| "Think of real business examples for each concept", | |
| "Consider how strategies apply to different industries", | |
| "Analyze case studies related to the topics" | |
| ] | |
| else: | |
| practical_tips = base_tips + [ | |
| "Connect new information to what you already know", | |
| "Look for patterns and relationships between concepts", | |
| "Think about practical applications of the knowledge" | |
| ] | |
| if commands: | |
| practical_tips.append("Try running the mentioned commands to see how they work") | |
| if tools: | |
| practical_tips.append(f"Research and experiment with: {', '.join(tools[:2])}") | |
| return practical_tips | |
| def generate_next_steps(content_type: str, tools: list = None, commands: list = None) -> list: | |
| """Generate practical next steps based on content analysis""" | |
| next_steps = [] | |
| if "programming" in content_type.lower() or "computer" in content_type.lower(): | |
| next_steps = [ | |
| "Set up a development environment for the discussed technologies", | |
| "Create a small project using the concepts learned", | |
| "Join relevant developer communities and forums" | |
| ] | |
| if tools: | |
| next_steps.append(f"Install and configure: {', '.join(tools[:2])}") | |
| if commands: | |
| next_steps.append("Practice the commands in a test environment") | |
| elif "aws" in str(tools).lower() or "cloud" in content_type.lower(): | |
| next_steps = [ | |
| "Create a free AWS account to practice", | |
| "Follow AWS tutorials related to the discussed services", | |
| "Set up AWS CLI and practice basic commands" | |
| ] | |
| if commands: | |
| aws_commands = [cmd for cmd in commands if 'aws' in cmd.lower()] | |
| if aws_commands: | |
| next_steps.append(f"Try these AWS commands: {aws_commands[0]}") | |
| elif "docker" in str(tools).lower(): | |
| next_steps = [ | |
| "Install Docker on your local machine", | |
| "Practice creating and running containers", | |
| "Build a simple Docker image for a basic application" | |
| ] | |
| elif "kubernetes" in str(tools).lower(): | |
| next_steps = [ | |
| "Set up a local Kubernetes cluster (minikube/kind)", | |
| "Practice basic kubectl commands", | |
| "Deploy a simple application to the cluster" | |
| ] | |
| elif "math" in content_type.lower(): | |
| next_steps = [ | |
| "Solve 5-10 practice problems related to the topic", | |
| "Create flashcards for important formulas", | |
| "Find online calculators to verify your work" | |
| ] | |
| elif "science" in content_type.lower(): | |
| next_steps = [ | |
| "Look up related scientific papers or articles", | |
| "Find online simulations or visualizations", | |
| "Connect the concepts to current events or news" | |
| ] | |
| else: | |
| next_steps = [ | |
| "Create a mind map of the key concepts", | |
| "Find additional resources on the topic", | |
| "Discuss the concepts with peers or in online forums" | |
| ] | |
| return next_steps[:4] | |
| def get_learning_strategies(content_type: str) -> list: | |
| """Get learning strategies specific to content type""" | |
| if "programming" in content_type.lower(): | |
| return [ | |
| "Code along with examples", | |
| "Build small projects using the concepts", | |
| "Join coding communities and forums" | |
| ] | |
| elif "math" in content_type.lower(): | |
| return [ | |
| "Solve practice problems daily", | |
| "Explain solutions in your own words", | |
| "Use visual aids and diagrams" | |
| ] | |
| elif "history" in content_type.lower(): | |
| return [ | |
| "Create mind maps of historical connections", | |
| "Use mnemonic devices for dates and names", | |
| "Watch documentaries on related topics" | |
| ] | |
| elif "science" in content_type.lower(): | |
| return [ | |
| "Conduct related experiments if possible", | |
| "Draw diagrams and flowcharts", | |
| "Connect theories to observable phenomena" | |
| ] | |
| else: | |
| return [ | |
| "Use spaced repetition for memorization", | |
| "Teach the concepts to someone else", | |
| "Create visual summaries and diagrams" | |
| ] | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "π΅ AudioScribe API", | |
| "description": "AI-powered audio transcription and summarization", | |
| "version": "1.0.0", | |
| "status": "healthy", | |
| "endpoints": { | |
| "health": "/health", | |
| "transcribe": "/transcribe", | |
| "transcribe-youtube": "/transcribe-youtube", | |
| "models": "/models", | |
| "docs": "/docs" | |
| } | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "transcription_model": "openai/whisper-base", | |
| "summarization_model": summarization_model_name or "none", | |
| "models_loaded": { | |
| "transcription": transcription_pipeline is not None, | |
| "summarization": summarization_pipeline is not None | |
| } | |
| } | |
| async def transcribe_youtube(request: YouTubeRequest): | |
| """ | |
| Transcribe YouTube video and generate summary | |
| Args: | |
| request: JSON with YouTube URL | |
| Returns: | |
| JSON with transcription and summary | |
| """ | |
| if not request.url: | |
| raise HTTPException(status_code=400, detail="No YouTube URL provided") | |
| if not is_youtube_url(request.url): | |
| raise HTTPException(status_code=400, detail="Invalid YouTube URL") | |
| try: | |
| # Download audio from YouTube | |
| print(f"π₯ Downloading audio from YouTube: {request.url}") | |
| audio_path = download_youtube_audio(request.url) | |
| # Get file info | |
| file_size = os.path.getsize(audio_path) | |
| # Extract video title for filename (simplified) | |
| filename = f"youtube_audio_{int(time.time())}.wav" | |
| # Preprocess audio | |
| processed_audio = preprocess_audio(audio_path) | |
| # Transcribe audio | |
| if not transcription_pipeline: | |
| raise HTTPException(status_code=503, detail="Transcription model not loaded") | |
| print("π΅ Transcribing audio...") | |
| result = transcription_pipeline(processed_audio) | |
| # Extract text from the result (handle both old and new format) | |
| if isinstance(result, dict) and "text" in result: | |
| transcription = result["text"].strip() | |
| elif isinstance(result, dict) and "chunks" in result: | |
| # Handle timestamped chunks | |
| transcription = " ".join([chunk["text"] for chunk in result["chunks"]]).strip() | |
| else: | |
| transcription = str(result).strip() | |
| if not transcription: | |
| raise HTTPException(status_code=400, detail="No speech detected in YouTube video") | |
| # Generate summary or notes | |
| mode_text = "study notes" if request.mode == "notes" else "summary" | |
| print(f"π Generating {mode_text}...") | |
| summary = generate_summary(transcription, request.mode) | |
| # Cleanup temporary files | |
| try: | |
| os.unlink(audio_path) | |
| if processed_audio != audio_path: | |
| os.unlink(processed_audio) | |
| # Clean up temp directory | |
| temp_dir = os.path.dirname(audio_path) | |
| if os.path.exists(temp_dir): | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except: | |
| pass | |
| return JSONResponse({ | |
| "success": True, | |
| "filename": filename, | |
| "file_size": file_size, | |
| "transcription": transcription, | |
| "summary": summary, | |
| "youtube_url": request.url, | |
| "model_info": { | |
| "transcription_model": "openai/whisper-base", | |
| "summarization_model": summarization_model_name | |
| } | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"YouTube transcription error: {e}") | |
| raise HTTPException(status_code=500, detail=f"YouTube transcription failed: {str(e)}") | |
| async def transcribe_audio(file: UploadFile = File(...)): | |
| """ | |
| Transcribe audio file and generate summary | |
| Args: | |
| file: Audio file (MP3, WAV, M4A, FLAC, OGG, AAC, MP4, MOV, AVI, MKV) | |
| Returns: | |
| JSON with transcription and summary | |
| """ | |
| # Validate file | |
| if not file: | |
| raise HTTPException(status_code=400, detail="No file provided") | |
| # Check file size (25MB limit) | |
| content = await file.read() | |
| file_size = len(content) | |
| if file_size > 25 * 1024 * 1024: # 25MB | |
| raise HTTPException(status_code=413, detail="File too large. Maximum size is 25MB.") | |
| # Check file type | |
| file_extension = file.filename.lower().split('.')[-1] if file.filename else '' | |
| allowed_extensions = ['mp3', 'wav', 'm4a', 'ogg', 'webm', 'flac', 'aac', 'mp4', 'mov', 'avi', 'mkv'] | |
| if file_extension not in allowed_extensions: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file type. Supported formats: {', '.join(allowed_extensions)}" | |
| ) | |
| try: | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as tmp_file: | |
| tmp_file.write(content) | |
| temp_path = tmp_file.name | |
| # Preprocess audio | |
| processed_audio = preprocess_audio(temp_path) | |
| # Transcribe audio | |
| if not transcription_pipeline: | |
| raise HTTPException(status_code=503, detail="Transcription model not loaded") | |
| result = transcription_pipeline(processed_audio) | |
| # Extract text from the result (handle both old and new format) | |
| if isinstance(result, dict) and "text" in result: | |
| transcription = result["text"].strip() | |
| elif isinstance(result, dict) and "chunks" in result: | |
| # Handle timestamped chunks | |
| transcription = " ".join([chunk["text"] for chunk in result["chunks"]]).strip() | |
| else: | |
| transcription = str(result).strip() | |
| if not transcription: | |
| raise HTTPException(status_code=400, detail="No speech detected in audio file") | |
| # Generate summary | |
| summary = generate_summary(transcription) | |
| # Cleanup temporary files | |
| try: | |
| os.unlink(temp_path) | |
| if processed_audio != temp_path: | |
| os.unlink(processed_audio) | |
| except: | |
| pass | |
| return JSONResponse({ | |
| "success": True, | |
| "filename": file.filename, | |
| "file_size": file_size, | |
| "transcription": transcription, | |
| "summary": summary, | |
| "model_info": { | |
| "transcription_model": "openai/whisper-base", | |
| "summarization_model": summarization_model_name | |
| } | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"Transcription error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") | |
| async def get_model_info(): | |
| """Get information about loaded models""" | |
| return { | |
| "transcription": { | |
| "model": "openai/whisper-base", | |
| "loaded": transcription_pipeline is not None, | |
| "description": "OpenAI Whisper model for speech-to-text" | |
| }, | |
| "summarization": { | |
| "model": summarization_model_name, | |
| "loaded": summarization_pipeline is not None, | |
| "description": "T5/BART model for text summarization" | |
| } | |
| } | |
| async def test_ytdlp(): | |
| """Test yt-dlp installation and basic functionality""" | |
| try: | |
| # Check if yt-dlp is installed | |
| result = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True, timeout=10) | |
| if result.returncode != 0: | |
| return { | |
| "status": "error", | |
| "message": "yt-dlp not found or not working", | |
| "error": result.stderr | |
| } | |
| version = result.stdout.strip() | |
| # Test with a simple YouTube URL (this won't download, just check if accessible) | |
| test_cmd = [ | |
| "yt-dlp", | |
| "--extract-audio", | |
| "--audio-format", "wav", | |
| "--audio-quality", "0", | |
| "--output", output_template, | |
| "--no-playlist", | |
| "--no-warnings", | |
| "--socket-timeout", "60", # Increased timeout | |
| "--retries", "5", # More retries | |
| "--fragment-retries", "5", # More fragment retries | |
| "--no-check-certificate", | |
| "--prefer-insecure", | |
| "--ignore-errors", | |
| "--force-ipv4", # Force IPv4 to avoid IPv6 issues | |
| "--source-address", "0.0.0.0", # Bind to all interfaces | |
| "--geo-bypass", # Bypass geo-restrictions | |
| "--user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "--add-header", "Accept-Language:en-US,en;q=0.9", | |
| "--add-header", "Accept-Encoding:gzip, deflate, br", | |
| url | |
| ] | |
| test_result = subprocess.run(test_cmd, capture_output=True, text=True, timeout=30) | |
| return { | |
| "status": "success" if test_result.returncode == 0 else "warning", | |
| "yt_dlp_version": version, | |
| "test_access": "success" if test_result.returncode == 0 else "failed", | |
| "test_output": test_result.stdout[:100] if test_result.stdout else None, | |
| "test_error": test_result.stderr[:200] if test_result.stderr else None, | |
| "message": "yt-dlp is working properly" if test_result.returncode == 0 else "yt-dlp installed but may have network issues" | |
| } | |
| except subprocess.TimeoutExpired: | |
| return { | |
| "status": "error", | |
| "message": "yt-dlp test timeout", | |
| "error": "Network timeout or yt-dlp hanging" | |
| } | |
| except FileNotFoundError: | |
| return { | |
| "status": "error", | |
| "message": "yt-dlp not found", | |
| "error": "yt-dlp is not installed or not in PATH" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": "yt-dlp test failed", | |
| "error": str(e) | |
| } | |
| # Test endpoint for OpenRouter integration | |
| async def test_openrouter_endpoint(): | |
| """Test endpoint to verify OpenRouter integration""" | |
| try: | |
| test_prompt = "Please respond with exactly: 'OpenRouter integration is working perfectly!'" | |
| result = call_openrouter_api(test_prompt, max_tokens=50) | |
| if result: | |
| return { | |
| "status": "success", | |
| "message": "OpenRouter API is working", | |
| "response": result | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": "OpenRouter API returned empty response" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"OpenRouter API test failed: {str(e)}" | |
| } | |
| def call_openrouter_api(prompt: str, max_tokens: int = 1000) -> str: | |
| """Call OpenRouter API using OpenAI client for enhanced content generation""" | |
| try: | |
| print(f"π§ Making OpenRouter API call with prompt: {prompt[:100]}...") | |
| # Create client fresh each time to avoid any initialization issues | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key="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", | |
| ) | |
| completion = client.chat.completions.create( | |
| extra_headers={ | |
| "HTTP-Referer": "http://localhost:8001", | |
| "X-Title": "AudioScribe", | |
| }, | |
| model="deepseek/deepseek-r1:free", # Using free tier model | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| max_tokens=min(max_tokens, 4000), # Limit to 4000 tokens | |
| temperature=0.7 | |
| ) | |
| result = completion.choices[0].message.content.strip() | |
| print(f"β OpenRouter API response: {result[:200]}...") | |
| return result | |
| except Exception as e: | |
| print(f"β OpenRouter API call failed: {e}") | |
| print(f"β Error type: {type(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return "" | |
| def generate_enhanced_notes_with_openrouter(summary: str, original_text: str, content_type: str) -> str: | |
| """Generate enhanced study notes using OpenRouter API""" | |
| try: | |
| # Create a comprehensive prompt for OpenRouter | |
| prompt = f""" | |
| Analyze this educational content and create comprehensive study notes. The content type is: {content_type} | |
| Original transcription (first 1000 chars): {original_text[:1000]} | |
| AI Summary: {summary} | |
| Please provide: | |
| 1. π ENHANCED STUDY NOTES with subject-specific insights | |
| 2. π KEY TAKEAWAYS (3-5 main points) | |
| 3. β‘ PRACTICAL APPLICATIONS (real-world uses) | |
| 4. π‘ ADDITIONAL INSIGHTS (connections to related topics) | |
| 5. π RECOMMENDED ACTIONS (specific next steps) | |
| 6. π― QUIZ QUESTIONS (3-4 questions to test understanding) | |
| For technical content, include: | |
| - Relevant commands, code snippets, or formulas | |
| - Best practices and common pitfalls | |
| - Industry standards and conventions | |
| For educational content, include: | |
| - Memory aids and mnemonics | |
| - Related concepts and prerequisites | |
| - Further reading suggestions | |
| Format as clear, structured study notes with emojis and bullet points. | |
| """ | |
| enhanced_content = call_openrouter_api(prompt, max_tokens=1500) | |
| return enhanced_content if enhanced_content else "Enhanced notes temporarily unavailable." | |
| except Exception as e: | |
| print(f"Enhanced notes generation error: {e}") | |
| return "Enhanced notes temporarily unavailable." | |
| def generate_subject_specific_examples(content_type: str, key_concepts: list) -> str: | |
| """Generate subject-specific examples using OpenRouter""" | |
| try: | |
| concepts_text = ", ".join(key_concepts[:5]) if key_concepts else "the main concepts" | |
| prompt = f""" | |
| Based on the {content_type} subject and these key concepts: {concepts_text} | |
| Provide 3-4 practical examples, exercises, or scenarios that demonstrate these concepts in action. | |
| For programming topics: Include code examples with explanations | |
| For AWS/Cloud topics: Include CLI commands and use cases | |
| For business topics: Include real company examples | |
| For science topics: Include experiments or real-world phenomena | |
| For math topics: Include step-by-step problem solutions | |
| Format as: | |
| π― PRACTICAL EXAMPLES: | |
| β’ Example 1: [detailed example] | |
| β’ Example 2: [detailed example] | |
| etc. | |
| Keep each example concise but informative (2-3 sentences max). | |
| """ | |
| examples = call_openrouter_api(prompt, max_tokens=800) | |
| return examples if examples else "" | |
| except Exception as e: | |
| print(f"Examples generation error: {e}") | |
| return "" | |
| def generate_troubleshooting_tips(content_type: str, topics: list) -> str: | |
| """Generate troubleshooting and common issues tips""" | |
| try: | |
| topics_text = ", ".join(topics[:5]) if topics else "the discussed topics" | |
| prompt = f""" | |
| For {content_type} content covering {topics_text}, provide: | |
| β οΈ COMMON ISSUES & SOLUTIONS: | |
| β’ Issue 1: [problem] β Solution: [fix] | |
| β’ Issue 2: [problem] β Solution: [fix] | |
| β’ Issue 3: [problem] β Solution: [fix] | |
| π§ TROUBLESHOOTING TIPS: | |
| β’ [practical debugging/problem-solving tip] | |
| β’ [prevention strategy] | |
| β’ [verification method] | |
| Focus on practical, actionable advice. For technical topics, include specific error messages or failure modes. | |
| Keep it concise - max 2 lines per point. | |
| """ | |
| tips = call_openrouter_api(prompt, max_tokens=600) | |
| return tips if tips else "" | |
| except Exception as e: | |
| print(f"Troubleshooting tips generation error: {e}") | |
| return "Troubleshooting tips temporarily unavailable." | |
| async def test_network(): | |
| """Test network connectivity for YouTube downloads""" | |
| try: | |
| from youtube_downloader import youtube_downloader | |
| network_status = youtube_downloader.check_network_connectivity() | |
| return { | |
| "status": "Network Diagnostics", | |
| "timestamp": time.time(), | |
| "connectivity": network_status, | |
| "recommendations": { | |
| "youtube_accessible": network_status.get('dns_youtube', False), | |
| "fallback_available": network_status.get('dns_google', False) or network_status.get('general_internet', False), | |
| "suggestion": "Upload audio files directly if YouTube is inaccessible" if not network_status.get('dns_youtube', False) else "YouTube should work" | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Network test failed: {str(e)}", | |
| "timestamp": time.time() | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 7860)) # Default to 7860 for Hugging Face Spaces | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |