from fastapi import FastAPI, HTTPException from pydantic import BaseModel import subprocess import requests import os import time app = FastAPI() ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY") class VideoURL(BaseModel): url: str @app.post("/analyze") def analyze_video(video: VideoURL): try: # 1. Download audio using yt-dlp with modified cache location output_file = "audio.mp3" cmd = [ "yt-dlp", "-x", "--audio-format", "mp3", "--no-cache-dir", # Disable cache to avoid permission issues "--force-ipv4", # Force IPv4 to avoid network issues "-o", output_file, video.url ] # Add cookies if available (you'll need to provide a cookies.txt file) if os.path.exists("cookies.txt"): cmd.extend(["--cookies", "cookies.txt"]) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"Download error: {result.stderr}") # Rest of your code remains the same... # 2. Upload audio to AssemblyAI headers = {"authorization": ASSEMBLYAI_API_KEY} with open(output_file, "rb") as f: upload_response = requests.post( "https://api.assemblyai.com/v2/upload", headers=headers, files={"file": f} ) audio_url = upload_response.json()["upload_url"] # 3. Send transcription request transcript_response = requests.post( "https://api.assemblyai.com/v2/transcript", json={"audio_url": audio_url}, headers=headers ) transcript_id = transcript_response.json()["id"] # 4. Poll for completion while True: poll = requests.get(f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers=headers) status = poll.json()["status"] if status == "completed": return {"transcription": poll.json()["text"]} elif status == "error": raise Exception("Transcription error") time.sleep(3) except Exception as e: raise HTTPException(status_code=500, detail=str(e))