subhash4face commited on
Commit
420030c
·
verified ·
1 Parent(s): 125ea2c

Updated to include API keys

Browse files
Files changed (1) hide show
  1. app.py +197 -83
app.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  import os
3
  import io
4
  import json
@@ -9,13 +8,14 @@ from typing import Optional
9
  import gradio as gr
10
  from pydantic import BaseModel
11
 
12
- # Try optional dependencies
13
  try:
14
  import openai
15
  OPENAI_AVAILABLE = True
16
  except Exception:
17
  OPENAI_AVAILABLE = False
18
 
 
19
  try:
20
  from PIL import Image
21
  import requests
@@ -24,18 +24,23 @@ try:
24
  except Exception:
25
  HF_BLIP_AVAILABLE = False
26
 
27
- # Config
 
 
28
  OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
29
  ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
 
30
 
31
  if OPENAI_API_KEY and OPENAI_AVAILABLE:
32
  openai.api_key = OPENAI_API_KEY
33
 
34
- ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL")
 
35
  ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech"
36
 
37
-
38
- # MCP server shim
 
39
  class ToolResult(BaseModel):
40
  content: str
41
  meta: Optional[dict] = None
@@ -48,161 +53,270 @@ class MCPServer:
48
 
49
  def tool(self, name: str, description: str = ""):
50
  def decorator(fn):
51
- self.tools[name] = {"fn": fn, "description": description}
 
 
 
52
  return fn
53
  return decorator
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  server = MCPServer("accessibility_voice_mcp")
56
 
57
- # STT utilities
 
 
 
58
  def transcribe_with_openai(audio_file_path: str) -> str:
 
59
  if not OPENAI_AVAILABLE:
60
- return "OpenAI not available"
61
- try:
62
- with open(audio_file_path, "rb") as f:
63
- tr = openai.Audio.transcriptions.create(model="whisper-1", file=f)
64
- if isinstance(tr, dict):
65
- return tr.get("text", "")
66
- return getattr(tr, "text", "")
67
- except Exception as e:
68
- return f"OpenAI transcription error: {e}"
 
 
 
69
 
70
  def transcribe_fallback(audio_file_path: str) -> str:
 
71
  try:
72
  import whisper
73
  model = whisper.load_model("small")
74
  res = model.transcribe(audio_file_path)
75
  return res.get("text", "")
76
  except Exception as e:
77
- return f"Local STT fallback failed: {e}"
 
78
 
79
- # TTS
80
  def tts_elevenlabs(text: str) -> bytes:
 
81
  if not ELEVENLABS_API_KEY:
82
- raise RuntimeError("ELEVENLABS_API_KEY missing")
83
  url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}"
84
- headers = {"xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json"}
85
- payload = {"text": text, "voice_settings": {"stability": 0.5, "similarity_boost": 0.75}}
86
- r = requests.post(url, headers=headers, json=payload, stream=True)
87
- if r.status_code != 200:
88
- raise RuntimeError(f"ElevenLabs error {r.status_code}: {r.text}")
89
- return r.content
90
-
91
- # Gemini Vision
 
 
 
 
 
 
 
 
 
 
 
92
  def describe_image_gemini(image_path: str) -> str:
 
93
  try:
94
  import google.generativeai as genai
95
- key = os.environ.get("GOOGLE_GEMINI_API_KEY")
96
- if not key:
97
  return "GOOGLE_GEMINI_API_KEY not set"
98
 
99
- genai.configure(api_key=key)
100
  model = genai.GenerativeModel("gemini-1.5-flash")
101
 
102
  with open(image_path, "rb") as f:
103
- img_bytes = f.read()
104
 
105
- resp = model.generate_content([
106
- "Describe this image for a visually impaired user.",
107
- {"mime_type": "image/jpeg", "data": img_bytes}
108
- ])
109
- return resp.text
110
  except Exception as e:
111
  return f"Gemini describe error: {e}"
112
 
113
- # BLIP fallback
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  def describe_image_blip(image_path: str) -> str:
115
  if not HF_BLIP_AVAILABLE:
116
- return "BLIP not available"
117
  try:
118
  processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
119
  model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
120
- raw = Image.open(image_path).convert("RGB")
121
- inputs = processor(raw, return_tensors="pt")
122
  out = model.generate(**inputs)
123
- return processor.decode(out[0], skip_special_tokens=True)
 
124
  except Exception as e:
125
  return f"BLIP caption error: {e}"
126
 
 
127
  # MCP Tools
128
- @server.tool("speak_text", "Convert text to speech using ElevenLabs")
 
129
  def speak_text_tool(text: str) -> ToolResult:
130
  try:
131
- audio = tts_elevenlabs(text)
132
- enc = base64.b64encode(audio).decode("utf-8")
133
- return ToolResult(content=enc, meta={"format": "base64-audio"})
134
  except Exception as e:
135
- return ToolResult(content=f"TTS error: {e}")
136
 
137
- @server.tool("describe_image", "Describe an uploaded image")
 
138
  def describe_image_tool(image_path: str) -> ToolResult:
139
- desc = describe_image_gemini(image_path)
140
- if "error" not in desc.lower() and "not set" not in desc.lower():
 
 
 
 
 
141
  return ToolResult(content=desc)
142
- desc = describe_image_blip(image_path)
143
- return ToolResult(content=desc)
144
 
145
- @server.tool("transcribe_audio", "Transcribe audio to text")
146
- def transcribe_audio_tool(path: str) -> ToolResult:
147
  if OPENAI_AVAILABLE:
148
- return ToolResult(content=transcribe_with_openai(path))
149
- return ToolResult(content=transcribe_fallback(path))
 
 
 
 
 
 
 
150
 
151
- # Gradio UI
152
  def decode_base64_audio(b64: str) -> bytes:
153
  return base64.b64decode(b64)
154
 
155
  with gr.Blocks() as demo:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  gr.Markdown("# Accessibility Voice Agent — MCP Tools")
157
 
158
  with gr.Row():
159
  with gr.Column(scale=2):
160
- chatbox = gr.Chatbot(type="messages")
161
- user_input = gr.Textbox(placeholder="Type or speak...", show_label=False)
162
 
163
  with gr.Row():
164
- mic = gr.Audio(sources=["microphone"], type="filepath")
165
  send_btn = gr.Button("Send")
166
 
167
- with gr.Accordion("Tools"):
168
- tts_text = gr.Textbox(label="Text to speak")
169
- tts_btn = gr.Button("Speak")
170
 
171
- img_upload = gr.File(label="Upload image")
172
  img_btn = gr.Button("Describe image")
173
 
174
  with gr.Column(scale=1):
 
175
  tools_log = gr.Textbox(value="Ready.", lines=20)
176
 
177
- def on_send_text(text, history, mic_file):
178
- log = tools_log.value
 
179
  if mic_file:
180
- log += "\nTranscribing..."
 
 
 
181
  tr = transcribe_audio_tool(mic_file)
182
- text = tr.content
183
-
184
- history = history or []
185
- history.append({"role": "user", "content": text})
186
- history.append({"role": "assistant", "content": "You said: " + text})
187
- return history, log
188
-
189
- send_btn.click(on_send_text, [user_input, chatbox, mic], [chatbox, tools_log])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
  def on_tts(text):
192
  res = speak_text_tool(text)
193
  if res.meta and res.meta.get("format") == "base64-audio":
194
- audio = decode_base64_audio(res.content)
195
- return (audio, 16000)
 
196
 
197
- tts_btn.click(on_tts, [tts_text], [gr.Audio()])
198
 
199
- def on_describe(file_obj):
200
  if not file_obj:
201
  return "No file uploaded"
202
- desc = describe_image_tool(file_obj.name)
203
- return {"role": "assistant", "content": desc.content}
 
204
 
205
- img_btn.click(on_describe, [img_upload], [chatbox])
206
 
207
  if __name__ == "__main__":
208
- demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))
 
 
1
  import os
2
  import io
3
  import json
 
8
  import gradio as gr
9
  from pydantic import BaseModel
10
 
11
+ # Optional: use openai if available for transcription and image captioning
12
  try:
13
  import openai
14
  OPENAI_AVAILABLE = True
15
  except Exception:
16
  OPENAI_AVAILABLE = False
17
 
18
+ # Optional: HF transformers fallbacks
19
  try:
20
  from PIL import Image
21
  import requests
 
24
  except Exception:
25
  HF_BLIP_AVAILABLE = False
26
 
27
+ # -----------------------------
28
+ # Configuration
29
+ # -----------------------------
30
  OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
31
  ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")
32
+ HUGGINGFACE_API_TOKEN = os.environ.get("HUGGINGFACE_API_TOKEN")
33
 
34
  if OPENAI_API_KEY and OPENAI_AVAILABLE:
35
  openai.api_key = OPENAI_API_KEY
36
 
37
+ # ElevenLabs defaults
38
+ ELEVEN_VOICE_ID = os.environ.get("ELEVEN_VOICE_ID", "EXAVITQu4vr4xnSDxMaL") # placeholder
39
  ELEVEN_API_URL = "https://api.elevenlabs.io/v1/text-to-speech"
40
 
41
+ # -----------------------------
42
+ # Minimal MCP Server shim
43
+ # -----------------------------
44
  class ToolResult(BaseModel):
45
  content: str
46
  meta: Optional[dict] = None
 
53
 
54
  def tool(self, name: str, description: str = ""):
55
  def decorator(fn):
56
+ self.tools[name] = {
57
+ "fn": fn,
58
+ "description": description,
59
+ }
60
  return fn
61
  return decorator
62
 
63
+ async def run_tool(self, name: str, *args, **kwargs):
64
+ tool = self.tools.get(name)
65
+ if not tool:
66
+ raise ValueError(f"Tool {name} not found")
67
+ fn = tool["fn"]
68
+ if asyncio.iscoroutinefunction(fn):
69
+ res = await fn(*args, **kwargs)
70
+ else:
71
+ res = fn(*args, **kwargs)
72
+ if isinstance(res, ToolResult):
73
+ return res
74
+ return ToolResult(content=str(res))
75
+
76
  server = MCPServer("accessibility_voice_mcp")
77
 
78
+ # -----------------------------
79
+ # Utilities: STT, TTS, Image describe
80
+ # -----------------------------
81
+
82
  def transcribe_with_openai(audio_file_path: str) -> str:
83
+ """Transcribe audio using OpenAI Whisper (if available)."""
84
  if not OPENAI_AVAILABLE:
85
+ return "OpenAI library not available"
86
+ with open(audio_file_path, "rb") as f:
87
+ # Uses the OpenAI Audio transcription API (may vary by SDK version)
88
+ try:
89
+ transcript = openai.Audio.transcriptions.create(model="whisper-1", file=f)
90
+ # Some SDKs return .text
91
+ if isinstance(transcript, dict):
92
+ return transcript.get("text", "")
93
+ return getattr(transcript, "text", "")
94
+ except Exception as e:
95
+ return f"OpenAI transcription error: {e}"
96
+
97
 
98
  def transcribe_fallback(audio_file_path: str) -> str:
99
+ """Fallback: invoke whisper from local package (if installed)."""
100
  try:
101
  import whisper
102
  model = whisper.load_model("small")
103
  res = model.transcribe(audio_file_path)
104
  return res.get("text", "")
105
  except Exception as e:
106
+ return f"Local transcription fallback failed: {e}"
107
+
108
 
 
109
  def tts_elevenlabs(text: str) -> bytes:
110
+ """Call ElevenLabs API to synthesize speech. Returns raw audio bytes (wav/mp3 depending on API)."""
111
  if not ELEVENLABS_API_KEY:
112
+ raise RuntimeError("ELEVENLABS_API_KEY not set in environment")
113
  url = f"{ELEVEN_API_URL}/{ELEVEN_VOICE_ID}"
114
+ headers = {
115
+ "xi-api-key": ELEVENLABS_API_KEY,
116
+ "Content-Type": "application/json",
117
+ }
118
+ payload = {
119
+ "text": text,
120
+ "voice_settings": {"stability": 0.5, "similarity_boost": 0.75}
121
+ }
122
+ resp = requests.post(url, headers=headers, json=payload, stream=True)
123
+ if resp.status_code != 200:
124
+ raise RuntimeError(f"ElevenLabs TTS failed: {resp.status_code} {resp.text}")
125
+ return resp.content
126
+
127
+
128
+
129
+ def # -----------------------------
130
+ # Gemini Image Description
131
+ # -----------------------------
132
+
133
  def describe_image_gemini(image_path: str) -> str:
134
+ """Describe an image using Google Gemini Vision."""
135
  try:
136
  import google.generativeai as genai
137
+ GEMINI_KEY = os.environ.get("GOOGLE_GEMINI_API_KEY")
138
+ if not GEMINI_KEY:
139
  return "GOOGLE_GEMINI_API_KEY not set"
140
 
141
+ genai.configure(api_key=GEMINI_KEY)
142
  model = genai.GenerativeModel("gemini-1.5-flash")
143
 
144
  with open(image_path, "rb") as f:
145
+ image_bytes = f.read()
146
 
147
+ response = model.generate_content(["Describe this image for a visually impaired user.", {"mime_type":"image/jpeg", "data": image_bytes}])
148
+ return response.text
 
 
 
149
  except Exception as e:
150
  return f"Gemini describe error: {e}"
151
 
152
+ # (OpenAI code removed for simplicity)
153
+ (image_path: str) -> str:
154
+ """Attempt to describe an image using OpenAI vision (if available)."""
155
+ if not OPENAI_AVAILABLE:
156
+ return "OpenAI not available for image captioning"
157
+ try:
158
+ with open(image_path, "rb") as f:
159
+ # Example using the OpenAI image understanding endpoints (SDKs vary)
160
+ # We'll call the Chat Completions with system prompt and base64 image as a fallback
161
+ b64 = base64.b64encode(f.read()).decode("utf-8")
162
+ prompt = (
163
+ "You are an assistant that describes images for visually impaired users. "
164
+ "Provide a concise, vivid, and accessible description of the image."
165
+
166
+ Image(base64):" + b64
167
+ )
168
+ resp = openai.ChatCompletion.create(
169
+ model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], max_tokens=300
170
+ )
171
+ return resp.choices[0].message.content.strip()
172
+ except Exception as e:
173
+ return f"OpenAI image describe error: {e}"
174
+
175
+
176
  def describe_image_blip(image_path: str) -> str:
177
  if not HF_BLIP_AVAILABLE:
178
+ return "HF BLIP not available in this runtime"
179
  try:
180
  processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
181
  model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
182
+ raw_image = Image.open(image_path).convert("RGB")
183
+ inputs = processor(raw_image, return_tensors="pt")
184
  out = model.generate(**inputs)
185
+ caption = processor.decode(out[0], skip_special_tokens=True)
186
+ return caption
187
  except Exception as e:
188
  return f"BLIP caption error: {e}"
189
 
190
+ # -----------------------------
191
  # MCP Tools
192
+ # -----------------------------
193
+ @server.tool(name="speak_text", description="Convert text to speech using ElevenLabs")
194
  def speak_text_tool(text: str) -> ToolResult:
195
  try:
196
+ audio_bytes = tts_elevenlabs(text)
197
+ encoded = base64.b64encode(audio_bytes).decode("utf-8")
198
+ return ToolResult(content=encoded, meta={"format": "base64-audio"})
199
  except Exception as e:
200
+ return ToolResult(content=f"TTS Error: {e}")
201
 
202
+
203
+ @server.tool(name="describe_image", description="Describe an uploaded image for visually impaired users")
204
  def describe_image_tool(image_path: str) -> ToolResult:
205
+ # Prioritize OpenAI -> HF BLIP -> error
206
+ if OPENAI_AVAILABLE:
207
+ desc = describe_image_openai(image_path)
208
+ if desc and not desc.startswith("OpenAI image describe error"):
209
+ return ToolResult(content=desc)
210
+ if HF_BLIP_AVAILABLE:
211
+ desc = describe_image_blip(image_path)
212
  return ToolResult(content=desc)
213
+ return ToolResult(content="No image captioning backend available. Set OPENAI_API_KEY or install transformers + pillow.")
214
+
215
 
216
+ @server.tool(name="transcribe_audio", description="Transcribe user audio to text")
217
+ def transcribe_audio_tool(audio_path: str) -> ToolResult:
218
  if OPENAI_AVAILABLE:
219
+ text = transcribe_with_openai(audio_path)
220
+ return ToolResult(content=text)
221
+ else:
222
+ text = transcribe_fallback(audio_path)
223
+ return ToolResult(content=text)
224
+
225
+ # -----------------------------
226
+ # Gradio UI (client)
227
+ # -----------------------------
228
 
 
229
  def decode_base64_audio(b64: str) -> bytes:
230
  return base64.b64decode(b64)
231
 
232
  with gr.Blocks() as demo:
233
+
234
+ with gr.Accordion("🔑 API Keys (stored only in session)", open=False):
235
+ openai_key = gr.Textbox(label="OpenAI API Key", type="password")
236
+ eleven_key = gr.Textbox(label="ElevenLabs API Key", type="password")
237
+ gemini_key = gr.Textbox(label="Gemini API Key", type="password")
238
+
239
+ def set_keys(ok, ek, gk):
240
+ if ok: os.environ["OPENAI_API_KEY"] = ok
241
+ if ek: os.environ["ELEVENLABS_API_KEY"] = ek
242
+ if gk: os.environ["GOOGLE_GEMINI_API_KEY"] = gk
243
+ return "API keys set for this session."
244
+
245
+ set_btn = gr.Button("Save API Keys")
246
+ set_output = gr.Textbox(label="Status")
247
+ set_btn.click(set_keys, [openai_key, eleven_key, gemini_key], [set_output])
248
+
249
  gr.Markdown("# Accessibility Voice Agent — MCP Tools")
250
 
251
  with gr.Row():
252
  with gr.Column(scale=2):
253
+ chatbox = gr.Chatbot(label="Assistant")
254
+ user_input = gr.Textbox(placeholder="Type or press the microphone to speak...", show_label=False)
255
 
256
  with gr.Row():
257
+ mic = gr.Audio(source="microphone", type="filepath", label="Record voice (press to record)")
258
  send_btn = gr.Button("Send")
259
 
260
+ with gr.Accordion("Advanced / Tools", open=False):
261
+ tts_text = gr.Textbox(label="Text to speak (ElevenLabs)")
262
+ tts_btn = gr.Button("Speak (TTS)")
263
 
264
+ img_upload = gr.File(label="Upload image (for description)")
265
  img_btn = gr.Button("Describe image")
266
 
267
  with gr.Column(scale=1):
268
+ gr.Markdown("### Tools Log")
269
  tools_log = gr.Textbox(value="Ready.", lines=20)
270
 
271
+ # Callbacks
272
+ def on_send_text(text, chat_history, mic_file):
273
+ # If there's a mic file, prefer transcribing audio
274
  if mic_file:
275
+ tools_log_val = tools_log.value if hasattr(tools_log, 'value') else ''
276
+ tools_log_val = (tools_log_val + "
277
+ Transcribing audio...")
278
+ # transcribe
279
  tr = transcribe_audio_tool(mic_file)
280
+ user_text = tr.content
281
+ else:
282
+ user_text = text
283
+ # append user->assistant exchange
284
+ chat_history = chat_history or []
285
+ chat_history.append((user_text, "..."))
286
+ # For demo: assistant echoes + uses describe_image if commands detected
287
+ if user_text.strip().lower().startswith("describe image:"):
288
+ # expects: "describe image: filename"
289
+ _, _, fname = user_text.partition(":")
290
+ fname = fname.strip()
291
+ if fname:
292
+ desc = describe_image_tool(fname)
293
+ assistant = desc.content
294
+ else:
295
+ assistant = "Please upload an image using the Describe Image tool."
296
+ else:
297
+ assistant = "I heard: " + user_text
298
+ chat_history[-1] = (user_text, assistant)
299
+ return chat_history, tools_log_val
300
+
301
+ send_btn.click(on_send_text, inputs=[user_input, chatbox, mic], outputs=[chatbox, tools_log])
302
 
303
  def on_tts(text):
304
  res = speak_text_tool(text)
305
  if res.meta and res.meta.get("format") == "base64-audio":
306
+ audio_bytes = decode_base64_audio(res.content)
307
+ return (audio_bytes, 16000)
308
+ return None
309
 
310
+ tts_btn.click(on_tts, inputs=[tts_text], outputs=[gr.Audio(label="TTS Output")])
311
 
312
+ def on_describe_image(file_obj):
313
  if not file_obj:
314
  return "No file uploaded"
315
+ # file_obj is a tempfile path in hf spaces; pass path to tool
316
+ desc = describe_image_tool(file_obj.name if hasattr(file_obj, 'name') else file_obj)
317
+ return desc.content
318
 
319
+ img_btn.click(on_describe_image, inputs=[img_upload], outputs=[chatbox])
320
 
321
  if __name__ == "__main__":
322
+ demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))