|
|
| from fastapi import FastAPI, UploadFile, File, Form |
| from fastapi.responses import JSONResponse |
| import yt_dlp |
| import whisper |
| import os |
| import requests |
| import uuid |
| from transformers import pipeline |
|
|
| app = FastAPI() |
|
|
| |
| def clean_old_files(): |
| for fname in ["video.mp4", "audio.mp3", "transcription.txt"]: |
| if os.path.exists(fname): |
| os.remove(fname) |
|
|
| |
| def download_video(video_url): |
| unique_name = f"video_{uuid.uuid4().hex[:8]}.mp4" |
| ydl_opts = {'format': 'mp4', 'outtmpl': unique_name} |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([video_url]) |
| return unique_name |
|
|
| |
| def download_direct_video(video_url): |
| unique_name = f"video_{uuid.uuid4().hex[:8]}.mp4" |
| response = requests.get(video_url, stream=True) |
| with open(unique_name, "wb") as f: |
| for chunk in response.iter_content(chunk_size=1024): |
| f.write(chunk) |
| return unique_name |
|
|
| |
| def extract_audio(video_path): |
| audio_path = "audio.mp3" |
| os.system(f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 3 \"{audio_path}\" -y") |
| if not os.path.exists(audio_path): |
| raise RuntimeError("Error: Failed to extract audio.") |
| return audio_path |
|
|
| |
| def transcribe_audio(audio_path): |
| model = whisper.load_model("large") |
| result = model.transcribe(audio_path) |
| return result['text'] |
|
|
| |
| def classify_content(text): |
| classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") |
| labels = ["educational", "entertainment", "news", "political", "religious", "technical", "advertisement", "social"] |
| result = classifier(text, candidate_labels=labels, hypothesis_template="This text is about {}.") |
| top_label = result['labels'][0] |
| confidence = result['scores'][0] |
| return {"category": top_label, "confidence": round(confidence, 2)} |
|
|
| @app.post("/process_video") |
| async def process_video( |
| video_url: str = Form(None), |
| video_file: UploadFile = File(None) |
| ): |
| try: |
| clean_old_files() |
|
|
| if video_url: |
| if video_url.endswith(".mp4"): |
| video_path = video_url |
| elif "youtube.com" in video_url or "youtu.be" in video_url: |
| video_path = download_video(video_url) |
| else: |
| video_path = download_direct_video(video_url) |
| elif video_file: |
| video_path = f"video_{uuid.uuid4().hex[:8]}.mp4" |
| with open(video_path, "wb") as f: |
| f.write(await video_file.read()) |
| else: |
| return JSONResponse(status_code=400, content={"error": "No video input provided."}) |
|
|
| audio_path = extract_audio(video_path) |
| transcription = transcribe_audio(audio_path) |
|
|
| with open("transcription.txt", "w", encoding="utf-8") as f: |
| f.write(transcription) |
|
|
| classification = classify_content(transcription) |
|
|
| return { |
| "transcription": transcription, |
| "classification": classification |
| } |
|
|
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|