OppaAI commited on
Commit
8c3dcd1
·
verified ·
1 Parent(s): 00f5c0d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +25 -61
app.py CHANGED
@@ -1,25 +1,24 @@
1
  import os
2
  import base64
3
  import json
4
- import gradio as gr
5
- from huggingface_hub import HfApi, InferenceClient
6
  from datetime import datetime
7
  import traceback
8
  from typing import Optional, Dict, Any
9
 
 
 
10
  from fastmcp import FastMCP
11
 
12
- # --- Configuration ---
13
  HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
14
  HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
15
 
16
- # Create MCP server
17
  mcp = FastMCP("Robot_MCP_Server")
18
 
19
 
20
- # -----------------------------------------------------
21
- # Save and upload image to HF
22
- # -----------------------------------------------------
23
  def upload_image(image_b64: str, hf_token: str):
24
  try:
25
  image_bytes = base64.b64decode(image_b64)
@@ -43,21 +42,18 @@ def upload_image(image_b64: str, hf_token: str):
43
  token=hf_token
44
  )
45
 
46
- # FIXED URL
47
  url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
48
-
49
  return local_path, url, filename, size_bytes
50
 
51
- except Exception as e:
52
- print(f"[Error] during image upload: {e}")
53
  traceback.print_exc()
54
  return None, None, None, 0
55
 
56
 
57
- # -----------------------------------------------------
58
- # JSON parsing helper
59
- # -----------------------------------------------------
60
- def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
61
  if not text:
62
  return None
63
  try:
@@ -72,19 +68,16 @@ def safe_parse_json_from_text(text: str) -> Optional[Dict[str, Any]]:
72
  try:
73
  start = cleaned.find("{")
74
  end = cleaned.rfind("}")
75
- if start >= 0 and end > start:
76
- return json.loads(cleaned[start:end + 1])
77
  except:
78
  return None
79
 
80
- return None
81
-
82
 
83
- # -----------------------------------------------------
84
- # MCP Tool: image → VLM → structured JSON
85
- # -----------------------------------------------------
86
  @mcp.tool()
87
- def robot_watch(payload: Dict[str, Any]) -> Dict[str, Any]:
88
  if isinstance(payload, str):
89
  try:
90
  payload = json.loads(payload)
@@ -100,15 +93,14 @@ def robot_watch(payload: Dict[str, Any]) -> Dict[str, Any]:
100
  if not image_b64:
101
  return {"error": "image_b64 missing"}
102
 
103
- # 1. Save + Upload
104
  _, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
105
  if not hf_url:
106
  return {"error": "Image upload failed"}
107
 
108
- # 2. VLM prompt
109
  system_prompt = """
110
  Respond in STRICT JSON ONLY.
111
- Output format:
112
  {
113
  "description": "...",
114
  "human": "...",
@@ -119,7 +111,7 @@ Output format:
119
  messages = [
120
  {"role": "system", "content": system_prompt},
121
  {"role": "user", "content": [
122
- {"type": "text", "text": "Analyze the image and provide the description."},
123
  {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
124
  ]}
125
  ]
@@ -127,24 +119,23 @@ Output format:
127
  client = InferenceClient(token=hf_token)
128
 
129
  try:
130
- response = client.chat.completions.create(
131
  model=HF_VLM_MODEL,
132
  messages=messages,
133
  max_tokens=300,
134
- temperature=0.1,
135
  )
136
  except Exception as e:
137
- return {"status": "error", "message": f"Inference API call failed: {e}"}
138
 
139
- vlm_output = response.choices[0].message.content.strip()
140
  parsed = safe_parse_json_from_text(vlm_output)
141
 
142
  if parsed is None:
143
  return {
144
  "status": "model_no_json",
145
- "robot_id": robot_id,
146
  "vlm_raw": vlm_output,
147
- "message": "VLM returned invalid JSON"
148
  }
149
 
150
  return {
@@ -152,31 +143,4 @@ Output format:
152
  "robot_id": robot_id,
153
  "file_size_bytes": size_bytes,
154
  "image_url": hf_url,
155
- "description": parsed.get("description"),
156
- "human": parsed.get("human"),
157
- "environment": parsed.get("environment"),
158
- "vlm_raw": vlm_output
159
- }
160
-
161
-
162
- # -----------------------------------------------------
163
- # Gradio Interface wrapper
164
- # -----------------------------------------------------
165
- def process_and_describe(payload):
166
- return robot_watch(payload)
167
-
168
-
169
- app = gr.Interface(
170
- fn=process_and_describe,
171
- inputs=gr.JSON(label="Input JSON Payload (must include hf_token & image_b64)"),
172
- outputs=gr.JSON(label="Output JSON Result"),
173
- api_name="predict",
174
- flagging_mode="never"
175
- )
176
-
177
-
178
- # -----------------------------------------------------
179
- # Entry
180
- # -----------------------------------------------------
181
- if __name__ == "__main__":
182
- app.launch(mcp_server=True)
 
1
  import os
2
  import base64
3
  import json
 
 
4
  from datetime import datetime
5
  import traceback
6
  from typing import Optional, Dict, Any
7
 
8
+ import gradio as gr
9
+ from huggingface_hub import HfApi, InferenceClient
10
  from fastmcp import FastMCP
11
 
12
+
13
  HF_DATASET_REPO = os.environ.get("HF_DATASET_REPO", "OppaAI/Robot_MCP")
14
  HF_VLM_MODEL = os.environ.get("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct")
15
 
 
16
  mcp = FastMCP("Robot_MCP_Server")
17
 
18
 
19
+ # -------------------------------
20
+ # Upload helper
21
+ # -------------------------------
22
  def upload_image(image_b64: str, hf_token: str):
23
  try:
24
  image_bytes = base64.b64decode(image_b64)
 
42
  token=hf_token
43
  )
44
 
 
45
  url = f"https://huggingface.co/datasets/{HF_DATASET_REPO}/resolve/main/tmp/{filename}"
 
46
  return local_path, url, filename, size_bytes
47
 
48
+ except Exception:
 
49
  traceback.print_exc()
50
  return None, None, None, 0
51
 
52
 
53
+ # -------------------------------
54
+ # Safe JSON parse
55
+ # -------------------------------
56
+ def safe_parse_json_from_text(text: str):
57
  if not text:
58
  return None
59
  try:
 
68
  try:
69
  start = cleaned.find("{")
70
  end = cleaned.rfind("}")
71
+ return json.loads(cleaned[start:end + 1])
 
72
  except:
73
  return None
74
 
 
 
75
 
76
+ # -------------------------------
77
+ # MCP TOOL
78
+ # -------------------------------
79
  @mcp.tool()
80
+ def robot_watch(payload: Dict[str, Any]):
81
  if isinstance(payload, str):
82
  try:
83
  payload = json.loads(payload)
 
93
  if not image_b64:
94
  return {"error": "image_b64 missing"}
95
 
96
+ # Upload image
97
  _, hf_url, _, size_bytes = upload_image(image_b64, hf_token)
98
  if not hf_url:
99
  return {"error": "Image upload failed"}
100
 
101
+ # VLM call
102
  system_prompt = """
103
  Respond in STRICT JSON ONLY.
 
104
  {
105
  "description": "...",
106
  "human": "...",
 
111
  messages = [
112
  {"role": "system", "content": system_prompt},
113
  {"role": "user", "content": [
114
+ {"type": "text", "text": "Analyze the image."},
115
  {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}}
116
  ]}
117
  ]
 
119
  client = InferenceClient(token=hf_token)
120
 
121
  try:
122
+ resp = client.chat.completions.create(
123
  model=HF_VLM_MODEL,
124
  messages=messages,
125
  max_tokens=300,
126
+ temperature=0.1
127
  )
128
  except Exception as e:
129
+ return {"status": "error", "message": str(e)}
130
 
131
+ vlm_output = resp.choices[0].message.content.strip()
132
  parsed = safe_parse_json_from_text(vlm_output)
133
 
134
  if parsed is None:
135
  return {
136
  "status": "model_no_json",
 
137
  "vlm_raw": vlm_output,
138
+ "message": "Invalid JSON returned"
139
  }
140
 
141
  return {
 
143
  "robot_id": robot_id,
144
  "file_size_bytes": size_bytes,
145
  "image_url": hf_url,
146
+ "de