OppaAI commited on
Commit
f037a8f
·
verified ·
1 Parent(s): aca2800

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +43 -100
app.py CHANGED
@@ -6,17 +6,13 @@ import gradio as gr
6
  from huggingface_hub import upload_file, InferenceClient
7
  from datetime import datetime
8
  import traceback
9
- from typing import Optional, Dict, Any, Tuple
10
 
11
  from fastmcp import FastMCP
12
 
13
- # --- Configuration using Environment Variables ---
14
- # It is best practice to manage sensitive info outside of the code.
15
- # Use os.environ.get() to safely retrieve these values.
16
  HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
17
  HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
18
- # The token will be required in the payload, but we define the env var name here.
19
- # HF_TOKEN_ENV_VAR_NAME = "HF_TOKEN"
20
 
21
  mcp = FastMCP("Robot_MCP")
22
 
@@ -26,53 +22,33 @@ mcp = FastMCP("Robot_MCP")
26
  @mcp.tool()
27
  def speak(text: str, emotion: str = "neutral"):
28
  """Makes the robot speak a given text with an emotion."""
29
- return {
30
- "status": "success",
31
- "action_executed": "speak",
32
- "payload": {"text": text, "emotion": emotion},
33
- }
34
-
35
 
36
  @mcp.tool()
37
  def navigate(direction: str, distance_meters: float):
38
  """Moves the robot a specified distance in a direction (max 5m)."""
39
  if distance_meters > 5.0:
40
  return {"status": "error", "message": "Safety limit exceeded"}
41
- return {
42
- "status": "success",
43
- "action_executed": "navigate",
44
- "payload": {"direction": direction, "distance": distance_meters},
45
- }
46
-
47
 
48
  @mcp.tool()
49
  def scan_hazard(hazard_type: str, severity: str):
50
  """Logs a potential hazard detected by the robot."""
51
  timestamp = datetime.now().isoformat()
52
- return {
53
- "status": "warning_logged",
54
- "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})",
55
- }
56
-
57
 
58
  @mcp.tool()
59
  def analyze_human(clothing_color: str, estimated_action: str):
60
  """Tracks human activity based on visual input."""
61
- return {
62
- "status": "human_tracked",
63
- "details": f"Human wearing {clothing_color} is {estimated_action}",
64
- }
65
 
66
  # -----------------------------------------------------
67
- # Save + Upload
68
  # -----------------------------------------------------
69
  def save_and_upload_image(image_b64: str, hf_token: str):
70
- """Decodes a base64 image, saves it locally, and uploads to Hugging Face Hub."""
71
  try:
72
  image_bytes = base64.b64decode(image_b64)
73
- size_bytes = len(image_bytes)
74
-
75
- # Ensure the /tmp directory exists
76
  os.makedirs("/tmp", exist_ok=True)
77
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
78
  local_path = f"/tmp/robot_img_{timestamp}.jpg"
@@ -81,91 +57,77 @@ def save_and_upload_image(image_b64: str, hf_token: str):
81
  f.write(image_bytes)
82
 
83
  filename = f"robot_{timestamp}.jpg"
84
-
85
- upload_file(
86
- path_or_fileobj=local_path,
87
- path_in_repo=filename,
88
- repo_id=HF_DATASET_REPO,
89
- token=hf_token,
90
- repo_type="dataset",
91
- )
92
-
93
  url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}"
94
- return local_path, url, filename, size_bytes
95
 
96
  except Exception as e:
97
- print(f"Error during image upload: {e}")
98
  traceback.print_exc()
99
  return None, None, None, 0
100
 
101
  # -----------------------------------------------------
102
- # JSON Parse
103
  # -----------------------------------------------------
104
  def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
105
- """Attempts to safely parse JSON from potentially messy text output."""
106
  if not text:
107
  return None
108
  try:
109
  return json.loads(text)
110
- except json.JSONDecodeError:
111
- pass # Try heuristic approach
112
-
113
  cleaned = text.strip().strip("`").strip()
114
- # Remove leading 'json' if present after stripping backticks
115
  if cleaned.lower().startswith("json"):
116
  cleaned = cleaned[4:].strip()
117
-
118
  try:
119
  start = cleaned.find("{")
120
  end = cleaned.rfind("}")
121
  if start >= 0 and end > start:
122
- return json.loads(cleaned[start : end + 1])
123
- except json.JSONDecodeError:
124
- pass
125
-
126
  return None
127
 
128
  # -----------------------------------------------------
129
- # Validate and Call Tool
130
  # -----------------------------------------------------
131
  def validate_and_call_tool(tool_name: str, tool_args: dict) -> Dict[str, Any]:
132
- """Validates tool access and executes the corresponding function."""
133
- if tool_name not in mcp._tools:
134
- return {"error": f"Unknown or unauthorized tool '{tool_name}'"}
135
-
136
  try:
137
- tool_fn = mcp._tools[tool_name]["function"]
138
- return tool_fn(**tool_args)
139
-
 
 
 
 
 
140
  except Exception as e:
141
  traceback.print_exc()
142
- return {"error": f"Tool error: {str(e)}"}
143
 
144
  # -----------------------------------------------------
145
- # Main Pipeline
146
  # -----------------------------------------------------
147
  def process_and_describe(payload: Dict[str, Any]) -> Dict[str, Any]:
148
- """Main pipeline function to process image, call VLM, and execute tool."""
149
-
150
- # Input handling for gradio.JSON input which sometimes arrives as a string
151
  if isinstance(payload, str):
152
  try:
153
  payload = json.loads(payload)
154
- except json.JSONDecodeError:
155
- return {"error": "Invalid JSON payload provided to the function"}
156
 
157
  hf_token = payload.get("hf_token")
158
  if not hf_token:
159
- return {"error": "hf_token missing in payload. Cannot authenticate with HF Hub."}
160
 
161
  robot_id = payload.get("robot_id", "unknown")
162
  image_b64 = payload.get("image_b64")
163
  if not image_b64:
164
- return {"error": "image_b64 missing in payload"}
165
 
166
  # Save + Upload
167
  _, hf_url, _, size_bytes = save_and_upload_image(image_b64, hf_token)
168
-
169
  if not hf_url:
170
  return {"error": "Image upload failed"}
171
 
@@ -174,27 +136,19 @@ def process_and_describe(payload: Dict[str, Any]) -> Dict[str, Any]:
174
  Respond in STRICT JSON ONLY:
175
  {{
176
  "description": "short visual description",
177
- "tool_name": "{' | '.join(mcp._tools.keys())}",
178
  "arguments": {{ ... }}
179
  }}
180
  """
181
-
182
  messages = [
183
  {"role": "system", "content": system_prompt},
184
- {
185
- "role": "user",
186
- "content": [
187
- {"type": "text", "text": "Analyze the image and choose ONE tool."},
188
- {
189
- "type": "image_url",
190
- "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
191
- },
192
- ],
193
- },
194
  ]
195
 
196
  client = InferenceClient(token=hf_token)
197
-
198
  try:
199
  response = client.chat.completions.create(
200
  model=HF_VLM_MODEL,
@@ -203,25 +157,15 @@ Respond in STRICT JSON ONLY:
203
  temperature=0.1,
204
  )
205
  except Exception as e:
206
- return {"status": "error", "message": f"Inference API call failed: {str(e)}"}
207
-
208
 
209
  vlm_output = response.choices[0].message.content.strip()
210
-
211
  parsed = safe_parse_json_from_text(vlm_output)
212
-
213
  if parsed is None:
214
- return {
215
- "status": "model_no_json",
216
- "robot_id": robot_id,
217
- "image_url": hf_url,
218
- "vlm_raw": vlm_output,
219
- "message": "VLM returned invalid JSON format",
220
- }
221
 
222
  tool_name = parsed.get("tool_name")
223
  tool_args = parsed.get("arguments") or {}
224
-
225
  tool_result = validate_and_call_tool(tool_name, tool_args)
226
 
227
  return {
@@ -233,7 +177,7 @@ Respond in STRICT JSON ONLY:
233
  "chosen_tool": tool_name,
234
  "tool_arguments": tool_args,
235
  "tool_execution_result": tool_result,
236
- "vlm_raw": vlm_output,
237
  }
238
 
239
  # ------------------------------
@@ -241,7 +185,7 @@ Respond in STRICT JSON ONLY:
241
  # ------------------------------
242
  iface = gr.Interface(
243
  fn=process_and_describe,
244
- inputs=gr.JSON(label="Input JSON Payload (must contain hf_token and image_b64)"),
245
  outputs=gr.JSON(label="Output JSON Result"),
246
  api_name="predict",
247
  flagging_mode="never"
@@ -255,4 +199,3 @@ if __name__ == "__main__":
255
  print(f"[Config] HF_VLM_MODEL: {HF_VLM_MODEL}")
256
  print("[Gradio] Launching interface...")
257
  iface.launch(server_name="0.0.0.0", server_port=7860)
258
-
 
6
  from huggingface_hub import upload_file, InferenceClient
7
  from datetime import datetime
8
  import traceback
9
+ from typing import Optional, Dict, Any
10
 
11
  from fastmcp import FastMCP
12
 
13
+ # --- Configuration ---
 
 
14
  HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
15
  HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
 
 
16
 
17
  mcp = FastMCP("Robot_MCP")
18
 
 
22
  @mcp.tool()
23
  def speak(text: str, emotion: str = "neutral"):
24
  """Makes the robot speak a given text with an emotion."""
25
+ return {"status": "success", "action_executed": "speak", "payload": {"text": text, "emotion": emotion}}
 
 
 
 
 
26
 
27
  @mcp.tool()
28
  def navigate(direction: str, distance_meters: float):
29
  """Moves the robot a specified distance in a direction (max 5m)."""
30
  if distance_meters > 5.0:
31
  return {"status": "error", "message": "Safety limit exceeded"}
32
+ return {"status": "success", "action_executed": "navigate", "payload": {"direction": direction, "distance": distance_meters}}
 
 
 
 
 
33
 
34
  @mcp.tool()
35
  def scan_hazard(hazard_type: str, severity: str):
36
  """Logs a potential hazard detected by the robot."""
37
  timestamp = datetime.now().isoformat()
38
+ return {"status": "warning_logged", "log": f"[{timestamp}] HAZARD: {hazard_type} (Severity: {severity})"}
 
 
 
 
39
 
40
  @mcp.tool()
41
  def analyze_human(clothing_color: str, estimated_action: str):
42
  """Tracks human activity based on visual input."""
43
+ return {"status": "human_tracked", "details": f"Human wearing {clothing_color} is {estimated_action}"}
 
 
 
44
 
45
  # -----------------------------------------------------
46
+ # Save and upload image to HF
47
  # -----------------------------------------------------
48
  def save_and_upload_image(image_b64: str, hf_token: str):
49
+ """Decode base64 image, save locally, and upload to HF dataset repo."""
50
  try:
51
  image_bytes = base64.b64decode(image_b64)
 
 
 
52
  os.makedirs("/tmp", exist_ok=True)
53
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
54
  local_path = f"/tmp/robot_img_{timestamp}.jpg"
 
57
  f.write(image_bytes)
58
 
59
  filename = f"robot_{timestamp}.jpg"
60
+ upload_file(local_path, path_in_repo=filename, repo_id=HF_DATASET_REPO, token=hf_token, repo_type="dataset")
 
 
 
 
 
 
 
 
61
  url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{filename}"
62
+ return local_path, url, filename, len(image_bytes)
63
 
64
  except Exception as e:
65
+ print(f"[Error] Image upload failed: {e}")
66
  traceback.print_exc()
67
  return None, None, None, 0
68
 
69
  # -----------------------------------------------------
70
+ # JSON parsing helper
71
  # -----------------------------------------------------
72
  def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
73
+ """Safely extract JSON from messy VLM output"""
74
  if not text:
75
  return None
76
  try:
77
  return json.loads(text)
78
+ except:
79
+ pass
 
80
  cleaned = text.strip().strip("`").strip()
 
81
  if cleaned.lower().startswith("json"):
82
  cleaned = cleaned[4:].strip()
 
83
  try:
84
  start = cleaned.find("{")
85
  end = cleaned.rfind("}")
86
  if start >= 0 and end > start:
87
+ return json.loads(cleaned[start:end+1])
88
+ except:
89
+ return None
 
90
  return None
91
 
92
  # -----------------------------------------------------
93
+ # Call MCP tool safely using public API
94
  # -----------------------------------------------------
95
  def validate_and_call_tool(tool_name: str, tool_args: dict) -> Dict[str, Any]:
96
+ """Use public API instead of _tools"""
 
 
 
97
  try:
98
+ # FastMCP v2.11.2 provides call_tool
99
+ if hasattr(mcp, "call_tool"):
100
+ return mcp.call_tool(tool_name, tool_args)
101
+ # fallback: call the registered function directly
102
+ if hasattr(mcp, tool_name):
103
+ tool_fn = getattr(mcp, tool_name)
104
+ return tool_fn(**tool_args)
105
+ return {"error": f"Unknown tool '{tool_name}'"}
106
  except Exception as e:
107
  traceback.print_exc()
108
+ return {"error": f"Tool execution error: {str(e)}"}
109
 
110
  # -----------------------------------------------------
111
+ # Main pipeline: image → VLM → tool
112
  # -----------------------------------------------------
113
  def process_and_describe(payload: Dict[str, Any]) -> Dict[str, Any]:
 
 
 
114
  if isinstance(payload, str):
115
  try:
116
  payload = json.loads(payload)
117
+ except:
118
+ return {"error": "Invalid JSON payload"}
119
 
120
  hf_token = payload.get("hf_token")
121
  if not hf_token:
122
+ return {"error": "hf_token missing"}
123
 
124
  robot_id = payload.get("robot_id", "unknown")
125
  image_b64 = payload.get("image_b64")
126
  if not image_b64:
127
+ return {"error": "image_b64 missing"}
128
 
129
  # Save + Upload
130
  _, hf_url, _, size_bytes = save_and_upload_image(image_b64, hf_token)
 
131
  if not hf_url:
132
  return {"error": "Image upload failed"}
133
 
 
136
  Respond in STRICT JSON ONLY:
137
  {{
138
  "description": "short visual description",
139
+ "tool_name": "speak | navigate | scan_hazard | analyze_human",
140
  "arguments": {{ ... }}
141
  }}
142
  """
 
143
  messages = [
144
  {"role": "system", "content": system_prompt},
145
+ {"role": "user", "content": [
146
+ {"type": "text", "text": "Analyze the image and choose ONE tool."},
147
+ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
148
+ ]}
 
 
 
 
 
 
149
  ]
150
 
151
  client = InferenceClient(token=hf_token)
 
152
  try:
153
  response = client.chat.completions.create(
154
  model=HF_VLM_MODEL,
 
157
  temperature=0.1,
158
  )
159
  except Exception as e:
160
+ return {"status": "error", "message": f"Inference API call failed: {e}"}
 
161
 
162
  vlm_output = response.choices[0].message.content.strip()
 
163
  parsed = safe_parse_json_from_text(vlm_output)
 
164
  if parsed is None:
165
+ return {"status": "model_no_json", "robot_id": robot_id, "image_url": hf_url, "vlm_raw": vlm_output, "message": "VLM returned invalid JSON"}
 
 
 
 
 
 
166
 
167
  tool_name = parsed.get("tool_name")
168
  tool_args = parsed.get("arguments") or {}
 
169
  tool_result = validate_and_call_tool(tool_name, tool_args)
170
 
171
  return {
 
177
  "chosen_tool": tool_name,
178
  "tool_arguments": tool_args,
179
  "tool_execution_result": tool_result,
180
+ "vlm_raw": vlm_output
181
  }
182
 
183
  # ------------------------------
 
185
  # ------------------------------
186
  iface = gr.Interface(
187
  fn=process_and_describe,
188
+ inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"),
189
  outputs=gr.JSON(label="Output JSON Result"),
190
  api_name="predict",
191
  flagging_mode="never"
 
199
  print(f"[Config] HF_VLM_MODEL: {HF_VLM_MODEL}")
200
  print("[Gradio] Launching interface...")
201
  iface.launch(server_name="0.0.0.0", server_port=7860)