from __future__ import annotations import html import io import mimetypes import os import tempfile from pathlib import Path from typing import Any import gradio as gr import httpx from PIL import Image, ImageDraw, ImageFont TRIAL_GATEWAY_URL = os.getenv("TRIAL_GATEWAY_URL", "http://localhost:3000").rstrip("/") TRIAL_EXAMPLES_DIR = os.getenv("TRIAL_EXAMPLES_DIR", "examples").strip() or "examples" try: TRIAL_EXAMPLES_MAX = int(os.getenv("TRIAL_EXAMPLES_MAX", "12")) except ValueError: TRIAL_EXAMPLES_MAX = 12 TRIAL_EXAMPLES_MAX = max(1, min(TRIAL_EXAMPLES_MAX, 100)) ALLOWED_EXAMPLE_SUFFIXES = {".png", ".jpg", ".jpeg", ".webp"} SPACE_ROOT = Path(__file__).resolve().parent WATERMARK_TEXT = "go to carb-connect.com fully use" UPGRADE_MESSAGE = "Go to carb-connect.com to keep using this app" UPGRADE_URL = "https://carb-connect.com" APP_CODE = "NUGENTSCORE_1" CUSTOM_CSS = """ :root { --bg: #f6fbff; --ink: #0f172a; --muted: #475569; --primary: #0f766e; --ok-bg: #ecfdf5; --ok-border: #10b981; --error-bg: #fef2f2; --error-border: #ef4444; --info-bg: #eff6ff; --info-border: #3b82f6; } .gradio-container { background: linear-gradient(180deg, #f8fafc 0%, var(--bg) 100%); } #hero { border: 1px solid #dbe4f0; border-radius: 14px; padding: 18px 18px 16px 18px; background: radial-gradient(700px 220px at 92% -35%, rgba(15, 118, 110, 0.14) 0%, transparent 58%), #ffffff; box-shadow: 0 14px 34px rgba(15, 23, 42, 0.08); } #hero h1 { margin: 6px 0 0 0; color: var(--ink); font-size: 1.62rem; line-height: 1.2; } #hero p { margin: 10px 0 0 0; color: var(--muted); max-width: 760px; } .hero-eyebrow { display: inline-flex; align-items: center; gap: 6px; margin: 0; font-size: 0.76rem; letter-spacing: 0.06em; font-weight: 700; text-transform: uppercase; color: #0f766e; } .hero-highlight { color: #0f766e; font-weight: 700; } .hero-actions { margin-top: 14px; display: flex; gap: 10px; flex-wrap: wrap; } .hero-cta { display: inline-block; text-decoration: none; border-radius: 10px; padding: 9px 14px; font-weight: 700; background: #0f766e; color: #ffffff !important; } .hero-note { display: inline-block; border-radius: 10px; padding: 9px 12px; background: #f1f5f9; color: #334155; font-size: 0.9rem; } .panel { border: 1px solid #dbe4f0; border-radius: 14px; padding: 12px; background: #ffffff; } .status-card { border-radius: 10px; border: 1px solid; padding: 10px 12px; font-size: 0.95rem; } .status-ok { background: var(--ok-bg); border-color: var(--ok-border); color: #065f46; } .status-error { background: var(--error-bg); border-color: var(--error-border); color: #991b1b; } .status-info { background: var(--info-bg); border-color: var(--info-border); color: #1e3a8a; } #run-btn { background: var(--primary); color: #ffffff; border: none; } .upgrade-cta { margin-top: 12px; border-radius: 14px; border: 1px solid #f59e0b; background: radial-gradient(820px 240px at 88% -35%, rgba(245, 158, 11, 0.25) 0%, transparent 58%), linear-gradient(135deg, #fff8eb 0%, #ffedd5 100%); box-shadow: 0 12px 30px rgba(245, 158, 11, 0.18); padding: 16px 18px; } .upgrade-cta-title { margin: 0; color: #9a3412; font-weight: 800; font-size: 1.02rem; } .upgrade-cta-copy { margin: 7px 0 0 0; color: #7c2d12; font-size: 0.93rem; } .upgrade-cta-button { display: inline-block; margin-top: 12px; padding: 9px 14px; border-radius: 10px; text-decoration: none; background: #b45309; color: #ffffff !important; font-weight: 700; } """ def _safe_json(response: httpx.Response) -> dict[str, Any]: try: payload = response.json() except ValueError: return {} return payload if isinstance(payload, dict) else {} def _render_status(message: str, level: str = "info") -> str: safe_message = html.escape(message) if level not in {"ok", "error", "info"}: level = "info" return f"
{safe_message}
" def _render_upgrade_cta(visible: bool) -> str: if not visible: return "" safe_message = html.escape(UPGRADE_MESSAGE) safe_url = html.escape(UPGRADE_URL, quote=True) return ( "
" f"

{safe_message}

" "

You have reached your free limit for this demo.

" f"" "Go to carb-connect.com" "" "
" ) def _write_temp_image(image_bytes: bytes, suffix: str = ".png") -> str: with tempfile.NamedTemporaryFile(prefix="zoi-output-", suffix=suffix, delete=False) as tmp: tmp.write(image_bytes) return tmp.name def _resolve_examples_dir() -> Path: examples_dir = Path(TRIAL_EXAMPLES_DIR) if not examples_dir.is_absolute(): examples_dir = SPACE_ROOT / examples_dir return examples_dir def _load_example_choices() -> list[tuple[str, str]]: examples_dir = _resolve_examples_dir() if not examples_dir.exists() or not examples_dir.is_dir(): return [] files = sorted( [path for path in examples_dir.iterdir() if path.is_file() and path.suffix.lower() in ALLOWED_EXAMPLE_SUFFIXES], key=lambda path: path.name.lower(), )[:TRIAL_EXAMPLES_MAX] labels: set[str] = set() choices: list[tuple[str, str]] = [] for path in files: base_label = path.stem.replace("-", " ").replace("_", " ").strip() or path.name label = base_label index = 2 while label in labels: label = f"{base_label} ({index})" index += 1 labels.add(label) choices.append((label, str(path))) return choices def _post_image_echo(image_path: str, hf_token: str) -> httpx.Response: infer_url = f"{TRIAL_GATEWAY_URL}/v1/trial/infer" headers = {"Authorization": f"Bearer {hf_token}"} mime_type = mimetypes.guess_type(image_path)[0] or "application/octet-stream" form_data = {"app_code": APP_CODE} with open(image_path, "rb") as img_file: files = {"image": (Path(image_path).name, img_file, mime_type)} return httpx.post(infer_url, headers=headers, files=files, data=form_data, timeout=45.0) def _content_type_to_format(content_type: str) -> str: normalized = (content_type or "").split(";")[0].strip().lower() if normalized in {"image/jpeg", "image/jpg"}: return "JPEG" if normalized == "image/png": return "PNG" if normalized == "image/webp": return "WEBP" return "PNG" def _apply_watermark(image_bytes: bytes, content_type: str) -> bytes: with Image.open(io.BytesIO(image_bytes)) as source_image: base_image = source_image.convert("RGBA") width, height = base_image.size if width <= 0 or height <= 0: raise ValueError("Invalid image dimensions for watermarking.") overlay = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) margin = max(12, int(min(width, height) * 0.03)) font_size = max(14, int(min(width, height) * 0.035)) try: font = ImageFont.truetype("DejaVuSans.ttf", font_size) except OSError: font = ImageFont.load_default() text_box = draw.textbbox((0, 0), WATERMARK_TEXT, font=font) text_width = text_box[2] - text_box[0] text_height = text_box[3] - text_box[1] text_x = max(margin, width - text_width - margin) text_y = max(margin, height - text_height - margin) for offset_x, offset_y in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: draw.text( (text_x + offset_x, text_y + offset_y), WATERMARK_TEXT, font=font, fill=(0, 0, 0, 110), ) draw.text((text_x, text_y), WATERMARK_TEXT, font=font, fill=(255, 255, 255, 160)) composited = Image.alpha_composite(base_image, overlay) output_format = _content_type_to_format(content_type) output_buffer = io.BytesIO() if output_format == "JPEG": composited = composited.convert("RGB") composited.save(output_buffer, format=output_format) return output_buffer.getvalue() def _run_image_trial( input_image_path: str | None, upgrade_cta_visible: bool, oauth_token: gr.OAuthToken | None = None, ) -> tuple[str | None, str, str, bool]: current_cta_visible = bool(upgrade_cta_visible) if oauth_token is None or not oauth_token.token: return ( None, _render_status("Please sign in with Hugging Face first.", "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) if not input_image_path: return ( None, _render_status("You must upload an input image.", "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) try: response = _post_image_echo(input_image_path, oauth_token.token) except httpx.HTTPError as error: return ( None, _render_status(f"Could not connect to the backend: {error}", "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) if response.status_code >= 400: parsed = _safe_json(response) message = str(parsed.get("error") or parsed.get("message") or f"Request failed with status {response.status_code}") lower_message = message.lower() quota_reached = response.status_code == 403 or "no remaining uses" in lower_message or "remaining attempts: 0" in lower_message if quota_reached: current_cta_visible = True return ( None, _render_status(UPGRADE_MESSAGE, "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) return ( None, _render_status(message, "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) content_type = response.headers.get("content-type", "").split(";")[0].strip().lower() if not content_type.startswith("image/"): return ( None, _render_status("Backend response did not include an image.", "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) try: output_bytes = _apply_watermark(response.content, content_type) except Exception as error: return ( None, _render_status(f"Could not apply watermark: {error}", "error"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) ext = mimetypes.guess_extension(content_type) or ".png" output_path = _write_temp_image(output_bytes, ext) remaining_calls = response.headers.get("x-remaining-calls") status_message = "Processing complete." if remaining_calls is not None and remaining_calls.strip().isdigit(): remaining_value = int(remaining_calls.strip()) if remaining_value <= 0: current_cta_visible = True status_message = f"Processing complete. Remaining attempts: 0. {UPGRADE_MESSAGE}" else: status_message = f"Processing complete. Remaining attempts: {remaining_value}" elif remaining_calls is not None and remaining_calls.strip(): status_message = f"Processing complete. Remaining attempts: {remaining_calls.strip()}" return ( output_path, _render_status(status_message, "ok"), _render_upgrade_cta(current_cta_visible), current_cta_visible, ) def _select_example(example_path: str | None) -> str | None: return example_path or None EXAMPLE_CHOICES = _load_example_choices() NO_EXAMPLES_MESSAGE = ( f"No examples found in `{_resolve_examples_dir()}`. " "Add images (`.png`, `.jpg`, `.jpeg`, `.webp`) to enable the selector." ) with gr.Blocks( title="Nugent Score AI 1.0 - Image Echo Demo", css=CUSTOM_CSS, theme=gr.themes.Base( primary_hue="teal", neutral_hue="slate", font=[gr.themes.GoogleFont("Space Grotesk"), "ui-sans-serif", "sans-serif"], ), ) as demo: gr.HTML( """

Nugent Score AI 1.0

Nugent Score AI 1.0 Demo

Sign in with Hugging Face, upload your image, and get an instant preview while trial attempts are available.

Want to use the app without restrictions? Get full access at carb-connect.com.

Unlock Full Access
""" ) with gr.Row(): gr.LoginButton("Sign in with Hugging Face") if hasattr(gr, "LogoutButton"): gr.LogoutButton("Sign out") upgrade_cta_state = gr.State(False) upgrade_cta_html = gr.HTML(_render_upgrade_cta(False)) with gr.Row(equal_height=True): with gr.Column(scale=1, elem_classes=["panel"]): example_selector = gr.Dropdown( label="Use example image", choices=EXAMPLE_CHOICES, value=None, interactive=bool(EXAMPLE_CHOICES), ) if not EXAMPLE_CHOICES: gr.Markdown(NO_EXAMPLES_MESSAGE) gr.Textbox( label="Type of speciment", value="Vaginal Swab", interactive=False, info="Informational only", ) input_image = gr.Image(type="filepath", label="Input image") run_button = gr.Button("Send image", elem_id="run-btn") with gr.Column(scale=1, elem_classes=["panel"]): output_image = gr.Image(type="filepath", label="Output image") trial_status = gr.HTML(_render_status("Sign in and upload an image to get started.", "info")) example_selector.change( _select_example, inputs=[example_selector], outputs=[input_image], ) run_button.click( _run_image_trial, inputs=[input_image, upgrade_cta_state], outputs=[output_image, trial_status, upgrade_cta_html, upgrade_cta_state], show_progress="full", ) demo.queue().launch()