Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, render_template | |
| import whisper | |
| from pydub import AudioSegment | |
| import os | |
| import io | |
| import numpy as np | |
| from transformers import PegasusForConditionalGeneration, PegasusTokenizer | |
| import math | |
| from yt_dlp import YoutubeDL | |
| import logging | |
| from functools import lru_cache | |
| from dotenv import load_dotenv | |
| import time | |
| import re | |
| import tempfile | |
| load_dotenv() | |
| def setup_environment(): | |
| """Configure environment for Hugging Face Spaces""" | |
| # Create directories with proper permissions | |
| for directory in [ | |
| "/tmp/transformers_cache", | |
| "/tmp/hf_home", | |
| "/tmp/cache", | |
| "/tmp/yt-dlp", | |
| "/tmp/certs", | |
| ]: | |
| os.makedirs(directory, exist_ok=True) | |
| try: | |
| # Ensure the directory is writeable | |
| os.chmod(directory, 0o777) | |
| except Exception as e: | |
| logging.warning(f"Could not set permissions on {directory}: {e}") | |
| # Set environment variables | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache" | |
| os.environ["HF_HOME"] = "/tmp/hf_home" | |
| os.environ["XDG_CACHE_HOME"] = "/tmp/cache" | |
| # Certificate handling | |
| os.environ["PYTHONHTTPSVERIFY"] = "0" | |
| os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" | |
| os.environ["SSL_CERT_DIR"] = "/etc/ssl/certs" | |
| # Set this to the temp directory to avoid permission issues | |
| os.environ["HOME"] = "/tmp" | |
| # For yt-dlp | |
| os.environ["no_proxy"] = "*" | |
| # Disable warnings that might flood logs | |
| import warnings | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| setup_environment() | |
| app = Flask(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| MODEL_NAME = "google/pegasus-xsum" | |
| def convert_audio_to_mp3(audio_bytes, original_format=None): | |
| try: | |
| logging.info(f"Converting audio from {original_format} to MP3 in memory...") | |
| audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=original_format) | |
| buffer = io.BytesIO() | |
| audio.export(buffer, format="mp3") | |
| buffer.seek(0) | |
| logging.info("Conversion successful") | |
| return buffer | |
| except Exception as e: | |
| logging.error(f"Error converting audio to MP3: {e}") | |
| raise ValueError(f"Error converting audio to MP3: {e}") | |
| def load_whisper_model(): | |
| return whisper.load_model("base") | |
| def load_pegasus_model(): | |
| tokenizer = PegasusTokenizer.from_pretrained(MODEL_NAME) | |
| model = PegasusForConditionalGeneration.from_pretrained(MODEL_NAME) | |
| return tokenizer, model | |
| def transcribe_audio_with_whisper(audio_data, timeout=300): # 5 minute timeout | |
| try: | |
| logging.info("Transcribing audio data") | |
| start_time = time.time() | |
| model = load_whisper_model() | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file: | |
| if isinstance(audio_data, io.BytesIO): | |
| temp_file.write(audio_data.getvalue()) | |
| else: | |
| temp_file.write(audio_data) | |
| temp_file.flush() | |
| temp_file_path = temp_file.name | |
| try: | |
| # Use multiprocessing to implement a timeout | |
| from multiprocessing import Process, Queue | |
| def transcribe_process(file_path, result_queue): | |
| try: | |
| model = load_whisper_model() | |
| result = model.transcribe(file_path) | |
| result_queue.put(result) | |
| except Exception as e: | |
| result_queue.put(e) | |
| # Create a queue for the result | |
| result_queue = Queue() | |
| # Create and start the process | |
| process = Process( | |
| target=transcribe_process, args=(temp_file_path, result_queue) | |
| ) | |
| process.start() | |
| # Wait for the specified timeout | |
| process.join(timeout) | |
| # If process is still running after timeout, terminate it | |
| if process.is_alive(): | |
| process.terminate() | |
| process.join() | |
| os.unlink(temp_file_path) # Clean up | |
| raise TimeoutError(f"Transcription timed out after {timeout} seconds") | |
| # Get the result | |
| if result_queue.empty(): | |
| os.unlink(temp_file_path) # Clean up | |
| raise ValueError("Transcription process failed") | |
| result_or_error = result_queue.get() | |
| if isinstance(result_or_error, Exception): | |
| os.unlink(temp_file_path) # Clean up | |
| raise result_or_error | |
| result = result_or_error | |
| finally: | |
| # Clean up temp file | |
| try: | |
| os.unlink(temp_file_path) | |
| except: | |
| pass | |
| elapsed = time.time() - start_time | |
| logging.info(f"Transcription completed in {elapsed:.2f} seconds") | |
| return result["text"] | |
| except TimeoutError as e: | |
| logging.error(f"Transcription timeout: {e}") | |
| raise ValueError( | |
| "Audio transcription took too long. Please try a shorter audio file." | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error in audio transcription: {e}") | |
| raise ValueError(f"Error in audio transcription: {e}") | |
| def summarize_text_with_pegasus(text, tokenizer, model): | |
| try: | |
| inputs = tokenizer( | |
| text, truncation=True, padding="longest", return_tensors="pt" | |
| ) | |
| total_tokens = len(inputs["input_ids"][0]) | |
| min_summary_length = max(math.ceil(total_tokens / 4), 75) | |
| max_summary_length = max(math.ceil(total_tokens / 3), 200) | |
| if min_summary_length >= max_summary_length: | |
| min_summary_length = max_summary_length - 1 | |
| summary_ids = model.generate( | |
| inputs.input_ids, | |
| num_beams=5, | |
| min_length=min_summary_length, | |
| max_length=max_summary_length, | |
| early_stopping=True, | |
| ) | |
| summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| summary = remove_repeated_sentences(summary) | |
| return summary | |
| except Exception as e: | |
| logging.error(f"Error in text summarization: {e}") | |
| raise ValueError(f"Error in text summarization: {e}") | |
| def download_youtube_with_cookies(url): | |
| """Download YouTube audio using the project's cookies file""" | |
| try: | |
| logging.info(f"Downloading YouTube with cookies: {url}") | |
| # Use the cookies.txt from the project directory | |
| cookies_path = os.path.join(os.getcwd(), "cookies.txt") | |
| if not os.path.exists(cookies_path): | |
| logging.warning("cookies.txt not found in project directory") | |
| # Create an empty cookies file | |
| with open(cookies_path, "w") as f: | |
| f.write("# Netscape HTTP Cookie File\n") | |
| logging.info(f"Using cookies from: {cookies_path}") | |
| output_dir = "/tmp/yt_downloads" | |
| os.makedirs(output_dir, exist_ok=True) | |
| os.chmod(output_dir, 0o777) | |
| output_path = os.path.join(output_dir, f"download_{int(time.time())}.%(ext)s") | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "mp3", | |
| "preferredquality": "192", | |
| } | |
| ], | |
| "outtmpl": output_path, | |
| "cookies": cookies_path, | |
| "nocheckcertificate": True, | |
| "ignoreerrors": True, | |
| "geo_bypass": True, | |
| "logtostderr": True, | |
| "quiet": False, | |
| "no_warnings": False, | |
| "socket_timeout": 30, | |
| "retries": 5, | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| if not info: | |
| raise ValueError("Could not fetch video information") | |
| filename = ydl.prepare_filename(info) | |
| # Handle potential mp3 extension | |
| if not filename.endswith(".mp3"): | |
| filename = filename.rsplit(".", 1)[0] + ".mp3" | |
| if not os.path.exists(filename): | |
| # Try alternative extensions | |
| for ext in [".mp3", ".webm.mp3", ".m4a.mp3"]: | |
| alt_filename = filename.rsplit(".", 1)[0] + ext | |
| if os.path.exists(alt_filename): | |
| filename = alt_filename | |
| break | |
| logging.info(f"Downloaded file: {filename}") | |
| if not os.path.exists(filename): | |
| raise FileNotFoundError(f"Could not find downloaded file: {filename}") | |
| with open(filename, "rb") as f: | |
| buffer = io.BytesIO(f.read()) | |
| buffer.seek(0) | |
| # Clean up | |
| try: | |
| os.unlink(filename) | |
| except Exception as e: | |
| logging.warning(f"Could not remove temp file: {e}") | |
| return buffer | |
| except Exception as e: | |
| logging.error(f"Error downloading with cookies: {e}", exc_info=True) | |
| raise ValueError(f"Error downloading with cookies: {e}") | |
| def download_youtube_direct(url): | |
| """Direct YouTube download without cookies, simplified options""" | |
| try: | |
| logging.info(f"Attempting direct YouTube download: {url}") | |
| output_dir = "/tmp/yt_direct" | |
| os.makedirs(output_dir, exist_ok=True) | |
| os.chmod(output_dir, 0o777) | |
| output_path = os.path.join(output_dir, f"direct_{int(time.time())}.%(ext)s") | |
| ydl_opts = { | |
| "format": "bestaudio", | |
| "outtmpl": output_path, | |
| "nocheckcertificate": True, | |
| "ignoreerrors": False, | |
| "geo_bypass": True, | |
| "no_warnings": True, | |
| "quiet": True, | |
| "skip_download": False, | |
| "noprogress": True, | |
| "nooverwrites": False, | |
| "socket_timeout": 30, | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| if not info: | |
| raise ValueError("Could not fetch video information") | |
| filename = ydl.prepare_filename(info) | |
| if not os.path.exists(filename): | |
| raise FileNotFoundError(f"Could not find downloaded file: {filename}") | |
| with open(filename, "rb") as f: | |
| data = f.read() | |
| # Convert to mp3 if needed | |
| if not filename.endswith(".mp3"): | |
| buffer = convert_audio_to_mp3( | |
| data, original_format=filename.split(".")[-1] | |
| ) | |
| else: | |
| buffer = io.BytesIO(data) | |
| buffer.seek(0) | |
| # Clean up | |
| try: | |
| os.unlink(filename) | |
| except Exception as e: | |
| logging.warning(f"Could not remove temp file: {e}") | |
| return buffer | |
| except Exception as e: | |
| logging.error(f"Error in direct download: {e}", exc_info=True) | |
| raise ValueError(f"Error in direct download: {e}") | |
| def download_audio_from_youtube(url): | |
| """Main YouTube download function with multiple fallback methods""" | |
| logging.info(f"Starting YouTube download process for: {url}") | |
| errors = [] | |
| # Method 1: Try with project cookies | |
| try: | |
| return download_youtube_with_cookies(url) | |
| except Exception as e: | |
| logging.warning(f"Cookie download failed: {e}") | |
| errors.append(f"Cookie method: {str(e)}") | |
| # Method 2: Try direct download | |
| try: | |
| return download_youtube_direct(url) | |
| except Exception as e: | |
| logging.warning(f"Direct download failed: {e}") | |
| errors.append(f"Direct method: {str(e)}") | |
| # Method 3: Try with pytube as last resort | |
| try: | |
| logging.info("Attempting download with pytube") | |
| from pytube import YouTube | |
| yt = YouTube(url) | |
| stream = yt.streams.filter(only_audio=True).first() | |
| if not stream: | |
| raise ValueError("No audio stream found") | |
| output_dir = "/tmp/pytube_downloads" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_path = stream.download(output_path=output_dir) | |
| logging.info(f"Downloaded to: {output_path}") | |
| with open(output_path, "rb") as f: | |
| data = f.read() | |
| # Convert to mp3 | |
| buffer = convert_audio_to_mp3(data, original_format=output_path.split(".")[-1]) | |
| # Clean up | |
| try: | |
| os.unlink(output_path) | |
| except Exception as e: | |
| logging.warning(f"Could not remove pytube temp file: {e}") | |
| return buffer | |
| except Exception as e: | |
| logging.error(f"Pytube download failed: {e}") | |
| errors.append(f"Pytube method: {str(e)}") | |
| # All methods failed | |
| error_message = "All download methods failed:\n" + "\n".join(errors) | |
| logging.error(error_message) | |
| raise ValueError( | |
| "Could not download YouTube audio. Please try uploading an audio file directly or use a different URL." | |
| ) | |
| def allowed_file(filename): | |
| ALLOWED_EXTENSIONS = {"mp3", "aac", "flac", "m4a"} | |
| return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def remove_repeated_sentences(text): | |
| sentences = re.split(r"(?<=[.!?]) +", text) | |
| unique_sentences = [] | |
| seen_sentences = set() | |
| for sentence in sentences: | |
| normalized_sentence = sentence.lower().strip() | |
| if normalized_sentence not in seen_sentences: | |
| unique_sentences.append(sentence) | |
| seen_sentences.add(normalized_sentence) | |
| return " ".join(unique_sentences) | |
| def index(): | |
| return render_template("index.html") | |
| def transcribe(): | |
| # Record the start time | |
| start_time = time.time() | |
| logging.info("Starting new transcription request") | |
| try: | |
| audio_data = None | |
| if "url" in request.form and request.form["url"]: | |
| youtube_url = request.form["url"].strip() | |
| logging.info(f"Processing YouTube URL: {youtube_url}") | |
| if not youtube_url.startswith(("http://", "https://")): | |
| return ( | |
| jsonify( | |
| {"error": "Invalid URL format. Please provide a complete URL."} | |
| ), | |
| 400, | |
| ) | |
| try: | |
| audio_data = download_audio_from_youtube(youtube_url) | |
| logging.info( | |
| f"YouTube download completed in {time.time() - start_time:.2f} seconds" | |
| ) | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if any( | |
| term in error_msg | |
| for term in [ | |
| "bot", | |
| "sign in", | |
| "cookie", | |
| "certificate", | |
| "permission", | |
| ] | |
| ): | |
| return ( | |
| jsonify( | |
| { | |
| "error": "YouTube access issue. Please try uploading an audio file directly or use a different YouTube URL." | |
| } | |
| ), | |
| 400, | |
| ) | |
| else: | |
| raise e | |
| elif "file" in request.files: | |
| audio_file = request.files["file"] | |
| if not audio_file.filename: | |
| return jsonify({"error": "No file selected."}), 400 | |
| if not allowed_file(audio_file.filename): | |
| return ( | |
| jsonify( | |
| { | |
| "error": "Invalid file type. Please upload an audio file (mp3, aac, flac, or m4a)." | |
| } | |
| ), | |
| 400, | |
| ) | |
| audio_bytes = audio_file.read() | |
| file_format = audio_file.filename.rsplit(".", 1)[1].lower() | |
| logging.info( | |
| f"Processing uploaded file: {audio_file.filename}, format: {file_format}, size: {len(audio_bytes)} bytes" | |
| ) | |
| audio_data = convert_audio_to_mp3(audio_bytes, original_format=file_format) | |
| logging.info( | |
| f"File conversion completed in {time.time() - start_time:.2f} seconds" | |
| ) | |
| else: | |
| return jsonify({"error": "No audio file or URL provided."}), 400 | |
| # Transcribe the audio | |
| transcribe_start = time.time() | |
| transcription = transcribe_audio_with_whisper(audio_data) | |
| transcribe_time = time.time() - transcribe_start | |
| logging.info( | |
| f"Transcription completed in {transcribe_time:.2f} seconds. Text length: {len(transcription)}" | |
| ) | |
| if transcription: | |
| # Summarize the transcription | |
| summary_start = time.time() | |
| tokenizer, model = load_pegasus_model() | |
| summary = summarize_text_with_pegasus(transcription, tokenizer, model) | |
| summary_time = time.time() - summary_start | |
| logging.info( | |
| f"Summarization completed in {summary_time:.2f} seconds. Summary length: {len(summary)}" | |
| ) | |
| total_time = time.time() - start_time | |
| logging.info(f"Total request completed in {total_time:.2f} seconds") | |
| return jsonify({"transcription": transcription, "summary": summary}) | |
| else: | |
| return jsonify({"error": "Transcription failed to produce any text."}), 500 | |
| except ValueError as e: | |
| logging.error(f"ValueError: {str(e)}") | |
| return jsonify({"error": str(e)}), 400 | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred: {e}", exc_info=True) | |
| return ( | |
| jsonify( | |
| {"error": "An unexpected error occurred while processing your request."} | |
| ), | |
| 500, | |
| ) | |
| if __name__ == "__main__": | |
| app.run(debug=False, port=7860) | |