whisper-api / app.py
dahyedahye's picture
.
4d0661a
import os
import re
import uuid
import gdown
import whisper
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from postmarker.core import PostmarkClient
# ꡬ글 λ“œλΌμ΄λΈŒ 링크, μ–΄λ–€ μ–Έμ–΄, 받을 이메일 μ£Όμ†Œ
# .env νŒŒμΌμ—μ„œ μ€‘μš”ν•œ ν™˜κ²½ λ³€μˆ˜ λ‘œλ“œ
load_dotenv()
app = FastAPI(
version="0.0.1",
servers=[
{
"url": "https://leekwoon-whisper-api.hf.space",
"description": "video/audio transcription API",
}
],
)
# Whisper λͺ¨λΈ λ‘œλ“œ
model = whisper.load_model("large-v2")
postmark = PostmarkClient(server_token=os.getenv("POSTMARK_API_KEY"))
executor = ThreadPoolExecutor(max_workers=3) # μ΅œλŒ€ 3개의 μŠ€λ ˆλ“œλ‘œ 비동기 μž‘μ—… 처리
def extract_file_id(drive_url: str) -> str:
"""
Google Drive URLμ—μ„œ 파일 IDλ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.
"""
match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_url)
if match:
return match.group(1)
match = re.search(r'file/d/([a-zA-Z0-9_-]+)', drive_url)
if match:
return match.group(1)
match = re.search(r'([a-zA-Z0-9_-]{33,})', drive_url)
if match:
return match.group(1)
raise ValueError("Invalid Google Drive URL")
def send_email(to_email: str, srt_file_path: str, transcription_time: float):
subject = "[kyobody - μžλ§‰μƒμ„±] μž‘μ—…μ΄ μ™„λ£Œλ˜μ—ˆμŠ΅λ‹ˆλ‹€."
body = f"[kyobody - μžλ§‰μƒμ„±] μž‘μ—…μ΄ μ™„λ£Œλ˜μ—ˆμŠ΅λ‹ˆλ‹€. 총 μ†Œμš” μ‹œκ°„: {transcription_time:.2f} 초. SRT νŒŒμΌμ„ μ²¨λΆ€ν•˜μ—¬ μ „λ‹¬λ“œλ¦½λ‹ˆλ‹€."
email = postmark.emails.Email(
From=os.getenv("FROM_EMAIL"),
To=to_email,
Subject=subject,
# HtmlBody='<html><body><strong>Hello</strong> dear Postmark user.</body></html>'
HtmlBody=body
)
email['X-Accept-Language'] = 'ko'
email.attach(srt_file_path)
email.send()
def transcribe_and_send_email(temp_input_file: str, srt_file_path: str, email: str, language: str):
try:
# Transcribe the video/audio file
import time
start_time = time.time()
result = model.transcribe(temp_input_file, language=language)
transcription_time = time.time() - start_time
# Save the transcription to an SRT file
with open(srt_file_path, "w") as srt_file:
for i, segment in enumerate(result["segments"]):
start = segment['start']
end = segment['end']
text = segment['text'][1:]
start_time = f"{int(start // 3600):02}:{int((start % 3600) // 60):02}:{int(start % 60):02},{int((start * 1000) % 1000):03}"
end_time = f"{int(end // 3600):02}:{int((end % 3600) // 60):02}:{int(end % 60):02},{int((end * 1000) % 1000):03}"
srt_file.write(f"{i + 1}\n")
srt_file.write(f"{start_time} --> {end_time}\n")
srt_file.write(f"{text}\n\n")
# Send the result via email
send_email(email, srt_file_path, transcription_time)
# Clean up the temporary files
os.remove(temp_input_file)
os.remove(srt_file_path)
except Exception as e:
raise e
@app.post("/transcribe/")
def transcribe_video(url: str, email: str, background_tasks: BackgroundTasks, language: str = "ko"):
try:
# Extract file ID and download the file
file_id = extract_file_id(url)
download_url = f"https://drive.google.com/uc?id={file_id}"
temp_input_file = f'/tmp/{uuid.uuid4()}.mp4'
gdown.download(download_url, temp_input_file, quiet=False)
# Define SRT file path
srt_file_path = f'/tmp/{uuid.uuid4()}.srt'
# Schedule the transcription and email sending in the background
background_tasks.add_task(executor.submit, transcribe_and_send_email, temp_input_file, srt_file_path, email, language)
# Respond to the client immediately
return JSONResponse(status_code=202, content={"message": "Transcription started, you will receive an email when it's done."})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))