| |
| """ |
| Image Generation Tools Module |
| |
| Provides image generation via FAL.ai. Multiple FAL models are supported and |
| selectable via ``hermes tools`` → Image Generation; the active model is |
| persisted to ``image_gen.model`` in ``config.yaml``. |
| |
| Architecture: |
| - ``FAL_MODELS`` is a catalog of supported models with per-model metadata |
| (size-style family, defaults, ``supports`` whitelist, upscaler flag). |
| - ``_build_fal_payload()`` translates the agent's unified inputs (prompt + |
| aspect_ratio) into the model-specific payload and filters to the |
| ``supports`` whitelist so models never receive rejected keys. |
| - Upscaling via FAL's Clarity Upscaler is gated per-model via the ``upscale`` |
| flag — on for FLUX 2 Pro (backward-compat), off for all faster/newer models |
| where upscaling would either hurt latency or add marginal quality. |
| |
| Pricing shown in UI strings is as-of the initial commit; we accept drift and |
| update when it's noticed. |
| """ |
|
|
| import json |
| import logging |
| import os |
| import datetime |
| import threading |
| import uuid |
| from typing import Any, Dict, Optional, Union |
| from urllib.parse import urlencode |
|
|
| import fal_client |
|
|
| from tools.debug_helpers import DebugSession |
| from tools.managed_tool_gateway import resolve_managed_tool_gateway |
| from tools.tool_backend_helpers import ( |
| fal_key_is_configured, |
| managed_nous_tools_enabled, |
| prefers_gateway, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| FAL_MODELS: Dict[str, Dict[str, Any]] = { |
| "fal-ai/flux-2/klein/9b": { |
| "display": "FLUX 2 Klein 9B", |
| "speed": "<1s", |
| "strengths": "Fast, crisp text", |
| "price": "$0.006/MP", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| "num_inference_steps": 4, |
| "output_format": "png", |
| "enable_safety_checker": False, |
| }, |
| "supports": { |
| "prompt", "image_size", "num_inference_steps", "seed", |
| "output_format", "enable_safety_checker", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/flux-2-pro": { |
| "display": "FLUX 2 Pro", |
| "speed": "~6s", |
| "strengths": "Studio photorealism", |
| "price": "$0.03/MP", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| "num_inference_steps": 50, |
| "guidance_scale": 4.5, |
| "num_images": 1, |
| "output_format": "png", |
| "enable_safety_checker": False, |
| "safety_tolerance": "5", |
| "sync_mode": True, |
| }, |
| "supports": { |
| "prompt", "image_size", "num_inference_steps", "guidance_scale", |
| "num_images", "output_format", "enable_safety_checker", |
| "safety_tolerance", "sync_mode", "seed", |
| }, |
| "upscale": True, |
| }, |
| "fal-ai/z-image/turbo": { |
| "display": "Z-Image Turbo", |
| "speed": "~2s", |
| "strengths": "Bilingual EN/CN, 6B", |
| "price": "$0.005/MP", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| "num_inference_steps": 8, |
| "num_images": 1, |
| "output_format": "png", |
| "enable_safety_checker": False, |
| "enable_prompt_expansion": False, |
| }, |
| "supports": { |
| "prompt", "image_size", "num_inference_steps", "num_images", |
| "seed", "output_format", "enable_safety_checker", |
| "enable_prompt_expansion", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/nano-banana-pro": { |
| "display": "Nano Banana Pro (Gemini 3 Pro Image)", |
| "speed": "~8s", |
| "strengths": "Gemini 3 Pro, reasoning depth, text rendering", |
| "price": "$0.15/image (1K)", |
| "size_style": "aspect_ratio", |
| "sizes": { |
| "landscape": "16:9", |
| "square": "1:1", |
| "portrait": "9:16", |
| }, |
| "defaults": { |
| "num_images": 1, |
| "output_format": "png", |
| "safety_tolerance": "5", |
| |
| |
| "resolution": "1K", |
| }, |
| "supports": { |
| "prompt", "aspect_ratio", "num_images", "output_format", |
| "safety_tolerance", "seed", "sync_mode", "resolution", |
| "enable_web_search", "limit_generations", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/gpt-image-1.5": { |
| "display": "GPT Image 1.5", |
| "speed": "~15s", |
| "strengths": "Prompt adherence", |
| "price": "$0.034/image", |
| "size_style": "gpt_literal", |
| "sizes": { |
| "landscape": "1536x1024", |
| "square": "1024x1024", |
| "portrait": "1024x1536", |
| }, |
| "defaults": { |
| |
| |
| "quality": "medium", |
| "num_images": 1, |
| "output_format": "png", |
| }, |
| "supports": { |
| "prompt", "image_size", "quality", "num_images", "output_format", |
| "background", "sync_mode", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/gpt-image-2": { |
| "display": "GPT Image 2", |
| "speed": "~20s", |
| "strengths": "SOTA text rendering + CJK, world-aware photorealism", |
| "price": "$0.04–0.06/image", |
| |
| |
| |
| |
| |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_4_3", |
| "square": "square_hd", |
| "portrait": "portrait_4_3", |
| }, |
| "defaults": { |
| |
| |
| |
| "quality": "medium", |
| "num_images": 1, |
| "output_format": "png", |
| }, |
| "supports": { |
| "prompt", "image_size", "quality", "num_images", "output_format", |
| "sync_mode", |
| |
| |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/ideogram/v3": { |
| "display": "Ideogram V3", |
| "speed": "~5s", |
| "strengths": "Best typography", |
| "price": "$0.03-0.09/image", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| "rendering_speed": "BALANCED", |
| "expand_prompt": True, |
| "style": "AUTO", |
| }, |
| "supports": { |
| "prompt", "image_size", "rendering_speed", "expand_prompt", |
| "style", "seed", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/recraft/v4/pro/text-to-image": { |
| "display": "Recraft V4 Pro", |
| "speed": "~8s", |
| "strengths": "Design, brand systems, production-ready", |
| "price": "$0.25/image", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| |
| "enable_safety_checker": False, |
| }, |
| "supports": { |
| "prompt", "image_size", "enable_safety_checker", |
| "colors", "background_color", |
| }, |
| "upscale": False, |
| }, |
| "fal-ai/qwen-image": { |
| "display": "Qwen Image", |
| "speed": "~12s", |
| "strengths": "LLM-based, complex text", |
| "price": "$0.02/MP", |
| "size_style": "image_size_preset", |
| "sizes": { |
| "landscape": "landscape_16_9", |
| "square": "square_hd", |
| "portrait": "portrait_16_9", |
| }, |
| "defaults": { |
| "num_inference_steps": 30, |
| "guidance_scale": 2.5, |
| "num_images": 1, |
| "output_format": "png", |
| "acceleration": "regular", |
| }, |
| "supports": { |
| "prompt", "image_size", "num_inference_steps", "guidance_scale", |
| "num_images", "output_format", "acceleration", "seed", "sync_mode", |
| }, |
| "upscale": False, |
| }, |
| } |
|
|
| |
| DEFAULT_MODEL = "fal-ai/flux-2/klein/9b" |
|
|
| DEFAULT_ASPECT_RATIO = "landscape" |
| VALID_ASPECT_RATIOS = ("landscape", "square", "portrait") |
|
|
|
|
| |
| |
| |
| UPSCALER_MODEL = "fal-ai/clarity-upscaler" |
| UPSCALER_FACTOR = 2 |
| UPSCALER_SAFETY_CHECKER = False |
| UPSCALER_DEFAULT_PROMPT = "masterpiece, best quality, highres" |
| UPSCALER_NEGATIVE_PROMPT = "(worst quality, low quality, normal quality:2)" |
| UPSCALER_CREATIVITY = 0.35 |
| UPSCALER_RESEMBLANCE = 0.6 |
| UPSCALER_GUIDANCE_SCALE = 4 |
| UPSCALER_NUM_INFERENCE_STEPS = 18 |
|
|
|
|
| _debug = DebugSession("image_tools", env_var="IMAGE_TOOLS_DEBUG") |
| _managed_fal_client = None |
| _managed_fal_client_config = None |
| _managed_fal_client_lock = threading.Lock() |
|
|
|
|
| |
| |
| |
| def _resolve_managed_fal_gateway(): |
| """Return managed fal-queue gateway config when the user prefers the gateway |
| or direct FAL credentials are absent.""" |
| if fal_key_is_configured() and not prefers_gateway("image_gen"): |
| return None |
| return resolve_managed_tool_gateway("fal-queue") |
|
|
|
|
| def _normalize_fal_queue_url_format(queue_run_origin: str) -> str: |
| normalized_origin = str(queue_run_origin or "").strip().rstrip("/") |
| if not normalized_origin: |
| raise ValueError("Managed FAL queue origin is required") |
| return f"{normalized_origin}/" |
|
|
|
|
| class _ManagedFalSyncClient: |
| """Small per-instance wrapper around fal_client.SyncClient for managed queue hosts.""" |
|
|
| def __init__(self, *, key: str, queue_run_origin: str): |
| sync_client_class = getattr(fal_client, "SyncClient", None) |
| if sync_client_class is None: |
| raise RuntimeError("fal_client.SyncClient is required for managed FAL gateway mode") |
|
|
| client_module = getattr(fal_client, "client", None) |
| if client_module is None: |
| raise RuntimeError("fal_client.client is required for managed FAL gateway mode") |
|
|
| self._queue_url_format = _normalize_fal_queue_url_format(queue_run_origin) |
| self._sync_client = sync_client_class(key=key) |
| self._http_client = getattr(self._sync_client, "_client", None) |
| self._maybe_retry_request = getattr(client_module, "_maybe_retry_request", None) |
| self._raise_for_status = getattr(client_module, "_raise_for_status", None) |
| self._request_handle_class = getattr(client_module, "SyncRequestHandle", None) |
| self._add_hint_header = getattr(client_module, "add_hint_header", None) |
| self._add_priority_header = getattr(client_module, "add_priority_header", None) |
| self._add_timeout_header = getattr(client_module, "add_timeout_header", None) |
|
|
| if self._http_client is None: |
| raise RuntimeError("fal_client.SyncClient._client is required for managed FAL gateway mode") |
| if self._maybe_retry_request is None or self._raise_for_status is None: |
| raise RuntimeError("fal_client.client request helpers are required for managed FAL gateway mode") |
| if self._request_handle_class is None: |
| raise RuntimeError("fal_client.client.SyncRequestHandle is required for managed FAL gateway mode") |
|
|
| def submit( |
| self, |
| application: str, |
| arguments: Dict[str, Any], |
| *, |
| path: str = "", |
| hint: Optional[str] = None, |
| webhook_url: Optional[str] = None, |
| priority: Any = None, |
| headers: Optional[Dict[str, str]] = None, |
| start_timeout: Optional[Union[int, float]] = None, |
| ): |
| url = self._queue_url_format + application |
| if path: |
| url += "/" + path.lstrip("/") |
| if webhook_url is not None: |
| url += "?" + urlencode({"fal_webhook": webhook_url}) |
|
|
| request_headers = dict(headers or {}) |
| if hint is not None and self._add_hint_header is not None: |
| self._add_hint_header(hint, request_headers) |
| if priority is not None: |
| if self._add_priority_header is None: |
| raise RuntimeError("fal_client.client.add_priority_header is required for priority requests") |
| self._add_priority_header(priority, request_headers) |
| if start_timeout is not None: |
| if self._add_timeout_header is None: |
| raise RuntimeError("fal_client.client.add_timeout_header is required for timeout requests") |
| self._add_timeout_header(start_timeout, request_headers) |
|
|
| response = self._maybe_retry_request( |
| self._http_client, |
| "POST", |
| url, |
| json=arguments, |
| timeout=getattr(self._sync_client, "default_timeout", 120.0), |
| headers=request_headers, |
| ) |
| self._raise_for_status(response) |
|
|
| data = response.json() |
| return self._request_handle_class( |
| request_id=data["request_id"], |
| response_url=data["response_url"], |
| status_url=data["status_url"], |
| cancel_url=data["cancel_url"], |
| client=self._http_client, |
| ) |
|
|
|
|
| def _get_managed_fal_client(managed_gateway): |
| """Reuse the managed FAL client so its internal httpx.Client is not leaked per call.""" |
| global _managed_fal_client, _managed_fal_client_config |
|
|
| client_config = ( |
| managed_gateway.gateway_origin.rstrip("/"), |
| managed_gateway.nous_user_token, |
| ) |
| with _managed_fal_client_lock: |
| if _managed_fal_client is not None and _managed_fal_client_config == client_config: |
| return _managed_fal_client |
|
|
| _managed_fal_client = _ManagedFalSyncClient( |
| key=managed_gateway.nous_user_token, |
| queue_run_origin=managed_gateway.gateway_origin, |
| ) |
| _managed_fal_client_config = client_config |
| return _managed_fal_client |
|
|
|
|
| def _submit_fal_request(model: str, arguments: Dict[str, Any]): |
| """Submit a FAL request using direct credentials or the managed queue gateway.""" |
| request_headers = {"x-idempotency-key": str(uuid.uuid4())} |
| managed_gateway = _resolve_managed_fal_gateway() |
| if managed_gateway is None: |
| return fal_client.submit(model, arguments=arguments, headers=request_headers) |
|
|
| managed_client = _get_managed_fal_client(managed_gateway) |
| try: |
| return managed_client.submit( |
| model, |
| arguments=arguments, |
| headers=request_headers, |
| ) |
| except Exception as exc: |
| |
| |
| |
| |
| status = _extract_http_status(exc) |
| if status is not None and 400 <= status < 500: |
| raise ValueError( |
| f"Nous Subscription gateway rejected model '{model}' " |
| f"(HTTP {status}). This model may not yet be enabled on " |
| f"the Nous Portal's FAL proxy. Either:\n" |
| f" • Set FAL_KEY in your environment to use FAL.ai directly, or\n" |
| f" • Pick a different model via `hermes tools` → Image Generation." |
| ) from exc |
| raise |
|
|
|
|
| def _extract_http_status(exc: BaseException) -> Optional[int]: |
| """Return an HTTP status code from httpx/fal exceptions, else None. |
| |
| Defensive across exception shapes — httpx.HTTPStatusError exposes |
| ``.response.status_code`` while fal_client wrappers may expose |
| ``.status_code`` directly. |
| """ |
| response = getattr(exc, "response", None) |
| if response is not None: |
| status = getattr(response, "status_code", None) |
| if isinstance(status, int): |
| return status |
| status = getattr(exc, "status_code", None) |
| if isinstance(status, int): |
| return status |
| return None |
|
|
|
|
| |
| |
| |
| def _resolve_fal_model() -> tuple: |
| """Resolve the active FAL model from config.yaml (primary) or default. |
| |
| Returns (model_id, metadata_dict). Falls back to DEFAULT_MODEL if the |
| configured model is unknown (logged as a warning). |
| """ |
| model_id = "" |
| try: |
| from hermes_cli.config import load_config |
| cfg = load_config() |
| img_cfg = cfg.get("image_gen") if isinstance(cfg, dict) else None |
| if isinstance(img_cfg, dict): |
| raw = img_cfg.get("model") |
| if isinstance(raw, str): |
| model_id = raw.strip() |
| except Exception as exc: |
| logger.debug("Could not load image_gen.model from config: %s", exc) |
|
|
| |
| if not model_id: |
| model_id = os.getenv("FAL_IMAGE_MODEL", "").strip() |
|
|
| if not model_id: |
| return DEFAULT_MODEL, FAL_MODELS[DEFAULT_MODEL] |
|
|
| if model_id not in FAL_MODELS: |
| logger.warning( |
| "Unknown FAL model '%s' in config; falling back to %s", |
| model_id, DEFAULT_MODEL, |
| ) |
| return DEFAULT_MODEL, FAL_MODELS[DEFAULT_MODEL] |
|
|
| return model_id, FAL_MODELS[model_id] |
|
|
|
|
| def _build_fal_payload( |
| model_id: str, |
| prompt: str, |
| aspect_ratio: str = DEFAULT_ASPECT_RATIO, |
| seed: Optional[int] = None, |
| overrides: Optional[Dict[str, Any]] = None, |
| ) -> Dict[str, Any]: |
| """Build a FAL request payload for `model_id` from unified inputs. |
| |
| Translates aspect_ratio into the model's native size spec (preset enum, |
| aspect-ratio enum, or GPT literal string), merges model defaults, applies |
| caller overrides, then filters to the model's ``supports`` whitelist. |
| """ |
| meta = FAL_MODELS[model_id] |
| size_style = meta["size_style"] |
| sizes = meta["sizes"] |
|
|
| aspect = (aspect_ratio or DEFAULT_ASPECT_RATIO).lower().strip() |
| if aspect not in sizes: |
| aspect = DEFAULT_ASPECT_RATIO |
|
|
| payload: Dict[str, Any] = dict(meta.get("defaults", {})) |
| payload["prompt"] = (prompt or "").strip() |
|
|
| if size_style in ("image_size_preset", "gpt_literal"): |
| payload["image_size"] = sizes[aspect] |
| elif size_style == "aspect_ratio": |
| payload["aspect_ratio"] = sizes[aspect] |
| else: |
| raise ValueError(f"Unknown size_style: {size_style!r}") |
|
|
| if seed is not None and isinstance(seed, int): |
| payload["seed"] = seed |
|
|
| if overrides: |
| for k, v in overrides.items(): |
| if v is not None: |
| payload[k] = v |
|
|
| supports = meta["supports"] |
| return {k: v for k, v in payload.items() if k in supports} |
|
|
|
|
| |
| |
| |
| def _upscale_image(image_url: str, original_prompt: str) -> Optional[Dict[str, Any]]: |
| """Upscale an image using FAL.ai's Clarity Upscaler. |
| |
| Returns upscaled image dict, or None on failure (caller falls back to |
| the original image). |
| """ |
| try: |
| logger.info("Upscaling image with Clarity Upscaler...") |
|
|
| upscaler_arguments = { |
| "image_url": image_url, |
| "prompt": f"{UPSCALER_DEFAULT_PROMPT}, {original_prompt}", |
| "upscale_factor": UPSCALER_FACTOR, |
| "negative_prompt": UPSCALER_NEGATIVE_PROMPT, |
| "creativity": UPSCALER_CREATIVITY, |
| "resemblance": UPSCALER_RESEMBLANCE, |
| "guidance_scale": UPSCALER_GUIDANCE_SCALE, |
| "num_inference_steps": UPSCALER_NUM_INFERENCE_STEPS, |
| "enable_safety_checker": UPSCALER_SAFETY_CHECKER, |
| } |
|
|
| handler = _submit_fal_request(UPSCALER_MODEL, arguments=upscaler_arguments) |
| result = handler.get() |
|
|
| if result and "image" in result: |
| upscaled_image = result["image"] |
| logger.info( |
| "Image upscaled successfully to %sx%s", |
| upscaled_image.get("width", "unknown"), |
| upscaled_image.get("height", "unknown"), |
| ) |
| return { |
| "url": upscaled_image["url"], |
| "width": upscaled_image.get("width", 0), |
| "height": upscaled_image.get("height", 0), |
| "upscaled": True, |
| "upscale_factor": UPSCALER_FACTOR, |
| } |
| logger.error("Upscaler returned invalid response") |
| return None |
|
|
| except Exception as e: |
| logger.error("Error upscaling image: %s", e, exc_info=True) |
| return None |
|
|
|
|
| |
| |
| |
| def image_generate_tool( |
| prompt: str, |
| aspect_ratio: str = DEFAULT_ASPECT_RATIO, |
| num_inference_steps: Optional[int] = None, |
| guidance_scale: Optional[float] = None, |
| num_images: Optional[int] = None, |
| output_format: Optional[str] = None, |
| seed: Optional[int] = None, |
| ) -> str: |
| """Generate an image from a text prompt using the configured FAL model. |
| |
| The agent-facing schema exposes only ``prompt`` and ``aspect_ratio``; the |
| remaining kwargs are overrides for direct Python callers and are filtered |
| per-model via the ``supports`` whitelist (unsupported overrides are |
| silently dropped so legacy callers don't break when switching models). |
| |
| Returns a JSON string with ``{"success": bool, "image": url | None, |
| "error": str, "error_type": str}``. |
| """ |
| model_id, meta = _resolve_fal_model() |
|
|
| debug_call_data = { |
| "model": model_id, |
| "parameters": { |
| "prompt": prompt, |
| "aspect_ratio": aspect_ratio, |
| "num_inference_steps": num_inference_steps, |
| "guidance_scale": guidance_scale, |
| "num_images": num_images, |
| "output_format": output_format, |
| "seed": seed, |
| }, |
| "error": None, |
| "success": False, |
| "images_generated": 0, |
| "generation_time": 0, |
| } |
|
|
| start_time = datetime.datetime.now() |
|
|
| try: |
| if not prompt or not isinstance(prompt, str) or len(prompt.strip()) == 0: |
| raise ValueError("Prompt is required and must be a non-empty string") |
|
|
| if not (fal_key_is_configured() or _resolve_managed_fal_gateway()): |
| message = "FAL_KEY environment variable not set" |
| if managed_nous_tools_enabled(): |
| message += " and managed FAL gateway is unavailable" |
| raise ValueError(message) |
|
|
| aspect_lc = (aspect_ratio or DEFAULT_ASPECT_RATIO).lower().strip() |
| if aspect_lc not in VALID_ASPECT_RATIOS: |
| logger.warning( |
| "Invalid aspect_ratio '%s', defaulting to '%s'", |
| aspect_ratio, DEFAULT_ASPECT_RATIO, |
| ) |
| aspect_lc = DEFAULT_ASPECT_RATIO |
|
|
| overrides: Dict[str, Any] = {} |
| if num_inference_steps is not None: |
| overrides["num_inference_steps"] = num_inference_steps |
| if guidance_scale is not None: |
| overrides["guidance_scale"] = guidance_scale |
| if num_images is not None: |
| overrides["num_images"] = num_images |
| if output_format is not None: |
| overrides["output_format"] = output_format |
|
|
| arguments = _build_fal_payload( |
| model_id, prompt, aspect_lc, seed=seed, overrides=overrides, |
| ) |
|
|
| logger.info( |
| "Generating image with %s (%s) — prompt: %s", |
| meta.get("display", model_id), model_id, prompt[:80], |
| ) |
|
|
| handler = _submit_fal_request(model_id, arguments=arguments) |
| result = handler.get() |
|
|
| generation_time = (datetime.datetime.now() - start_time).total_seconds() |
|
|
| if not result or "images" not in result: |
| raise ValueError("Invalid response from FAL.ai API — no images returned") |
|
|
| images = result.get("images", []) |
| if not images: |
| raise ValueError("No images were generated") |
|
|
| should_upscale = bool(meta.get("upscale", False)) |
|
|
| formatted_images = [] |
| for img in images: |
| if not (isinstance(img, dict) and "url" in img): |
| continue |
| original_image = { |
| "url": img["url"], |
| "width": img.get("width", 0), |
| "height": img.get("height", 0), |
| } |
|
|
| if should_upscale: |
| upscaled_image = _upscale_image(img["url"], prompt.strip()) |
| if upscaled_image: |
| formatted_images.append(upscaled_image) |
| continue |
| logger.warning("Using original image as fallback (upscale failed)") |
|
|
| original_image["upscaled"] = False |
| formatted_images.append(original_image) |
|
|
| if not formatted_images: |
| raise ValueError("No valid image URLs returned from API") |
|
|
| upscaled_count = sum(1 for img in formatted_images if img.get("upscaled")) |
| logger.info( |
| "Generated %s image(s) in %.1fs (%s upscaled) via %s", |
| len(formatted_images), generation_time, upscaled_count, model_id, |
| ) |
|
|
| response_data = { |
| "success": True, |
| "image": formatted_images[0]["url"] if formatted_images else None, |
| } |
|
|
| debug_call_data["success"] = True |
| debug_call_data["images_generated"] = len(formatted_images) |
| debug_call_data["generation_time"] = generation_time |
| _debug.log_call("image_generate_tool", debug_call_data) |
| _debug.save() |
|
|
| return json.dumps(response_data, indent=2, ensure_ascii=False) |
|
|
| except Exception as e: |
| generation_time = (datetime.datetime.now() - start_time).total_seconds() |
| error_msg = f"Error generating image: {str(e)}" |
| logger.error("%s", error_msg, exc_info=True) |
|
|
| response_data = { |
| "success": False, |
| "image": None, |
| "error": str(e), |
| "error_type": type(e).__name__, |
| } |
|
|
| debug_call_data["error"] = error_msg |
| debug_call_data["generation_time"] = generation_time |
| _debug.log_call("image_generate_tool", debug_call_data) |
| _debug.save() |
|
|
| return json.dumps(response_data, indent=2, ensure_ascii=False) |
|
|
|
|
| def check_fal_api_key() -> bool: |
| """True if the FAL.ai API key (direct or managed gateway) is available.""" |
| return bool(fal_key_is_configured() or _resolve_managed_fal_gateway()) |
|
|
|
|
| def check_image_generation_requirements() -> bool: |
| """True if any image gen backend is available. |
| |
| Providers are considered in this order: |
| |
| 1. The in-tree FAL backend (FAL_KEY or managed gateway). |
| 2. Any plugin-registered provider whose ``is_available()`` returns True. |
| |
| Plugins win only when the in-tree FAL path is NOT ready, which matches |
| the historical behavior: shipping hermes with a FAL key configured |
| should still expose the tool. The active selection among ready |
| providers is resolved per-call by ``image_gen.provider``. |
| """ |
| try: |
| if check_fal_api_key(): |
| fal_client |
| return True |
| except ImportError: |
| pass |
|
|
| |
| try: |
| from agent.image_gen_registry import list_providers |
| from hermes_cli.plugins import _ensure_plugins_discovered |
|
|
| _ensure_plugins_discovered() |
| for provider in list_providers(): |
| try: |
| if provider.is_available(): |
| return True |
| except Exception: |
| continue |
| except Exception: |
| pass |
|
|
| return False |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| print("🎨 Image Generation Tools — FAL.ai multi-model support") |
| print("=" * 60) |
|
|
| if not check_fal_api_key(): |
| print("❌ FAL_KEY environment variable not set") |
| print(" Set it via: export FAL_KEY='your-key-here'") |
| print(" Get a key: https://fal.ai/") |
| raise SystemExit(1) |
| print("✅ FAL.ai API key found") |
|
|
| try: |
| import fal_client |
| print("✅ fal_client library available") |
| except ImportError: |
| print("❌ fal_client library not found — pip install fal-client") |
| raise SystemExit(1) |
|
|
| model_id, meta = _resolve_fal_model() |
| print(f"🤖 Active model: {meta.get('display', model_id)} ({model_id})") |
| print(f" Speed: {meta.get('speed', '?')} · Price: {meta.get('price', '?')}") |
| print(f" Upscaler: {'on' if meta.get('upscale') else 'off'}") |
|
|
| print("\nAvailable models:") |
| for mid, m in FAL_MODELS.items(): |
| marker = " ← active" if mid == model_id else "" |
| print(f" {mid:<32} {m.get('speed', '?'):<6} {m.get('price', '?')}{marker}") |
|
|
| if _debug.active: |
| print(f"\n🐛 Debug mode enabled — session {_debug.session_id}") |
|
|
|
|
| |
| |
| |
| from tools.registry import registry, tool_error |
|
|
| IMAGE_GENERATE_SCHEMA = { |
| "name": "image_generate", |
| "description": ( |
| "Generate high-quality images from text prompts. The underlying " |
| "backend (FAL, OpenAI, etc.) and model are user-configured and not " |
| "selectable by the agent. Returns either a URL or an absolute file " |
| "path in the `image` field; display it with markdown " |
| " and the gateway will deliver it." |
| ), |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "prompt": { |
| "type": "string", |
| "description": "The text prompt describing the desired image. Be detailed and descriptive.", |
| }, |
| "aspect_ratio": { |
| "type": "string", |
| "enum": list(VALID_ASPECT_RATIOS), |
| "description": "The aspect ratio of the generated image. 'landscape' is 16:9 wide, 'portrait' is 16:9 tall, 'square' is 1:1.", |
| "default": DEFAULT_ASPECT_RATIO, |
| }, |
| }, |
| "required": ["prompt"], |
| }, |
| } |
|
|
|
|
| def _read_configured_image_provider(): |
| """Return the value of ``image_gen.provider`` from config.yaml, or None. |
| |
| We only consult the plugin registry when this is explicitly set — an |
| unset value keeps users on the legacy in-tree FAL path even when other |
| providers happen to be registered (e.g. a user has OPENAI_API_KEY set |
| for other features but never asked for OpenAI image gen). |
| """ |
| try: |
| from hermes_cli.config import load_config |
| cfg = load_config() |
| section = cfg.get("image_gen") if isinstance(cfg, dict) else None |
| if isinstance(section, dict): |
| value = section.get("provider") |
| if isinstance(value, str) and value.strip(): |
| return value.strip() |
| except Exception as exc: |
| logger.debug("Could not read image_gen.provider: %s", exc) |
| return None |
|
|
|
|
| def _dispatch_to_plugin_provider(prompt: str, aspect_ratio: str): |
| """Route the call to a plugin-registered provider when one is selected. |
| |
| Returns a JSON string on dispatch, or ``None`` to fall through to the |
| built-in FAL path. |
| |
| Dispatch only fires when ``image_gen.provider`` is explicitly set AND |
| it does not point to ``fal`` (FAL still lives in-tree in this PR; |
| a later PR ports it into ``plugins/image_gen/fal/``). Any other value |
| that matches a registered plugin provider wins. |
| """ |
| configured = _read_configured_image_provider() |
| if not configured or configured == "fal": |
| return None |
|
|
| try: |
| |
| |
| from agent.image_gen_registry import get_provider |
| from hermes_cli.plugins import _ensure_plugins_discovered |
|
|
| _ensure_plugins_discovered() |
| provider = get_provider(configured) |
| except Exception as exc: |
| logger.debug("image_gen plugin dispatch skipped: %s", exc) |
| return None |
|
|
| if provider is None: |
| try: |
| |
| |
| |
| _ensure_plugins_discovered(force=True) |
| provider = get_provider(configured) |
| except Exception as exc: |
| logger.debug("image_gen plugin force-refresh skipped: %s", exc) |
|
|
| if provider is None: |
| return json.dumps({ |
| "success": False, |
| "image": None, |
| "error": ( |
| f"image_gen.provider='{configured}' is set but no plugin " |
| f"registered that name. Run `hermes plugins list` to see " |
| f"available image gen backends." |
| ), |
| "error_type": "provider_not_registered", |
| }) |
|
|
| try: |
| result = provider.generate(prompt=prompt, aspect_ratio=aspect_ratio) |
| except Exception as exc: |
| logger.warning( |
| "Image gen provider '%s' raised: %s", |
| getattr(provider, "name", "?"), exc, |
| ) |
| return json.dumps({ |
| "success": False, |
| "image": None, |
| "error": f"Provider '{getattr(provider, 'name', '?')}' error: {exc}", |
| "error_type": "provider_exception", |
| }) |
| if not isinstance(result, dict): |
| return json.dumps({ |
| "success": False, |
| "image": None, |
| "error": "Provider returned a non-dict result", |
| "error_type": "provider_contract", |
| }) |
| return json.dumps(result) |
|
|
|
|
| def _handle_image_generate(args, **kw): |
| prompt = args.get("prompt", "") |
| if not prompt: |
| return tool_error("prompt is required for image generation") |
| aspect_ratio = args.get("aspect_ratio", DEFAULT_ASPECT_RATIO) |
|
|
| |
| |
| dispatched = _dispatch_to_plugin_provider(prompt, aspect_ratio) |
| if dispatched is not None: |
| return dispatched |
|
|
| return image_generate_tool( |
| prompt=prompt, |
| aspect_ratio=aspect_ratio, |
| ) |
|
|
|
|
| registry.register( |
| name="image_generate", |
| toolset="image_gen", |
| schema=IMAGE_GENERATE_SCHEMA, |
| handler=_handle_image_generate, |
| check_fn=check_image_generation_requirements, |
| requires_env=[], |
| is_async=False, |
| emoji="🎨", |
| ) |
|
|