Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import json | |
| from datetime import datetime | |
| import traceback | |
| import gradio as gr | |
| from huggingface_hub import HfApi, InferenceClient | |
| from pydantic import BaseModel, Field | |
| # ------------------------------- | |
| # Environment variables / Constants | |
| # ------------------------------- | |
| HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP") | |
| HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct") | |
| # ------------------------------- | |
| # Pydantic schema for the tool payload | |
| # ------------------------------- | |
| class RobotWatchPayload(BaseModel): | |
| """ | |
| Defines the expected input structure for the robot VLM analysis tool. | |
| Attributes: | |
| hf_token (str): Your Hugging Face API token. | |
| robot_id (str): Identifier for the robot (default "unknown"). | |
| image_b64 (str): Base64 encoded image string to analyze. | |
| """ | |
| hf_token: str = Field(description="Your Hugging Face API token.") | |
| robot_id: str = Field(description="Robot identifier.", default="unknown") | |
| image_b64: str = Field(description="Base64 encoded image data.") | |
| # ------------------------------- | |
| # Helper function: Upload image to Hugging Face dataset | |
| # ------------------------------- | |
| def upload_image(image_b64: str, hf_token: str): | |
| """ | |
| Decodes a base64 image string, saves it locally, and uploads to Hugging Face dataset. | |
| Args: | |
| image_b64 (str): Base64 encoded image data. | |
| hf_token (str): Hugging Face API token. | |
| Returns: | |
| tuple: (local_path, hf_url, filename, size_bytes) | |
| """ | |
| try: | |
| image_bytes = base64.b64decode(image_b64) | |
| os.makedirs("/tmp", exist_ok=True) | |
| # Generate unique timestamped filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| local_path = f"/tmp/robot_img_{timestamp}.jpg" | |
| # Save locally | |
| with open(local_path, "wb") as f: | |
| f.write(image_bytes) | |
| filename = f"robot_{timestamp}.jpg" | |
| # Upload to Hugging Face dataset | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=f"tmp/{filename}", | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=hf_token | |
| ) | |
| hf_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}" | |
| return local_path, hf_url, filename, len(image_bytes) | |
| except Exception: | |
| traceback.print_exc() | |
| return None, None, None, 0 | |
| # ------------------------------- | |
| # Helper function: Parse JSON safely | |
| # ------------------------------- | |
| def safe_parse_json_from_text(text: str): | |
| """ | |
| Attempts to parse JSON from text returned by the VLM model. | |
| Strips any leading/trailing characters and handles malformed responses. | |
| Args: | |
| text (str): Raw text output from the model. | |
| Returns: | |
| dict or None: Parsed JSON dictionary, or None if parsing fails. | |
| """ | |
| if not text: | |
| return None | |
| try: | |
| return json.loads(text) | |
| except: | |
| pass | |
| cleaned = text.strip().strip("`").strip() | |
| if cleaned.lower().startswith("json"): | |
| cleaned = cleaned[4:].strip() | |
| try: | |
| start = cleaned.find("{") | |
| end = cleaned.rfind("}") | |
| return json.loads(cleaned[start:end + 1]) | |
| except: | |
| return None | |
| # ------------------------------- | |
| # Core VLM analysis function | |
| # ------------------------------- | |
| def run_vlm_analysis(payload: RobotWatchPayload): | |
| """ | |
| Main logic for analyzing an image using Hugging Face VLM model. | |
| Args: | |
| payload (RobotWatchPayload): Validated payload containing token, robot_id, and image. | |
| Returns: | |
| dict: Analysis result including description, objects, and raw VLM output. | |
| """ | |
| hf_token = payload.hf_token | |
| image_b64 = payload.image_b64 | |
| robot_id = payload.robot_id | |
| # Upload the image to Hugging Face dataset | |
| _, hf_url, _, size_bytes = upload_image(image_b64, hf_token) | |
| if not hf_url: | |
| return {"error": "Image upload failed"} | |
| # System prompt instructs VLM to return strict JSON | |
| system_prompt = """ | |
| Respond in STRICT JSON ONLY. Put more details in Description. Ensure all the fields are never empty; list general items if specific ones are not clear. | |
| { | |
| "description": "...", | |
| "environment": "...", | |
| "indoor_or_outdoor": "...", | |
| "lighting_condition": "..." | |
| "human": "...", | |
| "animals": "...", | |
| "objects": [], | |
| "hazards": "...", | |
| } | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze the image."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}} | |
| ]} | |
| ] | |
| client = InferenceClient(token=hf_token) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=HF_VLM_MODEL, | |
| messages=messages, | |
| max_tokens=500, | |
| temperature=0.1 | |
| ) | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| vlm_output = resp.choices[0].message.content.strip() | |
| parsed = safe_parse_json_from_text(vlm_output) or {} | |
| return { | |
| "status": "success", | |
| "robot_id": robot_id, | |
| "file_size_bytes": size_bytes, | |
| "image_url": hf_url, | |
| "result": parsed, | |
| "vlm_raw": vlm_output | |
| } | |
| # ------------------------------- | |
| # Gradio interface function | |
| # ------------------------------- | |
| def robot_watch( | |
| hf_token_input: str, | |
| robot_id_input: str, | |
| image_b64_input: str | |
| ): | |
| """ | |
| Gradio wrapper for run_vlm_analysis. | |
| Converts individual fields into Pydantic model and calls core logic. | |
| Args: | |
| hf_token_input (str): Hugging Face API token input from UI. | |
| robot_id_input (str): Robot ID input from UI. | |
| image_b64_input (str): Base64 image input from UI. | |
| Returns: | |
| dict: Result from run_vlm_analysis. | |
| """ | |
| if not image_b64_input: | |
| return {"error": "Base64 image string is empty."} | |
| # Create the payload instance | |
| payload_instance = RobotWatchPayload( | |
| hf_token=hf_token_input, | |
| robot_id=robot_id_input, | |
| image_b64=image_b64_input | |
| ) | |
| # Run core analysis | |
| result = run_vlm_analysis(payload_instance) | |
| return result | |
| # ------------------------------- | |
| # Gradio App | |
| # ------------------------------- | |
| app = gr.Interface( | |
| fn=robot_watch, | |
| inputs=[ | |
| gr.Textbox(label="Hugging Face Token", lines=1), | |
| gr.Textbox(label="Robot ID", lines=1, value="unknown"), | |
| gr.Textbox(label="Image Base64 String", lines=5) | |
| ], | |
| outputs=gr.Json(label="Tool Output"), | |
| title="Robot CV MCP Server", | |
| description="Interface for robot VLM analysis using individual fields, including base64 image string.", | |
| api_name="predict" | |
| ) | |
| if __name__ == "__main__": | |
| # Launch Gradio app with MCP server enabled | |
| app.launch(mcp_server=True) |