File size: 5,037 Bytes
c564fad b6093b0 c564fad b6093b0 2b0ce36 3fc2a44 23e5406 9876d02 c564fad 2b0ce36 78c135b 3fc2a44 9876d02 3fc2a44 ab25f71 2a6a104 ab25f71 d0c0836 ab25f71 2a6a104 ab25f71 d0c0836 ab25f71 b6093b0 1aceee0 44d365d b6093b0 23e5406 b6093b0 c564fad b6093b0 ab25f71 d63a8c1 8cdcb92 b6093b0 3fc2a44 d0c0836 c564fad 8cdcb92 b6093b0 2b0ce36 c564fad b6093b0 c564fad b6093b0 8cdcb92 c564fad b6093b0 3fc2a44 25a7124 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import shutil, os, logging, uvicorn, tempfile
from typing import Optional
from utils.process_video import process_video
from utils.zip_response import zip_response
from utils.api_configs import api_configs
from utils.read_html import read_html
from utils.logger import setup_logger
from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends
from fastapi.responses import HTMLResponse, Response
from fastapi.security import HTTPBasic
from pydantic import BaseModel, field_validator
app = FastAPI()
security = HTTPBasic()
api_configs_file = os.path.abspath("api_config.yml")
class MP4Video(BaseModel):
video_file: UploadFile
@property
def filename(self):
return self.video_file.filename
@property
def file(self):
return self.video_file.file
@field_validator('video_file')
def validate_video_file(cls, v):
if not v.filename.endswith('.mp4'):
raise HTTPException(status_code=500, detail='Invalid video file type. Please upload an MP4 file.')
return v
class SRTFile(BaseModel):
srt_file: Optional[UploadFile] = None
@property
def filename(self):
return self.srt_file.filename
@property
def file(self):
return self.srt_file.file
@property
def size(self):
return self.srt_file.size
@field_validator('srt_file')
def validate_srt_file(cls, v):
if v.size > 0 and not v.filename.endswith('.srt'):
raise HTTPException(status_code=422, detail='Invalid subtitle file type. Please upload an SRT file.')
return v
@app.get("/")
async def root():
html_content = f"""
{read_html(os.path.join(os.getcwd(),"static/landing_page.html"))}
"""
return HTMLResponse(content=html_content)
@app.get("/submit_video/")
async def get_form():
html_content = f"""
{read_html(os.path.join(os.getcwd(),"static/submit_video.html"))}
"""
return HTMLResponse(content=html_content)
async def get_temp_dir():
dir = tempfile.TemporaryDirectory()
try:
yield dir.name
finally:
del dir
@app.post("/process_video/")
async def process_video_api(video_file: MP4Video = Depends(),
srt_file: SRTFile = Depends(),
task: Optional[str] = Form("transcribe"),
max_words_per_line: Optional[int] = Form(6),
fontsize: Optional[int] = Form(42),
font: Optional[str] = Form("FuturaPTHeavy"),
bg_color: Optional[str] = Form("#070a13b3"),
text_color: Optional[str] = Form("white"),
caption_mode: Optional[str] = Form("desktop"),
temp_dir: str = Depends(get_temp_dir)
):
try:
logging.info("Creating temporary directories")
with open(os.path.join(temp_dir, video_file.filename), 'w+b') as temp_file:
logging.info("Copying video UploadFile to the temporary directory")
try:
shutil.copyfileobj(video_file.file, temp_file)
finally:
video_file.file.close()
logging.info("Copying SRT UploadFile to the temp_input_path")
if srt_file.size > 0:
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
try:
shutil.copyfileobj(srt_file.file, temp_srt_file)
finally:
srt_file.file.close()
logging.info("Processing the video...")
output_path, _ = process_video(temp_file.name, temp_srt_file.name, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
logging.info("Zipping response...")
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
return Response(content = zip_file)
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.srt"), 'w+b') as temp_srt_file:
logging.info("Processing the video...")
output_path, srt_path = process_video(temp_file.name, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode, api_configs_file)
logging.info("Zipping response...")
with open(os.path.join(temp_dir, f"{video_file.filename.split('.')[0]}.zip"), 'w+b') as temp_zip_file:
zip_file = zip_response(temp_zip_file.name, [output_path, srt_path])
return Response(content = zip_file)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
app_logger = setup_logger('appLogger', 'main.log', level=logging.DEBUG)
uvicorn.run(app, host="0.0.0.0", port=8000) |