| | import os
|
| | import shutil
|
| | import tempfile
|
| | from fastapi import FastAPI, Request, HTTPException
|
| | from fastapi.responses import JSONResponse, PlainTextResponse
|
| | from pydantic import BaseModel
|
| | import yt_dlp
|
| | import ffmpeg
|
| | import whisper
|
| | import torch
|
| | from typing import List
|
| |
|
| | app = FastAPI()
|
| |
|
| | WELCOME_MSG = "Whisper Transcriber Free API — أرسل طلب POST إلى /transcribe مع {\"url\": \"...\"} لتحليل الفيديو."
|
| |
|
| | class TranscribeRequest(BaseModel):
|
| | url: str
|
| |
|
| | class TranscribedSegment(BaseModel):
|
| | timestamp: str
|
| | text: str
|
| |
|
| | @app.get("/")
|
| | def root():
|
| | return PlainTextResponse(WELCOME_MSG)
|
| |
|
| | @app.post("/transcribe", response_model=List[TranscribedSegment])
|
| | def transcribe_audio(req: TranscribeRequest):
|
| | url = req.url
|
| | if not url or not url.startswith("http"):
|
| |
|
| | return [
|
| | {"timestamp": "00:00", "text": "خطأ: الرابط غير صالح."}
|
| | ]
|
| | tmpdir = tempfile.mkdtemp()
|
| | audio_path = os.path.join(tmpdir, "audio.wav")
|
| | video_path = os.path.join(tmpdir, "video")
|
| | try:
|
| |
|
| | ydl_opts = {
|
| | 'outtmpl': video_path,
|
| | 'format': 'bestaudio/best',
|
| | 'quiet': True,
|
| | 'noplaylist': True,
|
| | }
|
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| | ydl.download([url])
|
| |
|
| | downloaded_files = os.listdir(tmpdir)
|
| | video_file = next((f for f in downloaded_files if f != "audio.wav"), None)
|
| | if not video_file:
|
| | raise Exception("فشل التنزيل.")
|
| | video_file_path = os.path.join(tmpdir, video_file)
|
| |
|
| | ffmpeg.input(video_file_path).output(audio_path, ac=1, ar=16000, format='wav').run(overwrite_output=True, quiet=True)
|
| |
|
| | model_size = os.environ.get("WHISPER_MODEL", "base")
|
| | model = whisper.load_model(model_size, device="cuda" if torch.cuda.is_available() else "cpu")
|
| |
|
| | result = model.transcribe(audio_path, word_timestamps=False, verbose=False)
|
| | segments = result.get("segments", [])
|
| | output = []
|
| | for seg in segments:
|
| | start = int(seg["start"])
|
| | minutes = start // 60
|
| | seconds = start % 60
|
| |
|
| | timestamp = f"{minutes:02d}:{seconds:02d}"
|
| | output.append({
|
| | "timestamp": timestamp,
|
| | "text": seg["text"].strip()
|
| | })
|
| | return output
|
| | except Exception as e:
|
| |
|
| | return [
|
| | {"timestamp": "00:00", "text": f"خطأ: {str(e)}"}
|
| | ]
|
| | finally:
|
| | shutil.rmtree(tmpdir, ignore_errors=True)
|
| |
|