Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| import yt_dlp | |
| import os | |
| import subprocess | |
| import json | |
| from threading import Thread | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import spaces | |
| import time | |
| import langdetect | |
| import uuid | |
| # Hugging Face Token | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| print("Starting the program...") | |
| # Load Qwen Model on CPU | |
| model_path = "Qwen/Qwen2.5-7B-Instruct" | |
| print(f"Loading model {model_path} on CPU...") | |
| tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_path, | |
| torch_dtype=torch.bfloat16, # Uses less memory than float32 | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True, | |
| device_map="auto" # Automatically optimizes model parts for CPU | |
| ).to("cpu") | |
| model = model.eval() | |
| print("Model successfully loaded.") | |
| # Generate unique filenames | |
| def generate_unique_filename(extension): | |
| return f"{uuid.uuid4()}{extension}" | |
| # Cleanup temporary files | |
| def cleanup_files(*files): | |
| for file in files: | |
| if file and os.path.exists(file): | |
| os.remove(file) | |
| print(f"Removed file: {file}") | |
| # Extract audio using FFmpeg | |
| def extract_audio_ffmpeg(video_path): | |
| print("Extracting audio using ffmpeg...") | |
| audio_path = generate_unique_filename(".wav") | |
| command = ["ffmpeg", "-i", video_path, "-q:a", "0", "-map", "a", audio_path, "-y"] | |
| subprocess.Popen(command).wait() # Use Popen to reduce memory usage | |
| return audio_path | |
| # Transcribe audio | |
| def transcribe_audio(file_path): | |
| print(f"Starting transcription of file: {file_path}") | |
| temp_audio = None | |
| if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')): | |
| print("Video file detected. Extracting audio...") | |
| temp_audio = extract_audio_ffmpeg(file_path) | |
| file_path = temp_audio | |
| output_file = generate_unique_filename(".json") | |
| command = [ | |
| "insanely-fast-whisper", "--file-name", file_path, | |
| "--device-id", "cpu", "--model-name", "openai/whisper-large-v3", | |
| "--task", "transcribe", "--timestamp", "chunk", | |
| "--transcript-path", output_file | |
| ] | |
| subprocess.Popen(command).wait() | |
| with open(output_file, "r") as f: | |
| transcription = json.load(f) | |
| result = transcription.get("text", " ".join([chunk["text"] for chunk in transcription.get("chunks", [])])) | |
| cleanup_files(output_file) | |
| if temp_audio: | |
| cleanup_files(temp_audio) | |
| return result | |
| # Generate summary using Qwen Model | |
| def generate_summary_stream(transcription): | |
| detected_language = langdetect.detect(transcription) | |
| prompt = f"""Summarize the following video transcription in 150-300 words. | |
| The summary should be in the same language as the transcription, which is detected as {detected_language}. | |
| {transcription[:100000]}...""" # Limiting input size to avoid memory overflow | |
| response, history = model.chat(tokenizer, prompt, history=[]) | |
| return response | |
| # Process video upload | |
| def process_uploaded_video(video_path): | |
| try: | |
| transcription = transcribe_audio(video_path) | |
| return transcription, None | |
| except Exception as e: | |
| return f"Processing error: {str(e)}", None | |
| # Gradio UI | |
| demo = gr.Blocks() | |
| with demo: | |
| gr.Markdown(""" | |
| # π₯ AI Video Transcription & Summary | |
| Upload a video or provide a YouTube link to get a transcription and AI-generated summary. | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("π€ Video Upload"): | |
| video_input = gr.File(label="Upload a video file") | |
| video_button = gr.Button("π Process Video", variant="primary") | |
| transcription_output = gr.Textbox(label="π Transcription", lines=10, show_copy_button=True) | |
| summary_output = gr.Textbox(label="π Summary", lines=10, show_copy_button=True) | |
| summary_button = gr.Button("π Generate Summary", variant="secondary") | |
| video_button.click(process_uploaded_video, inputs=[video_input], outputs=[transcription_output, summary_output]) | |
| summary_button.click(generate_summary_stream, inputs=[transcription_output], outputs=[summary_output]) | |
| demo.launch() | |