Avatarsokhango / app.py
Ezmary's picture
Update app.py
c9e7bc1 verified
import os
import uuid
import asyncio
import logging
import shutil
import glob
import subprocess
import math
import random
from datetime import datetime, timedelta
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import JSONResponse
from sqlalchemy import create_engine, Column, String, Integer, PickleType, DateTime, Boolean, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import httpx
from pydub import AudioSegment
from pydub.silence import detect_silence
from PIL import Image, ImageDraw, ImageFont
# --- تنظیمات ---
# لیست کامل ۱۶ کارگر
WORKER_URLS = [
"https://ezmary-avatarworker1.hf.space",
"https://ezmary-avatarworker2.hf.space",
"https://ezmary-avatarworker3.hf.space",
"https://ezmary-avatarworker4.hf.space",
"https://ezmary-avatarworker5.hf.space",
"https://ezmary-avatarworker6.hf.space",
"https://ezmary-avatarworker7.hf.space",
"https://ezmary-avatarworker8.hf.space",
"https://ezmary-avatarworker9.hf.space",
"https://ezmary-avatarworker10.hf.space",
"https://ezmary-avatarworker11.hf.space",
"https://ezmary-avatarworker12.hf.space",
"https://ezmary-avatarworker13.hf.space",
"https://ezmary-avatarworker14.hf.space",
"https://ezmary-avatarworker15.hf.space",
"https://ezmary-avatarworker16.hf.space"
]
MANAGER_BASE_URL = "https://ezmary-avatarsokhango.hf.space"
# --- تنظیمات برش هوشمند (۱۲ تا ۱۹ ثانیه) ---
MIN_CUT_TIME = 12000 # ۱۲ ثانیه
MAX_CUT_TIME = 19000 # ۱۹ ثانیه
SILENCE_THRESH = -35 # حساسیت کمتر برای پیدا کردن راحت‌تر سکوت
SILENCE_MIN_LEN = 250 # کاهش حداقل زمان سکوت برای پیدا کردن برش‌های بیشتر
# --- تنظیمات سگ نگهبان ---
WATCHDOG_CHECK_INTERVAL = 60 # هر ۱ دقیقه چک کن
WATCHDOG_TIMEOUT_THRESHOLD = 7200 # ۲ ساعت
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# قفل دیتابیس
callback_lock = asyncio.Lock()
# ساخت پوشه‌ها
for path in ["static/uploads", "static/temp", "static/videos", "data", "static/assets"]:
os.makedirs(path, exist_ok=True)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# --- دیتابیس ---
SQLALCHEMY_DATABASE_URL = "sqlite:///./data/jobs_final.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class Job(Base):
__tablename__ = "jobs"
id = Column(String, primary_key=True, index=True)
status = Column(String, default="PROCESSING")
resolution = Column(String)
is_premium = Column(Boolean, default=False)
original_image_path = Column(String)
original_audio_path = Column(String)
total_parts = Column(Integer, default=1)
parts_status = Column(JSON, default={})
audio_parts = Column(PickleType, default=[])
video_parts = Column(JSON, default={})
final_video_url = Column(String, nullable=True)
message = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_activity = Column(DateTime, default=datetime.utcnow)
Base.metadata.create_all(bind=engine)
# --- توابع واترمارک ---
def create_watermark_image(text, output_path):
try:
width, height = 250, 60
img = Image.new('RGBA', (width, height), (255, 255, 255, 0))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 20)
except:
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoSansArabic-Bold.ttf", 20)
except:
font = ImageFont.load_default()
text_color = (255, 255, 255, 70)
stroke_color = (0, 0, 0, 80)
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
position = ((width - text_width) / 2, (height - text_height) / 2)
draw.text(position, text, font=font, fill=text_color, stroke_width=1, stroke_fill=stroke_color)
img.save(output_path, "PNG")
return True
except Exception as e:
logger.error(f"Watermark creation failed: {e}")
return False
def apply_watermark(input_video, watermark_img, output_video):
try:
speed = 8
expr_x = (
f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), "
f"if(lt(mod(t,15),6), (W-w-10) - {speed}*mod(t,3), "
f"if(lt(mod(t,15),9), (W-w-10) - {speed}*mod(t,3), "
f"if(lt(mod(t,15),12), 10 + {speed}*mod(t,3), "
"(W-w)/2 + 40*sin(t)))))"
)
expr_y = (
f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), "
f"if(lt(mod(t,15),6), (H-h-10) - {speed}*mod(t,3), "
f"if(lt(mod(t,15),9), 10 + {speed}*mod(t,3), "
f"if(lt(mod(t,15),12), (H-h-10) - {speed}*mod(t,3), "
"(H-h)/2 + 20*cos(t)))))"
)
subprocess.run([
"ffmpeg", "-y", "-i", input_video, "-i", watermark_img,
"-filter_complex", f"[0:v][1:v]overlay=x='{expr_x}':y='{expr_y}'",
"-c:a", "copy",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "24",
output_video
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except Exception as e:
logger.error(f"Apply watermark failed: {e}")
return False
# --- توابع پردازش هوشمند صدا (برش دقیق) ---
def smart_split_audio(file_path, job_id):
try:
audio = AudioSegment.from_file(file_path)
total_len = len(audio)
parts = []
start = 0
part_idx = 0
while start < total_len:
if total_len - start <= MAX_CUT_TIME:
chunk = audio[start:]
out_name = f"static/uploads/{job_id}_seg{part_idx}.wav"
chunk.export(out_name, format="wav")
parts.append(out_name)
break
cut_point = 0
found_cut = False
search_start = start + 14000
search_end = min(start + MAX_CUT_TIME, total_len)
if search_end > search_start:
chunk_search = audio[search_start:search_end]
silences = detect_silence(chunk_search, min_silence_len=SILENCE_MIN_LEN, silence_thresh=SILENCE_THRESH)
if silences:
s = silences[0]
mid = s[0] + (s[1] - s[0]) // 2
cut_point = search_start + mid
found_cut = True
if not found_cut:
search_start_safe = start + 9000
search_end_safe = start + 14000
chunk_safe = audio[search_start_safe:search_end_safe]
silences_safe = detect_silence(chunk_safe, min_silence_len=200, silence_thresh=SILENCE_THRESH)
if silences_safe:
s = silences_safe[0]
mid = s[0] + (s[1] - s[0]) // 2
cut_point = search_start_safe + mid
found_cut = True
if not found_cut:
cut_point = start + MAX_CUT_TIME
chunk = audio[start:cut_point]
out_name = f"static/uploads/{job_id}_seg{part_idx}.wav"
chunk.export(out_name, format="wav")
parts.append(out_name)
start = cut_point
part_idx += 1
return parts
except Exception as e:
logger.error(f"Smart Split Error: {e}")
return fallback_split(file_path, job_id)
def fallback_split(input_path, job_id):
try:
cmd = ["ffmpeg", "-i", input_path, "-f", "segment", "-segment_time", "14", "-c", "copy", f"static/uploads/{job_id}_seg%d.wav"]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return sorted(glob.glob(f"static/uploads/{job_id}_seg*.wav"))
except: return []
def convert_audio_to_wav(input_path, output_path):
try:
subprocess.run(["ffmpeg", "-y", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", output_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except: return False
def merge_videos_smart(video_map, total_parts, output_path):
try:
videos = []
for i in range(total_parts):
if str(i) in video_map:
videos.append(video_map[str(i)])
else:
return False
if not videos: return False
if len(videos) == 1:
shutil.copy(videos[0], output_path)
return True
with open("list.txt", "w") as f:
for v in videos:
f.write(f"file '{os.path.abspath(v)}'\n")
subprocess.run([
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", "list.txt",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "24",
"-c:a", "aac",
output_path
], check=True)
return True
except Exception as e:
logger.error(f"Merge Error: {e}")
return False
# --- توزیع کار ---
async def dispatch_all_parallel(job_id, img_path, audio_files, resolution):
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = []
full_img = f"{MANAGER_BASE_URL}/{img_path}"
callback = f"{MANAGER_BASE_URL}/api/callback"
num_workers = len(WORKER_URLS)
if num_workers == 0: return
for i, audio_file in enumerate(audio_files):
worker = WORKER_URLS[i % num_workers]
full_aud = f"{MANAGER_BASE_URL}/{audio_file}"
combined_id = f"{job_id}___{i}"
payload = {
"job_id": combined_id,
"image_url": full_img,
"audio_url": full_aud,
"resolution": resolution,
"callback_url": callback
}
logger.info(f"🚀 Dispatching Part {i+1} to {worker}")
tasks.append(client.post(f"{worker}/process", json=payload))
await asyncio.gather(*tasks, return_exceptions=True)
# --- سگ نگهبان ---
async def watchdog_loop():
while True:
try:
await asyncio.sleep(WATCHDOG_CHECK_INTERVAL)
db = SessionLocal()
processing_jobs = db.query(Job).filter(Job.status == "PROCESSING").all()
for job in processing_jobs:
now = datetime.utcnow()
time_diff = (now - job.last_activity).total_seconds()
if time_diff > WATCHDOG_TIMEOUT_THRESHOLD:
p_status = dict(job.parts_status)
retry_needed = False
for idx_str, status in p_status.items():
if status == "PENDING":
logger.warning(f"⚠️ Watchdog: Retrying Part {idx_str} (> 2 hours)...")
current_audio = job.audio_parts[int(idx_str)]
worker = random.choice(WORKER_URLS)
full_img = f"{MANAGER_BASE_URL}/{job.original_image_path}"
full_aud = f"{MANAGER_BASE_URL}/{current_audio}"
callback = f"{MANAGER_BASE_URL}/api/callback"
combined_id = f"{job.id}___{idx_str}"
payload = {
"job_id": combined_id, "image_url": full_img, "audio_url": full_aud,
"resolution": job.resolution, "callback_url": callback
}
async with httpx.AsyncClient(timeout=5.0) as client:
try: await client.post(f"{worker}/process", json=payload)
except: pass
retry_needed = True
if retry_needed:
job.last_activity = datetime.utcnow()
db.commit()
db.close()
except Exception as e:
logger.error(f"Watchdog Error: {e}")
@app.on_event("startup")
async def startup_event():
asyncio.create_task(watchdog_loop())
# --- روت‌ها ---
@app.get("/")
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/generate")
async def create_job(background_tasks: BackgroundTasks, image: UploadFile = File(...), audio: UploadFile = File(...), resolution: str = Form(...), subscriptionStatus: str = Form(default="free")):
job_id = str(uuid.uuid4())
is_premium = (subscriptionStatus == 'paid')
final_res = resolution if is_premium else "480P"
img_ext = image.filename.split('.')[-1]
img_path = f"static/uploads/{job_id}.{img_ext}"
temp_audio = f"static/uploads/{job_id}_temp"
final_audio = f"static/uploads/{job_id}_original.wav"
with open(img_path, "wb") as f: f.write(await image.read())
with open(temp_audio, "wb") as f: f.write(await audio.read())
if not convert_audio_to_wav(temp_audio, final_audio):
return JSONResponse(status_code=400, content={"error": "Audio conversion failed"})
# برش صدا
audio_parts = smart_split_audio(final_audio, job_id)
# --- منطق جدید برای کاربران رایگان ---
# اگر کاربر رایگان است و تعداد پارت‌ها بیشتر از ۱ است، فقط پارت اول را نگه دار
if not is_premium and len(audio_parts) > 1:
# حذف فایل‌های اضافی از حافظه
for part in audio_parts[1:]:
try:
os.remove(part)
except:
pass
# فقط پارت اول (که زیر 19 ثانیه است) را نگه دار
audio_parts = [audio_parts[0]]
initial_msg = "شروع پردازش (محدود به ۱۹ ثانیه نسخه رایگان)..."
else:
initial_msg = f"شروع پردازش همزمان {len(audio_parts)} قسمت..."
total_parts = len(audio_parts)
parts_status_init = {str(i): "PENDING" for i in range(total_parts)}
db = SessionLocal()
new_job = Job(
id=job_id, status="PROCESSING", resolution=final_res, is_premium=is_premium,
original_image_path=img_path, original_audio_path=final_audio,
total_parts=total_parts, parts_status=parts_status_init,
audio_parts=audio_parts, video_parts={},
message=initial_msg,
last_activity=datetime.utcnow()
)
db.add(new_job)
db.commit()
db.close()
background_tasks.add_task(dispatch_all_parallel, job_id, img_path, audio_parts, final_res)
return {"status": "success", "job_id": job_id, "total_parts": total_parts}
@app.post("/api/callback")
async def callback(data: dict, background_tasks: BackgroundTasks):
combined_id = data.get("job_id")
status = data.get("status")
video_url = data.get("video_url")
if "___" in combined_id:
real_job_id, part_index_str = combined_id.split("___")
part_index = int(part_index_str)
else:
return {"ack": False}
async with callback_lock:
db = SessionLocal()
job = db.query(Job).filter(Job.id == real_job_id).first()
if not job:
db.close()
return {"error": "Job not found"}
job.last_activity = datetime.utcnow()
if status == "FAILED":
p_status = dict(job.parts_status)
p_status[str(part_index)] = "FAILED"
job.parts_status = p_status
db.commit()
db.close()
return {"ack": True}
current_vid_path = f"static/temp/{real_job_id}_part_{part_index}.mp4"
try:
async with httpx.AsyncClient() as client:
resp = await client.get(video_url, follow_redirects=True, timeout=None)
if resp.status_code == 200:
with open(current_vid_path, "wb") as f: f.write(resp.content)
else: raise Exception("Download failed")
except:
db.close()
return {"ack": False}
v_parts = dict(job.video_parts)
v_parts[str(part_index)] = current_vid_path
job.video_parts = v_parts
p_status = dict(job.parts_status)
p_status[str(part_index)] = "DONE"
job.parts_status = p_status
done_count = sum(1 for s in p_status.values() if s == "DONE")
if done_count < job.total_parts:
job.message = f"تکمیل {done_count} از {job.total_parts} قسمت..."
db.commit()
else:
job.message = "در حال میکس نهایی ویدیوها..."
db.commit()
final_output = f"static/videos/{real_job_id}_final.mp4"
temp_merged = f"static/temp/{real_job_id}_merged.mp4"
if merge_videos_smart(v_parts, job.total_parts, temp_merged):
if not job.is_premium:
job.message = "در حال افزودن واترمارک..."
db.commit()
watermark_path = "static/assets/watermark.png"
if not os.path.exists(watermark_path):
create_watermark_image("هوش مصنوعی آلفا", watermark_path)
if apply_watermark(temp_merged, watermark_path, final_output):
pass
else:
shutil.copy(temp_merged, final_output)
else:
shutil.copy(temp_merged, final_output)
job.status = "COMPLETED"
job.message = "تکمیل شد!"
job.final_video_url = f"/static/videos/{real_job_id}_final.mp4"
try:
for f in glob.glob(f"static/temp/{real_job_id}_*"): os.remove(f)
for f in glob.glob(f"static/uploads/{real_job_id}_seg*"): os.remove(f)
except: pass
else:
job.status = "FAILED"
job.message = "خطا در ادغام ویدیوها"
db.commit()
db.close()
return {"ack": True}
@app.get("/api/status/{job_id}")
def get_status(job_id: str):
db = SessionLocal()
job = db.query(Job).filter(Job.id == job_id).first()
db.close()
if not job: return JSONResponse(status_code=404, content={"status": "not_found"})
p_status = job.parts_status
done_count = sum(1 for s in p_status.values() if s == "DONE")
return {
"status": job.status,
"message": job.message,
"video_url": job.final_video_url,
"is_premium": job.is_premium,
"current_part": done_count,
"total_parts": job.total_parts,
"parts_map": p_status
}