OppaAI commited on
Commit
08216b8
·
verified ·
1 Parent(s): c687f9a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +158 -55
app.py CHANGED
@@ -1,38 +1,110 @@
1
  import os
2
  import base64
 
3
  import gradio as gr
4
  from huggingface_hub import upload_file, InferenceClient
5
- import json
6
 
7
  # --- Config ---
8
  HF_DATASET_REPO = "OppaAI/Robot_MCP"
9
  HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
10
 
11
- def say_hi(greeting_text: str = "Hi there!") -> dict:
12
- """Return a greeting command in JSON."""
13
- return {"command": "say_hi", "text": greeting_text}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
- # --- Helper Functions ---
16
  def save_and_upload_image(image_b64: str, hf_token: str):
17
- image_bytes = base64.b64decode(image_b64)
18
- local_tmp_path = "/tmp/tmp.jpg"
19
- with open(local_tmp_path, "wb") as f:
20
- f.write(image_bytes)
21
-
22
- path_in_repo = "images/tmp.jpg"
23
- upload_file(
24
- path_or_fileobj=local_tmp_path,
25
- path_in_repo=path_in_repo,
26
- repo_id=HF_DATASET_REPO,
27
- token=hf_token,
28
- repo_type="dataset"
29
- )
30
-
31
- hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
32
- return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
33
-
34
- # --- Main MCP function ---
 
 
 
 
 
 
 
 
 
 
35
  def process_and_describe(payload: dict):
 
 
 
 
36
  try:
37
  hf_token = payload.get("hf_token")
38
  if not hf_token:
@@ -40,33 +112,49 @@ def process_and_describe(payload: dict):
40
 
41
  robot_id = payload.get("robot_id", "unknown")
42
  image_b64 = payload.get("image_b64")
 
43
  if not image_b64:
44
  return {"error": "No image provided."}
45
 
46
- # Save image & upload
47
  local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
48
 
49
- # Initialize HF client
50
  hf_client = InferenceClient(token=hf_token)
51
 
52
- system_prompt = """
53
- You are a helpful robot assistant.
54
- When you receive an image, you must:
55
- 1. Describe the image in detail.
56
- 2. Decide actions for the robot. Example:
57
- - Human figure call the `say_hi` tool with a friendly greeting (vary every time)
58
- Always respond in JSON with:
59
- {
60
- "description": "...",
61
- "action": "say_hi",
62
- "greeting_text": "a friendly greeting"
63
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  """
65
 
66
  messages_payload = [
67
  {"role": "system", "content": system_prompt},
68
  {"role": "user", "content": [
69
- {"type": "text", "text": "Here is an image."},
70
  {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
71
  ]}
72
  ]
@@ -75,40 +163,55 @@ def process_and_describe(payload: dict):
75
  chat_completion = hf_client.chat.completions.create(
76
  model=HF_VLM_MODEL,
77
  messages=messages_payload,
78
- max_tokens=300
 
79
  )
80
 
81
  vlm_text = chat_completion.choices[0].message.content.strip()
 
 
 
 
82
 
83
- # Parse JSON from VLM
84
  try:
85
  action_data = json.loads(vlm_text)
 
 
 
 
 
 
 
 
 
 
 
 
86
  except json.JSONDecodeError:
87
- action_data = {"description": vlm_text, "action": None, "greeting_text": None}
 
88
 
89
  return {
90
- "saved_to_hf_hub": True,
91
- "repo_id": HF_DATASET_REPO,
92
- "path_in_repo": path_in_repo,
93
- "image_url": hf_url,
94
- "file_size_bytes": size_bytes,
95
  "robot_id": robot_id,
96
- "vlm_response": vlm_text,
97
- "vlm_action": action_data.get("action"),
98
- "vlm_description": action_data.get("description"),
99
- "tool_result": tool_result
 
100
  }
101
 
102
  except Exception as e:
103
- return {"error": f"An API error occurred: {str(e)}"}
104
 
105
- # --- Gradio MCP Interface ---
106
  demo = gr.Interface(
107
  fn=process_and_describe,
108
- inputs=gr.JSON(label="Input Payload"),
109
- outputs=gr.JSON(label="Reply to Jetson"),
110
  api_name="predict"
111
  )
112
 
113
  if __name__ == "__main__":
114
- demo.launch()
 
1
  import os
2
  import base64
3
+ import json
4
  import gradio as gr
5
  from huggingface_hub import upload_file, InferenceClient
6
+ from datetime import datetime
7
 
8
  # --- Config ---
9
  HF_DATASET_REPO = "OppaAI/Robot_MCP"
10
  HF_VLM_MODEL = "Qwen/Qwen2.5-VL-7B-Instruct"
11
 
12
+ # ==========================================
13
+ # 1. DEFINE ROBOT TOOLS
14
+ # ==========================================
15
+
16
+ def tool_speak(text: str, emotion: str = "neutral") -> dict:
17
+ """
18
+ Command the robot to speak text via TTS.
19
+ """
20
+ # In a real scenario, this would send a signal to the robot's speaker driver
21
+ return {
22
+ "status": "success",
23
+ "action_executed": "speak",
24
+ "payload": {"text": text, "emotion": emotion}
25
+ }
26
+
27
+ def tool_navigate(direction: str, distance_meters: float) -> dict:
28
+ """
29
+ Move the robot. Direction options: 'forward', 'backward', 'left', 'right'.
30
+ """
31
+ if distance_meters > 5.0:
32
+ return {"status": "error", "message": "Safety limit: Cannot move more than 5m at once."}
33
+
34
+ return {
35
+ "status": "success",
36
+ "action_executed": "navigate",
37
+ "payload": {"direction": direction, "distance": distance_meters}
38
+ }
39
+
40
+ def tool_scan_hazard(hazard_type: str, severity: str) -> dict:
41
+ """
42
+ Log a safety hazard if seen in the image (e.g., 'fire', 'water', 'obstacle').
43
+ """
44
+ timestamp = datetime.now().isoformat()
45
+ log_entry = f"[{timestamp}] WARNING: {hazard_type} detected (Severity: {severity})"
46
+ # Here you would write to a log file or trigger an alarm
47
+ return {
48
+ "status": "warning_logged",
49
+ "log": log_entry
50
+ }
51
+
52
+ def tool_analyze_human(clothing_color: str, estimated_action: str) -> dict:
53
+ """
54
+ Specialized analysis when a human is detected.
55
+ """
56
+ return {
57
+ "status": "human_tracked",
58
+ "details": f"Human wearing {clothing_color} is likely {estimated_action}."
59
+ }
60
+
61
+ # --- Tool Dispatcher ---
62
+ # This maps string names to the actual Python functions
63
+ TOOL_REGISTRY = {
64
+ "speak": tool_speak,
65
+ "navigate": tool_navigate,
66
+ "scan_hazard": tool_scan_hazard,
67
+ "analyze_human": tool_analyze_human
68
+ }
69
+
70
+ # ==========================================
71
+ # 2. HELPER FUNCTIONS
72
+ # ==========================================
73
 
 
74
  def save_and_upload_image(image_b64: str, hf_token: str):
75
+ try:
76
+ image_bytes = base64.b64decode(image_b64)
77
+ local_tmp_path = "/tmp/tmp.jpg"
78
+ with open(local_tmp_path, "wb") as f:
79
+ f.write(image_bytes)
80
+
81
+ # Create unique filename to avoid overwriting
82
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
83
+ path_in_repo = f"images/robot_{timestamp}.jpg"
84
+
85
+ upload_file(
86
+ path_or_fileobj=local_tmp_path,
87
+ path_in_repo=path_in_repo,
88
+ repo_id=HF_DATASET_REPO,
89
+ token=hf_token,
90
+ repo_type="dataset"
91
+ )
92
+
93
+ hf_image_url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/{path_in_repo}"
94
+ return local_tmp_path, hf_image_url, path_in_repo, len(image_bytes)
95
+ except Exception as e:
96
+ print(f"Upload failed: {e}")
97
+ return None, None, None, 0
98
+
99
+ # ==========================================
100
+ # 3. MAIN LOGIC
101
+ # ==========================================
102
+
103
  def process_and_describe(payload: dict):
104
+ tool_result = None
105
+ vlm_text = ""
106
+ action_data = {}
107
+
108
  try:
109
  hf_token = payload.get("hf_token")
110
  if not hf_token:
 
112
 
113
  robot_id = payload.get("robot_id", "unknown")
114
  image_b64 = payload.get("image_b64")
115
+
116
  if not image_b64:
117
  return {"error": "No image provided."}
118
 
119
+ # Upload Image
120
  local_tmp_path, hf_url, path_in_repo, size_bytes = save_and_upload_image(image_b64, hf_token)
121
 
122
+ # Initialize HF Client
123
  hf_client = InferenceClient(token=hf_token)
124
 
125
+ # --- Dynamic System Prompt Construction ---
126
+ tools_desc = json.dumps({
127
+ "speak": {"text": "string", "emotion": "string"},
128
+ "navigate": {"direction": "forward/left/right", "distance_meters": "float"},
129
+ "scan_hazard": {"hazard_type": "string", "severity": "low/medium/high"},
130
+ "analyze_human": {"clothing_color": "string", "estimated_action": "string"}
131
+ }, indent=2)
132
+
133
+ system_prompt = f"""
134
+ You are a Robot Control AI. Analyze the image and choose ONE tool to execute.
135
+
136
+ AVAILABLE TOOLS (JSON Schema):
137
+ {tools_desc}
138
+
139
+ INSTRUCTIONS:
140
+ 1. Describe what you see briefly.
141
+ 2. Select the most appropriate tool based on the visual context.
142
+ - If you see a person -> use 'analyze_human' OR 'speak'.
143
+ - If you see a clear path -> use 'navigate'.
144
+ - If you see fire/mess -> use 'scan_hazard'.
145
+
146
+ RESPONSE FORMAT (Strict JSON):
147
+ {{
148
+ "description": "Brief visual description",
149
+ "tool_name": "name_of_tool",
150
+ "arguments": {{ ...args matching schema... }}
151
+ }}
152
  """
153
 
154
  messages_payload = [
155
  {"role": "system", "content": system_prompt},
156
  {"role": "user", "content": [
157
+ {"type": "text", "text": "Analyze this camera feed and decide on an action."},
158
  {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
159
  ]}
160
  ]
 
163
  chat_completion = hf_client.chat.completions.create(
164
  model=HF_VLM_MODEL,
165
  messages=messages_payload,
166
+ max_tokens=300,
167
+ temperature=0.1 # Low temp for reliable JSON
168
  )
169
 
170
  vlm_text = chat_completion.choices[0].message.content.strip()
171
+
172
+ # Clean up markdown code blocks if the model adds them (```json ... ```)
173
+ if vlm_text.startswith("```"):
174
+ vlm_text = vlm_text.strip("`").replace("json", "").strip()
175
 
176
+ # Parse JSON
177
  try:
178
  action_data = json.loads(vlm_text)
179
+
180
+ # --- TOOL EXECUTION BLOCK ---
181
+ tool_name = action_data.get("tool_name")
182
+ tool_args = action_data.get("arguments", {})
183
+
184
+ if tool_name in TOOL_REGISTRY:
185
+ # Execute the Python function dynamically
186
+ print(f"Executing tool: {tool_name} with args {tool_args}")
187
+ tool_result = TOOL_REGISTRY[tool_name](**tool_args)
188
+ else:
189
+ tool_result = {"error": f"Tool '{tool_name}' not found in registry."}
190
+
191
  except json.JSONDecodeError:
192
+ action_data = {"description": vlm_text, "tool_name": None}
193
+ tool_result = {"error": "Model did not return valid JSON."}
194
 
195
  return {
196
+ "status": "success",
 
 
 
 
197
  "robot_id": robot_id,
198
+ "image_url": hf_url,
199
+ "analysis": action_data.get("description"),
200
+ "chosen_tool": action_data.get("tool_name"),
201
+ "tool_arguments": action_data.get("arguments"),
202
+ "tool_execution_result": tool_result
203
  }
204
 
205
  except Exception as e:
206
+ return {"error": f"Server error: {str(e)}", "raw_response": vlm_text}
207
 
208
+ # --- Gradio Interface ---
209
  demo = gr.Interface(
210
  fn=process_and_describe,
211
+ inputs=gr.JSON(label="Input (JSON with 'image_b64' and 'hf_token')"),
212
+ outputs=gr.JSON(label="Robot Command Output"),
213
  api_name="predict"
214
  )
215
 
216
  if __name__ == "__main__":
217
+ demo.launch()