| from __future__ import annotations |
|
|
| import html |
| import io |
| import mimetypes |
| import os |
| import tempfile |
| from pathlib import Path |
| from typing import Any |
|
|
| import gradio as gr |
| import httpx |
| from PIL import Image, ImageDraw, ImageFont |
|
|
| TRIAL_GATEWAY_URL = os.getenv("TRIAL_GATEWAY_URL", "http://localhost:3000").rstrip("/") |
| TRIAL_EXAMPLES_DIR = os.getenv("TRIAL_EXAMPLES_DIR", "examples").strip() or "examples" |
| try: |
| TRIAL_EXAMPLES_MAX = int(os.getenv("TRIAL_EXAMPLES_MAX", "12")) |
| except ValueError: |
| TRIAL_EXAMPLES_MAX = 12 |
| TRIAL_EXAMPLES_MAX = max(1, min(TRIAL_EXAMPLES_MAX, 100)) |
|
|
| ALLOWED_EXAMPLE_SUFFIXES = {".png", ".jpg", ".jpeg", ".webp"} |
| SPACE_ROOT = Path(__file__).resolve().parent |
| WATERMARK_TEXT = "go to carb-connect.com fully use" |
| UPGRADE_MESSAGE = "Go to carb-connect.com to keep using this app" |
| UPGRADE_URL = "https://carb-connect.com" |
| APP_CODE = "ZOI_PRO" |
| DEFAULT_DISH_DIAMETER = 90 |
|
|
| CUSTOM_CSS = """ |
| :root { |
| --bg: #f6fbff; |
| --ink: #0f172a; |
| --muted: #475569; |
| --primary: #0f766e; |
| --ok-bg: #ecfdf5; |
| --ok-border: #10b981; |
| --error-bg: #fef2f2; |
| --error-border: #ef4444; |
| --info-bg: #eff6ff; |
| --info-border: #3b82f6; |
| } |
| |
| .gradio-container { |
| background: linear-gradient(180deg, #f8fafc 0%, var(--bg) 100%); |
| } |
| |
| #hero { |
| border: 1px solid #dbe4f0; |
| border-radius: 14px; |
| padding: 18px 18px 16px 18px; |
| background: |
| radial-gradient(700px 220px at 92% -35%, rgba(15, 118, 110, 0.14) 0%, transparent 58%), |
| #ffffff; |
| box-shadow: 0 14px 34px rgba(15, 23, 42, 0.08); |
| } |
| |
| #hero h1 { |
| margin: 6px 0 0 0; |
| color: var(--ink); |
| font-size: 1.62rem; |
| line-height: 1.2; |
| } |
| |
| #hero p { |
| margin: 10px 0 0 0; |
| color: var(--muted); |
| max-width: 760px; |
| } |
| |
| .hero-eyebrow { |
| display: inline-flex; |
| align-items: center; |
| gap: 6px; |
| margin: 0; |
| font-size: 0.76rem; |
| letter-spacing: 0.06em; |
| font-weight: 700; |
| text-transform: uppercase; |
| color: #0f766e; |
| } |
| |
| .hero-highlight { |
| color: #0f766e; |
| font-weight: 700; |
| } |
| |
| .hero-actions { |
| margin-top: 14px; |
| display: flex; |
| gap: 10px; |
| flex-wrap: wrap; |
| } |
| |
| .hero-cta { |
| display: inline-block; |
| text-decoration: none; |
| border-radius: 10px; |
| padding: 9px 14px; |
| font-weight: 700; |
| background: #0f766e; |
| color: #ffffff !important; |
| } |
| |
| .hero-note { |
| display: inline-block; |
| border-radius: 10px; |
| padding: 9px 12px; |
| background: #f1f5f9; |
| color: #334155; |
| font-size: 0.9rem; |
| } |
| |
| .panel { |
| border: 1px solid #dbe4f0; |
| border-radius: 14px; |
| padding: 12px; |
| background: #ffffff; |
| } |
| |
| .status-card { |
| border-radius: 10px; |
| border: 1px solid; |
| padding: 10px 12px; |
| font-size: 0.95rem; |
| } |
| |
| .status-ok { |
| background: var(--ok-bg); |
| border-color: var(--ok-border); |
| color: #065f46; |
| } |
| |
| .status-error { |
| background: var(--error-bg); |
| border-color: var(--error-border); |
| color: #991b1b; |
| } |
| |
| .status-info { |
| background: var(--info-bg); |
| border-color: var(--info-border); |
| color: #1e3a8a; |
| } |
| |
| #run-btn { |
| background: var(--primary); |
| color: #ffffff; |
| border: none; |
| } |
| |
| .upgrade-cta { |
| margin-top: 12px; |
| border-radius: 14px; |
| border: 1px solid #f59e0b; |
| background: |
| radial-gradient(820px 240px at 88% -35%, rgba(245, 158, 11, 0.25) 0%, transparent 58%), |
| linear-gradient(135deg, #fff8eb 0%, #ffedd5 100%); |
| box-shadow: 0 12px 30px rgba(245, 158, 11, 0.18); |
| padding: 16px 18px; |
| } |
| |
| .upgrade-cta-title { |
| margin: 0; |
| color: #9a3412; |
| font-weight: 800; |
| font-size: 1.02rem; |
| } |
| |
| .upgrade-cta-copy { |
| margin: 7px 0 0 0; |
| color: #7c2d12; |
| font-size: 0.93rem; |
| } |
| |
| .upgrade-cta-button { |
| display: inline-block; |
| margin-top: 12px; |
| padding: 9px 14px; |
| border-radius: 10px; |
| text-decoration: none; |
| background: #b45309; |
| color: #ffffff !important; |
| font-weight: 700; |
| } |
| """ |
|
|
|
|
| def _safe_json(response: httpx.Response) -> dict[str, Any]: |
| try: |
| payload = response.json() |
| except ValueError: |
| return {} |
| return payload if isinstance(payload, dict) else {} |
|
|
|
|
| def _render_status(message: str, level: str = "info") -> str: |
| safe_message = html.escape(message) |
| if level not in {"ok", "error", "info"}: |
| level = "info" |
| return f"<div class='status-card status-{level}'>{safe_message}</div>" |
|
|
|
|
| def _render_upgrade_cta(visible: bool) -> str: |
| if not visible: |
| return "" |
| safe_message = html.escape(UPGRADE_MESSAGE) |
| safe_url = html.escape(UPGRADE_URL, quote=True) |
| return ( |
| "<section class='upgrade-cta'>" |
| f"<p class='upgrade-cta-title'>{safe_message}</p>" |
| "<p class='upgrade-cta-copy'>You have reached your free limit for this demo.</p>" |
| f"<a class='upgrade-cta-button' href='{safe_url}' target='_blank' rel='noopener noreferrer'>" |
| "Go to carb-connect.com" |
| "</a>" |
| "</section>" |
| ) |
|
|
|
|
| def _write_temp_image(image_bytes: bytes, suffix: str = ".png") -> str: |
| with tempfile.NamedTemporaryFile(prefix="zoi-output-", suffix=suffix, delete=False) as tmp: |
| tmp.write(image_bytes) |
| return tmp.name |
|
|
|
|
| def _resolve_examples_dir() -> Path: |
| examples_dir = Path(TRIAL_EXAMPLES_DIR) |
| if not examples_dir.is_absolute(): |
| examples_dir = SPACE_ROOT / examples_dir |
| return examples_dir |
|
|
|
|
| def _load_example_choices() -> list[tuple[str, str]]: |
| examples_dir = _resolve_examples_dir() |
| if not examples_dir.exists() or not examples_dir.is_dir(): |
| return [] |
|
|
| files = sorted( |
| [path for path in examples_dir.iterdir() if path.is_file() and path.suffix.lower() in ALLOWED_EXAMPLE_SUFFIXES], |
| key=lambda path: path.name.lower(), |
| )[:TRIAL_EXAMPLES_MAX] |
|
|
| labels: set[str] = set() |
| choices: list[tuple[str, str]] = [] |
| for path in files: |
| base_label = path.stem.replace("-", " ").replace("_", " ").strip() or path.name |
| label = base_label |
| index = 2 |
| while label in labels: |
| label = f"{base_label} ({index})" |
| index += 1 |
| labels.add(label) |
| choices.append((label, str(path))) |
| return choices |
|
|
|
|
| def _normalize_dish_diameter(diameter: float | None) -> int | None: |
| if diameter is None: |
| return None |
| try: |
| parsed = float(diameter) |
| except (TypeError, ValueError): |
| return None |
| if parsed <= 0 or not parsed.is_integer(): |
| return None |
| return int(parsed) |
|
|
|
|
| def _post_image_echo(image_path: str, hf_token: str, dish_diameter: int) -> httpx.Response: |
| infer_url = f"{TRIAL_GATEWAY_URL}/v1/trial/infer" |
| headers = {"Authorization": f"Bearer {hf_token}"} |
| mime_type = mimetypes.guess_type(image_path)[0] or "application/octet-stream" |
| form_data: dict[str, str | int] = { |
| "app_code": APP_CODE, |
| "dish_diameter": dish_diameter, |
| } |
|
|
| with open(image_path, "rb") as img_file: |
| files = {"image": (Path(image_path).name, img_file, mime_type)} |
| return httpx.post(infer_url, headers=headers, files=files, data=form_data, timeout=45.0) |
|
|
|
|
| def _content_type_to_format(content_type: str) -> str: |
| normalized = (content_type or "").split(";")[0].strip().lower() |
| if normalized in {"image/jpeg", "image/jpg"}: |
| return "JPEG" |
| if normalized == "image/png": |
| return "PNG" |
| if normalized == "image/webp": |
| return "WEBP" |
| return "PNG" |
|
|
|
|
| def _apply_watermark(image_bytes: bytes, content_type: str) -> bytes: |
| with Image.open(io.BytesIO(image_bytes)) as source_image: |
| base_image = source_image.convert("RGBA") |
|
|
| width, height = base_image.size |
| if width <= 0 or height <= 0: |
| raise ValueError("Invalid image dimensions for watermarking.") |
|
|
| overlay = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) |
| draw = ImageDraw.Draw(overlay) |
|
|
| margin = max(12, int(min(width, height) * 0.03)) |
| font_size = max(14, int(min(width, height) * 0.035)) |
| try: |
| font = ImageFont.truetype("DejaVuSans.ttf", font_size) |
| except OSError: |
| font = ImageFont.load_default() |
|
|
| text_box = draw.textbbox((0, 0), WATERMARK_TEXT, font=font) |
| text_width = text_box[2] - text_box[0] |
| text_height = text_box[3] - text_box[1] |
| text_x = max(margin, width - text_width - margin) |
| text_y = max(margin, height - text_height - margin) |
|
|
| for offset_x, offset_y in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: |
| draw.text( |
| (text_x + offset_x, text_y + offset_y), |
| WATERMARK_TEXT, |
| font=font, |
| fill=(0, 0, 0, 110), |
| ) |
| draw.text((text_x, text_y), WATERMARK_TEXT, font=font, fill=(255, 255, 255, 160)) |
|
|
| composited = Image.alpha_composite(base_image, overlay) |
| output_format = _content_type_to_format(content_type) |
|
|
| output_buffer = io.BytesIO() |
| if output_format == "JPEG": |
| composited = composited.convert("RGB") |
| composited.save(output_buffer, format=output_format) |
| return output_buffer.getvalue() |
|
|
|
|
| def _run_image_trial( |
| input_image_path: str | None, |
| diameter: float | None, |
| upgrade_cta_visible: bool, |
| oauth_token: gr.OAuthToken | None = None, |
| ) -> tuple[str | None, str, str, bool]: |
| current_cta_visible = bool(upgrade_cta_visible) |
|
|
| if oauth_token is None or not oauth_token.token: |
| return ( |
| None, |
| _render_status("Please sign in with Hugging Face first.", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| if not input_image_path: |
| return ( |
| None, |
| _render_status("You must upload an input image.", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| normalized_diameter = _normalize_dish_diameter(diameter) |
| if normalized_diameter is None: |
| return ( |
| None, |
| _render_status("Diameter must be a whole number greater than 0.", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| try: |
| response = _post_image_echo(input_image_path, oauth_token.token, normalized_diameter) |
| except httpx.HTTPError as error: |
| return ( |
| None, |
| _render_status(f"Could not connect to the backend: {error}", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| if response.status_code >= 400: |
| parsed = _safe_json(response) |
| message = str(parsed.get("error") or parsed.get("message") or f"Request failed with status {response.status_code}") |
| lower_message = message.lower() |
| quota_reached = response.status_code == 403 or "no remaining uses" in lower_message or "remaining attempts: 0" in lower_message |
| if quota_reached: |
| current_cta_visible = True |
| return ( |
| None, |
| _render_status(UPGRADE_MESSAGE, "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
| return ( |
| None, |
| _render_status(message, "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| content_type = response.headers.get("content-type", "").split(";")[0].strip().lower() |
| if not content_type.startswith("image/"): |
| return ( |
| None, |
| _render_status("Backend response did not include an image.", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| try: |
| output_bytes = _apply_watermark(response.content, content_type) |
| except Exception as error: |
| return ( |
| None, |
| _render_status(f"Could not apply watermark: {error}", "error"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
| ext = mimetypes.guess_extension(content_type) or ".png" |
| output_path = _write_temp_image(output_bytes, ext) |
| remaining_calls = response.headers.get("x-remaining-calls") |
|
|
| status_message = "Processing complete." |
| if remaining_calls is not None and remaining_calls.strip().isdigit(): |
| remaining_value = int(remaining_calls.strip()) |
| if remaining_value <= 0: |
| current_cta_visible = True |
| status_message = f"Processing complete. Remaining attempts: 0. {UPGRADE_MESSAGE}" |
| else: |
| status_message = f"Processing complete. Remaining attempts: {remaining_value}" |
| elif remaining_calls is not None and remaining_calls.strip(): |
| status_message = f"Processing complete. Remaining attempts: {remaining_calls.strip()}" |
|
|
| return ( |
| output_path, |
| _render_status(status_message, "ok"), |
| _render_upgrade_cta(current_cta_visible), |
| current_cta_visible, |
| ) |
|
|
|
|
| def _select_example(example_path: str | None) -> str | None: |
| return example_path or None |
|
|
|
|
| EXAMPLE_CHOICES = _load_example_choices() |
| NO_EXAMPLES_MESSAGE = ( |
| f"No examples found in `{_resolve_examples_dir()}`. " |
| "Add images (`.png`, `.jpg`, `.jpeg`, `.webp`) to enable the selector." |
| ) |
|
|
| with gr.Blocks( |
| title="ZOI PRO - Image Echo Demo", |
| css=CUSTOM_CSS, |
| theme=gr.themes.Base( |
| primary_hue="teal", |
| neutral_hue="slate", |
| font=[gr.themes.GoogleFont("Space Grotesk"), "ui-sans-serif", "sans-serif"], |
| ), |
| ) as demo: |
| gr.HTML( |
| """ |
| <section id="hero"> |
| <p class="hero-eyebrow">ZOI PRO</p> |
| <h1>ZOI PRO Demo</h1> |
| <p>Sign in with Hugging Face, upload your image, and get an instant preview while trial attempts are available.</p> |
| <p class="hero-note">Want to use the app <span class="hero-highlight">without restrictions</span>? Get full access at carb-connect.com.</p> |
| <div class="hero-actions"> |
| <a class="hero-cta" href="https://carb-connect.com" target="_blank" rel="noopener noreferrer">Unlock Full Access</a> |
| </div> |
| </section> |
| """ |
| ) |
|
|
| with gr.Row(): |
| gr.LoginButton("Sign in with Hugging Face") |
| if hasattr(gr, "LogoutButton"): |
| gr.LogoutButton("Sign out") |
|
|
| upgrade_cta_state = gr.State(False) |
| upgrade_cta_html = gr.HTML(_render_upgrade_cta(False)) |
|
|
| with gr.Row(equal_height=True): |
| with gr.Column(scale=1, elem_classes=["panel"]): |
| example_selector = gr.Dropdown( |
| label="Use example image", |
| choices=EXAMPLE_CHOICES, |
| value=None, |
| interactive=bool(EXAMPLE_CHOICES), |
| ) |
| if not EXAMPLE_CHOICES: |
| gr.Markdown(NO_EXAMPLES_MESSAGE) |
| input_image = gr.Image(type="filepath", label="Input image") |
| diameter_input = gr.Number( |
| label="Diameter (mm)", |
| value=DEFAULT_DISH_DIAMETER, |
| minimum=1, |
| precision=0, |
| placeholder="Enter diameter in mm", |
| ) |
| run_button = gr.Button("Send image", elem_id="run-btn") |
| with gr.Column(scale=1, elem_classes=["panel"]): |
| output_image = gr.Image(type="filepath", label="Output image") |
|
|
| trial_status = gr.HTML(_render_status("Sign in and upload an image to get started.", "info")) |
|
|
| example_selector.change( |
| _select_example, |
| inputs=[example_selector], |
| outputs=[input_image], |
| ) |
|
|
| run_button.click( |
| _run_image_trial, |
| inputs=[input_image, diameter_input, upgrade_cta_state], |
| outputs=[output_image, trial_status, upgrade_cta_html, upgrade_cta_state], |
| show_progress="full", |
| ) |
|
|
| demo.queue().launch() |
|
|