Spaces:
Build error
Build error
| from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_from_directory | |
| import os | |
| import re | |
| import json | |
| import tempfile | |
| import time | |
| import threading | |
| import yt_dlp | |
| import spacy | |
| import google.generativeai as genai | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) # Required for flash and session | |
| # Configuration | |
| UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads') | |
| RESULTS_FOLDER = os.path.join(os.getcwd(), 'results') | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['RESULTS_FOLDER'] = RESULTS_FOLDER | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size | |
| # Create required directories if they don't exist | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(RESULTS_FOLDER, exist_ok=True) | |
| # Default API key (can be overridden in the UI) | |
| DEFAULT_API_KEY = "AIzaSyB0IOx76FydAk4wabMz1juzzHF5oBiHW64" | |
| # Global variable to track processing status | |
| processing_status = { | |
| 'is_processing': False, | |
| 'current_step': '', | |
| 'progress': 0, | |
| 'log': [] | |
| } | |
| # Initialize spaCy NLP pipeline | |
| try: | |
| nlp = spacy.load('en_core_web_sm') | |
| except OSError: | |
| import subprocess | |
| subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"]) | |
| nlp = spacy.load('en_core_web_sm') | |
| # Configuration for yt_dlp | |
| YDL_OPTS = { | |
| 'skip_download': True, | |
| 'writesubtitles': True, | |
| 'writeautomaticsub': True, | |
| 'subtitleslangs': ['en'], | |
| 'outtmpl': '%(id)s.%(ext)s', | |
| } | |
| def update_status(step, progress, message): | |
| """Update the processing status""" | |
| processing_status['current_step'] = step | |
| processing_status['progress'] = progress | |
| processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': message}) | |
| print(f"Status: {step} - {progress}% - {message}") | |
| def download_subtitles(video_url): | |
| """ | |
| Downloads (auto-)subtitles for the given YouTube URL. | |
| Returns the filename of the downloaded subtitle file (.srt or .vtt) and video title. | |
| """ | |
| update_status('download_subtitles', 10, f"Downloading subtitles for {video_url}...") | |
| with yt_dlp.YoutubeDL(YDL_OPTS) as ydl: | |
| info = ydl.extract_info(video_url, download=True) | |
| video_id = info.get('id') | |
| video_title = info.get('title', 'Unknown Title') | |
| update_status('download_subtitles', 20, f"Video title: {video_title}") | |
| # Check for standard filename patterns | |
| for ext in ('.en.vtt', '.en.srt', '.vtt', '.srt'): | |
| potential_names = [ | |
| f"{video_id}{ext}", | |
| f"{video_id}.en{ext}", | |
| ] | |
| for fname in potential_names: | |
| if os.path.exists(fname): | |
| update_status('download_subtitles', 30, f"Found subtitle file: {fname}") | |
| return fname, video_title | |
| # Fallback: find any subtitle file for this video_id | |
| for fname in os.listdir('.'): | |
| if fname.startswith(video_id) and fname.lower().endswith(('.srt', '.vtt')): | |
| update_status('download_subtitles', 30, f"Found subtitle file: {fname}") | |
| return fname, video_title | |
| raise FileNotFoundError(f"Subtitle file for {video_id} not found.") | |
| def extract_dialogue_from_srt(path): | |
| """ | |
| Reads a subtitle file (.srt or .vtt), removes timestamps and metadata, | |
| and returns cleaned dialogue as a single string. | |
| """ | |
| update_status('extract_dialogue', 40, f"Extracting dialogue from {path}...") | |
| pattern_timestamp = re.compile(r"^\d{2}:\d{2}:\d{2}[\.,]\d+ -->") | |
| cleaned_lines = [] | |
| with open(path, 'r', encoding='utf-8', errors='replace') as f: | |
| for line in f: | |
| line = line.strip() | |
| # Skip empty, index, timestamp, or styling lines | |
| if not line or re.match(r"^\d+$", line) or pattern_timestamp.match(line) or line.startswith( | |
| ('WEBVTT', 'Kind:', 'Language:')): | |
| continue | |
| # Remove inline tags | |
| text = re.sub(r"<[^>]+>", "", line) | |
| cleaned_lines.append(text) | |
| # Join lines with smart handling of sentence boundaries | |
| dialogue = " ".join(cleaned_lines) | |
| # Clean up multiple spaces | |
| dialogue = re.sub(r'\s+', ' ', dialogue) | |
| return dialogue | |
| def process_text_with_spacy(text): | |
| """ | |
| Runs spaCy NLP pipeline to perform sentence segmentation, | |
| highlight named entities, and returns a formatted string. | |
| """ | |
| update_status('process_text_with_spacy', 50, "Processing text with spaCy...") | |
| doc = nlp(text) | |
| formatted = [] | |
| for sent in doc.sents: | |
| sent_text = sent.text.strip() | |
| # Skip empty sentences or sentences with just punctuation | |
| if len(sent_text) <= 1: | |
| continue | |
| entities = {} | |
| for ent in sent.ents: | |
| entities[ent.text] = ent.label_ | |
| if entities: | |
| for entity, label in entities.items(): | |
| sent_text = sent_text.replace(entity, f"**{entity} ({label})**") | |
| formatted.append(sent_text) | |
| return "\n\n".join(formatted) | |
| def process_with_gemini(api_key, text, video_title): | |
| """ | |
| Sends the processed transcript to Gemini API for final formatting and analysis. | |
| """ | |
| update_status('process_with_gemini', 60, "Sending to Gemini for final processing...") | |
| # Configure the Gemini API | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel('gemini-2.0-flash') | |
| prompt = f""" | |
| I'm providing a transcript from the YouTube video titled: "{video_title}" | |
| Please analyze this transcript and return a JSON object with the following fields: | |
| 1. "summary": An array of bullet points summarizing key points (5-7 items) | |
| 2. "topics": An array of main topics discussed (3-5 items) | |
| 3. "formatted_transcript": A well-formatted version of the transcript | |
| 4. "notable_quotes": An array of 3-5 notable quotes from the transcript | |
| Here's the raw transcript: | |
| {text} | |
| Return your analysis as a valid JSON object containing all requested fields. | |
| """ | |
| response = model.generate_content(prompt) | |
| try: | |
| # Try to parse the response as JSON | |
| response_text = response.text | |
| # Extract JSON from the response if it's wrapped in markdown code blocks | |
| if "```json" in response_text: | |
| json_content = response_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in response_text: | |
| json_content = response_text.split("```")[1].strip() | |
| else: | |
| json_content = response_text | |
| result = json.loads(json_content) | |
| update_status('process_with_gemini', 70, "Gemini processing complete") | |
| return result | |
| except json.JSONDecodeError: | |
| # If JSON parsing fails, return a structured response with the raw text | |
| update_status('process_with_gemini', 70, "Warning: Could not parse Gemini response as JSON") | |
| return { | |
| "summary": ["Unable to parse Gemini response as JSON"], | |
| "topics": ["Error in processing"], | |
| "formatted_transcript": response.text, | |
| "notable_quotes": [] | |
| } | |
| def translate_to_hindi(api_key, results): | |
| """ | |
| Translates the processed results to Hindi using Gemini AI. | |
| """ | |
| update_status('translate_to_hindi', 80, "Translating results to Hindi using Gemini...") | |
| # Configure the Gemini API | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel('gemini-2.0-flash') # Using flash model for faster response | |
| # Create a copy of the results for Hindi translation | |
| hindi_results = { | |
| "summary": [], | |
| "topics": [], | |
| "formatted_transcript": "", | |
| "notable_quotes": [] | |
| } | |
| # Translate summary points | |
| summary_prompt = f""" | |
| Translate the following English bullet points to Hindi. | |
| Keep formatting and meaning intact: | |
| {json.dumps(results["summary"], indent=2)} | |
| Return the result as a JSON array. | |
| """ | |
| summary_response = model.generate_content(summary_prompt) | |
| try: | |
| # Extract JSON from the response | |
| summary_text = summary_response.text | |
| if "```json" in summary_text: | |
| json_content = summary_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in summary_text: | |
| json_content = summary_text.split("```")[1].strip() | |
| else: | |
| json_content = summary_text | |
| hindi_results["summary"] = json.loads(json_content) | |
| update_status('translate_to_hindi', 82, "Summary translation complete.") | |
| except Exception as e: | |
| update_status('translate_to_hindi', 82, f"Error in summary translation: {e}") | |
| # Fallback: process items individually | |
| for point in results["summary"]: | |
| prompt = f"Translate this to Hindi: {point}" | |
| response = model.generate_content(prompt) | |
| hindi_results["summary"].append(response.text.strip()) | |
| # Translate topics | |
| topics_prompt = f""" | |
| Translate the following English topics to Hindi. | |
| Keep formatting and meaning intact: | |
| {json.dumps(results["topics"], indent=2)} | |
| Return the result as a JSON array. | |
| """ | |
| topics_response = model.generate_content(topics_prompt) | |
| try: | |
| # Extract JSON from the response | |
| topics_text = topics_response.text | |
| if "```json" in topics_text: | |
| json_content = topics_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in topics_text: | |
| json_content = topics_text.split("```")[1].strip() | |
| else: | |
| json_content = topics_text | |
| hindi_results["topics"] = json.loads(json_content) | |
| update_status('translate_to_hindi', 85, "Topics translation complete.") | |
| except Exception as e: | |
| update_status('translate_to_hindi', 85, f"Error in topics translation: {e}") | |
| # Fallback | |
| for topic in results["topics"]: | |
| prompt = f"Translate this to Hindi: {topic}" | |
| response = model.generate_content(prompt) | |
| hindi_results["topics"].append(response.text.strip()) | |
| # Translate notable quotes | |
| quotes_prompt = f""" | |
| Translate the following English quotes to Hindi. | |
| Keep formatting and meaning intact: | |
| {json.dumps(results["notable_quotes"], indent=2)} | |
| Return ONLY the translated Hindi text in JSON array format. | |
| """ | |
| quotes_response = model.generate_content(quotes_prompt) | |
| try: | |
| # Extract JSON from the response | |
| quotes_text = quotes_response.text | |
| if "```json" in quotes_text: | |
| json_content = quotes_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in quotes_text: | |
| json_content = quotes_text.split("```")[1].strip() | |
| else: | |
| json_content = quotes_text | |
| hindi_results["notable_quotes"] = json.loads(json_content) | |
| update_status('translate_to_hindi', 88, "Quotes translation complete.") | |
| except Exception as e: | |
| update_status('translate_to_hindi', 88, f"Error in quotes translation: {e}") | |
| # Fallback | |
| for quote in results["notable_quotes"]: | |
| prompt = f"Translate this to Hindi: {quote}" | |
| response = model.generate_content(prompt) | |
| hindi_results["notable_quotes"].append(response.text.strip()) | |
| # Translate the formatted transcript (may need to be chunked for long texts) | |
| transcript = results["formatted_transcript"] | |
| # Split transcript into paragraphs | |
| paragraphs = transcript.split("\n\n") | |
| translated_paragraphs = [] | |
| # Process paragraphs in batches | |
| batch_size = 5 # Adjust based on average paragraph length | |
| total_paragraphs = len(paragraphs) | |
| for i in range(0, total_paragraphs, batch_size): | |
| batch = paragraphs[i:i + batch_size] | |
| batch_text = "\n\n".join(batch) | |
| progress = 88 + (i / total_paragraphs * 10) # Scale from 88% to 98% | |
| update_status('translate_to_hindi', int(progress), | |
| f"Translating transcript paragraphs {i + 1} to {min(i + batch_size, total_paragraphs)} of {total_paragraphs}") | |
| translate_prompt = f""" | |
| Translate the following English text to Hindi. | |
| Preserve paragraph breaks and formatting: | |
| {batch_text} | |
| Return ONLY the translated Hindi text. | |
| """ | |
| try: | |
| response = model.generate_content(translate_prompt) | |
| translated_batch = response.text.strip() | |
| translated_paragraphs.append(translated_batch) | |
| except Exception as e: | |
| update_status('translate_to_hindi', int(progress), f"Error in batch translation: {e}") | |
| # Fallback: translate paragraph by paragraph | |
| for para in batch: | |
| try: | |
| prompt = f"Translate this to Hindi: {para}" | |
| response = model.generate_content(prompt) | |
| translated_paragraphs.append(response.text.strip()) | |
| except: | |
| # In case of failure, add original paragraph | |
| translated_paragraphs.append(f"[Translation error: {para[:50]}...]") | |
| # Join all translated content | |
| hindi_results["formatted_transcript"] = "\n\n".join(translated_paragraphs) | |
| update_status('translate_to_hindi', 98, "Transcript translation complete.") | |
| return hindi_results | |
| def save_results(results, output_file): | |
| """ | |
| Saves the processed results to a file. | |
| """ | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| # First write a markdown-formatted version | |
| f.write(f"# Transcript Analysis\n\n") | |
| f.write("## Summary\n") | |
| for point in results["summary"]: | |
| f.write(f"- {point}\n") | |
| f.write("\n") | |
| f.write("## Topics\n") | |
| for topic in results["topics"]: | |
| f.write(f"- {topic}\n") | |
| f.write("\n") | |
| f.write("## Notable Quotes\n") | |
| for quote in results["notable_quotes"]: | |
| f.write(f"> {quote}\n\n") | |
| f.write("\n") | |
| f.write("## Formatted Transcript\n\n") | |
| f.write(results["formatted_transcript"]) | |
| f.write("\n\n") | |
| # Also save the raw JSON | |
| f.write("---\n\n") | |
| f.write("```json\n") | |
| json.dump(results, f, indent=2) | |
| f.write("\n```\n") | |
| update_status('save_results', 99, f"Results saved to {output_file}") | |
| def save_hindi_results(hindi_results, output_file): | |
| """ | |
| Saves the Hindi translated results to a file. | |
| """ | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| # First write a markdown-formatted version | |
| f.write(f"# प्रतिलेख विश्लेषण\n\n") | |
| f.write("## सारांश\n") | |
| for point in hindi_results["summary"]: | |
| f.write(f"- {point}\n") | |
| f.write("\n") | |
| f.write("## विषय\n") | |
| for topic in hindi_results["topics"]: | |
| f.write(f"- {topic}\n") | |
| f.write("\n") | |
| f.write("## उल्लेखनीय उद्धरण\n") | |
| for quote in hindi_results["notable_quotes"]: | |
| f.write(f"> {quote}\n\n") | |
| f.write("\n") | |
| f.write("## स्वरूपित प्रतिलेख\n\n") | |
| f.write(hindi_results["formatted_transcript"]) | |
| f.write("\n\n") | |
| # Also save the raw JSON | |
| f.write("---\n\n") | |
| f.write("```json\n") | |
| json.dump(hindi_results, f, indent=2, ensure_ascii=False) | |
| f.write("\n```\n") | |
| update_status('save_hindi_results', 100, f"Hindi results saved to {output_file}") | |
| def process_youtube_url(youtube_url, api_key): | |
| """Process a YouTube URL and return the analysis results""" | |
| global processing_status | |
| try: | |
| processing_status = { | |
| 'is_processing': True, | |
| 'current_step': 'Starting', | |
| 'progress': 0, | |
| 'log': [] | |
| } | |
| # Generate unique filenames for this run | |
| timestamp = int(time.time()) | |
| eng_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_{timestamp}.md") | |
| hindi_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_hindi_{timestamp}.md") | |
| # Step 1: Download subtitles | |
| subtitle_path, video_title = download_subtitles(youtube_url) | |
| # Step 2: Extract and clean dialogue | |
| raw_dialogue = extract_dialogue_from_srt(subtitle_path) | |
| # Step 3: Process with spaCy | |
| nlp_processed = process_text_with_spacy(raw_dialogue) | |
| # Step 4: Process with Gemini | |
| final_results = process_with_gemini(api_key, raw_dialogue, video_title) | |
| # Step 5: Save English results | |
| save_results(final_results, eng_output_file) | |
| # Step 6: Translate to Hindi | |
| hindi_results = translate_to_hindi(api_key, final_results) | |
| # Step 7: Save Hindi results | |
| save_hindi_results(hindi_results, hindi_output_file) | |
| # Clean up subtitle file | |
| if os.path.exists(subtitle_path): | |
| os.remove(subtitle_path) | |
| update_status('cleanup', 100, f"Cleaned up temporary file: {subtitle_path}") | |
| processing_status['is_processing'] = False | |
| return { | |
| 'success': True, | |
| 'video_title': video_title, | |
| 'english_file': os.path.basename(eng_output_file), | |
| 'hindi_file': os.path.basename(hindi_output_file), | |
| 'english_results': final_results, | |
| 'hindi_results': hindi_results | |
| } | |
| except Exception as e: | |
| processing_status['is_processing'] = False | |
| processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': f"Error: {str(e)}"}) | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def index(): | |
| """Home page with form for entering YouTube URL""" | |
| api_key = session.get('api_key', DEFAULT_API_KEY) | |
| return render_template('index.html', api_key=api_key) | |
| def process(): | |
| """Start processing a YouTube URL""" | |
| if processing_status['is_processing']: | |
| return jsonify({'success': False, 'error': 'Another process is already running'}) | |
| youtube_url = request.form.get('youtube_url', '').strip() | |
| api_key = request.form.get('api_key', DEFAULT_API_KEY).strip() | |
| if not youtube_url: | |
| return jsonify({'success': False, 'error': 'Please enter a valid YouTube URL'}) | |
| # Start processing in a background thread | |
| thread = threading.Thread( | |
| target=process_youtube_url, | |
| args=(youtube_url, api_key) | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| return jsonify({'success': True, 'message': 'Processing started'}) | |
| def status(): | |
| """Return the current processing status""" | |
| return jsonify(processing_status) | |
| def results(filename): | |
| """Serve result files""" | |
| return send_from_directory(app.config['RESULTS_FOLDER'], filename) | |
| def list_results(): | |
| """List all available result files""" | |
| files = [] | |
| for filename in os.listdir(app.config['RESULTS_FOLDER']): | |
| if filename.endswith('.md'): | |
| filepath = os.path.join(app.config['RESULTS_FOLDER'], filename) | |
| files.append({ | |
| 'filename': filename, | |
| 'size': os.path.getsize(filepath), | |
| 'created': os.path.getctime(filepath), | |
| 'is_hindi': 'hindi' in filename.lower() | |
| }) | |
| # Sort by creation time (newest first) | |
| files.sort(key=lambda x: x['created'], reverse=True) | |
| return jsonify(files) | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=5000) |