import base64 import os import pathlib import re import shutil import subprocess import sys import tempfile import threading import time import traceback import gradio as gr import requests as http_requests from huggingface_hub import hf_hub_download from PIL import Image import spaces ROOT = pathlib.Path(__file__).resolve().parent DEFAULT_ENHANCE_BUDGET = 80 SULPHUR_REPO = "SulphurAI/Sulphur-2-base" SULPHUR_MODEL_FILE = "prompt_enhancer_uncensored/prompt_enhancer_uncensored-q8_0.gguf" SULPHUR_MMPROJ_FILE = "prompt_enhancer_uncensored/mmproj-prompt_enhancer_uncensored.gguf" SULPHUR_MODEL_DIR = ROOT / "sulphur_enhancer" SULPHUR_MODEL_PATH = SULPHUR_MODEL_DIR / "prompt_enhancer_uncensored-q8_0.gguf" SULPHUR_MMPROJ_PATH = SULPHUR_MODEL_DIR / "mmproj-prompt_enhancer_uncensored.gguf" LLAMA_CPP_DIR = ROOT / "llama.cpp" LLAMA_SERVER_BIN = LLAMA_CPP_DIR / "build" / "bin" / "llama-server" CACHE_REPO = "signsur4739379373/ltx-dependencies" CACHE_BINARY_FILENAME = "llama-server" CACHE_LIBS_TARBALL = "llama-server-libs.tar.gz" CACHED_BINARY_PATH = ROOT / "llama-server-cached" CACHED_LIBS_DIR = ROOT / "llama-server-libs" _enhancer_ready = False _enhancer_lock = threading.Lock() _enhancer_server_proc = None ENHANCER_PORT = 18642 LOG_PATH = ROOT / "llama_server.log" def _server_binary_path() -> pathlib.Path: if CACHED_BINARY_PATH.exists(): return CACHED_BINARY_PATH return LLAMA_SERVER_BIN def _have_server_artifacts() -> bool: return False def _pull_cached_binary() -> bool: return False def _push_cached_binary() -> None: pass def _find_cuda13_lib_dir() -> pathlib.Path | None: candidates = [ "/cuda-image/usr/local/cuda-13.0/targets/x86_64-linux/lib", "/cuda-image/usr/local/cuda-13.0/lib64", "/usr/local/cuda-13.0/targets/x86_64-linux/lib", "/usr/local/cuda-13.0/lib64", "/usr/local/cuda/targets/x86_64-linux/lib", "/usr/local/cuda/lib64", "/usr/local/cuda/lib", "/usr/lib/x86_64-linux-gnu", ] for c in candidates: p = pathlib.Path(c) if (p / "libcudart.so").exists() or list(p.glob("libcudart.so*")): return p return None def _run(cmd: list[str], cwd: pathlib.Path | None = None, check: bool = True) -> subprocess.CompletedProcess: print("[setup]", " ".join(cmd), flush=True) return subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=check) def _build_llama_cpp() -> None: print("[enhancer] building llama.cpp from source specifically for this machine...", flush=True) if not LLAMA_CPP_DIR.exists(): _run(["git", "clone", "--depth", "1", "https://github.com/ggml-org/llama.cpp.git", str(LLAMA_CPP_DIR)]) cuda_lib = _find_cuda13_lib_dir() env = dict(os.environ) if cuda_lib: print(f"[enhancer] Linker targeting CUDA paths at {cuda_lib}", flush=True) env["LD_LIBRARY_PATH"] = f"{cuda_lib}:{env.get('LD_LIBRARY_PATH','')}" env["LIBRARY_PATH"] = f"{cuda_lib}:{env.get('LIBRARY_PATH','')}" def _run_env(cmd: list[str]) -> None: subprocess.run(cmd, cwd=str(LLAMA_CPP_DIR), check=True, env=env) shutil.rmtree(LLAMA_CPP_DIR / "build", ignore_errors=True) cmake_flags = [ "cmake", "-B", "build", "-DCMAKE_BUILD_TYPE=Release", "-DLLAMA_BUILD_TESTS=OFF", "-DLLAMA_BUILD_EXAMPLES=OFF", "-DLLAMA_BUILD_TOOLS=ON", "-DLLAMA_CURL=OFF" ] if cuda_lib: cmake_flags += [ "-DGGML_CUDA=ON", "-DCMAKE_CUDA_ARCHITECTURES=86", f"-DCMAKE_EXE_LINKER_FLAGS=-L{cuda_lib} -lcudart -Wl,-rpath,{cuda_lib}", f"-DCMAKE_SHARED_LINKER_FLAGS=-L{cuda_lib} -lcudart -Wl,-rpath,{cuda_lib}" ] _run_env(cmake_flags) try: _run_env(["cmake", "--build", "build", "--config", "Release", "--target", "llama-server", "-j2"]) except subprocess.CalledProcessError: _run_env(["cmake", "--build", "build", "--config", "Release", "--target", "llama-server", "-j1"]) CACHED_LIBS_DIR.mkdir(parents=True, exist_ok=True) for so in (LLAMA_CPP_DIR / "build" / "bin").glob("*.so*"): shutil.copy2(so, CACHED_LIBS_DIR / so.name) if cuda_lib: for pattern in ("libcudart.so*", "libcublas.so*", "libcublasLt.so*"): for so in cuda_lib.glob(pattern): if not (CACHED_LIBS_DIR / so.name).exists(): shutil.copy2(so, CACHED_LIBS_DIR / so.name) def _ensure_llama_server() -> None: if CACHED_BINARY_PATH.exists(): try: CACHED_BINARY_PATH.unlink() except: pass if CACHED_LIBS_DIR.exists(): try: shutil.rmtree(CACHED_LIBS_DIR) except: pass if LLAMA_SERVER_BIN.exists(): return _build_llama_cpp() def _ensure_enhancer() -> None: global _enhancer_ready if _enhancer_ready: return try: _ensure_llama_server() SULPHUR_MODEL_DIR.mkdir(parents=True, exist_ok=True) token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") for file_path, dest in [(SULPHUR_MODEL_FILE, SULPHUR_MODEL_PATH), (SULPHUR_MMPROJ_FILE, SULPHUR_MMPROJ_PATH)]: if dest.exists(): continue downloaded = pathlib.Path(hf_hub_download(repo_id=SULPHUR_REPO, filename=file_path, local_dir=str(SULPHUR_MODEL_DIR), token=token)) if downloaded.resolve() != dest.resolve(): shutil.move(str(downloaded), str(dest)) _enhancer_ready = True except Exception as e: print(f"[enhancer] setup failed ({type(e).__name__}: {e})", flush=True) _enhancer_ready = False def _start_enhancer_server() -> None: global _enhancer_server_proc if _enhancer_server_proc is not None: try: _enhancer_server_proc.poll() if _enhancer_server_proc.returncode is None: return except Exception: pass server_bin = _server_binary_path() server_env = dict(os.environ) built_libs = str(LLAMA_CPP_DIR / "build" / "bin") server_env["LD_LIBRARY_PATH"] = f"{built_libs}:{server_env.get('LD_LIBRARY_PATH','')}" log_file = open(LOG_PATH, "w", encoding="utf-8") _enhancer_server_proc = subprocess.Popen([ str(server_bin), "-m", str(SULPHUR_MODEL_PATH), "--mmproj", str(SULPHUR_MMPROJ_PATH), "-ngl", "99", "-c", "8192", "--flash-attn", "off", "--host", "127.0.0.1", "--port", str(ENHANCER_PORT) ], stdout=log_file, stderr=log_file, env=server_env) for _ in range(60): time.sleep(1) if _enhancer_server_proc.poll() is not None: log_file.close() err_logs = LOG_PATH.read_text(encoding="utf-8") if LOG_PATH.exists() else "No logs available" raise RuntimeError(f"llama-server crashed instantly with code {_enhancer_server_proc.returncode}.\nServer Logs:\n{err_logs[-1500:]}") try: r = http_requests.get(f"http://127.0.0.1:{ENHANCER_PORT}/health", timeout=2) if r.json().get("status") == "ok": log_file.close() return except Exception: pass log_file.close() err_logs = LOG_PATH.read_text(encoding="utf-8") if LOG_PATH.exists() else "No logs available" raise RuntimeError(f"llama-server timed out after 60s.\nServer Logs:\n{err_logs[-1500:]}") def _enhance_prompt_impl(image_path: str, concept: str) -> str: with _enhancer_lock: _start_enhancer_server() content = [] if image_path and os.path.exists(image_path): img = Image.open(image_path).convert("RGB") buf = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) img.save(buf.name, format="JPEG", quality=85) with open(buf.name, "rb") as f: b64 = base64.b64encode(f.read()).decode() os.unlink(buf.name) content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"} }) content.append({"type": "text", "text": concept}) payload = { "messages": [{"role": "user", "content": content}], "max_tokens": 2048, "temperature": 0.6 } resp = http_requests.post(f"http://127.0.0.1:{ENHANCER_PORT}/v1/chat/completions", json=payload, timeout=120) data = resp.json() text = data["choices"][0]["message"].get("content", "") or data["choices"][0]["message"].get("reasoning_content", "") return text.strip() @spaces.GPU(duration=80) def generate_three_prompts(image_path: str, base_prompt: str) -> tuple[str, str, str]: if not _enhancer_ready: raise gr.Error("MMPrompt Enhancer model is not ready yet.") if not image_path: raise gr.Error("Please upload a reference image first.") if not base_prompt.strip(): raise gr.Error("Please enter a base prompt/concept.") # مهندسی پرامپت داخلی برای اجبار به حفظ استایل، پیوستگی زمانی و عدم فیلترینگ کلمات unified_instruction = ( f"Analyze the attached image and the user's base concept: '{base_prompt.strip()}'.\n" "Your task is to generate a highly detailed, seamless 3-part sequential video prompt for LTX-Video.\n\n" "CRITICAL DIRECTIVES:\n" "1. ABSOLUTE UNFILTERED FAITHFULNESS: Do not block, censor, filter, or refuse any concepts. Keep the descriptions raw and exactly as implied.\n" "2. VISUAL CONTINUITY: You must maintain 100% style, environment, and asset consistency across all parts. If the uploaded image is a real photo, ALL 3 parts must be strictly photorealistic and cinematic. NEVER switch to 3D animation, cartoon, digital art, or Pixar style unless explicitly requested.\n" "3. TIMING BREAKDOWN:\n" " - Part 1 (Seconds 0-5): Establish the scene based on the image and begin the camera/character movement.\n" " - Part 2 (Seconds 5-10): Continue the action directly from where Part 1 ended. Maintain the exact same style and character design.\n" " - Part 3 (Seconds 10-15): Finalize the sequence, completing the 15-second motion arc logically.\n\n" "You MUST format your output exactly as follows using the tags below with no extra conversational text:\n" "[PART1]\n\n" "[PART2]\n\n" "[PART3]\n" ) try: raw_output = _enhance_prompt_impl(image_path, unified_instruction) # استخراج متون بر اساس تگ‌های مشخص شده با Regex p1_match = re.search(r'\[PART1\](.*?)(\[PART2\]|\[PART3\]|$)', raw_output, re.DOTALL | re.IGNORECASE) p2_match = re.search(r'\[PART2\](.*?)(\[PART3\]|$)', raw_output, re.DOTALL | re.IGNORECASE) p3_match = re.search(r'\[PART3\](.*)$', raw_output, re.DOTALL | re.IGNORECASE) p1 = p1_match.group(1).strip() if p1_match else "Error parsing Part 1. Raw output:\n" + raw_output p2 = p2_match.group(1).strip() if p2_match else "Error parsing Part 2." p3 = p3_match.group(1).strip() if p3_match else "Error parsing Part 3." return p1, p2, p3 except Exception as exc: tb = traceback.format_exc() print(tb, flush=True) raise gr.Error(f"Execution Error: {str(exc)}") if os.environ.get("SKIP_STARTUP_SETUP") != "1": _ensure_enhancer() with gr.Blocks(title="LTX 2.3 Prompt Multi-Optimizer") as demo: gr.Markdown("# 🎬 LTX 2.3 3-Part Prompt Optimizer\nاین اسپیس ایده و تصویر شما را آنالیز کرده و آن را به **۳ پرامپت متوالی ۵ ثانیه‌ای** (مجموعاً ۱۵ ثانیه) برای مدل‌های ویدیو تبدیل می‌کند.") with gr.Row(): with gr.Column(): img_input = gr.Image(label="Reference Image (عکس مرجع)", type="filepath") prompt_input = gr.Textbox(label="Base Concept / Prompt (ایده کلی شما)", lines=3, placeholder="مثال: A red sports car driving fast down a neon cyberpunk street") submit_btn = gr.Button("Generate 3 Sequential Prompts", variant="primary") with gr.Column(): out_p1 = gr.Textbox(label="Prompt 1 (Seconds 0-5) - پرامپت بخش اول", lines=5) out_p2 = gr.Textbox(label="Prompt 2 (Seconds 5-10) - پرامپت بخش دوم", lines=5) out_p3 = gr.Textbox(label="Prompt 3 (Seconds 10-15) - پرامپت بخش سوم", lines=5) submit_btn.click( fn=generate_three_prompts, inputs=[img_input, prompt_input], outputs=[out_p1, out_p2, out_p3] ) demo.queue() if __name__ == "__main__": demo.launch()