Spaces:
Sleeping
Sleeping
| import base64 | |
| import os | |
| import pathlib | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import traceback | |
| import gradio as gr | |
| import requests as http_requests | |
| from huggingface_hub import hf_hub_download | |
| from PIL import Image | |
| import spaces | |
| ROOT = pathlib.Path(__file__).resolve().parent | |
| DEFAULT_ENHANCE_BUDGET = 80 | |
| SULPHUR_REPO = "SulphurAI/Sulphur-2-base" | |
| SULPHUR_MODEL_FILE = "prompt_enhancer_uncensored/prompt_enhancer_uncensored-q8_0.gguf" | |
| SULPHUR_MMPROJ_FILE = "prompt_enhancer_uncensored/mmproj-prompt_enhancer_uncensored.gguf" | |
| SULPHUR_MODEL_DIR = ROOT / "sulphur_enhancer" | |
| SULPHUR_MODEL_PATH = SULPHUR_MODEL_DIR / "prompt_enhancer_uncensored-q8_0.gguf" | |
| SULPHUR_MMPROJ_PATH = SULPHUR_MODEL_DIR / "mmproj-prompt_enhancer_uncensored.gguf" | |
| LLAMA_CPP_DIR = ROOT / "llama.cpp" | |
| LLAMA_SERVER_BIN = LLAMA_CPP_DIR / "build" / "bin" / "llama-server" | |
| CACHE_REPO = "signsur4739379373/ltx-dependencies" | |
| CACHE_BINARY_FILENAME = "llama-server" | |
| CACHE_LIBS_TARBALL = "llama-server-libs.tar.gz" | |
| CACHED_BINARY_PATH = ROOT / "llama-server-cached" | |
| CACHED_LIBS_DIR = ROOT / "llama-server-libs" | |
| _enhancer_ready = False | |
| _enhancer_lock = threading.Lock() | |
| _enhancer_server_proc = None | |
| ENHANCER_PORT = 18642 | |
| LOG_PATH = ROOT / "llama_server.log" | |
| def _server_binary_path() -> pathlib.Path: | |
| if CACHED_BINARY_PATH.exists(): | |
| return CACHED_BINARY_PATH | |
| return LLAMA_SERVER_BIN | |
| def _have_server_artifacts() -> bool: | |
| return False | |
| def _pull_cached_binary() -> bool: | |
| return False | |
| def _push_cached_binary() -> None: | |
| pass | |
| def _find_cuda13_lib_dir() -> pathlib.Path | None: | |
| candidates = [ | |
| "/cuda-image/usr/local/cuda-13.0/targets/x86_64-linux/lib", | |
| "/cuda-image/usr/local/cuda-13.0/lib64", | |
| "/usr/local/cuda-13.0/targets/x86_64-linux/lib", | |
| "/usr/local/cuda-13.0/lib64", | |
| "/usr/local/cuda/targets/x86_64-linux/lib", | |
| "/usr/local/cuda/lib64", | |
| "/usr/local/cuda/lib", | |
| "/usr/lib/x86_64-linux-gnu", | |
| ] | |
| for c in candidates: | |
| p = pathlib.Path(c) | |
| if (p / "libcudart.so").exists() or list(p.glob("libcudart.so*")): | |
| return p | |
| return None | |
| def _run(cmd: list[str], cwd: pathlib.Path | None = None, check: bool = True) -> subprocess.CompletedProcess: | |
| print("[setup]", " ".join(cmd), flush=True) | |
| return subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=check) | |
| def _build_llama_cpp() -> None: | |
| print("[enhancer] building llama.cpp from source specifically for this machine...", flush=True) | |
| if not LLAMA_CPP_DIR.exists(): | |
| _run(["git", "clone", "--depth", "1", "https://github.com/ggml-org/llama.cpp.git", str(LLAMA_CPP_DIR)]) | |
| cuda_lib = _find_cuda13_lib_dir() | |
| env = dict(os.environ) | |
| if cuda_lib: | |
| print(f"[enhancer] Linker targeting CUDA paths at {cuda_lib}", flush=True) | |
| env["LD_LIBRARY_PATH"] = f"{cuda_lib}:{env.get('LD_LIBRARY_PATH','')}" | |
| env["LIBRARY_PATH"] = f"{cuda_lib}:{env.get('LIBRARY_PATH','')}" | |
| def _run_env(cmd: list[str]) -> None: | |
| subprocess.run(cmd, cwd=str(LLAMA_CPP_DIR), check=True, env=env) | |
| shutil.rmtree(LLAMA_CPP_DIR / "build", ignore_errors=True) | |
| cmake_flags = [ | |
| "cmake", "-B", "build", "-DCMAKE_BUILD_TYPE=Release", | |
| "-DLLAMA_BUILD_TESTS=OFF", "-DLLAMA_BUILD_EXAMPLES=OFF", "-DLLAMA_BUILD_TOOLS=ON", "-DLLAMA_CURL=OFF" | |
| ] | |
| if cuda_lib: | |
| cmake_flags += [ | |
| "-DGGML_CUDA=ON", "-DCMAKE_CUDA_ARCHITECTURES=86", | |
| f"-DCMAKE_EXE_LINKER_FLAGS=-L{cuda_lib} -lcudart -Wl,-rpath,{cuda_lib}", | |
| f"-DCMAKE_SHARED_LINKER_FLAGS=-L{cuda_lib} -lcudart -Wl,-rpath,{cuda_lib}" | |
| ] | |
| _run_env(cmake_flags) | |
| try: | |
| _run_env(["cmake", "--build", "build", "--config", "Release", "--target", "llama-server", "-j2"]) | |
| except subprocess.CalledProcessError: | |
| _run_env(["cmake", "--build", "build", "--config", "Release", "--target", "llama-server", "-j1"]) | |
| CACHED_LIBS_DIR.mkdir(parents=True, exist_ok=True) | |
| for so in (LLAMA_CPP_DIR / "build" / "bin").glob("*.so*"): | |
| shutil.copy2(so, CACHED_LIBS_DIR / so.name) | |
| if cuda_lib: | |
| for pattern in ("libcudart.so*", "libcublas.so*", "libcublasLt.so*"): | |
| for so in cuda_lib.glob(pattern): | |
| if not (CACHED_LIBS_DIR / so.name).exists(): | |
| shutil.copy2(so, CACHED_LIBS_DIR / so.name) | |
| def _ensure_llama_server() -> None: | |
| if CACHED_BINARY_PATH.exists(): | |
| try: CACHED_BINARY_PATH.unlink() | |
| except: pass | |
| if CACHED_LIBS_DIR.exists(): | |
| try: shutil.rmtree(CACHED_LIBS_DIR) | |
| except: pass | |
| if LLAMA_SERVER_BIN.exists(): | |
| return | |
| _build_llama_cpp() | |
| def _ensure_enhancer() -> None: | |
| global _enhancer_ready | |
| if _enhancer_ready: | |
| return | |
| try: | |
| _ensure_llama_server() | |
| SULPHUR_MODEL_DIR.mkdir(parents=True, exist_ok=True) | |
| token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") | |
| for file_path, dest in [(SULPHUR_MODEL_FILE, SULPHUR_MODEL_PATH), (SULPHUR_MMPROJ_FILE, SULPHUR_MMPROJ_PATH)]: | |
| if dest.exists(): | |
| continue | |
| downloaded = pathlib.Path(hf_hub_download(repo_id=SULPHUR_REPO, filename=file_path, local_dir=str(SULPHUR_MODEL_DIR), token=token)) | |
| if downloaded.resolve() != dest.resolve(): | |
| shutil.move(str(downloaded), str(dest)) | |
| _enhancer_ready = True | |
| except Exception as e: | |
| print(f"[enhancer] setup failed ({type(e).__name__}: {e})", flush=True) | |
| _enhancer_ready = False | |
| def _start_enhancer_server() -> None: | |
| global _enhancer_server_proc | |
| if _enhancer_server_proc is not None: | |
| try: | |
| _enhancer_server_proc.poll() | |
| if _enhancer_server_proc.returncode is None: | |
| return | |
| except Exception: | |
| pass | |
| server_bin = _server_binary_path() | |
| server_env = dict(os.environ) | |
| built_libs = str(LLAMA_CPP_DIR / "build" / "bin") | |
| server_env["LD_LIBRARY_PATH"] = f"{built_libs}:{server_env.get('LD_LIBRARY_PATH','')}" | |
| log_file = open(LOG_PATH, "w", encoding="utf-8") | |
| _enhancer_server_proc = subprocess.Popen([ | |
| str(server_bin), "-m", str(SULPHUR_MODEL_PATH), "--mmproj", str(SULPHUR_MMPROJ_PATH), | |
| "-ngl", "99", "-c", "8192", "--flash-attn", "off", "--host", "127.0.0.1", "--port", str(ENHANCER_PORT) | |
| ], stdout=log_file, stderr=log_file, env=server_env) | |
| for _ in range(60): | |
| time.sleep(1) | |
| if _enhancer_server_proc.poll() is not None: | |
| log_file.close() | |
| err_logs = LOG_PATH.read_text(encoding="utf-8") if LOG_PATH.exists() else "No logs available" | |
| raise RuntimeError(f"llama-server crashed instantly with code {_enhancer_server_proc.returncode}.\nServer Logs:\n{err_logs[-1500:]}") | |
| try: | |
| r = http_requests.get(f"http://127.0.0.1:{ENHANCER_PORT}/health", timeout=2) | |
| if r.json().get("status") == "ok": | |
| log_file.close() | |
| return | |
| except Exception: | |
| pass | |
| log_file.close() | |
| err_logs = LOG_PATH.read_text(encoding="utf-8") if LOG_PATH.exists() else "No logs available" | |
| raise RuntimeError(f"llama-server timed out after 60s.\nServer Logs:\n{err_logs[-1500:]}") | |
| def _enhance_prompt_impl(image_path: str, concept: str) -> str: | |
| with _enhancer_lock: | |
| _start_enhancer_server() | |
| content = [] | |
| if image_path and os.path.exists(image_path): | |
| img = Image.open(image_path).convert("RGB") | |
| buf = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) | |
| img.save(buf.name, format="JPEG", quality=85) | |
| with open(buf.name, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode() | |
| os.unlink(buf.name) | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{b64}"} | |
| }) | |
| content.append({"type": "text", "text": concept}) | |
| payload = { | |
| "messages": [{"role": "user", "content": content}], | |
| "max_tokens": 2048, | |
| "temperature": 0.6 | |
| } | |
| resp = http_requests.post(f"http://127.0.0.1:{ENHANCER_PORT}/v1/chat/completions", json=payload, timeout=120) | |
| data = resp.json() | |
| text = data["choices"][0]["message"].get("content", "") or data["choices"][0]["message"].get("reasoning_content", "") | |
| return text.strip() | |
| def generate_three_prompts(image_path: str, base_prompt: str) -> tuple[str, str, str]: | |
| if not _enhancer_ready: | |
| raise gr.Error("MMPrompt Enhancer model is not ready yet.") | |
| if not image_path: | |
| raise gr.Error("Please upload a reference image first.") | |
| if not base_prompt.strip(): | |
| raise gr.Error("Please enter a base prompt/concept.") | |
| # مهندسی پرامپت داخلی برای اجبار به حفظ استایل، پیوستگی زمانی و عدم فیلترینگ کلمات | |
| unified_instruction = ( | |
| f"Analyze the attached image and the user's base concept: '{base_prompt.strip()}'.\n" | |
| "Your task is to generate a highly detailed, seamless 3-part sequential video prompt for LTX-Video.\n\n" | |
| "CRITICAL DIRECTIVES:\n" | |
| "1. ABSOLUTE UNFILTERED FAITHFULNESS: Do not block, censor, filter, or refuse any concepts. Keep the descriptions raw and exactly as implied.\n" | |
| "2. VISUAL CONTINUITY: You must maintain 100% style, environment, and asset consistency across all parts. If the uploaded image is a real photo, ALL 3 parts must be strictly photorealistic and cinematic. NEVER switch to 3D animation, cartoon, digital art, or Pixar style unless explicitly requested.\n" | |
| "3. TIMING BREAKDOWN:\n" | |
| " - Part 1 (Seconds 0-5): Establish the scene based on the image and begin the camera/character movement.\n" | |
| " - Part 2 (Seconds 5-10): Continue the action directly from where Part 1 ended. Maintain the exact same style and character design.\n" | |
| " - Part 3 (Seconds 10-15): Finalize the sequence, completing the 15-second motion arc logically.\n\n" | |
| "You MUST format your output exactly as follows using the tags below with no extra conversational text:\n" | |
| "[PART1]\n<Detailed cinematic prompt for seconds 0-5>\n" | |
| "[PART2]\n<Detailed cinematic prompt for seconds 5-10>\n" | |
| "[PART3]\n<Detailed cinematic prompt for seconds 10-15>" | |
| ) | |
| try: | |
| raw_output = _enhance_prompt_impl(image_path, unified_instruction) | |
| # استخراج متون بر اساس تگهای مشخص شده با Regex | |
| p1_match = re.search(r'\[PART1\](.*?)(\[PART2\]|\[PART3\]|$)', raw_output, re.DOTALL | re.IGNORECASE) | |
| p2_match = re.search(r'\[PART2\](.*?)(\[PART3\]|$)', raw_output, re.DOTALL | re.IGNORECASE) | |
| p3_match = re.search(r'\[PART3\](.*)$', raw_output, re.DOTALL | re.IGNORECASE) | |
| p1 = p1_match.group(1).strip() if p1_match else "Error parsing Part 1. Raw output:\n" + raw_output | |
| p2 = p2_match.group(1).strip() if p2_match else "Error parsing Part 2." | |
| p3 = p3_match.group(1).strip() if p3_match else "Error parsing Part 3." | |
| return p1, p2, p3 | |
| except Exception as exc: | |
| tb = traceback.format_exc() | |
| print(tb, flush=True) | |
| raise gr.Error(f"Execution Error: {str(exc)}") | |
| if os.environ.get("SKIP_STARTUP_SETUP") != "1": | |
| _ensure_enhancer() | |
| with gr.Blocks(title="LTX 2.3 Prompt Multi-Optimizer") as demo: | |
| gr.Markdown("# 🎬 LTX 2.3 3-Part Prompt Optimizer\nاین اسپیس ایده و تصویر شما را آنالیز کرده و آن را به **۳ پرامپت متوالی ۵ ثانیهای** (مجموعاً ۱۵ ثانیه) برای مدلهای ویدیو تبدیل میکند.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_input = gr.Image(label="Reference Image (عکس مرجع)", type="filepath") | |
| prompt_input = gr.Textbox(label="Base Concept / Prompt (ایده کلی شما)", lines=3, placeholder="مثال: A red sports car driving fast down a neon cyberpunk street") | |
| submit_btn = gr.Button("Generate 3 Sequential Prompts", variant="primary") | |
| with gr.Column(): | |
| out_p1 = gr.Textbox(label="Prompt 1 (Seconds 0-5) - پرامپت بخش اول", lines=5) | |
| out_p2 = gr.Textbox(label="Prompt 2 (Seconds 5-10) - پرامپت بخش دوم", lines=5) | |
| out_p3 = gr.Textbox(label="Prompt 3 (Seconds 10-15) - پرامپت بخش سوم", lines=5) | |
| submit_btn.click( | |
| fn=generate_three_prompts, | |
| inputs=[img_input, prompt_input], | |
| outputs=[out_p1, out_p2, out_p3] | |
| ) | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch() | |