File size: 6,966 Bytes
65ec2a1
 
 
 
 
87deda2
8c3dcd1
 
9f6e9fd
65ec2a1
d0108c1
 
 
aca2800
3f25383
65ec2a1
01a3239
 
 
fad7cd4
01a3239
 
 
 
 
 
 
 
fad7cd4
9f6e9fd
fad7cd4
 
87deda2
d0108c1
01a3239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d0108c1
8c3dcd1
d0108c1
 
01a3239
 
 
 
 
d0108c1
65ec2a1
 
 
01a3239
 
 
 
aca2800
 
 
87deda2
65ec2a1
 
 
8c3dcd1
f037a8f
 
87deda2
 
d0108c1
01a3239
d0108c1
91b3954
d0108c1
 
01a3239
 
 
 
d0108c1
fad7cd4
 
 
bdb8def
01a3239
54151d7
65ec2a1
 
 
01a3239
87deda2
7ef2fc9
87deda2
80c4ab2
f3167fb
7ef2fc9
 
 
 
 
3dffc39
87deda2
65ec2a1
80c4ab2
65ec2a1
 
f037a8f
8c3dcd1
f037a8f
 
65ec2a1
 
 
aca2800
8c3dcd1
aca2800
 
f3167fb
8c3dcd1
aca2800
 
8c3dcd1
65ec2a1
8c3dcd1
bdb8def
f3167fb
01a3239
65ec2a1
 
 
87deda2
bdb8def
d1e9476
 
 
bdb8def
01a3239
 
 
670ecf3
6916c39
 
d0108c1
6916c39
01a3239
 
 
 
 
 
 
 
 
 
 
 
 
 
6916c39
 
 
 
 
01a3239
 
6916c39
01a3239
d1e9476
 
01a3239
 
 
d1e9476
d0108c1
01a3239
 
 
 
 
 
 
 
 
d1e9476
 
 
01a3239
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import base64
import json
from datetime import datetime
import traceback

import gradio as gr
from huggingface_hub import HfApi, InferenceClient
from pydantic import BaseModel, Field

# -------------------------------
# Environment variables / Constants
# -------------------------------
HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")

# -------------------------------
# Pydantic schema for the tool payload
# -------------------------------
class RobotWatchPayload(BaseModel):
    """
    Defines the expected input structure for the robot VLM analysis tool.
    
    Attributes:
        hf_token (str): Your Hugging Face API token.
        robot_id (str): Identifier for the robot (default "unknown").
        image_b64 (str): Base64 encoded image string to analyze.
    """
    hf_token: str = Field(description="Your Hugging Face API token.")
    robot_id: str = Field(description="Robot identifier.", default="unknown")
    image_b64: str = Field(description="Base64 encoded image data.")


# -------------------------------
# Helper function: Upload image to Hugging Face dataset
# -------------------------------
def upload_image(image_b64: str, hf_token: str):
    """
    Decodes a base64 image string, saves it locally, and uploads to Hugging Face dataset.
    Args:
        image_b64 (str): Base64 encoded image data.
        hf_token (str): Hugging Face API token.
    Returns:
        tuple: (local_path, hf_url, filename, size_bytes)
    """
    try:
        image_bytes = base64.b64decode(image_b64)
        os.makedirs("/tmp", exist_ok=True)

        # Generate unique timestamped filename
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        local_path = f"/tmp/robot_img_{timestamp}.jpg"

        # Save locally
        with open(local_path, "wb") as f:
            f.write(image_bytes)

        filename = f"robot_{timestamp}.jpg"

        # Upload to Hugging Face dataset
        api = HfApi()
        api.upload_file(
            path_or_fileobj=local_path,
            path_in_repo=f"tmp/{filename}",
            repo_id=HF_DATASET_REPO,
            repo_type="dataset",
            token=hf_token
        )

        hf_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
        return local_path, hf_url, filename, len(image_bytes)

    except Exception:
        traceback.print_exc()
        return None, None, None, 0


# -------------------------------
# Helper function: Parse JSON safely
# -------------------------------
def safe_parse_json_from_text(text: str):
    """
    Attempts to parse JSON from text returned by the VLM model.
    Strips any leading/trailing characters and handles malformed responses.
    Args:
        text (str): Raw text output from the model.
    Returns:
        dict or None: Parsed JSON dictionary, or None if parsing fails.
    """
    if not text:
        return None
    try:
        return json.loads(text)
    except:
        pass

    cleaned = text.strip().strip("`").strip()
    if cleaned.lower().startswith("json"):
        cleaned = cleaned[4:].strip()

    try:
        start = cleaned.find("{")
        end = cleaned.rfind("}")
        return json.loads(cleaned[start:end + 1])
    except:
        return None


# -------------------------------
# Core VLM analysis function
# -------------------------------
def run_vlm_analysis(payload: RobotWatchPayload):
    """
    Main logic for analyzing an image using Hugging Face VLM model.
    Args:
        payload (RobotWatchPayload): Validated payload containing token, robot_id, and image.
    Returns:
        dict: Analysis result including description, objects, and raw VLM output.
    """
    hf_token = payload.hf_token
    image_b64 = payload.image_b64
    robot_id = payload.robot_id

    # Upload the image to Hugging Face dataset
    _, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
    if not hf_url:
        return {"error": "Image upload failed"}

    # System prompt instructs VLM to return strict JSON
    system_prompt = """
Respond in STRICT JSON ONLY. Put more details in Description. Ensure all the fields are never empty; list general items if specific ones are not clear.
{
 "description": "...",
 "environment": "...",
 "indoor_or_outdoor": "...",
 "lighting_condition": "..."
 "human": "...",
 "animals": "...",
 "objects": [],
 "hazards": "...",
}
"""

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": [
            {"type": "text", "text": "Analyze the image."},
            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
        ]}
    ]

    client = InferenceClient(token=hf_token)
    try:
        resp = client.chat.completions.create(
            model=HF_VLM_MODEL,
            messages=messages,
            max_tokens=500,
            temperature=0.1
        )
    except Exception as e:
        return {"status": "error", "message": str(e)}

    vlm_output = resp.choices[0].message.content.strip()
    parsed = safe_parse_json_from_text(vlm_output) or {}

    return {
        "status": "success",
        "robot_id": robot_id,
        "file_size_bytes": size_bytes,
        "image_url": hf_url,
        "result": parsed,
        "vlm_raw": vlm_output
    }


# -------------------------------
# Gradio interface function
# -------------------------------
def robot_watch(
    hf_token_input: str,
    robot_id_input: str,
    image_b64_input: str
):
    """
    Gradio wrapper for run_vlm_analysis.
    Converts individual fields into Pydantic model and calls core logic.
    Args:
        hf_token_input (str): Hugging Face API token input from UI.
        robot_id_input (str): Robot ID input from UI.
        image_b64_input (str): Base64 image input from UI.
    Returns:
        dict: Result from run_vlm_analysis.
    """
    if not image_b64_input:
        return {"error": "Base64 image string is empty."}
    
    # Create the payload instance
    payload_instance = RobotWatchPayload(
        hf_token=hf_token_input,
        robot_id=robot_id_input,
        image_b64=image_b64_input
    )
    
    # Run core analysis
    result = run_vlm_analysis(payload_instance)
    return result


# -------------------------------
# Gradio App
# -------------------------------
app = gr.Interface(
    fn=robot_watch,
    inputs=[
        gr.Textbox(label="Hugging Face Token", lines=1),
        gr.Textbox(label="Robot ID", lines=1, value="unknown"),
        gr.Textbox(label="Image Base64 String", lines=5)
    ],
    outputs=gr.Json(label="Tool Output"),
    title="Robot CV MCP Server",
    description="Interface for robot VLM analysis using individual fields, including base64 image string.",
    api_name="predict"
)

if __name__ == "__main__":
    # Launch Gradio app with MCP server enabled
    app.launch(mcp_server=True)