Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import asyncio | |
| import logging | |
| import shutil | |
| import glob | |
| import subprocess | |
| import math | |
| import random | |
| from datetime import datetime, timedelta | |
| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.responses import JSONResponse | |
| from sqlalchemy import create_engine, Column, String, Integer, PickleType, DateTime, Boolean, JSON | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker | |
| import httpx | |
| from pydub import AudioSegment | |
| from pydub.silence import detect_silence | |
| from PIL import Image, ImageDraw, ImageFont | |
| # --- تنظیمات --- | |
| # لیست کامل ۱۶ کارگر | |
| WORKER_URLS = [ | |
| "https://ezmary-avatarworker1.hf.space", | |
| "https://ezmary-avatarworker2.hf.space", | |
| "https://ezmary-avatarworker3.hf.space", | |
| "https://ezmary-avatarworker4.hf.space", | |
| "https://ezmary-avatarworker5.hf.space", | |
| "https://ezmary-avatarworker6.hf.space", | |
| "https://ezmary-avatarworker7.hf.space", | |
| "https://ezmary-avatarworker8.hf.space", | |
| "https://ezmary-avatarworker9.hf.space", | |
| "https://ezmary-avatarworker10.hf.space", | |
| "https://ezmary-avatarworker11.hf.space", | |
| "https://ezmary-avatarworker12.hf.space", | |
| "https://ezmary-avatarworker13.hf.space", | |
| "https://ezmary-avatarworker14.hf.space", | |
| "https://ezmary-avatarworker15.hf.space", | |
| "https://ezmary-avatarworker16.hf.space" | |
| ] | |
| MANAGER_BASE_URL = "https://ezmary-avatarsokhango.hf.space" | |
| # --- تنظیمات برش هوشمند (۱۲ تا ۱۹ ثانیه) --- | |
| MIN_CUT_TIME = 12000 # ۱۲ ثانیه | |
| MAX_CUT_TIME = 19000 # ۱۹ ثانیه | |
| SILENCE_THRESH = -35 # حساسیت کمتر برای پیدا کردن راحتتر سکوت | |
| SILENCE_MIN_LEN = 250 # کاهش حداقل زمان سکوت برای پیدا کردن برشهای بیشتر | |
| # --- تنظیمات سگ نگهبان --- | |
| WATCHDOG_CHECK_INTERVAL = 60 # هر ۱ دقیقه چک کن | |
| WATCHDOG_TIMEOUT_THRESHOLD = 7200 # ۲ ساعت | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # قفل دیتابیس | |
| callback_lock = asyncio.Lock() | |
| # ساخت پوشهها | |
| for path in ["static/uploads", "static/temp", "static/videos", "data", "static/assets"]: | |
| os.makedirs(path, exist_ok=True) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # --- دیتابیس --- | |
| SQLALCHEMY_DATABASE_URL = "sqlite:///./data/jobs_final.db" | |
| engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base = declarative_base() | |
| class Job(Base): | |
| __tablename__ = "jobs" | |
| id = Column(String, primary_key=True, index=True) | |
| status = Column(String, default="PROCESSING") | |
| resolution = Column(String) | |
| is_premium = Column(Boolean, default=False) | |
| original_image_path = Column(String) | |
| original_audio_path = Column(String) | |
| total_parts = Column(Integer, default=1) | |
| parts_status = Column(JSON, default={}) | |
| audio_parts = Column(PickleType, default=[]) | |
| video_parts = Column(JSON, default={}) | |
| final_video_url = Column(String, nullable=True) | |
| message = Column(String, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| last_activity = Column(DateTime, default=datetime.utcnow) | |
| Base.metadata.create_all(bind=engine) | |
| # --- توابع واترمارک --- | |
| def create_watermark_image(text, output_path): | |
| try: | |
| width, height = 250, 60 | |
| img = Image.new('RGBA', (width, height), (255, 255, 255, 0)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 20) | |
| except: | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/noto/NotoSansArabic-Bold.ttf", 20) | |
| except: | |
| font = ImageFont.load_default() | |
| text_color = (255, 255, 255, 70) | |
| stroke_color = (0, 0, 0, 80) | |
| text_bbox = draw.textbbox((0, 0), text, font=font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| position = ((width - text_width) / 2, (height - text_height) / 2) | |
| draw.text(position, text, font=font, fill=text_color, stroke_width=1, stroke_fill=stroke_color) | |
| img.save(output_path, "PNG") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Watermark creation failed: {e}") | |
| return False | |
| def apply_watermark(input_video, watermark_img, output_video): | |
| try: | |
| speed = 8 | |
| expr_x = ( | |
| f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),6), (W-w-10) - {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),9), (W-w-10) - {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),12), 10 + {speed}*mod(t,3), " | |
| "(W-w)/2 + 40*sin(t)))))" | |
| ) | |
| expr_y = ( | |
| f"if(lt(mod(t,15),3), 10 + {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),6), (H-h-10) - {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),9), 10 + {speed}*mod(t,3), " | |
| f"if(lt(mod(t,15),12), (H-h-10) - {speed}*mod(t,3), " | |
| "(H-h)/2 + 20*cos(t)))))" | |
| ) | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", input_video, "-i", watermark_img, | |
| "-filter_complex", f"[0:v][1:v]overlay=x='{expr_x}':y='{expr_y}'", | |
| "-c:a", "copy", | |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "24", | |
| output_video | |
| ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Apply watermark failed: {e}") | |
| return False | |
| # --- توابع پردازش هوشمند صدا (برش دقیق) --- | |
| def smart_split_audio(file_path, job_id): | |
| try: | |
| audio = AudioSegment.from_file(file_path) | |
| total_len = len(audio) | |
| parts = [] | |
| start = 0 | |
| part_idx = 0 | |
| while start < total_len: | |
| if total_len - start <= MAX_CUT_TIME: | |
| chunk = audio[start:] | |
| out_name = f"static/uploads/{job_id}_seg{part_idx}.wav" | |
| chunk.export(out_name, format="wav") | |
| parts.append(out_name) | |
| break | |
| cut_point = 0 | |
| found_cut = False | |
| search_start = start + 14000 | |
| search_end = min(start + MAX_CUT_TIME, total_len) | |
| if search_end > search_start: | |
| chunk_search = audio[search_start:search_end] | |
| silences = detect_silence(chunk_search, min_silence_len=SILENCE_MIN_LEN, silence_thresh=SILENCE_THRESH) | |
| if silences: | |
| s = silences[0] | |
| mid = s[0] + (s[1] - s[0]) // 2 | |
| cut_point = search_start + mid | |
| found_cut = True | |
| if not found_cut: | |
| search_start_safe = start + 9000 | |
| search_end_safe = start + 14000 | |
| chunk_safe = audio[search_start_safe:search_end_safe] | |
| silences_safe = detect_silence(chunk_safe, min_silence_len=200, silence_thresh=SILENCE_THRESH) | |
| if silences_safe: | |
| s = silences_safe[0] | |
| mid = s[0] + (s[1] - s[0]) // 2 | |
| cut_point = search_start_safe + mid | |
| found_cut = True | |
| if not found_cut: | |
| cut_point = start + MAX_CUT_TIME | |
| chunk = audio[start:cut_point] | |
| out_name = f"static/uploads/{job_id}_seg{part_idx}.wav" | |
| chunk.export(out_name, format="wav") | |
| parts.append(out_name) | |
| start = cut_point | |
| part_idx += 1 | |
| return parts | |
| except Exception as e: | |
| logger.error(f"Smart Split Error: {e}") | |
| return fallback_split(file_path, job_id) | |
| def fallback_split(input_path, job_id): | |
| try: | |
| cmd = ["ffmpeg", "-i", input_path, "-f", "segment", "-segment_time", "14", "-c", "copy", f"static/uploads/{job_id}_seg%d.wav"] | |
| subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return sorted(glob.glob(f"static/uploads/{job_id}_seg*.wav")) | |
| except: return [] | |
| def convert_audio_to_wav(input_path, output_path): | |
| try: | |
| subprocess.run(["ffmpeg", "-y", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "44100", "-ac", "2", output_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| return True | |
| except: return False | |
| def merge_videos_smart(video_map, total_parts, output_path): | |
| try: | |
| videos = [] | |
| for i in range(total_parts): | |
| if str(i) in video_map: | |
| videos.append(video_map[str(i)]) | |
| else: | |
| return False | |
| if not videos: return False | |
| if len(videos) == 1: | |
| shutil.copy(videos[0], output_path) | |
| return True | |
| with open("list.txt", "w") as f: | |
| for v in videos: | |
| f.write(f"file '{os.path.abspath(v)}'\n") | |
| subprocess.run([ | |
| "ffmpeg", "-y", | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", "list.txt", | |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "24", | |
| "-c:a", "aac", | |
| output_path | |
| ], check=True) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Merge Error: {e}") | |
| return False | |
| # --- توزیع کار --- | |
| async def dispatch_all_parallel(job_id, img_path, audio_files, resolution): | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| tasks = [] | |
| full_img = f"{MANAGER_BASE_URL}/{img_path}" | |
| callback = f"{MANAGER_BASE_URL}/api/callback" | |
| num_workers = len(WORKER_URLS) | |
| if num_workers == 0: return | |
| for i, audio_file in enumerate(audio_files): | |
| worker = WORKER_URLS[i % num_workers] | |
| full_aud = f"{MANAGER_BASE_URL}/{audio_file}" | |
| combined_id = f"{job_id}___{i}" | |
| payload = { | |
| "job_id": combined_id, | |
| "image_url": full_img, | |
| "audio_url": full_aud, | |
| "resolution": resolution, | |
| "callback_url": callback | |
| } | |
| logger.info(f"🚀 Dispatching Part {i+1} to {worker}") | |
| tasks.append(client.post(f"{worker}/process", json=payload)) | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| # --- سگ نگهبان --- | |
| async def watchdog_loop(): | |
| while True: | |
| try: | |
| await asyncio.sleep(WATCHDOG_CHECK_INTERVAL) | |
| db = SessionLocal() | |
| processing_jobs = db.query(Job).filter(Job.status == "PROCESSING").all() | |
| for job in processing_jobs: | |
| now = datetime.utcnow() | |
| time_diff = (now - job.last_activity).total_seconds() | |
| if time_diff > WATCHDOG_TIMEOUT_THRESHOLD: | |
| p_status = dict(job.parts_status) | |
| retry_needed = False | |
| for idx_str, status in p_status.items(): | |
| if status == "PENDING": | |
| logger.warning(f"⚠️ Watchdog: Retrying Part {idx_str} (> 2 hours)...") | |
| current_audio = job.audio_parts[int(idx_str)] | |
| worker = random.choice(WORKER_URLS) | |
| full_img = f"{MANAGER_BASE_URL}/{job.original_image_path}" | |
| full_aud = f"{MANAGER_BASE_URL}/{current_audio}" | |
| callback = f"{MANAGER_BASE_URL}/api/callback" | |
| combined_id = f"{job.id}___{idx_str}" | |
| payload = { | |
| "job_id": combined_id, "image_url": full_img, "audio_url": full_aud, | |
| "resolution": job.resolution, "callback_url": callback | |
| } | |
| async with httpx.AsyncClient(timeout=5.0) as client: | |
| try: await client.post(f"{worker}/process", json=payload) | |
| except: pass | |
| retry_needed = True | |
| if retry_needed: | |
| job.last_activity = datetime.utcnow() | |
| db.commit() | |
| db.close() | |
| except Exception as e: | |
| logger.error(f"Watchdog Error: {e}") | |
| async def startup_event(): | |
| asyncio.create_task(watchdog_loop()) | |
| # --- روتها --- | |
| async def home(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def create_job(background_tasks: BackgroundTasks, image: UploadFile = File(...), audio: UploadFile = File(...), resolution: str = Form(...), subscriptionStatus: str = Form(default="free")): | |
| job_id = str(uuid.uuid4()) | |
| is_premium = (subscriptionStatus == 'paid') | |
| final_res = resolution if is_premium else "480P" | |
| img_ext = image.filename.split('.')[-1] | |
| img_path = f"static/uploads/{job_id}.{img_ext}" | |
| temp_audio = f"static/uploads/{job_id}_temp" | |
| final_audio = f"static/uploads/{job_id}_original.wav" | |
| with open(img_path, "wb") as f: f.write(await image.read()) | |
| with open(temp_audio, "wb") as f: f.write(await audio.read()) | |
| if not convert_audio_to_wav(temp_audio, final_audio): | |
| return JSONResponse(status_code=400, content={"error": "Audio conversion failed"}) | |
| # برش صدا | |
| audio_parts = smart_split_audio(final_audio, job_id) | |
| # --- منطق جدید برای کاربران رایگان --- | |
| # اگر کاربر رایگان است و تعداد پارتها بیشتر از ۱ است، فقط پارت اول را نگه دار | |
| if not is_premium and len(audio_parts) > 1: | |
| # حذف فایلهای اضافی از حافظه | |
| for part in audio_parts[1:]: | |
| try: | |
| os.remove(part) | |
| except: | |
| pass | |
| # فقط پارت اول (که زیر 19 ثانیه است) را نگه دار | |
| audio_parts = [audio_parts[0]] | |
| initial_msg = "شروع پردازش (محدود به ۱۹ ثانیه نسخه رایگان)..." | |
| else: | |
| initial_msg = f"شروع پردازش همزمان {len(audio_parts)} قسمت..." | |
| total_parts = len(audio_parts) | |
| parts_status_init = {str(i): "PENDING" for i in range(total_parts)} | |
| db = SessionLocal() | |
| new_job = Job( | |
| id=job_id, status="PROCESSING", resolution=final_res, is_premium=is_premium, | |
| original_image_path=img_path, original_audio_path=final_audio, | |
| total_parts=total_parts, parts_status=parts_status_init, | |
| audio_parts=audio_parts, video_parts={}, | |
| message=initial_msg, | |
| last_activity=datetime.utcnow() | |
| ) | |
| db.add(new_job) | |
| db.commit() | |
| db.close() | |
| background_tasks.add_task(dispatch_all_parallel, job_id, img_path, audio_parts, final_res) | |
| return {"status": "success", "job_id": job_id, "total_parts": total_parts} | |
| async def callback(data: dict, background_tasks: BackgroundTasks): | |
| combined_id = data.get("job_id") | |
| status = data.get("status") | |
| video_url = data.get("video_url") | |
| if "___" in combined_id: | |
| real_job_id, part_index_str = combined_id.split("___") | |
| part_index = int(part_index_str) | |
| else: | |
| return {"ack": False} | |
| async with callback_lock: | |
| db = SessionLocal() | |
| job = db.query(Job).filter(Job.id == real_job_id).first() | |
| if not job: | |
| db.close() | |
| return {"error": "Job not found"} | |
| job.last_activity = datetime.utcnow() | |
| if status == "FAILED": | |
| p_status = dict(job.parts_status) | |
| p_status[str(part_index)] = "FAILED" | |
| job.parts_status = p_status | |
| db.commit() | |
| db.close() | |
| return {"ack": True} | |
| current_vid_path = f"static/temp/{real_job_id}_part_{part_index}.mp4" | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(video_url, follow_redirects=True, timeout=None) | |
| if resp.status_code == 200: | |
| with open(current_vid_path, "wb") as f: f.write(resp.content) | |
| else: raise Exception("Download failed") | |
| except: | |
| db.close() | |
| return {"ack": False} | |
| v_parts = dict(job.video_parts) | |
| v_parts[str(part_index)] = current_vid_path | |
| job.video_parts = v_parts | |
| p_status = dict(job.parts_status) | |
| p_status[str(part_index)] = "DONE" | |
| job.parts_status = p_status | |
| done_count = sum(1 for s in p_status.values() if s == "DONE") | |
| if done_count < job.total_parts: | |
| job.message = f"تکمیل {done_count} از {job.total_parts} قسمت..." | |
| db.commit() | |
| else: | |
| job.message = "در حال میکس نهایی ویدیوها..." | |
| db.commit() | |
| final_output = f"static/videos/{real_job_id}_final.mp4" | |
| temp_merged = f"static/temp/{real_job_id}_merged.mp4" | |
| if merge_videos_smart(v_parts, job.total_parts, temp_merged): | |
| if not job.is_premium: | |
| job.message = "در حال افزودن واترمارک..." | |
| db.commit() | |
| watermark_path = "static/assets/watermark.png" | |
| if not os.path.exists(watermark_path): | |
| create_watermark_image("هوش مصنوعی آلفا", watermark_path) | |
| if apply_watermark(temp_merged, watermark_path, final_output): | |
| pass | |
| else: | |
| shutil.copy(temp_merged, final_output) | |
| else: | |
| shutil.copy(temp_merged, final_output) | |
| job.status = "COMPLETED" | |
| job.message = "تکمیل شد!" | |
| job.final_video_url = f"/static/videos/{real_job_id}_final.mp4" | |
| try: | |
| for f in glob.glob(f"static/temp/{real_job_id}_*"): os.remove(f) | |
| for f in glob.glob(f"static/uploads/{real_job_id}_seg*"): os.remove(f) | |
| except: pass | |
| else: | |
| job.status = "FAILED" | |
| job.message = "خطا در ادغام ویدیوها" | |
| db.commit() | |
| db.close() | |
| return {"ack": True} | |
| def get_status(job_id: str): | |
| db = SessionLocal() | |
| job = db.query(Job).filter(Job.id == job_id).first() | |
| db.close() | |
| if not job: return JSONResponse(status_code=404, content={"status": "not_found"}) | |
| p_status = job.parts_status | |
| done_count = sum(1 for s in p_status.values() if s == "DONE") | |
| return { | |
| "status": job.status, | |
| "message": job.message, | |
| "video_url": job.final_video_url, | |
| "is_premium": job.is_premium, | |
| "current_part": done_count, | |
| "total_parts": job.total_parts, | |
| "parts_map": p_status | |
| } |