video-processor / app.py
pmrony's picture
Update app.py
3ecda7f verified
import os
import subprocess
import uuid
import asyncio
import shutil
from fastapi import FastAPI, UploadFile, File, Form, Header, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from telegram import Bot
from telegram.request import HTTPXRequest
app = FastAPI()
SECRET_KEY = "MySecretWatermarkKey123"
BOT_TOKEN = os.environ.get("BOT_TOKEN")
# হাই স্পিড কানেকশন পুল
t_request = HTTPXRequest(connect_timeout=30, read_timeout=60, write_timeout=600)
bot_instance = Bot(token=BOT_TOKEN, request=t_request) if BOT_TOKEN else None
@app.post("/watermark")
async def add_watermark(
file: UploadFile = File(...),
watermark_text: str = Form(...),
chat_id: int = Form(...),
api_key: str = Header(None)
):
if api_key != SECRET_KEY:
raise HTTPException(status_code=403, detail="Unauthorized")
unique_id = uuid.uuid4()
input_path = f"in_{unique_id}.mp4"
output_path = f"out_{unique_id}.mp4"
thumb_path = f"thumb_{unique_id}.jpg"
try:
# ১. ফাইল সেভ
with open(input_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# ২. সুপার ফাস্ট FFmpeg (Speed optimized)
# crf 28 এবং preset superfast ভিডিও এডিটিং দ্রুত করবে
cmd = [
"ffmpeg", "-i", input_path,
"-vf", f"drawtext=text='{watermark_text}':x=w-tw-20:y=h-th-20:fontsize=30:fontcolor=white@0.5",
"-c:v", "libx264", "-preset", "superfast", "-crf", "28", "-c:a", "copy",
"-movflags", "+faststart",
output_path
]
process = await asyncio.create_subprocess_exec(*cmd)
await process.wait()
# ৩. থাম্বনেইল জেনারেট করা (ভিডিওর ২য় সেকেন্ড থেকে)
thumb_cmd = ["ffmpeg", "-i", output_path, "-ss", "00:00:02", "-vframes", "1", "-y", thumb_path]
thumb_proc = await asyncio.create_subprocess_exec(*thumb_cmd)
await thumb_proc.wait()
# ৪. আপলোড করার চেষ্টা (থাম্বনেইল সহ)
if os.path.exists(output_path):
try:
with open(output_path, "rb") as v, open(thumb_path, "rb") as t:
await bot_instance.send_video(
chat_id=chat_id,
video=v,
thumbnail=t, # থাম্বনেইল সেট করা হলো
caption=f"✅ {watermark_text}",
supports_streaming=True
)
return JSONResponse({"status": "success"})
except Exception as e:
# আপলোড ফেইল হলে মেইন বটকে ভিডিও এবং থাম্বনেইল ফেরত পাঠানো (Rare case)
return FileResponse(output_path, media_type="video/mp4")
except Exception as e:
return JSONResponse({"status": "error", "message": str(e)}, status_code=500)
finally:
# ক্লিনআপ
for path in [input_path, thumb_path]:
if os.path.exists(path): os.remove(path)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)