Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import uuid | |
| import gdown | |
| import whisper | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from dotenv import load_dotenv | |
| from postmarker.core import PostmarkClient | |
| # κ΅¬κΈ λλΌμ΄λΈ λ§ν¬, μ΄λ€ μΈμ΄, λ°μ μ΄λ©μΌ μ£Όμ | |
| # .env νμΌμμ μ€μν νκ²½ λ³μ λ‘λ | |
| load_dotenv() | |
| app = FastAPI( | |
| version="0.0.1", | |
| servers=[ | |
| { | |
| "url": "https://leekwoon-whisper-api.hf.space", | |
| "description": "video/audio transcription API", | |
| } | |
| ], | |
| ) | |
| # Whisper λͺ¨λΈ λ‘λ | |
| model = whisper.load_model("large-v2") | |
| postmark = PostmarkClient(server_token=os.getenv("POSTMARK_API_KEY")) | |
| executor = ThreadPoolExecutor(max_workers=3) # μ΅λ 3κ°μ μ€λ λλ‘ λΉλκΈ° μμ μ²λ¦¬ | |
| def extract_file_id(drive_url: str) -> str: | |
| """ | |
| Google Drive URLμμ νμΌ IDλ₯Ό μΆμΆν©λλ€. | |
| """ | |
| match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_url) | |
| if match: | |
| return match.group(1) | |
| match = re.search(r'file/d/([a-zA-Z0-9_-]+)', drive_url) | |
| if match: | |
| return match.group(1) | |
| match = re.search(r'([a-zA-Z0-9_-]{33,})', drive_url) | |
| if match: | |
| return match.group(1) | |
| raise ValueError("Invalid Google Drive URL") | |
| def send_email(to_email: str, srt_file_path: str, transcription_time: float): | |
| subject = "[kyobody - μλ§μμ±] μμ μ΄ μλ£λμμ΅λλ€." | |
| body = f"[kyobody - μλ§μμ±] μμ μ΄ μλ£λμμ΅λλ€. μ΄ μμ μκ°: {transcription_time:.2f} μ΄. SRT νμΌμ 첨λΆνμ¬ μ λ¬λ립λλ€." | |
| email = postmark.emails.Email( | |
| From=os.getenv("FROM_EMAIL"), | |
| To=to_email, | |
| Subject=subject, | |
| # HtmlBody='<html><body><strong>Hello</strong> dear Postmark user.</body></html>' | |
| HtmlBody=body | |
| ) | |
| email['X-Accept-Language'] = 'ko' | |
| email.attach(srt_file_path) | |
| email.send() | |
| def transcribe_and_send_email(temp_input_file: str, srt_file_path: str, email: str, language: str): | |
| try: | |
| # Transcribe the video/audio file | |
| import time | |
| start_time = time.time() | |
| result = model.transcribe(temp_input_file, language=language) | |
| transcription_time = time.time() - start_time | |
| # Save the transcription to an SRT file | |
| with open(srt_file_path, "w") as srt_file: | |
| for i, segment in enumerate(result["segments"]): | |
| start = segment['start'] | |
| end = segment['end'] | |
| text = segment['text'][1:] | |
| start_time = f"{int(start // 3600):02}:{int((start % 3600) // 60):02}:{int(start % 60):02},{int((start * 1000) % 1000):03}" | |
| end_time = f"{int(end // 3600):02}:{int((end % 3600) // 60):02}:{int(end % 60):02},{int((end * 1000) % 1000):03}" | |
| srt_file.write(f"{i + 1}\n") | |
| srt_file.write(f"{start_time} --> {end_time}\n") | |
| srt_file.write(f"{text}\n\n") | |
| # Send the result via email | |
| send_email(email, srt_file_path, transcription_time) | |
| # Clean up the temporary files | |
| os.remove(temp_input_file) | |
| os.remove(srt_file_path) | |
| except Exception as e: | |
| raise e | |
| def transcribe_video(url: str, email: str, background_tasks: BackgroundTasks, language: str = "ko"): | |
| try: | |
| # Extract file ID and download the file | |
| file_id = extract_file_id(url) | |
| download_url = f"https://drive.google.com/uc?id={file_id}" | |
| temp_input_file = f'/tmp/{uuid.uuid4()}.mp4' | |
| gdown.download(download_url, temp_input_file, quiet=False) | |
| # Define SRT file path | |
| srt_file_path = f'/tmp/{uuid.uuid4()}.srt' | |
| # Schedule the transcription and email sending in the background | |
| background_tasks.add_task(executor.submit, transcribe_and_send_email, temp_input_file, srt_file_path, email, language) | |
| # Respond to the client immediately | |
| return JSONResponse(status_code=202, content={"message": "Transcription started, you will receive an email when it's done."}) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |