import os import re import uuid import gdown import whisper from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from dotenv import load_dotenv from postmarker.core import PostmarkClient # 구글 드라이브 링크, 어떤 언어, 받을 이메일 주소 # .env 파일에서 중요한 환경 변수 로드 load_dotenv() app = FastAPI( version="0.0.1", servers=[ { "url": "https://leekwoon-whisper-api.hf.space", "description": "video/audio transcription API", } ], ) # Whisper 모델 로드 model = whisper.load_model("large-v2") postmark = PostmarkClient(server_token=os.getenv("POSTMARK_API_KEY")) executor = ThreadPoolExecutor(max_workers=3) # 최대 3개의 스레드로 비동기 작업 처리 def extract_file_id(drive_url: str) -> str: """ Google Drive URL에서 파일 ID를 추출합니다. """ match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_url) if match: return match.group(1) match = re.search(r'file/d/([a-zA-Z0-9_-]+)', drive_url) if match: return match.group(1) match = re.search(r'([a-zA-Z0-9_-]{33,})', drive_url) if match: return match.group(1) raise ValueError("Invalid Google Drive URL") def send_email(to_email: str, srt_file_path: str, transcription_time: float): subject = "[kyobody - 자막생성] 작업이 완료되었습니다." body = f"[kyobody - 자막생성] 작업이 완료되었습니다. 총 소요 시간: {transcription_time:.2f} 초. SRT 파일을 첨부하여 전달드립니다." email = postmark.emails.Email( From=os.getenv("FROM_EMAIL"), To=to_email, Subject=subject, # HtmlBody='
Hello dear Postmark user.' HtmlBody=body ) email['X-Accept-Language'] = 'ko' email.attach(srt_file_path) email.send() def transcribe_and_send_email(temp_input_file: str, srt_file_path: str, email: str, language: str): try: # Transcribe the video/audio file import time start_time = time.time() result = model.transcribe(temp_input_file, language=language) transcription_time = time.time() - start_time # Save the transcription to an SRT file with open(srt_file_path, "w") as srt_file: for i, segment in enumerate(result["segments"]): start = segment['start'] end = segment['end'] text = segment['text'][1:] start_time = f"{int(start // 3600):02}:{int((start % 3600) // 60):02}:{int(start % 60):02},{int((start * 1000) % 1000):03}" end_time = f"{int(end // 3600):02}:{int((end % 3600) // 60):02}:{int(end % 60):02},{int((end * 1000) % 1000):03}" srt_file.write(f"{i + 1}\n") srt_file.write(f"{start_time} --> {end_time}\n") srt_file.write(f"{text}\n\n") # Send the result via email send_email(email, srt_file_path, transcription_time) # Clean up the temporary files os.remove(temp_input_file) os.remove(srt_file_path) except Exception as e: raise e @app.post("/transcribe/") def transcribe_video(url: str, email: str, background_tasks: BackgroundTasks, language: str = "ko"): try: # Extract file ID and download the file file_id = extract_file_id(url) download_url = f"https://drive.google.com/uc?id={file_id}" temp_input_file = f'/tmp/{uuid.uuid4()}.mp4' gdown.download(download_url, temp_input_file, quiet=False) # Define SRT file path srt_file_path = f'/tmp/{uuid.uuid4()}.srt' # Schedule the transcription and email sending in the background background_tasks.add_task(executor.submit, transcribe_and_send_email, temp_input_file, srt_file_path, email, language) # Respond to the client immediately return JSONResponse(status_code=202, content={"message": "Transcription started, you will receive an email when it's done."}) except Exception as e: raise HTTPException(status_code=500, detail=str(e))