ayloll's picture
Upload 3 files
c958e14 verified
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse
import yt_dlp
import whisper
import os
import requests
import uuid
from transformers import pipeline
app = FastAPI()
# Delete old files before starting a new process
def clean_old_files():
for fname in ["video.mp4", "audio.mp3", "transcription.txt"]:
if os.path.exists(fname):
os.remove(fname)
# Download video from YouTube
def download_video(video_url):
unique_name = f"video_{uuid.uuid4().hex[:8]}.mp4"
ydl_opts = {'format': 'mp4', 'outtmpl': unique_name}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
return unique_name
# Download video from a direct link
def download_direct_video(video_url):
unique_name = f"video_{uuid.uuid4().hex[:8]}.mp4"
response = requests.get(video_url, stream=True)
with open(unique_name, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
return unique_name
# Extract audio using ffmpeg
def extract_audio(video_path):
audio_path = "audio.mp3"
os.system(f"ffmpeg -i \"{video_path}\" -vn -acodec libmp3lame -q:a 3 \"{audio_path}\" -y")
if not os.path.exists(audio_path):
raise RuntimeError("Error: Failed to extract audio.")
return audio_path
# Transcribe audio using Whisper
def transcribe_audio(audio_path):
model = whisper.load_model("large")
result = model.transcribe(audio_path)
return result['text']
# Classify text using zero-shot classification
def classify_content(text):
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
labels = ["educational", "entertainment", "news", "political", "religious", "technical", "advertisement", "social"]
result = classifier(text, candidate_labels=labels, hypothesis_template="This text is about {}.")
top_label = result['labels'][0]
confidence = result['scores'][0]
return {"category": top_label, "confidence": round(confidence, 2)}
@app.post("/process_video")
async def process_video(
video_url: str = Form(None),
video_file: UploadFile = File(None)
):
try:
clean_old_files()
if video_url:
if video_url.endswith(".mp4"):
video_path = video_url
elif "youtube.com" in video_url or "youtu.be" in video_url:
video_path = download_video(video_url)
else:
video_path = download_direct_video(video_url)
elif video_file:
video_path = f"video_{uuid.uuid4().hex[:8]}.mp4"
with open(video_path, "wb") as f:
f.write(await video_file.read())
else:
return JSONResponse(status_code=400, content={"error": "No video input provided."})
audio_path = extract_audio(video_path)
transcription = transcribe_audio(audio_path)
with open("transcription.txt", "w", encoding="utf-8") as f:
f.write(transcription)
classification = classify_content(transcription)
return {
"transcription": transcription,
"classification": classification
}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})