File size: 4,941 Bytes
e84d389 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | from fastapi import APIRouter, Depends, Form, File, UploadFile, BackgroundTasks, HTTPException
from fastapi.responses import StreamingResponse
from datetime import datetime
from typing import Optional, List
import os
from ..models.transcription import TranscriptionRequest
from ..dependencies import get_current_user
from ..services.transcription import process_transcription, save_video_file
from ..services.llm import init_google_client
from ..config import settings
from ..db.mongodb import mongodb
from google.genai import types
router = APIRouter()
@router.post("/transcribe")
async def transcribe(
request: TranscriptionRequest,
current_user = Depends(get_current_user)
):
"""
Transcribe a YouTube video via Google GenAI and prepare the RAG system
"""
try:
client = init_google_client()
content = types.Content(
parts=[
types.Part(text="Transcribe the Video. Write all the things described in the video"),
types.Part(file_data=types.FileData(file_uri=request.youtube_url))
]
)
response = client.models.generate_content(
model='models/gemini-2.0-flash',
contents=content
)
transcription = response.candidates[0].content.parts[0].text
title = f"YouTube Video - {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}"
session_id = process_transcription(
transcription,
current_user.username,
title,
source_type="youtube",
source_url=request.youtube_url
)
return {"session_id": session_id, "message": "YouTube video transcribed and RAG system prepared"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error transcribing video: {str(e)}")
@router.post("/upload")
async def upload_video(
background_tasks: BackgroundTasks,
title: str = Form(...),
file: UploadFile = File(...),
prompt: str = Form("Transcribe the Video. Write all the things described in the video"),
current_user = Depends(get_current_user)
):
"""
Upload a video file (max 20MB), transcribe via GenAI, and prepare the RAG system
"""
try:
contents = await file.read()
file_size = len(contents)
if file_size > 20 * 1024 * 1024:
raise HTTPException(status_code=400, detail="File size exceeds 20MB limit")
if not file.content_type.startswith('video/'):
raise HTTPException(status_code=400, detail="File must be a video")
client = init_google_client()
content = types.Content(
parts=[
types.Part(text=prompt),
types.Part(inline_data=types.Blob(data=contents, mime_type=file.content_type))
]
)
response = client.models.generate_content(
model='models/gemini-2.0-flash',
contents=content
)
transcription = response.candidates[0].content.parts[0].text
session_id = process_transcription(
transcription,
current_user.username,
title,
source_type="upload",
file_size=file_size
)
ext = os.path.splitext(file.filename)[1]
file_path = os.path.join(settings.VIDEOS_DIR, f"{session_id}{ext}")
background_tasks.add_task(save_video_file, session_id, file_path, contents)
return {"session_id": session_id, "message": "Uploaded video transcribed and RAG system prepared"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing uploaded video: {str(e)}")
@router.get("/download/{video_id}")
async def download_video(
video_id: str,
current_user = Depends(get_current_user)
):
"""
Download a previously uploaded video by streaming the saved file
"""
video_data = mongodb.videos.find_one({"video_id": video_id})
if not video_data:
raise HTTPException(status_code=404, detail="Video not found")
if video_data["user_id"] != current_user.username:
raise HTTPException(status_code=403, detail="Not authorized to access this video")
if video_data["source_type"] == "youtube":
return {"message": "This is a YouTube video. Access via:", "url": video_data["source_url"]}
files = [f for f in os.listdir(settings.VIDEOS_DIR) if f.startswith(video_id)]
if not files:
raise HTTPException(status_code=404, detail="Video file not found")
path = os.path.join(settings.VIDEOS_DIR, files[0])
def iterfile():
with open(path, 'rb') as f:
yield from f
mime_type = f"video/{os.path.splitext(files[0])[1][1:]}"
return StreamingResponse(
iterfile(),
media_type=mime_type,
headers={"Content-Disposition": f"attachment; filename={video_data['title']}{os.path.splitext(files[0])[1]}"}
) |