Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import html | |
| import io | |
| import mimetypes | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| import httpx | |
| from PIL import Image, ImageDraw, ImageFont | |
| TRIAL_GATEWAY_URL = os.getenv("TRIAL_GATEWAY_URL", "http://localhost:3000").rstrip("/") | |
| TRIAL_EXAMPLES_DIR = os.getenv("TRIAL_EXAMPLES_DIR", "examples").strip() or "examples" | |
| try: | |
| TRIAL_EXAMPLES_MAX = int(os.getenv("TRIAL_EXAMPLES_MAX", "12")) | |
| except ValueError: | |
| TRIAL_EXAMPLES_MAX = 12 | |
| TRIAL_EXAMPLES_MAX = max(1, min(TRIAL_EXAMPLES_MAX, 100)) | |
| ALLOWED_EXAMPLE_SUFFIXES = {".png", ".jpg", ".jpeg", ".webp"} | |
| SPACE_ROOT = Path(__file__).resolve().parent | |
| WATERMARK_TEXT = "go to carb-connect.com fully use" | |
| UPGRADE_MESSAGE = "Go to carb-connect.com to keep using this app" | |
| UPGRADE_URL = "https://carb-connect.com" | |
| APP_CODE = "NUGENTSCORE_1" | |
| CUSTOM_CSS = """ | |
| :root { | |
| --bg: #f6fbff; | |
| --ink: #0f172a; | |
| --muted: #475569; | |
| --primary: #0f766e; | |
| --ok-bg: #ecfdf5; | |
| --ok-border: #10b981; | |
| --error-bg: #fef2f2; | |
| --error-border: #ef4444; | |
| --info-bg: #eff6ff; | |
| --info-border: #3b82f6; | |
| } | |
| .gradio-container { | |
| background: linear-gradient(180deg, #f8fafc 0%, var(--bg) 100%); | |
| } | |
| #hero { | |
| border: 1px solid #dbe4f0; | |
| border-radius: 14px; | |
| padding: 18px 18px 16px 18px; | |
| background: | |
| radial-gradient(700px 220px at 92% -35%, rgba(15, 118, 110, 0.14) 0%, transparent 58%), | |
| #ffffff; | |
| box-shadow: 0 14px 34px rgba(15, 23, 42, 0.08); | |
| } | |
| #hero h1 { | |
| margin: 6px 0 0 0; | |
| color: var(--ink); | |
| font-size: 1.62rem; | |
| line-height: 1.2; | |
| } | |
| #hero p { | |
| margin: 10px 0 0 0; | |
| color: var(--muted); | |
| max-width: 760px; | |
| } | |
| .hero-eyebrow { | |
| display: inline-flex; | |
| align-items: center; | |
| gap: 6px; | |
| margin: 0; | |
| font-size: 0.76rem; | |
| letter-spacing: 0.06em; | |
| font-weight: 700; | |
| text-transform: uppercase; | |
| color: #0f766e; | |
| } | |
| .hero-highlight { | |
| color: #0f766e; | |
| font-weight: 700; | |
| } | |
| .hero-actions { | |
| margin-top: 14px; | |
| display: flex; | |
| gap: 10px; | |
| flex-wrap: wrap; | |
| } | |
| .hero-cta { | |
| display: inline-block; | |
| text-decoration: none; | |
| border-radius: 10px; | |
| padding: 9px 14px; | |
| font-weight: 700; | |
| background: #0f766e; | |
| color: #ffffff !important; | |
| } | |
| .hero-note { | |
| display: inline-block; | |
| border-radius: 10px; | |
| padding: 9px 12px; | |
| background: #f1f5f9; | |
| color: #334155; | |
| font-size: 0.9rem; | |
| } | |
| .panel { | |
| border: 1px solid #dbe4f0; | |
| border-radius: 14px; | |
| padding: 12px; | |
| background: #ffffff; | |
| } | |
| .status-card { | |
| border-radius: 10px; | |
| border: 1px solid; | |
| padding: 10px 12px; | |
| font-size: 0.95rem; | |
| } | |
| .status-ok { | |
| background: var(--ok-bg); | |
| border-color: var(--ok-border); | |
| color: #065f46; | |
| } | |
| .status-error { | |
| background: var(--error-bg); | |
| border-color: var(--error-border); | |
| color: #991b1b; | |
| } | |
| .status-info { | |
| background: var(--info-bg); | |
| border-color: var(--info-border); | |
| color: #1e3a8a; | |
| } | |
| #run-btn { | |
| background: var(--primary); | |
| color: #ffffff; | |
| border: none; | |
| } | |
| .upgrade-cta { | |
| margin-top: 12px; | |
| border-radius: 14px; | |
| border: 1px solid #f59e0b; | |
| background: | |
| radial-gradient(820px 240px at 88% -35%, rgba(245, 158, 11, 0.25) 0%, transparent 58%), | |
| linear-gradient(135deg, #fff8eb 0%, #ffedd5 100%); | |
| box-shadow: 0 12px 30px rgba(245, 158, 11, 0.18); | |
| padding: 16px 18px; | |
| } | |
| .upgrade-cta-title { | |
| margin: 0; | |
| color: #9a3412; | |
| font-weight: 800; | |
| font-size: 1.02rem; | |
| } | |
| .upgrade-cta-copy { | |
| margin: 7px 0 0 0; | |
| color: #7c2d12; | |
| font-size: 0.93rem; | |
| } | |
| .upgrade-cta-button { | |
| display: inline-block; | |
| margin-top: 12px; | |
| padding: 9px 14px; | |
| border-radius: 10px; | |
| text-decoration: none; | |
| background: #b45309; | |
| color: #ffffff !important; | |
| font-weight: 700; | |
| } | |
| """ | |
| def _safe_json(response: httpx.Response) -> dict[str, Any]: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| return {} | |
| return payload if isinstance(payload, dict) else {} | |
| def _render_status(message: str, level: str = "info") -> str: | |
| safe_message = html.escape(message) | |
| if level not in {"ok", "error", "info"}: | |
| level = "info" | |
| return f"<div class='status-card status-{level}'>{safe_message}</div>" | |
| def _render_upgrade_cta(visible: bool) -> str: | |
| if not visible: | |
| return "" | |
| safe_message = html.escape(UPGRADE_MESSAGE) | |
| safe_url = html.escape(UPGRADE_URL, quote=True) | |
| return ( | |
| "<section class='upgrade-cta'>" | |
| f"<p class='upgrade-cta-title'>{safe_message}</p>" | |
| "<p class='upgrade-cta-copy'>You have reached your free limit for this demo.</p>" | |
| f"<a class='upgrade-cta-button' href='{safe_url}' target='_blank' rel='noopener noreferrer'>" | |
| "Go to carb-connect.com" | |
| "</a>" | |
| "</section>" | |
| ) | |
| def _write_temp_image(image_bytes: bytes, suffix: str = ".png") -> str: | |
| with tempfile.NamedTemporaryFile(prefix="zoi-output-", suffix=suffix, delete=False) as tmp: | |
| tmp.write(image_bytes) | |
| return tmp.name | |
| def _resolve_examples_dir() -> Path: | |
| examples_dir = Path(TRIAL_EXAMPLES_DIR) | |
| if not examples_dir.is_absolute(): | |
| examples_dir = SPACE_ROOT / examples_dir | |
| return examples_dir | |
| def _load_example_choices() -> list[tuple[str, str]]: | |
| examples_dir = _resolve_examples_dir() | |
| if not examples_dir.exists() or not examples_dir.is_dir(): | |
| return [] | |
| files = sorted( | |
| [path for path in examples_dir.iterdir() if path.is_file() and path.suffix.lower() in ALLOWED_EXAMPLE_SUFFIXES], | |
| key=lambda path: path.name.lower(), | |
| )[:TRIAL_EXAMPLES_MAX] | |
| labels: set[str] = set() | |
| choices: list[tuple[str, str]] = [] | |
| for path in files: | |
| base_label = path.stem.replace("-", " ").replace("_", " ").strip() or path.name | |
| label = base_label | |
| index = 2 | |
| while label in labels: | |
| label = f"{base_label} ({index})" | |
| index += 1 | |
| labels.add(label) | |
| choices.append((label, str(path))) | |
| return choices | |
| def _post_image_echo(image_path: str, hf_token: str) -> httpx.Response: | |
| infer_url = f"{TRIAL_GATEWAY_URL}/v1/trial/infer" | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| mime_type = mimetypes.guess_type(image_path)[0] or "application/octet-stream" | |
| form_data = {"app_code": APP_CODE} | |
| with open(image_path, "rb") as img_file: | |
| files = {"image": (Path(image_path).name, img_file, mime_type)} | |
| return httpx.post(infer_url, headers=headers, files=files, data=form_data, timeout=45.0) | |
| def _content_type_to_format(content_type: str) -> str: | |
| normalized = (content_type or "").split(";")[0].strip().lower() | |
| if normalized in {"image/jpeg", "image/jpg"}: | |
| return "JPEG" | |
| if normalized == "image/png": | |
| return "PNG" | |
| if normalized == "image/webp": | |
| return "WEBP" | |
| return "PNG" | |
| def _apply_watermark(image_bytes: bytes, content_type: str) -> bytes: | |
| with Image.open(io.BytesIO(image_bytes)) as source_image: | |
| base_image = source_image.convert("RGBA") | |
| width, height = base_image.size | |
| if width <= 0 or height <= 0: | |
| raise ValueError("Invalid image dimensions for watermarking.") | |
| overlay = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| margin = max(12, int(min(width, height) * 0.03)) | |
| font_size = max(14, int(min(width, height) * 0.035)) | |
| try: | |
| font = ImageFont.truetype("DejaVuSans.ttf", font_size) | |
| except OSError: | |
| font = ImageFont.load_default() | |
| text_box = draw.textbbox((0, 0), WATERMARK_TEXT, font=font) | |
| text_width = text_box[2] - text_box[0] | |
| text_height = text_box[3] - text_box[1] | |
| text_x = max(margin, width - text_width - margin) | |
| text_y = max(margin, height - text_height - margin) | |
| for offset_x, offset_y in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: | |
| draw.text( | |
| (text_x + offset_x, text_y + offset_y), | |
| WATERMARK_TEXT, | |
| font=font, | |
| fill=(0, 0, 0, 110), | |
| ) | |
| draw.text((text_x, text_y), WATERMARK_TEXT, font=font, fill=(255, 255, 255, 160)) | |
| composited = Image.alpha_composite(base_image, overlay) | |
| output_format = _content_type_to_format(content_type) | |
| output_buffer = io.BytesIO() | |
| if output_format == "JPEG": | |
| composited = composited.convert("RGB") | |
| composited.save(output_buffer, format=output_format) | |
| return output_buffer.getvalue() | |
| def _run_image_trial( | |
| input_image_path: str | None, | |
| upgrade_cta_visible: bool, | |
| oauth_token: gr.OAuthToken | None = None, | |
| ) -> tuple[str | None, str, str, bool]: | |
| current_cta_visible = bool(upgrade_cta_visible) | |
| if oauth_token is None or not oauth_token.token: | |
| return ( | |
| None, | |
| _render_status("Please sign in with Hugging Face first.", "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| if not input_image_path: | |
| return ( | |
| None, | |
| _render_status("You must upload an input image.", "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| try: | |
| response = _post_image_echo(input_image_path, oauth_token.token) | |
| except httpx.HTTPError as error: | |
| return ( | |
| None, | |
| _render_status(f"Could not connect to the backend: {error}", "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| if response.status_code >= 400: | |
| parsed = _safe_json(response) | |
| message = str(parsed.get("error") or parsed.get("message") or f"Request failed with status {response.status_code}") | |
| lower_message = message.lower() | |
| quota_reached = response.status_code == 403 or "no remaining uses" in lower_message or "remaining attempts: 0" in lower_message | |
| if quota_reached: | |
| current_cta_visible = True | |
| return ( | |
| None, | |
| _render_status(UPGRADE_MESSAGE, "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| return ( | |
| None, | |
| _render_status(message, "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| content_type = response.headers.get("content-type", "").split(";")[0].strip().lower() | |
| if not content_type.startswith("image/"): | |
| return ( | |
| None, | |
| _render_status("Backend response did not include an image.", "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| try: | |
| output_bytes = _apply_watermark(response.content, content_type) | |
| except Exception as error: | |
| return ( | |
| None, | |
| _render_status(f"Could not apply watermark: {error}", "error"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| ext = mimetypes.guess_extension(content_type) or ".png" | |
| output_path = _write_temp_image(output_bytes, ext) | |
| remaining_calls = response.headers.get("x-remaining-calls") | |
| status_message = "Processing complete." | |
| if remaining_calls is not None and remaining_calls.strip().isdigit(): | |
| remaining_value = int(remaining_calls.strip()) | |
| if remaining_value <= 0: | |
| current_cta_visible = True | |
| status_message = f"Processing complete. Remaining attempts: 0. {UPGRADE_MESSAGE}" | |
| else: | |
| status_message = f"Processing complete. Remaining attempts: {remaining_value}" | |
| elif remaining_calls is not None and remaining_calls.strip(): | |
| status_message = f"Processing complete. Remaining attempts: {remaining_calls.strip()}" | |
| return ( | |
| output_path, | |
| _render_status(status_message, "ok"), | |
| _render_upgrade_cta(current_cta_visible), | |
| current_cta_visible, | |
| ) | |
| def _select_example(example_path: str | None) -> str | None: | |
| return example_path or None | |
| EXAMPLE_CHOICES = _load_example_choices() | |
| NO_EXAMPLES_MESSAGE = ( | |
| f"No examples found in `{_resolve_examples_dir()}`. " | |
| "Add images (`.png`, `.jpg`, `.jpeg`, `.webp`) to enable the selector." | |
| ) | |
| with gr.Blocks( | |
| title="Nugent Score AI 1.0 - Image Echo Demo", | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Base( | |
| primary_hue="teal", | |
| neutral_hue="slate", | |
| font=[gr.themes.GoogleFont("Space Grotesk"), "ui-sans-serif", "sans-serif"], | |
| ), | |
| ) as demo: | |
| gr.HTML( | |
| """ | |
| <section id="hero"> | |
| <p class="hero-eyebrow">Nugent Score AI 1.0</p> | |
| <h1>Nugent Score AI 1.0 Demo</h1> | |
| <p>Sign in with Hugging Face, upload your image, and get an instant preview while trial attempts are available.</p> | |
| <p class="hero-note">Want to use the app <span class="hero-highlight">without restrictions</span>? Get full access at carb-connect.com.</p> | |
| <div class="hero-actions"> | |
| <a class="hero-cta" href="https://carb-connect.com" target="_blank" rel="noopener noreferrer">Unlock Full Access</a> | |
| </div> | |
| </section> | |
| """ | |
| ) | |
| with gr.Row(): | |
| gr.LoginButton("Sign in with Hugging Face") | |
| if hasattr(gr, "LogoutButton"): | |
| gr.LogoutButton("Sign out") | |
| upgrade_cta_state = gr.State(False) | |
| upgrade_cta_html = gr.HTML(_render_upgrade_cta(False)) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1, elem_classes=["panel"]): | |
| example_selector = gr.Dropdown( | |
| label="Use example image", | |
| choices=EXAMPLE_CHOICES, | |
| value=None, | |
| interactive=bool(EXAMPLE_CHOICES), | |
| ) | |
| if not EXAMPLE_CHOICES: | |
| gr.Markdown(NO_EXAMPLES_MESSAGE) | |
| gr.Textbox( | |
| label="Type of speciment", | |
| value="Vaginal Swab", | |
| interactive=False, | |
| info="Informational only", | |
| ) | |
| input_image = gr.Image(type="filepath", label="Input image") | |
| run_button = gr.Button("Send image", elem_id="run-btn") | |
| with gr.Column(scale=1, elem_classes=["panel"]): | |
| output_image = gr.Image(type="filepath", label="Output image") | |
| trial_status = gr.HTML(_render_status("Sign in and upload an image to get started.", "info")) | |
| example_selector.change( | |
| _select_example, | |
| inputs=[example_selector], | |
| outputs=[input_image], | |
| ) | |
| run_button.click( | |
| _run_image_trial, | |
| inputs=[input_image, upgrade_cta_state], | |
| outputs=[output_image, trial_status, upgrade_cta_html, upgrade_cta_state], | |
| show_progress="full", | |
| ) | |
| demo.queue().launch() | |