Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import os | |
| import io | |
| import json | |
| import zipfile | |
| import mimetypes | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import gradio as gr | |
| from openai import OpenAI | |
| # --------------------- Конфигурация --------------------- | |
| NV_API_KEY = os.environ.get("NV_API_KEY") | |
| NV_BASE_URL = os.environ.get("NV_BASE_URL", "https://integrate.api.nvidia.com/v1") | |
| NV_VLM_URL = os.environ.get("NV_VLM_URL", "https://ai.api.nvidia.com/v1/vlm/microsoft/florence-2") | |
| NVCF_ASSETS_URL = "https://api.nvcf.nvidia.com/v2/nvcf/assets" | |
| if not NV_API_KEY: | |
| raise RuntimeError("NV_API_KEY не задан. В HF Space: Settings → Secrets → NV_API_KEY") | |
| llm = OpenAI(base_url=NV_BASE_URL, api_key=NV_API_KEY) | |
| # --------------------- Florence utils --------------------- | |
| def _guess_mime(path: str) -> str: | |
| return mimetypes.guess_type(path)[0] or "image/jpeg" | |
| def nvcf_upload_asset(image_path: str, description: str = "Chat image") -> str: | |
| auth = requests.post( | |
| NVCF_ASSETS_URL, | |
| headers={ | |
| "Authorization": f"Bearer {NV_API_KEY}", | |
| "Content-Type": "application/json", | |
| "accept": "application/json", | |
| }, | |
| json={"contentType": _guess_mime(image_path), "description": description}, | |
| timeout=30, | |
| ) | |
| auth.raise_for_status() | |
| up_url = auth.json()["uploadUrl"] | |
| asset_id = str(auth.json()["assetId"]) | |
| with open(image_path, "rb") as f: | |
| put = requests.put( | |
| up_url, | |
| data=f, | |
| headers={ | |
| "x-amz-meta-nvcf-asset-description": description, | |
| "content-type": _guess_mime(image_path), | |
| }, | |
| timeout=300, | |
| ) | |
| put.raise_for_status() | |
| return asset_id | |
| def _vlm_content(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> str: | |
| parts = [task_token] | |
| if text_prompt and text_prompt.strip(): | |
| parts.append(text_prompt.strip()) | |
| parts.append(f'<img src="data:image/jpeg;asset_id,{asset_id}" />') | |
| return "".join(parts) | |
| PRIORITY_TEXT_KEYS = [ | |
| "more_detailed_caption", "detailed_caption", "caption", | |
| "generated_text", "text", "ocr", "description", | |
| ] | |
| def _deep_text_candidates(obj: Any) -> List[str]: | |
| out = [] | |
| def walk(o): | |
| if isinstance(o, dict): | |
| for k in PRIORITY_TEXT_KEYS: | |
| if k in o and isinstance(o[k], str) and o[k].strip(): | |
| out.append(o[k].strip()) | |
| for v in o.values(): | |
| walk(v) | |
| elif isinstance(o, list): | |
| for it in o: | |
| walk(it) | |
| elif isinstance(o, str): | |
| if o.strip(): | |
| out.append(o.strip()) | |
| walk(obj) | |
| return out | |
| def _debug_dump_from_response(resp: requests.Response) -> str: | |
| lines = [] | |
| data = resp.content | |
| ct = (resp.headers.get("content-type") or "").lower() | |
| lines.append("=== Florence HTTP Response ===") | |
| lines.append(f"status: {resp.status_code}") | |
| lines.append(f"content-type: {ct}") | |
| lines.append(f"bytes: {len(data)}") | |
| if "application/json" in ct and not data.startswith(b"PK"): | |
| try: | |
| raw = resp.text | |
| except Exception: | |
| raw = data.decode("utf-8", errors="ignore") | |
| lines.append("--- RAW JSON ---") | |
| lines.append(raw) | |
| return "\n".join(lines) | |
| if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: | |
| lines.append("--- ZIP CONTENTS ---") | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(data), "r") as z: | |
| for name in z.namelist(): | |
| lines.append(f"* {name}") | |
| for name in z.namelist(): | |
| low = name.lower() | |
| if low.endswith(".json") or low.endswith(".txt"): | |
| try: | |
| with z.open(name) as f: | |
| raw = f.read().decode("utf-8", errors="ignore") | |
| lines.append(f"\n--- FILE: {name} ---\n{raw}") | |
| except Exception as e: | |
| lines.append(f"\n--- FILE: {name} --- [read error: {e}]") | |
| except Exception as e: | |
| lines.append(f"[zip parse error: {e}]") | |
| return "\n".join(lines) | |
| try: | |
| txt = data.decode("utf-8", errors="ignore") | |
| except Exception: | |
| txt = "[binary body]" | |
| lines.append("--- RAW BODY ---") | |
| lines.append(txt) | |
| return "\n".join(lines) | |
| def _parse_vlm_text(resp: requests.Response) -> str: | |
| data = resp.content | |
| ct = (resp.headers.get("content-type") or "").lower() | |
| if "application/json" in ct and not data.startswith(b"PK"): | |
| try: | |
| obj = resp.json() | |
| cands = _deep_text_candidates(obj) | |
| return cands[0] if cands else "" | |
| except Exception: | |
| return "" | |
| if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(data), "r") as z: | |
| for name in z.namelist(): | |
| if not name.lower().endswith(".json"): | |
| continue | |
| try: | |
| with z.open(name) as f: | |
| obj = json.loads(f.read().decode("utf-8", errors="ignore")) | |
| cands = _deep_text_candidates(obj) | |
| if cands: | |
| return cands[0] | |
| except Exception: | |
| pass | |
| for name in z.namelist(): | |
| if name.lower().endswith(".txt"): | |
| try: | |
| with z.open(name) as f: | |
| txt = f.read().decode("utf-8", errors="ignore").strip() | |
| if txt: | |
| return txt | |
| except Exception: | |
| pass | |
| except Exception: | |
| return "" | |
| try: | |
| return data.decode("utf-8", errors="ignore").strip() | |
| except Exception: | |
| return "" | |
| def _call_florence(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> Tuple[str, str]: | |
| content = _vlm_content(task_token, asset_id, text_prompt) | |
| payload = {"messages": [{"role": "user", "content": content}]} | |
| headers = { | |
| "Authorization": f"Bearer {NV_API_KEY}", | |
| "Accept": "application/zip, application/json, */*", | |
| "Content-Type": "application/json", | |
| "NVCF-INPUT-ASSET-REFERENCES": asset_id, | |
| "NVCF-FUNCTION-ASSET-IDS": asset_id, | |
| } | |
| resp = requests.post(NV_VLM_URL, headers=headers, json=payload, timeout=300) | |
| raw_dump = _debug_dump_from_response(resp) if resp is not None else "[no response]" | |
| if not resp.ok: | |
| return f"[VLM HTTP {resp.status_code}]", raw_dump | |
| text = _parse_vlm_text(resp) | |
| return text, raw_dump | |
| def _is_good(text: str) -> bool: | |
| return isinstance(text, str) and len(text.strip()) >= 3 and "изображений-результатов" not in text.lower() | |
| def get_caption_with_debug(image_path: str) -> Tuple[str, str, str]: | |
| asset_id = nvcf_upload_asset(image_path) | |
| attempts = [ | |
| ("<MORE_DETAILED_CAPTION>", None), | |
| ("<DETAILED_CAPTION>", None), | |
| ("<CAPTION>", None), | |
| ("<OCR>", None), | |
| ] | |
| debug_parts = [] | |
| for token, prompt in attempts: | |
| text, raw_dump = _call_florence(token, asset_id, prompt) | |
| debug_parts.append(f"=== Attempt {token} ===\n{raw_dump}\n") | |
| if _is_good(text): | |
| return text, asset_id, "\n".join(debug_parts) | |
| return "", asset_id, "\n".join(debug_parts) | |
| # --------------------- LLM streaming utils --------------------- | |
| def _extract_text_from_stream_chunk(chunk: Any) -> str: | |
| try: | |
| if hasattr(chunk, "choices"): | |
| choices = getattr(chunk, "choices") | |
| if choices: | |
| c0 = choices[0] | |
| delta = getattr(c0, "delta", None) | |
| if delta is not None: | |
| txt = getattr(delta, "reasoning_content", None) or getattr(delta, "content", None) | |
| if txt: | |
| return str(txt) | |
| text_attr = getattr(c0, "text", None) | |
| if text_attr: | |
| return str(text_attr) | |
| if isinstance(chunk, dict): | |
| choices = chunk.get("choices") or [] | |
| if choices: | |
| delta = choices[0].get("delta") or {} | |
| return str(delta.get("content") or delta.get("reasoning_content") or choices[0].get("text") or "") | |
| except Exception: | |
| pass | |
| return "" | |
| # --------------------- Чат-логика --------------------- | |
| def respond( | |
| message: Dict[str, Any], | |
| chat_history: List[Dict[str, str]], | |
| last_caption: str, | |
| last_asset_id: str, | |
| last_debug: str | |
| ): | |
| text = (message or {}).get("text", "") if isinstance(message, dict) else str(message or "") | |
| files = (message or {}).get("files", []) if isinstance(message, dict) else [] | |
| def first_image_path(files) -> Optional[str]: | |
| for f in files: | |
| if isinstance(f, dict) and f.get("path"): | |
| mt = f.get("mime_type") or _guess_mime(f["path"]) | |
| if mt.startswith("image/"): | |
| return f["path"] | |
| elif isinstance(f, str): | |
| if _guess_mime(f).startswith("image/"): | |
| return f | |
| return None | |
| img_path = first_image_path(files) | |
| parts = [] | |
| if text and text.strip(): | |
| parts.append(text.strip()) | |
| if img_path: | |
| parts.append("🖼️ [изображение]") | |
| user_visible = "\n".join(parts) if parts else "🖐️" | |
| chat_history = chat_history or [] | |
| chat_history.append({"role": "user", "content": user_visible}) | |
| chat_history.append({"role": "assistant", "content": ""}) | |
| yield {"text": "", "files": []}, chat_history, last_caption, last_asset_id, (last_debug or "") | |
| caption = last_caption or "" | |
| asset_id = last_asset_id or "" | |
| debug_raw = last_debug or "" | |
| if img_path: | |
| chat_history[-1]["content"] = "🔎 Обрабатываю изображение во Florence…" | |
| yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
| try: | |
| caption, asset_id, debug_raw = get_caption_with_debug(img_path) | |
| except Exception as e: | |
| caption, debug_raw = "", f"[Florence error] {e}" | |
| if caption: | |
| system_prompt = ( | |
| "You are a helpful multimodal assistant. " | |
| "Use the provided 'More Detailed Caption' as visual context. " | |
| "If something is not visible or uncertain, say so.\n\n" | |
| "Image Caption START >>>\n" | |
| f"{caption}\n" | |
| "<<< Image Caption END." | |
| ) | |
| else: | |
| system_prompt = ( | |
| "You are a helpful assistant. " | |
| "If the user refers to an image but no caption is available, ask them to reattach the image." | |
| ) | |
| user_text_for_llm = text or ("Describe the attached image." if caption else "Hi") | |
| assistant_accum = "" | |
| try: | |
| stream = llm.chat.completions.create( | |
| model="openai/gpt-oss-120b", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_text_for_llm} | |
| ], | |
| temperature=0.7, | |
| top_p=1.0, | |
| max_tokens=768, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| piece = _extract_text_from_stream_chunk(chunk) | |
| if not piece: | |
| continue | |
| assistant_accum += piece | |
| chat_history[-1]["content"] = assistant_accum | |
| yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
| except Exception: | |
| try: | |
| resp = llm.chat.completions.create( | |
| model="openai/gpt-oss-120b", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_text_for_llm} | |
| ], | |
| temperature=0.7, | |
| top_p=1.0, | |
| max_tokens=768, | |
| stream=False, | |
| ) | |
| final_text = "" | |
| if hasattr(resp, "choices"): | |
| try: | |
| final_text = getattr(resp.choices[0].message, "content", "") or getattr(resp.choices[0], "text", "") or "" | |
| except Exception: | |
| final_text = str(resp) | |
| elif isinstance(resp, dict): | |
| choices = resp.get("choices", []) | |
| if choices: | |
| m = choices[0].get("message") or choices[0] | |
| final_text = m.get("content") or m.get("text") or str(m) | |
| else: | |
| final_text = str(resp) | |
| else: | |
| final_text = str(resp) | |
| chat_history[-1]["content"] = final_text | |
| yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
| except Exception as e2: | |
| chat_history[-1]["content"] = f"[Ошибка LLM: {e2}]" | |
| yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
| # --------------------- Интерфейс --------------------- | |
| messenger_css = """ | |
| :root { | |
| --radius-xl: 16px; | |
| } | |
| .gradio-container { max-width: 780px !important; margin: auto; } | |
| #title { text-align: center; padding: 8px 0 10px; font-size: 18px; } | |
| #chat-wrap { border: 1px solid rgba(0,0,0,0.07); border-radius: var(--radius-xl); overflow: hidden; } | |
| #chat { height: 520px; } | |
| #bottom-bar { position: sticky; bottom: 0; background: var(--body-background-fill); border-top: 1px solid rgba(0,0,0,0.06); padding: 8px; display: flex; gap: 8px; align-items: center; } | |
| #send { min-width: 42px; max-width: 42px; height: 42px; border-radius: 999px; } | |
| #msg .mm-wrap { border: 1px solid rgba(0,0,0,0.08); border-radius: 999px; } | |
| #raw-wrap .wrap>label { font-weight: 600; } | |
| """ | |
| theme = gr.themes.Soft( | |
| primary_hue="cyan", | |
| neutral_hue="slate", | |
| ).set( | |
| button_large_radius="999px", | |
| button_small_radius="999px", | |
| block_radius="16px", | |
| ) | |
| with gr.Blocks(theme=theme, css=messenger_css, analytics_enabled=False) as demo: | |
| gr.Markdown("✨ <div id='title'>Визуальный чат: Florence → GPT‑OSS</div>") | |
| caption_state = gr.State(value="") | |
| asset_state = gr.State(value="") | |
| debug_state = gr.State(value="") | |
| with gr.Group(elem_id="chat-wrap"): | |
| chatbot = gr.Chatbot(label="", height=520, elem_id="chat", type="messages") | |
| with gr.Row(elem_id="bottom-bar"): | |
| msg = gr.MultimodalTextbox( | |
| show_label=False, | |
| placeholder="Напишите сообщение… (иконка слева — добавить изображение)", | |
| elem_id="msg", | |
| ) | |
| send = gr.Button("➤", variant="primary", elem_id="send") | |
| with gr.Accordion("Raw Florence output", open=True, elem_id="raw-wrap"): | |
| raw_out = gr.Textbox( | |
| label="", | |
| value="", | |
| lines=14, | |
| show_copy_button=True | |
| ) | |
| msg.submit( | |
| respond, | |
| inputs=[msg, chatbot, caption_state, asset_state, debug_state], | |
| outputs=[msg, chatbot, caption_state, asset_state, raw_out] | |
| ) | |
| send.click( | |
| respond, | |
| inputs=[msg, chatbot, caption_state, asset_state, debug_state], | |
| outputs=[msg, chatbot, caption_state, asset_state, raw_out] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=False) |