Spaces:
Running
Running
| import base64 | |
| import hashlib | |
| import json | |
| import mimetypes | |
| import os | |
| import textwrap | |
| import time | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| import zipfile | |
| from pathlib import Path | |
| from agent_core.config import ( | |
| DEFAULT_LUX3D_OUTPUT_PATH, | |
| IMAGE_SUFFIXES, | |
| LUX3D_API_KEY_ENV, | |
| LUX3D_CREATE_URL, | |
| LUX3D_GET_URL, | |
| LUX3D_POLL_INTERVAL_SECONDS, | |
| LUX3D_TIMEOUT_SECONDS, | |
| ) | |
| from agent_core.outputs import ( | |
| resolve_lux3d_run, | |
| safe_path, | |
| update_latest_link, | |
| workspace_relative, | |
| write_lux3d_manifest, | |
| ) | |
| from agent_core.utils import json_response, tail_text | |
| def validate_image_path(image_path: str) -> Path: | |
| if not image_path or not image_path.strip(): | |
| raise ValueError("image_path is required for image-to-3D generation.") | |
| path = safe_path(image_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Image file does not exist: {image_path}") | |
| if not path.is_file(): | |
| raise ValueError(f"image_path must be a file: {image_path}") | |
| if path.suffix.lower() not in IMAGE_SUFFIXES: | |
| raise ValueError(f"Unsupported image format: {path.suffix}. Use one of: {', '.join(IMAGE_SUFFIXES)}") | |
| return path | |
| def image_to_data_url(image_path: Path) -> str: | |
| mime_type = mimetypes.guess_type(str(image_path))[0] | |
| if not mime_type: | |
| mime_type = "image/jpeg" if image_path.suffix.lower() in (".jpg", ".jpeg") else f"image/{image_path.suffix.lower().lstrip('.')}" | |
| encoded = base64.b64encode(image_path.read_bytes()).decode("ascii") | |
| return f"data:{mime_type};base64,{encoded}" | |
| def parse_lux3d_api_key() -> dict: | |
| api_key = os.getenv(LUX3D_API_KEY_ENV, "").strip() | |
| if not api_key: | |
| raise ValueError(f"Missing {LUX3D_API_KEY_ENV}. Set it in your environment or .env file.") | |
| try: | |
| decoded = base64.b64decode(api_key, validate=True).decode("utf-8") | |
| except Exception as exc: | |
| raise ValueError(f"Invalid {LUX3D_API_KEY_ENV}: expected a base64-encoded API key.") from exc | |
| parts = decoded.split(":") | |
| if len(parts) != 4 or not all(parts): | |
| raise ValueError(f"Invalid {LUX3D_API_KEY_ENV}: expected decoded format version:appkey:appsecret:appuid.") | |
| version, appkey, appsecret, appuid = parts | |
| return { | |
| "version": version, | |
| "appkey": appkey, | |
| "appsecret": appsecret, | |
| "appuid": appuid, | |
| } | |
| def lux3d_query(credentials: dict, extra: dict | None = None) -> str: | |
| timestamp = str(int(time.time() * 1000)) | |
| sign = hashlib.md5( | |
| (credentials["appsecret"] + credentials["appkey"] + credentials["appuid"] + timestamp).encode("utf-8") | |
| ).hexdigest() | |
| params = { | |
| "appuid": credentials["appuid"], | |
| "appkey": credentials["appkey"], | |
| "timestamp": timestamp, | |
| "sign": sign, | |
| } | |
| if extra: | |
| params.update(extra) | |
| return urllib.parse.urlencode(params) | |
| def lux3d_json_request(method: str, url: str, payload: dict | None = None, timeout: int = 30) -> dict: | |
| data = None | |
| headers = {} | |
| if payload is not None: | |
| data = json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
| headers["Content-Type"] = "application/json" | |
| request = urllib.request.Request(url, data=data, headers=headers, method=method) | |
| try: | |
| with urllib.request.urlopen(request, timeout=timeout) as response: | |
| raw = response.read().decode("utf-8") | |
| except urllib.error.HTTPError as exc: | |
| body = exc.read().decode("utf-8", errors="replace") | |
| raise RuntimeError(f"Lux3D API HTTP {exc.code}: {tail_text(body)}") from exc | |
| except urllib.error.URLError as exc: | |
| raise RuntimeError(f"Lux3D API request failed: {exc.reason}") from exc | |
| try: | |
| return json.loads(raw) | |
| except Exception as exc: | |
| raise RuntimeError(f"Lux3D API returned invalid JSON: {tail_text(raw)}") from exc | |
| def lux3d_create_task(credentials: dict, image_data_url: str) -> str | int: | |
| query = lux3d_query(credentials) | |
| payload = lux3d_json_request("POST", f"{LUX3D_CREATE_URL}?{query}", {"img": image_data_url}) | |
| busid = payload.get("d") | |
| if not busid: | |
| raise RuntimeError(f"Lux3D create task response did not include busid: {payload}") | |
| return busid | |
| def lux3d_get_task(credentials: dict, busid: str | int) -> dict: | |
| query = lux3d_query(credentials, {"busid": busid}) | |
| payload = lux3d_json_request("GET", f"{LUX3D_GET_URL}?{query}") | |
| data = payload.get("d") | |
| if not isinstance(data, dict): | |
| raise RuntimeError(f"Lux3D get task response did not include task data: {payload}") | |
| return data | |
| def infer_lux3d_output_name(result_url: str) -> str: | |
| parsed = urllib.parse.urlparse(result_url) | |
| name = Path(urllib.parse.unquote(parsed.path)).name | |
| if name and "." in name: | |
| return name | |
| return "model.glb" | |
| def download_lux3d_result(result_url: str, output_path: Path) -> None: | |
| request = urllib.request.Request(result_url, headers={"User-Agent": "aigc-3dcad/0.1"}) | |
| try: | |
| with urllib.request.urlopen(request, timeout=120) as response: | |
| data = response.read() | |
| except urllib.error.HTTPError as exc: | |
| body = exc.read().decode("utf-8", errors="replace") | |
| raise RuntimeError(f"Lux3D result download HTTP {exc.code}: {tail_text(body)}") from exc | |
| except urllib.error.URLError as exc: | |
| raise RuntimeError(f"Lux3D result download failed: {exc.reason}") from exc | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| output_path.write_bytes(data) | |
| def safe_zip_target(extract_dir: Path, member_name: str) -> Path: | |
| target = (extract_dir / member_name).resolve() | |
| if not target.is_relative_to(extract_dir.resolve()): | |
| raise RuntimeError(f"Unsafe path in Lux3D zip result: {member_name}") | |
| return target | |
| def extract_lux3d_glb_preview(zip_path: Path, run_dir: Path) -> Path: | |
| extract_dir = run_dir / "extracted" | |
| try: | |
| with zipfile.ZipFile(zip_path) as archive: | |
| glb_members = sorted( | |
| ( | |
| item | |
| for item in archive.infolist() | |
| if not item.is_dir() and Path(item.filename).suffix.lower() == ".glb" | |
| ), | |
| key=lambda item: item.filename, | |
| ) | |
| if not glb_members: | |
| raise RuntimeError(f"Lux3D zip result did not include a .glb file: {workspace_relative(zip_path)}") | |
| member = glb_members[0] | |
| preview_path = safe_zip_target(extract_dir, member.filename) | |
| preview_path.parent.mkdir(parents=True, exist_ok=True) | |
| preview_path.write_bytes(archive.read(member)) | |
| return preview_path | |
| except zipfile.BadZipFile as exc: | |
| raise RuntimeError(f"Lux3D result is not a valid zip file: {workspace_relative(zip_path)}") from exc | |
| def resolve_lux3d_preview_path(output_path: Path, run_dir: Path) -> Path | None: | |
| if zipfile.is_zipfile(output_path): | |
| return extract_lux3d_glb_preview(output_path, run_dir) | |
| if output_path.suffix.lower() == ".glb": | |
| return output_path | |
| return None | |
| def run_generate_3d_model( | |
| image_path: str, | |
| output_path: str | None = None, | |
| prompt: str | None = None, | |
| poll_interval_seconds: int = LUX3D_POLL_INTERVAL_SECONDS, | |
| timeout_seconds: int = LUX3D_TIMEOUT_SECONDS, | |
| ) -> str: | |
| busid = None | |
| status = None | |
| result_url = None | |
| output = None | |
| preview = None | |
| poll_count = 0 | |
| run_dir = None | |
| run_id = None | |
| requested_output_path = output_path or DEFAULT_LUX3D_OUTPUT_PATH | |
| try: | |
| image = validate_image_path(image_path) | |
| credentials = parse_lux3d_api_key() | |
| output_root, run_dir, run_id, requested_output_name, requested_output_path = resolve_lux3d_run(output_path) | |
| image_data_url = image_to_data_url(image) | |
| except Exception as exc: | |
| return json_response({ | |
| "ok": False, | |
| "stage": "input", | |
| "error_type": type(exc).__name__, | |
| "error": str(exc), | |
| "busid": busid, | |
| }) | |
| try: | |
| busid = lux3d_create_task(credentials, image_data_url) | |
| deadline = time.time() + timeout_seconds | |
| while True: | |
| if time.time() >= deadline: | |
| raise TimeoutError(f"Lux3D task timed out after {timeout_seconds} seconds.") | |
| time.sleep(poll_interval_seconds) | |
| poll_count += 1 | |
| task = lux3d_get_task(credentials, busid) | |
| raw_status = task.get("status") | |
| status = int(raw_status) if raw_status is not None else None | |
| if status in (0, 1): | |
| continue | |
| if status == 3: | |
| outputs = task.get("outputs") or [] | |
| result_url = next( | |
| (item.get("content") for item in outputs if isinstance(item, dict) and item.get("content")), | |
| None, | |
| ) | |
| if not result_url: | |
| raise RuntimeError(f"Lux3D task succeeded but did not include a result URL: {task}") | |
| output_name = requested_output_name or infer_lux3d_output_name(result_url) | |
| output = run_dir / output_name | |
| download_lux3d_result(result_url, output) | |
| preview = resolve_lux3d_preview_path(output, run_dir) | |
| manifest_path = write_lux3d_manifest( | |
| run_dir=run_dir, | |
| run_id=run_id, | |
| prompt=prompt, | |
| image_path=image, | |
| requested_output_path=requested_output_path, | |
| output_path=output, | |
| preview_path=preview, | |
| busid=busid, | |
| status=status, | |
| result_url=result_url, | |
| poll_count=poll_count, | |
| ) | |
| latest_warning = update_latest_link(output_root, run_dir) | |
| payload = { | |
| "ok": True, | |
| "stage": "done", | |
| "run_id": run_id, | |
| "run_dir": workspace_relative(run_dir), | |
| "output_path": workspace_relative(output), | |
| "preview_path": workspace_relative(preview) if preview else None, | |
| "manifest_path": workspace_relative(manifest_path), | |
| "latest_path": workspace_relative(output_root / "latest"), | |
| "busid": busid, | |
| "status": status, | |
| "result_url": result_url, | |
| "poll_count": poll_count, | |
| } | |
| if latest_warning: | |
| payload["warning"] = latest_warning | |
| return json_response(payload) | |
| if status == 4: | |
| raise RuntimeError(f"Lux3D task failed: {task}") | |
| raise RuntimeError(f"Lux3D task returned unknown status {raw_status}: {task}") | |
| except Exception as exc: | |
| manifest_path = None | |
| if run_dir and run_id: | |
| manifest_path = write_lux3d_manifest( | |
| run_dir=run_dir, | |
| run_id=run_id, | |
| prompt=prompt, | |
| image_path=image, | |
| requested_output_path=requested_output_path, | |
| output_path=output, | |
| preview_path=preview, | |
| busid=busid, | |
| status=status, | |
| result_url=result_url, | |
| poll_count=poll_count, | |
| error=str(exc), | |
| ) | |
| payload = { | |
| "ok": False, | |
| "stage": "lux3d", | |
| "error_type": type(exc).__name__, | |
| "error": str(exc), | |
| "busid": busid, | |
| "status": status, | |
| "result_url": result_url, | |
| "poll_count": poll_count, | |
| } | |
| if manifest_path: | |
| payload["run_id"] = run_id | |
| payload["run_dir"] = workspace_relative(run_dir) | |
| payload["manifest_path"] = workspace_relative(manifest_path) | |
| return json_response(payload) | |
| TOOL_SCHEMA = { | |
| "name": "generate_3d_model", | |
| "description": textwrap.dedent(""" | |
| Generate a non-precise 3D model from a local image using the Lux3D image-to-3D API. | |
| The input is a workspace or artifact image path, not image bytes or base64 text. | |
| The tool reads the image, creates a Lux3D task, polls for completion, downloads the result, and writes a manifest. | |
| """).strip(), | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "image_path": { | |
| "type": "string", | |
| "description": "Path to a source image file in the workspace or artifact directory. Supported formats: .png, .jpg, .jpeg, .webp.", | |
| }, | |
| "output_path": { | |
| "type": "string", | |
| "description": "Advanced: custom model file path or output directory. Omit this field to use the configured artifact directory (recommended).", | |
| }, | |
| }, | |
| "required": ["image_path"], | |
| }, | |
| } | |