import os import uuid import asyncio import logging import shutil import glob import subprocess import math import random from datetime import datetime, timedelta from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import JSONResponse from sqlalchemy import create_engine, Column, String, Integer, PickleType, DateTime, Boolean, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import httpx from pydub import AudioSegment from pydub.silence import detect_silence from PIL import Image, ImageDraw, ImageFont # --- تنظیمات --- # لیست کامل ۱۶ کارگر WORKER_URLS = [ "https://ezmary-avatarworker1.hf.space", "https://ezmary-avatarworker2.hf.space", "https://ezmary-avatarworker3.hf.space", "https://ezmary-avatarworker4.hf.space", "https://ezmary-avatarworker5.hf.space", "https://ezmary-avatarworker6.hf.space", "https://ezmary-avatarworker7.hf.space", "https://ezmary-avatarworker8.hf.space", "https://ezmary-avatarworker9.hf.space", "https://ezmary-avatarworker10.hf.space", "https://ezmary-avatarworker11.hf.space", "https://ezmary-avatarworker12.hf.space", "https://ezmary-avatarworker13.hf.space", "https://ezmary-avatarworker14.hf.space", "https://ezmary-avatarworker15.hf.space", "https://ezmary-avatarworker16.hf.space" ] MANAGER_BASE_URL = "https://ezmary-avatarsokhango.hf.space" # --- تنظیمات برش هوشمند (۱۲ تا ۱۹ ثانیه) --- MIN_CUT_TIME = 12000 # ۱۲ ثانیه MAX_CUT_TIME = 19000 # ۱۹ ثانیه SILENCE_THRESH = -35 # حساسیت کمتر برای پیدا کردن راحت‌تر سکوت SILENCE_MIN_LEN = 250 # کاهش حداقل زمان سکوت برای پیدا کردن برش‌های بیشتر # --- تنظیمات سگ نگهبان --- WATCHDOG_CHECK_INTERVAL = 60 # هر ۱ دقیقه چک کن WATCHDOG_TIMEOUT_THRESHOLD = 7200 # ۲ ساعت logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # قفل دیتابیس callback_lock = asyncio.Lock() # ساخت پوشه‌ها for path in ["static/uploads", "static/temp", "static/videos", "data", "static/assets"]: os.makedirs(path, exist_ok=True) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # --- دیتابیس --- SQLALCHEMY_DATABASE_URL = "sqlite:///./data/jobs_final.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() class Job(Base): __tablename__ = "jobs" id = Column(String, primary_key=True, index=True) status = Column(String, default="PROCESSING") resolution = Column(String) is_premium = Column(Boolean, default=False) original_image_path = Column(String) original_audio_path = Column(String) total_parts = Column(Integer, default=1) parts_status = Column(JSON, default={}) audio_parts = Column(PickleType, default=[]) video_parts = Column(JSON, default={}) final_video_url = Column(String, nullable=True) message = Column(String, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) last_activity = Column(DateTime, default=datetime.utcnow) Base.metadata.create_all(bind=engine) # --- توابع واترمارک --- def create_watermark_image(text, output_path): try: width, height = 250, 60 img = Image.new('RGBA', (width, height), (255, 255, 255, 0)) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 20) except: try: font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoSansArabic-Bold.ttf", 20) except: font = ImageFont.load_default() text_color = (255, 255, 255, 70) stroke_color = (0, 0, 0, 80) text_bbox = draw.textbbox((0, 0), text, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] position = ((width - text_width) / 2, (height - text_height) / 2) draw.text(position, text, font=font, fill=text_color, stroke_width=1, stroke_fill=stroke_color) img.save(output_path, "PNG") return True except Exception as e: logger.error(f"Watermark creation failed: {e}") return False def apply_watermark(input_video, watermark_img, output_video): try: speed = 8 expr_x = ( f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), " f"if(lt(mod(t,15),6), (W-w-10) - {speed}*mod(t,3), " f"if(lt(mod(t,15),9), (W-w-10) - {speed}*mod(t,3), " f"if(lt(mod(t,15),12), 10 + {speed}*mod(t,3), " "(W-w)/2 + 40*sin(t)))))" ) expr_y = ( f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), " f"if(lt(mod(t,15),6), (H-h-10) - {speed}*mod(t,3), " f"if(lt(mod(t,15),9), 10 + {speed}*mod(t,3), " f"if(lt(mod(t,15),12), (H-h-10) - {speed}*mod(t,3), " "(H-h)/2 + 20*cos(t)))))" ) subprocess.run([ "ffmpeg", "-y", "-i", input_video, "-i", watermark_img, "-filter_complex", f"[0:v][1:v]overlay=x='{expr_x}':y='{expr_y}'", "-c:a", "copy", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "24", output_video ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except Exception as e: logger.error(f"Apply watermark failed: {e}") return False # --- توابع پردازش هوشمند صدا (برش دقیق) --- def smart_split_audio(file_path, job_id): try: audio = AudioSegment.from_file(file_path) total_len = len(audio) parts = [] start = 0 part_idx = 0 while start < total_len: if total_len - start <= MAX_CUT_TIME: chunk = audio[start:] out_name = f"static/uploads/{job_id}_seg{part_idx}.wav" chunk.export(out_name, format="wav") parts.append(out_name) break cut_point = 0 found_cut = False search_start = start + 14000 search_end = min(start + MAX_CUT_TIME, total_len) if search_end > search_start: chunk_search = audio[search_start:search_end] silences = detect_silence(chunk_search, min_silence_len=SILENCE_MIN_LEN, silence_thresh=SILENCE_THRESH) if silences: s = silences[0] mid = s[0] + (s[1] - s[0]) // 2 cut_point = search_start + mid found_cut = True if not found_cut: search_start_safe = start + 9000 search_end_safe = start + 14000 chunk_safe = audio[search_start_safe:search_end_safe] silences_safe = detect_silence(chunk_safe, min_silence_len=200, silence_thresh=SILENCE_THRESH) if silences_safe: s = silences_safe[0] mid = s[0] + (s[1] - s[0]) // 2 cut_point = search_start_safe + mid found_cut = True if not found_cut: cut_point = start + MAX_CUT_TIME chunk = audio[start:cut_point] out_name = f"static/uploads/{job_id}_seg{part_idx}.wav" chunk.export(out_name, format="wav") parts.append(out_name) start = cut_point part_idx += 1 return parts except Exception as e: logger.error(f"Smart Split Error: {e}") return fallback_split(file_path, job_id) def fallback_split(input_path, job_id): try: cmd = ["ffmpeg", "-i", input_path, "-f", "segment", "-segment_time", "14", "-c", "copy", f"static/uploads/{job_id}_seg%d.wav"] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return sorted(glob.glob(f"static/uploads/{job_id}_seg*.wav")) except: return [] def convert_audio_to_wav(input_path, output_path): try: subprocess.run(["ffmpeg", "-y", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", output_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except: return False def merge_videos_smart(video_map, total_parts, output_path): try: videos = [] for i in range(total_parts): if str(i) in video_map: videos.append(video_map[str(i)]) else: return False if not videos: return False if len(videos) == 1: shutil.copy(videos[0], output_path) return True with open("list.txt", "w") as f: for v in videos: f.write(f"file '{os.path.abspath(v)}'\n") subprocess.run([ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "list.txt", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "24", "-c:a", "aac", output_path ], check=True) return True except Exception as e: logger.error(f"Merge Error: {e}") return False # --- توزیع کار --- async def dispatch_all_parallel(job_id, img_path, audio_files, resolution): async with httpx.AsyncClient(timeout=10.0) as client: tasks = [] full_img = f"{MANAGER_BASE_URL}/{img_path}" callback = f"{MANAGER_BASE_URL}/api/callback" num_workers = len(WORKER_URLS) if num_workers == 0: return for i, audio_file in enumerate(audio_files): worker = WORKER_URLS[i % num_workers] full_aud = f"{MANAGER_BASE_URL}/{audio_file}" combined_id = f"{job_id}___{i}" payload = { "job_id": combined_id, "image_url": full_img, "audio_url": full_aud, "resolution": resolution, "callback_url": callback } logger.info(f"🚀 Dispatching Part {i+1} to {worker}") tasks.append(client.post(f"{worker}/process", json=payload)) await asyncio.gather(*tasks, return_exceptions=True) # --- سگ نگهبان --- async def watchdog_loop(): while True: try: await asyncio.sleep(WATCHDOG_CHECK_INTERVAL) db = SessionLocal() processing_jobs = db.query(Job).filter(Job.status == "PROCESSING").all() for job in processing_jobs: now = datetime.utcnow() time_diff = (now - job.last_activity).total_seconds() if time_diff > WATCHDOG_TIMEOUT_THRESHOLD: p_status = dict(job.parts_status) retry_needed = False for idx_str, status in p_status.items(): if status == "PENDING": logger.warning(f"⚠️ Watchdog: Retrying Part {idx_str} (> 2 hours)...") current_audio = job.audio_parts[int(idx_str)] worker = random.choice(WORKER_URLS) full_img = f"{MANAGER_BASE_URL}/{job.original_image_path}" full_aud = f"{MANAGER_BASE_URL}/{current_audio}" callback = f"{MANAGER_BASE_URL}/api/callback" combined_id = f"{job.id}___{idx_str}" payload = { "job_id": combined_id, "image_url": full_img, "audio_url": full_aud, "resolution": job.resolution, "callback_url": callback } async with httpx.AsyncClient(timeout=5.0) as client: try: await client.post(f"{worker}/process", json=payload) except: pass retry_needed = True if retry_needed: job.last_activity = datetime.utcnow() db.commit() db.close() except Exception as e: logger.error(f"Watchdog Error: {e}") @app.on_event("startup") async def startup_event(): asyncio.create_task(watchdog_loop()) # --- روت‌ها --- @app.get("/") async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/api/generate") async def create_job(background_tasks: BackgroundTasks, image: UploadFile = File(...), audio: UploadFile = File(...), resolution: str = Form(...), subscriptionStatus: str = Form(default="free")): job_id = str(uuid.uuid4()) is_premium = (subscriptionStatus == 'paid') final_res = resolution if is_premium else "480P" img_ext = image.filename.split('.')[-1] img_path = f"static/uploads/{job_id}.{img_ext}" temp_audio = f"static/uploads/{job_id}_temp" final_audio = f"static/uploads/{job_id}_original.wav" with open(img_path, "wb") as f: f.write(await image.read()) with open(temp_audio, "wb") as f: f.write(await audio.read()) if not convert_audio_to_wav(temp_audio, final_audio): return JSONResponse(status_code=400, content={"error": "Audio conversion failed"}) # برش صدا audio_parts = smart_split_audio(final_audio, job_id) # --- منطق جدید برای کاربران رایگان --- # اگر کاربر رایگان است و تعداد پارت‌ها بیشتر از ۱ است، فقط پارت اول را نگه دار if not is_premium and len(audio_parts) > 1: # حذف فایل‌های اضافی از حافظه for part in audio_parts[1:]: try: os.remove(part) except: pass # فقط پارت اول (که زیر 19 ثانیه است) را نگه دار audio_parts = [audio_parts[0]] initial_msg = "شروع پردازش (محدود به ۱۹ ثانیه نسخه رایگان)..." else: initial_msg = f"شروع پردازش همزمان {len(audio_parts)} قسمت..." total_parts = len(audio_parts) parts_status_init = {str(i): "PENDING" for i in range(total_parts)} db = SessionLocal() new_job = Job( id=job_id, status="PROCESSING", resolution=final_res, is_premium=is_premium, original_image_path=img_path, original_audio_path=final_audio, total_parts=total_parts, parts_status=parts_status_init, audio_parts=audio_parts, video_parts={}, message=initial_msg, last_activity=datetime.utcnow() ) db.add(new_job) db.commit() db.close() background_tasks.add_task(dispatch_all_parallel, job_id, img_path, audio_parts, final_res) return {"status": "success", "job_id": job_id, "total_parts": total_parts} @app.post("/api/callback") async def callback(data: dict, background_tasks: BackgroundTasks): combined_id = data.get("job_id") status = data.get("status") video_url = data.get("video_url") if "___" in combined_id: real_job_id, part_index_str = combined_id.split("___") part_index = int(part_index_str) else: return {"ack": False} async with callback_lock: db = SessionLocal() job = db.query(Job).filter(Job.id == real_job_id).first() if not job: db.close() return {"error": "Job not found"} job.last_activity = datetime.utcnow() if status == "FAILED": p_status = dict(job.parts_status) p_status[str(part_index)] = "FAILED" job.parts_status = p_status db.commit() db.close() return {"ack": True} current_vid_path = f"static/temp/{real_job_id}_part_{part_index}.mp4" try: async with httpx.AsyncClient() as client: resp = await client.get(video_url, follow_redirects=True, timeout=None) if resp.status_code == 200: with open(current_vid_path, "wb") as f: f.write(resp.content) else: raise Exception("Download failed") except: db.close() return {"ack": False} v_parts = dict(job.video_parts) v_parts[str(part_index)] = current_vid_path job.video_parts = v_parts p_status = dict(job.parts_status) p_status[str(part_index)] = "DONE" job.parts_status = p_status done_count = sum(1 for s in p_status.values() if s == "DONE") if done_count < job.total_parts: job.message = f"تکمیل {done_count} از {job.total_parts} قسمت..." db.commit() else: job.message = "در حال میکس نهایی ویدیوها..." db.commit() final_output = f"static/videos/{real_job_id}_final.mp4" temp_merged = f"static/temp/{real_job_id}_merged.mp4" if merge_videos_smart(v_parts, job.total_parts, temp_merged): if not job.is_premium: job.message = "در حال افزودن واترمارک..." db.commit() watermark_path = "static/assets/watermark.png" if not os.path.exists(watermark_path): create_watermark_image("هوش مصنوعی آلفا", watermark_path) if apply_watermark(temp_merged, watermark_path, final_output): pass else: shutil.copy(temp_merged, final_output) else: shutil.copy(temp_merged, final_output) job.status = "COMPLETED" job.message = "تکمیل شد!" job.final_video_url = f"/static/videos/{real_job_id}_final.mp4" try: for f in glob.glob(f"static/temp/{real_job_id}_*"): os.remove(f) for f in glob.glob(f"static/uploads/{real_job_id}_seg*"): os.remove(f) except: pass else: job.status = "FAILED" job.message = "خطا در ادغام ویدیوها" db.commit() db.close() return {"ack": True} @app.get("/api/status/{job_id}") def get_status(job_id: str): db = SessionLocal() job = db.query(Job).filter(Job.id == job_id).first() db.close() if not job: return JSONResponse(status_code=404, content={"status": "not_found"}) p_status = job.parts_status done_count = sum(1 for s in p_status.values() if s == "DONE") return { "status": job.status, "message": job.message, "video_url": job.final_video_url, "is_premium": job.is_premium, "current_part": done_count, "total_parts": job.total_parts, "parts_map": p_status }