HARISH20205's picture
cookies
016fc9a
from flask import Flask, request, jsonify, render_template
import whisper
from pydub import AudioSegment
import os
import io
import numpy as np
from transformers import PegasusForConditionalGeneration, PegasusTokenizer
import math
from yt_dlp import YoutubeDL
import logging
from functools import lru_cache
from dotenv import load_dotenv
import time
import re
import tempfile
load_dotenv()
def setup_environment():
"""Configure environment for Hugging Face Spaces"""
# Create directories with proper permissions
for directory in [
"/tmp/transformers_cache",
"/tmp/hf_home",
"/tmp/cache",
"/tmp/yt-dlp",
"/tmp/certs",
]:
os.makedirs(directory, exist_ok=True)
try:
# Ensure the directory is writeable
os.chmod(directory, 0o777)
except Exception as e:
logging.warning(f"Could not set permissions on {directory}: {e}")
# Set environment variables
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache"
os.environ["HF_HOME"] = "/tmp/hf_home"
os.environ["XDG_CACHE_HOME"] = "/tmp/cache"
# Certificate handling
os.environ["PYTHONHTTPSVERIFY"] = "0"
os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt"
os.environ["SSL_CERT_DIR"] = "/etc/ssl/certs"
# Set this to the temp directory to avoid permission issues
os.environ["HOME"] = "/tmp"
# For yt-dlp
os.environ["no_proxy"] = "*"
# Disable warnings that might flood logs
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
setup_environment()
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
MODEL_NAME = "google/pegasus-xsum"
def convert_audio_to_mp3(audio_bytes, original_format=None):
try:
logging.info(f"Converting audio from {original_format} to MP3 in memory...")
audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=original_format)
buffer = io.BytesIO()
audio.export(buffer, format="mp3")
buffer.seek(0)
logging.info("Conversion successful")
return buffer
except Exception as e:
logging.error(f"Error converting audio to MP3: {e}")
raise ValueError(f"Error converting audio to MP3: {e}")
@lru_cache(maxsize=1)
def load_whisper_model():
return whisper.load_model("base")
@lru_cache(maxsize=1)
def load_pegasus_model():
tokenizer = PegasusTokenizer.from_pretrained(MODEL_NAME)
model = PegasusForConditionalGeneration.from_pretrained(MODEL_NAME)
return tokenizer, model
def transcribe_audio_with_whisper(audio_data, timeout=300): # 5 minute timeout
try:
logging.info("Transcribing audio data")
start_time = time.time()
model = load_whisper_model()
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
if isinstance(audio_data, io.BytesIO):
temp_file.write(audio_data.getvalue())
else:
temp_file.write(audio_data)
temp_file.flush()
temp_file_path = temp_file.name
try:
# Use multiprocessing to implement a timeout
from multiprocessing import Process, Queue
def transcribe_process(file_path, result_queue):
try:
model = load_whisper_model()
result = model.transcribe(file_path)
result_queue.put(result)
except Exception as e:
result_queue.put(e)
# Create a queue for the result
result_queue = Queue()
# Create and start the process
process = Process(
target=transcribe_process, args=(temp_file_path, result_queue)
)
process.start()
# Wait for the specified timeout
process.join(timeout)
# If process is still running after timeout, terminate it
if process.is_alive():
process.terminate()
process.join()
os.unlink(temp_file_path) # Clean up
raise TimeoutError(f"Transcription timed out after {timeout} seconds")
# Get the result
if result_queue.empty():
os.unlink(temp_file_path) # Clean up
raise ValueError("Transcription process failed")
result_or_error = result_queue.get()
if isinstance(result_or_error, Exception):
os.unlink(temp_file_path) # Clean up
raise result_or_error
result = result_or_error
finally:
# Clean up temp file
try:
os.unlink(temp_file_path)
except:
pass
elapsed = time.time() - start_time
logging.info(f"Transcription completed in {elapsed:.2f} seconds")
return result["text"]
except TimeoutError as e:
logging.error(f"Transcription timeout: {e}")
raise ValueError(
"Audio transcription took too long. Please try a shorter audio file."
)
except Exception as e:
logging.error(f"Error in audio transcription: {e}")
raise ValueError(f"Error in audio transcription: {e}")
def summarize_text_with_pegasus(text, tokenizer, model):
try:
inputs = tokenizer(
text, truncation=True, padding="longest", return_tensors="pt"
)
total_tokens = len(inputs["input_ids"][0])
min_summary_length = max(math.ceil(total_tokens / 4), 75)
max_summary_length = max(math.ceil(total_tokens / 3), 200)
if min_summary_length >= max_summary_length:
min_summary_length = max_summary_length - 1
summary_ids = model.generate(
inputs.input_ids,
num_beams=5,
min_length=min_summary_length,
max_length=max_summary_length,
early_stopping=True,
)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
summary = remove_repeated_sentences(summary)
return summary
except Exception as e:
logging.error(f"Error in text summarization: {e}")
raise ValueError(f"Error in text summarization: {e}")
def download_youtube_with_cookies(url):
"""Download YouTube audio using the project's cookies file"""
try:
logging.info(f"Downloading YouTube with cookies: {url}")
# Use the cookies.txt from the project directory
cookies_path = os.path.join(os.getcwd(), "cookies.txt")
if not os.path.exists(cookies_path):
logging.warning("cookies.txt not found in project directory")
# Create an empty cookies file
with open(cookies_path, "w") as f:
f.write("# Netscape HTTP Cookie File\n")
logging.info(f"Using cookies from: {cookies_path}")
output_dir = "/tmp/yt_downloads"
os.makedirs(output_dir, exist_ok=True)
os.chmod(output_dir, 0o777)
output_path = os.path.join(output_dir, f"download_{int(time.time())}.%(ext)s")
ydl_opts = {
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"outtmpl": output_path,
"cookies": cookies_path,
"nocheckcertificate": True,
"ignoreerrors": True,
"geo_bypass": True,
"logtostderr": True,
"quiet": False,
"no_warnings": False,
"socket_timeout": 30,
"retries": 5,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
if not info:
raise ValueError("Could not fetch video information")
filename = ydl.prepare_filename(info)
# Handle potential mp3 extension
if not filename.endswith(".mp3"):
filename = filename.rsplit(".", 1)[0] + ".mp3"
if not os.path.exists(filename):
# Try alternative extensions
for ext in [".mp3", ".webm.mp3", ".m4a.mp3"]:
alt_filename = filename.rsplit(".", 1)[0] + ext
if os.path.exists(alt_filename):
filename = alt_filename
break
logging.info(f"Downloaded file: {filename}")
if not os.path.exists(filename):
raise FileNotFoundError(f"Could not find downloaded file: {filename}")
with open(filename, "rb") as f:
buffer = io.BytesIO(f.read())
buffer.seek(0)
# Clean up
try:
os.unlink(filename)
except Exception as e:
logging.warning(f"Could not remove temp file: {e}")
return buffer
except Exception as e:
logging.error(f"Error downloading with cookies: {e}", exc_info=True)
raise ValueError(f"Error downloading with cookies: {e}")
def download_youtube_direct(url):
"""Direct YouTube download without cookies, simplified options"""
try:
logging.info(f"Attempting direct YouTube download: {url}")
output_dir = "/tmp/yt_direct"
os.makedirs(output_dir, exist_ok=True)
os.chmod(output_dir, 0o777)
output_path = os.path.join(output_dir, f"direct_{int(time.time())}.%(ext)s")
ydl_opts = {
"format": "bestaudio",
"outtmpl": output_path,
"nocheckcertificate": True,
"ignoreerrors": False,
"geo_bypass": True,
"no_warnings": True,
"quiet": True,
"skip_download": False,
"noprogress": True,
"nooverwrites": False,
"socket_timeout": 30,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
if not info:
raise ValueError("Could not fetch video information")
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
raise FileNotFoundError(f"Could not find downloaded file: {filename}")
with open(filename, "rb") as f:
data = f.read()
# Convert to mp3 if needed
if not filename.endswith(".mp3"):
buffer = convert_audio_to_mp3(
data, original_format=filename.split(".")[-1]
)
else:
buffer = io.BytesIO(data)
buffer.seek(0)
# Clean up
try:
os.unlink(filename)
except Exception as e:
logging.warning(f"Could not remove temp file: {e}")
return buffer
except Exception as e:
logging.error(f"Error in direct download: {e}", exc_info=True)
raise ValueError(f"Error in direct download: {e}")
def download_audio_from_youtube(url):
"""Main YouTube download function with multiple fallback methods"""
logging.info(f"Starting YouTube download process for: {url}")
errors = []
# Method 1: Try with project cookies
try:
return download_youtube_with_cookies(url)
except Exception as e:
logging.warning(f"Cookie download failed: {e}")
errors.append(f"Cookie method: {str(e)}")
# Method 2: Try direct download
try:
return download_youtube_direct(url)
except Exception as e:
logging.warning(f"Direct download failed: {e}")
errors.append(f"Direct method: {str(e)}")
# Method 3: Try with pytube as last resort
try:
logging.info("Attempting download with pytube")
from pytube import YouTube
yt = YouTube(url)
stream = yt.streams.filter(only_audio=True).first()
if not stream:
raise ValueError("No audio stream found")
output_dir = "/tmp/pytube_downloads"
os.makedirs(output_dir, exist_ok=True)
output_path = stream.download(output_path=output_dir)
logging.info(f"Downloaded to: {output_path}")
with open(output_path, "rb") as f:
data = f.read()
# Convert to mp3
buffer = convert_audio_to_mp3(data, original_format=output_path.split(".")[-1])
# Clean up
try:
os.unlink(output_path)
except Exception as e:
logging.warning(f"Could not remove pytube temp file: {e}")
return buffer
except Exception as e:
logging.error(f"Pytube download failed: {e}")
errors.append(f"Pytube method: {str(e)}")
# All methods failed
error_message = "All download methods failed:\n" + "\n".join(errors)
logging.error(error_message)
raise ValueError(
"Could not download YouTube audio. Please try uploading an audio file directly or use a different URL."
)
def allowed_file(filename):
ALLOWED_EXTENSIONS = {"mp3", "aac", "flac", "m4a"}
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def remove_repeated_sentences(text):
sentences = re.split(r"(?<=[.!?]) +", text)
unique_sentences = []
seen_sentences = set()
for sentence in sentences:
normalized_sentence = sentence.lower().strip()
if normalized_sentence not in seen_sentences:
unique_sentences.append(sentence)
seen_sentences.add(normalized_sentence)
return " ".join(unique_sentences)
@app.route("/")
def index():
return render_template("index.html")
@app.route("/transcribe", methods=["POST"])
def transcribe():
# Record the start time
start_time = time.time()
logging.info("Starting new transcription request")
try:
audio_data = None
if "url" in request.form and request.form["url"]:
youtube_url = request.form["url"].strip()
logging.info(f"Processing YouTube URL: {youtube_url}")
if not youtube_url.startswith(("http://", "https://")):
return (
jsonify(
{"error": "Invalid URL format. Please provide a complete URL."}
),
400,
)
try:
audio_data = download_audio_from_youtube(youtube_url)
logging.info(
f"YouTube download completed in {time.time() - start_time:.2f} seconds"
)
except Exception as e:
error_msg = str(e).lower()
if any(
term in error_msg
for term in [
"bot",
"sign in",
"cookie",
"certificate",
"permission",
]
):
return (
jsonify(
{
"error": "YouTube access issue. Please try uploading an audio file directly or use a different YouTube URL."
}
),
400,
)
else:
raise e
elif "file" in request.files:
audio_file = request.files["file"]
if not audio_file.filename:
return jsonify({"error": "No file selected."}), 400
if not allowed_file(audio_file.filename):
return (
jsonify(
{
"error": "Invalid file type. Please upload an audio file (mp3, aac, flac, or m4a)."
}
),
400,
)
audio_bytes = audio_file.read()
file_format = audio_file.filename.rsplit(".", 1)[1].lower()
logging.info(
f"Processing uploaded file: {audio_file.filename}, format: {file_format}, size: {len(audio_bytes)} bytes"
)
audio_data = convert_audio_to_mp3(audio_bytes, original_format=file_format)
logging.info(
f"File conversion completed in {time.time() - start_time:.2f} seconds"
)
else:
return jsonify({"error": "No audio file or URL provided."}), 400
# Transcribe the audio
transcribe_start = time.time()
transcription = transcribe_audio_with_whisper(audio_data)
transcribe_time = time.time() - transcribe_start
logging.info(
f"Transcription completed in {transcribe_time:.2f} seconds. Text length: {len(transcription)}"
)
if transcription:
# Summarize the transcription
summary_start = time.time()
tokenizer, model = load_pegasus_model()
summary = summarize_text_with_pegasus(transcription, tokenizer, model)
summary_time = time.time() - summary_start
logging.info(
f"Summarization completed in {summary_time:.2f} seconds. Summary length: {len(summary)}"
)
total_time = time.time() - start_time
logging.info(f"Total request completed in {total_time:.2f} seconds")
return jsonify({"transcription": transcription, "summary": summary})
else:
return jsonify({"error": "Transcription failed to produce any text."}), 500
except ValueError as e:
logging.error(f"ValueError: {str(e)}")
return jsonify({"error": str(e)}), 400
except Exception as e:
logging.error(f"An unexpected error occurred: {e}", exc_info=True)
return (
jsonify(
{"error": "An unexpected error occurred while processing your request."}
),
500,
)
if __name__ == "__main__":
app.run(debug=False, port=7860)