Spaces:
Sleeping
Sleeping
| import os | |
| import copy | |
| import base64 | |
| import requests | |
| import tempfile | |
| import secrets | |
| import gradio as gr | |
| from huggingface_hub import upload_file | |
| from dashscope import MultiModalConversation | |
| # --- Config --- | |
| HF_TOKEN = os.environ.get("HF_CV_ROBOT_TOKEN") | |
| HF_DATASET_REPO = "OppaAI/Robot_MCP" | |
| MODEL = "qwen2.5-vl-7b-instruct" | |
| if not HF_TOKEN: | |
| raise ValueError("HF_TOKEN environment variable not set.") | |
| # --- Helper Functions --- | |
| def save_and_upload_image(image_b64): | |
| """Save image to /tmp and upload to HF dataset.""" | |
| image_bytes = base64.b64decode(image_b64) | |
| local_tmp_path = "/tmp/tmp.jpg" | |
| with open(local_tmp_path, "wb") as f: | |
| f.write(image_bytes) | |
| path_in_repo = f"images/uploaded_image_{len(image_bytes)}.jpg" | |
| upload_file( | |
| path_or_fileobj=local_tmp_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_DATASET_REPO, | |
| token=HF_TOKEN, | |
| repo_type="dataset" | |
| ) | |
| hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}" | |
| return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes) | |
| def prepare_vlm_message(image_path, text="Describe this image in detail."): | |
| """Read local image, encode to base64, and prepare VLM message.""" | |
| with open(image_path, "rb") as f: | |
| image_b64 = base64.b64encode(f.read()).decode("utf-8") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": text}, | |
| {"type": "image_data", "image_data": {"b64": image_b64}} | |
| ] | |
| } | |
| ] | |
| return messages | |
| # --- Main MCP function --- | |
| def process_and_describe(payload: dict): | |
| try: | |
| robot_id = payload.get("robot_id", "unknown") | |
| image_b64 = payload["image_b64"] | |
| # 1️⃣ Save & upload image | |
| local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64) | |
| # 2️⃣ Prepare VLM message | |
| messages = prepare_vlm_message(local_tmp_path) | |
| # 3️⃣ Call VLM using MultiModalConversation | |
| responses = MultiModalConversation.call( | |
| model=MODEL, | |
| messages=messages, | |
| stream=True | |
| ) | |
| vlm_text = "" | |
| for resp in responses: | |
| if resp.status_code != 200: | |
| return {"error": f"VLM call failed: {resp.status_code}"} | |
| content = resp.output.choices[0].message.content | |
| # Extract text from response | |
| for ele in content: | |
| if "text" in ele: | |
| vlm_text += ele["text"] | |
| return { | |
| "saved_to_hf_hub": True, | |
| "repo_id": HF_DATASET_REPO, | |
| "path_in_repo": path_in_repo, | |
| "image_url": hf_url, | |
| "file_size_bytes": size_bytes, | |
| "robot_id": robot_id, | |
| "vlm_description": vlm_text | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # --- Gradio MCP Interface --- | |
| demo = gr.Interface( | |
| fn=process_and_describe, | |
| inputs=gr.JSON(label="Input Payload (Dict format with 'image_b64')"), | |
| outputs=gr.JSON(label="Reply to Jetson"), | |
| api_name="predict" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) | |