| """ |
| Chat handling logic for Universal MCP Client - Fixed Version with File Upload Support |
| """ |
| import re |
| import logging |
| import traceback |
| from datetime import datetime |
| from typing import Dict, Any, List, Tuple, Optional |
| import gradio as gr |
| from gradio import ChatMessage |
| from gradio_client import Client |
| import time |
| import json |
| import httpx |
|
|
| from config import AppConfig |
| from mcp_client import UniversalMCPClient |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class ChatHandler: |
| """Handles chat interactions with HF Inference Providers and MCP servers using ChatMessage dataclass""" |
| |
| def __init__(self, mcp_client: UniversalMCPClient): |
| self.mcp_client = mcp_client |
| |
| try: |
| self.uploader_client = Client("abidlabs/file-uploader") |
| logger.info("✅ File uploader client initialized") |
| except Exception as e: |
| logger.error(f"Failed to initialize file uploader: {e}") |
| self.uploader_client = None |
| |
| def _upload_file_to_gradio_server(self, file_path: str) -> str: |
| """Upload a file to the Gradio server and get a public URL""" |
| if not self.uploader_client: |
| logger.error("File uploader client not initialized") |
| return file_path |
| |
| try: |
| |
| with open(file_path, "rb") as f_: |
| files = [("files", (file_path.split("/")[-1], f_))] |
| r = httpx.post( |
| self.uploader_client.upload_url, |
| files=files, |
| ) |
| r.raise_for_status() |
| result = r.json() |
| uploaded_path = result[0] |
| |
| public_url = f"{self.uploader_client.src}/gradio_api/file={uploaded_path}" |
| logger.info(f"✅ Uploaded {file_path} -> {public_url}") |
| return public_url |
| except Exception as e: |
| logger.error(f"Failed to upload file {file_path}: {e}") |
| return file_path |
| |
| def process_multimodal_message(self, message: Dict[str, Any], history: List) -> Tuple[List[ChatMessage], Dict[str, Any]]: |
| """Enhanced MCP chat function with multimodal input support and ChatMessage formatting""" |
| |
| if not self.mcp_client.hf_client: |
| error_msg = "❌ HuggingFace token not configured. Please set HF_TOKEN environment variable or login." |
| history.append(ChatMessage(role="user", content=error_msg)) |
| history.append(ChatMessage(role="assistant", content=error_msg)) |
| return history, gr.MultimodalTextbox(value=None, interactive=False) |
| |
| if not self.mcp_client.current_provider or not self.mcp_client.current_model: |
| error_msg = "❌ Please select an inference provider and model first." |
| history.append(ChatMessage(role="user", content=error_msg)) |
| history.append(ChatMessage(role="assistant", content=error_msg)) |
| return history, gr.MultimodalTextbox(value=None, interactive=False) |
| |
| |
| user_text = "" |
| user_files = [] |
| uploaded_file_urls = [] |
| self.file_url_mapping = {} |
|
|
| try: |
| |
| user_text = message.get("text", "") if message else "" |
| user_files = message.get("files", []) if message else [] |
| |
| |
| if isinstance(message, str): |
| user_text = message |
| user_files = [] |
| |
| logger.info(f"💬 Processing multimodal message:") |
| logger.info(f" 📝 Text: {user_text}") |
| logger.info(f" 📁 Files: {len(user_files)} files uploaded") |
| logger.info(f" 📋 History type: {type(history)}, length: {len(history)}") |
| |
| |
| converted_history = [] |
| for i, msg in enumerate(history): |
| try: |
| if isinstance(msg, dict): |
| |
| logger.info(f" 📝 Converting dict message {i}: {msg.get('role', 'unknown')}") |
| converted_history.append(ChatMessage( |
| role=msg.get('role', 'assistant'), |
| content=msg.get('content', ''), |
| metadata=msg.get('metadata', None) |
| )) |
| else: |
| |
| logger.info(f" ✅ ChatMessage {i}: {getattr(msg, 'role', 'unknown')}") |
| converted_history.append(msg) |
| except Exception as conv_error: |
| logger.error(f"Error converting message {i}: {conv_error}") |
| logger.error(f"Message content: {msg}") |
| |
| continue |
| |
| history = converted_history |
| |
| |
| for file_path in user_files: |
| logger.info(f" 📄 Local File: {file_path}") |
| try: |
| |
| uploaded_url = self._upload_file_to_gradio_server(file_path) |
| |
| self.file_url_mapping[file_path] = uploaded_url |
| logger.info(f" ✅ Uploaded File URL: {uploaded_url}") |
| |
| |
| history.append(ChatMessage(role="user", content={"path": uploaded_url})) |
| except Exception as upload_error: |
| logger.error(f"Failed to upload file {file_path}: {upload_error}") |
| |
| history.append(ChatMessage(role="user", content={"path": file_path})) |
| logger.warning(f"⚠️ Using local path for {file_path} - MCP servers may not be able to access it") |
| |
| |
| if user_text and user_text.strip(): |
| history.append(ChatMessage(role="user", content=user_text)) |
| |
| |
| if not user_text.strip() and not user_files: |
| return history, gr.MultimodalTextbox(value=None, interactive=False) |
| |
| |
| messages = self._prepare_hf_messages(history, uploaded_file_urls) |
| |
| |
| response_messages = self._call_hf_api(messages, uploaded_file_urls) |
| |
| |
| history.extend(response_messages) |
| |
| return history, gr.MultimodalTextbox(value=None, interactive=False) |
| |
| except Exception as e: |
| error_msg = f"❌ Error: {str(e)}" |
| logger.error(f"Chat error: {e}") |
| logger.error(traceback.format_exc()) |
| |
| |
| if user_text and user_text.strip(): |
| history.append(ChatMessage(role="user", content=user_text)) |
| if user_files: |
| for file_path in user_files: |
| history.append(ChatMessage(role="user", content={"path": file_path})) |
| |
| history.append(ChatMessage(role="assistant", content=error_msg)) |
| return history, gr.MultimodalTextbox(value=None, interactive=False) |
| |
| def _prepare_hf_messages(self, history: List, uploaded_file_urls: List[str] = None) -> List[Dict[str, Any]]: |
| """Convert history (ChatMessage or dict) to HuggingFace Inference API format""" |
| messages = [] |
| |
| |
| if self.mcp_client.current_model and self.mcp_client.current_provider: |
| context_settings = AppConfig.get_optimal_context_settings( |
| self.mcp_client.current_model, |
| self.mcp_client.current_provider, |
| len(self.mcp_client.get_enabled_servers()) |
| ) |
| max_history = context_settings['recommended_history_limit'] |
| else: |
| max_history = 20 |
| |
| |
| recent_history = history[-max_history:] if len(history) > max_history else history |
| |
| for msg in recent_history: |
| |
| if hasattr(msg, 'role'): |
| role = msg.role |
| content = msg.content |
| elif isinstance(msg, dict) and 'role' in msg: |
| role = msg.get('role') |
| content = msg.get('content') |
| else: |
| continue |
| |
| if role in ["user", "assistant"]: |
| |
| |
| if isinstance(content, dict): |
| if "path" in content: |
| file_path = content.get('path', 'unknown') |
| |
| if file_path.startswith('http'): |
| |
| if AppConfig.is_image_file(file_path): |
| content = f"[User uploaded an image: {file_path}]" |
| elif AppConfig.is_audio_file(file_path): |
| content = f"[User uploaded an audio file: {file_path}]" |
| elif AppConfig.is_video_file(file_path): |
| content = f"[User uploaded a video file: {file_path}]" |
| else: |
| content = f"[User uploaded a file: {file_path}]" |
| else: |
| |
| content = f"[User uploaded a file (local path, not accessible to remote servers): {file_path}]" |
| else: |
| content = f"[Object: {str(content)[:50]}...]" |
| elif isinstance(content, (list, tuple)): |
| content = f"[List: {str(content)[:50]}...]" |
| elif content is None: |
| content = "[Empty]" |
| else: |
| content = str(content) |
| |
| messages.append({ |
| "role": role, |
| "content": content |
| }) |
| |
| return messages |
| |
| def _call_hf_api(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: |
| """Call HuggingFace Inference API and return structured ChatMessage responses""" |
| |
| |
| enabled_servers = self.mcp_client.get_enabled_servers() |
| if not enabled_servers: |
| return self._call_hf_without_mcp(messages) |
| else: |
| return self._call_hf_with_mcp(messages, uploaded_file_urls) |
| |
| def _call_hf_without_mcp(self, messages: List[Dict[str, Any]]) -> List[ChatMessage]: |
| """Call HF Inference API without MCP servers""" |
| logger.info("💬 No MCP servers available, using regular HF Inference chat") |
| |
| system_prompt = self._get_native_system_prompt() |
| |
| |
| if messages and messages[0].get("role") == "system": |
| messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] |
| else: |
| messages.insert(0, {"role": "system", "content": system_prompt}) |
| |
| |
| if self.mcp_client.current_model and self.mcp_client.current_provider: |
| context_settings = AppConfig.get_optimal_context_settings( |
| self.mcp_client.current_model, |
| self.mcp_client.current_provider, |
| 0 |
| ) |
| max_tokens = context_settings['max_response_tokens'] |
| else: |
| max_tokens = 8192 |
| |
| |
| try: |
| response = self.mcp_client.generate_chat_completion(messages, **{"max_tokens": max_tokens}) |
| response_text = response.choices[0].message.content |
| |
| if not response_text: |
| response_text = "I understand your request and I'm here to help." |
| |
| return [ChatMessage(role="assistant", content=response_text)] |
| except Exception as e: |
| logger.error(f"HF Inference API call failed: {e}") |
| return [ChatMessage(role="assistant", content=f"❌ API call failed: {str(e)}")] |
| |
| def _call_hf_with_mcp(self, messages: List[Dict[str, Any]], uploaded_file_urls: List[str] = None) -> List[ChatMessage]: |
| """Call HF Inference API with MCP servers and return structured responses""" |
| |
| |
| system_prompt = self._get_mcp_system_prompt(uploaded_file_urls) |
| |
| |
| if messages and messages[0].get("role") == "system": |
| messages[0]["content"] = system_prompt + "\n\n" + messages[0]["content"] |
| else: |
| messages.insert(0, {"role": "system", "content": system_prompt}) |
| |
| |
| enabled_servers = self.mcp_client.get_enabled_servers() |
| if self.mcp_client.current_model and self.mcp_client.current_provider: |
| context_settings = AppConfig.get_optimal_context_settings( |
| self.mcp_client.current_model, |
| self.mcp_client.current_provider, |
| len(enabled_servers) |
| ) |
| max_tokens = context_settings['max_response_tokens'] |
| else: |
| max_tokens = 8192 |
| |
| |
| logger.info(f"📤 Sending {len(messages)} messages to HF Inference API") |
| logger.info(f"🔧 Using {len(self.mcp_client.servers)} MCP servers") |
| logger.info(f"🤖 Model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}") |
| logger.info(f"📏 Max tokens: {max_tokens}") |
| |
| start_time = time.time() |
| |
| try: |
| |
| if hasattr(self, 'file_url_mapping'): |
| self.mcp_client.chat_handler_file_mapping = self.file_url_mapping |
| |
| |
| response = self.mcp_client.generate_chat_completion_with_mcp_tools(messages, **{"max_tokens": max_tokens}) |
| |
| return self._process_hf_response(response, start_time) |
| except Exception as e: |
| logger.error(f"HF Inference API call with MCP failed: {e}") |
| return [ChatMessage(role="assistant", content=f"❌ API call failed: {str(e)}")] |
| |
| def _process_hf_response(self, response, start_time: float) -> List[ChatMessage]: |
| """Process HF Inference response with simplified media handling and nested errors""" |
| chat_messages = [] |
| |
| try: |
| response_text = response.choices[0].message.content |
| |
| if not response_text: |
| response_text = "I understand your request and I'm here to help." |
| |
| |
| if hasattr(response, '_tool_execution'): |
| tool_info = response._tool_execution |
| logger.info(f"🔧 Processing response with tool execution: {tool_info}") |
| |
| duration = round(time.time() - start_time, 2) |
| tool_id = f"tool_{tool_info['tool']}_{int(time.time())}" |
| |
| if tool_info['success']: |
| tool_result = str(tool_info['result']) |
| |
| |
| media_url = self._extract_media_url(tool_result, tool_info.get('server', '')) |
| |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content="", |
| metadata={ |
| "title": f"🔧 Used {tool_info['tool']}", |
| "status": "done", |
| "duration": duration, |
| "id": tool_id |
| } |
| )) |
| |
| |
| if media_url: |
| result_preview = f"✅ Successfully generated media\nURL: {media_url[:100]}..." |
| else: |
| result_preview = f"✅ Tool executed successfully\nResult: {tool_result[:200]}..." |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content=result_preview, |
| metadata={ |
| "title": "📊 Server Response", |
| "parent_id": tool_id, |
| "status": "done" |
| } |
| )) |
| |
| |
| if response_text and not response_text.startswith('{"use_tool"'): |
| |
| clean_response = response_text |
| if media_url and media_url in clean_response: |
| clean_response = clean_response.replace(media_url, "").strip() |
| |
| |
| clean_response = re.sub(r'\{"use_tool"[^}]+\}', '', clean_response).strip() |
| |
| |
| clean_response = re.sub(r'!\[([^\]]*)\]\([^)]*\)', '', clean_response) |
| clean_response = re.sub(r'\[([^\]]*)\]\([^)]*\)', '', clean_response) |
| clean_response = re.sub(r'!\[([^\]]*)\]', '', clean_response) |
| clean_response = re.sub(r'\[([^\]]*)\]', '', clean_response) |
| clean_response = re.sub(r'\(\s*\)', '', clean_response) |
| clean_response = clean_response.strip() |
|
|
| |
| if clean_response and len(clean_response) > 10: |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content=clean_response |
| )) |
| |
| if media_url: |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content={"path": media_url} |
| )) |
| else: |
| |
| if not response_text or response_text.startswith('{"use_tool"'): |
| |
| if len(tool_result) > 500: |
| result_preview = f"Operation completed successfully. Result preview: {tool_result[:500]}..." |
| else: |
| result_preview = f"Operation completed successfully. Result: {tool_result}" |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content=result_preview |
| )) |
| |
| else: |
| |
| error_details = tool_info['result'] |
| |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content="", |
| metadata={ |
| "title": f"❌ Used {tool_info['tool']}", |
| "status": "error", |
| "duration": duration, |
| "id": tool_id |
| } |
| )) |
| |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content=f"❌ Tool execution failed\n```\n{error_details}\n```", |
| metadata={ |
| "title": "📊 Server Response", |
| "parent_id": tool_id, |
| "status": "error" |
| } |
| )) |
| |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content="**Suggestions:**\n• Try modifying your request slightly\n• Wait a moment and try again\n• Use a different MCP server if available", |
| metadata={ |
| "title": "💡 Possible Solutions", |
| "parent_id": tool_id, |
| "status": "info" |
| } |
| )) |
| else: |
| |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content=response_text |
| )) |
| |
| except Exception as e: |
| logger.error(f"Error processing HF response: {e}") |
| logger.error(traceback.format_exc()) |
| chat_messages.append(ChatMessage( |
| role="assistant", |
| content="I understand your request and I'm here to help." |
| )) |
| |
| return chat_messages |
| |
| def _extract_media_url(self, result_text: str, server_name: str) -> Optional[str]: |
| """Extract media URL from MCP response with improved pattern matching""" |
| if not isinstance(result_text, str): |
| return None |
| |
| logger.info(f"🔍 Extracting media from result: {result_text[:500]}...") |
| |
| |
| try: |
| if result_text.strip().startswith('[') or result_text.strip().startswith('{'): |
| data = json.loads(result_text.strip()) |
| |
| |
| if isinstance(data, list) and len(data) > 0: |
| item = data[0] |
| if isinstance(item, dict): |
| |
| for media_type in ['audio', 'video', 'image']: |
| if media_type in item and isinstance(item[media_type], dict): |
| if 'url' in item[media_type]: |
| url = item[media_type]['url'].strip('\'"') |
| logger.info(f"🎯 Found {media_type} URL in JSON: {url}") |
| return url |
| |
| if 'url' in item: |
| url = item['url'].strip('\'"') |
| logger.info(f"🎯 Found direct URL in JSON: {url}") |
| return url |
| |
| |
| elif isinstance(data, dict): |
| |
| for media_type in ['audio', 'video', 'image']: |
| if media_type in data and isinstance(data[media_type], dict): |
| if 'url' in data[media_type]: |
| url = data[media_type]['url'].strip('\'"') |
| logger.info(f"🎯 Found {media_type} URL in JSON: {url}") |
| return url |
| |
| if 'url' in data: |
| url = data['url'].strip('\'"') |
| logger.info(f"🎯 Found direct URL in JSON: {url}") |
| return url |
| |
| except json.JSONDecodeError: |
| pass |
| |
| |
| gradio_patterns = [ |
| r'https://[^/]+\.hf\.space/gradio_api/file=/[^/]+/[^/]+/[^\s"\'<>,]+', |
| r'https://[^/]+\.hf\.space/file=[^\s"\'<>,]+', |
| r'/gradio_api/file=/[^\s"\'<>,]+' |
| ] |
| |
| for pattern in gradio_patterns: |
| match = re.search(pattern, result_text) |
| if match: |
| url = match.group(0).rstrip('\'",:;') |
| logger.info(f"🎯 Found Gradio file URL: {url}") |
| return url |
| |
| |
| url_pattern = r'https?://[^\s"\'<>]+\.(?:mp3|wav|ogg|m4a|flac|aac|opus|wma|mp4|webm|avi|mov|mkv|m4v|wmv|png|jpg|jpeg|gif|webp|bmp|svg)' |
| match = re.search(url_pattern, result_text, re.IGNORECASE) |
| if match: |
| url = match.group(0) |
| logger.info(f"🎯 Found media URL by extension: {url}") |
| return url |
| |
| |
| if result_text.startswith('data:'): |
| logger.info("🎯 Found data URL") |
| return result_text |
| |
| logger.info("❌ No media URL found in result") |
| return None |
| |
| def _get_native_system_prompt(self) -> str: |
| """Get system prompt for HF Inference without MCP servers""" |
| model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) |
| context_length = model_info.get("context_length", 128000) |
| |
| return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}. You have native capabilities for: |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations |
| - **Code Analysis**: You can read, analyze, and explain code |
| - **Reasoning**: You can perform step-by-step reasoning and problem-solving |
| - **Context Window**: You have access to {context_length:,} tokens of context |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| Please provide helpful, accurate, and engaging responses to user queries.""" |
| |
| def _get_mcp_system_prompt(self, uploaded_file_urls: List[str] = None) -> str: |
| """Get enhanced system prompt for HF Inference with MCP servers""" |
| model_info = AppConfig.AVAILABLE_MODELS.get(self.mcp_client.current_model, {}) |
| context_length = model_info.get("context_length", 128000) |
| |
| uploaded_files_context = "" |
| if uploaded_file_urls: |
| uploaded_files_context = f"\n\nFILES UPLOADED BY USER (Public URLs accessible to MCP servers):\n" |
| for i, file_url in enumerate(uploaded_file_urls, 1): |
| file_name = file_url.split('/')[-1] if '/' in file_url else file_url |
| if AppConfig.is_image_file(file_url): |
| file_type = "Image" |
| elif AppConfig.is_audio_file(file_url): |
| file_type = "Audio" |
| elif AppConfig.is_video_file(file_url): |
| file_type = "Video" |
| else: |
| file_type = "File" |
| uploaded_files_context += f"{i}. {file_type}: {file_name}\n URL: {file_url}\n" |
| |
| |
| enabled_servers = self.mcp_client.get_enabled_servers() |
| tools_info = [] |
| for server_name, config in enabled_servers.items(): |
| tools_info.append(f"- **{server_name}**: {config.description}") |
| |
| return f"""You are an AI assistant powered by {self.mcp_client.current_model} via {self.mcp_client.current_provider}, with access to various MCP tools. |
| YOUR NATIVE CAPABILITIES: |
| - **Text Processing**: You can analyze, summarize, translate, and process text directly |
| - **General Knowledge**: You can answer questions, explain concepts, and have conversations |
| - **Code Analysis**: You can read, analyze, and explain code |
| - **Reasoning**: You can perform step-by-step reasoning and problem-solving |
| - **Context Window**: You have access to {context_length:,} tokens of context |
| AVAILABLE MCP TOOLS: |
| You have access to the following MCP servers: |
| {chr(10).join(tools_info)} |
| WHEN TO USE MCP TOOLS: |
| - **Image Generation**: Creating new images from text prompts |
| - **Image Editing**: Modifying, enhancing, or transforming existing images |
| - **Audio Processing**: Transcribing audio, generating speech, audio enhancement |
| - **Video Processing**: Creating or editing videos |
| - **Text to Speech**: Converting text to audio |
| - **Specialized Analysis**: Tasks requiring specific models or APIs |
| TOOL USAGE FORMAT: |
| When you need to use an MCP tool, respond with JSON in this exact format: |
| {{"use_tool": true, "server": "exact_server_name", "tool": "exact_tool_name", "arguments": {{"param": "value"}}}} |
| IMPORTANT: Always describe what you're going to do BEFORE the JSON tool call. For example: |
| "I'll generate speech for your text using the TTS tool." |
| {{"use_tool": true, "server": "text to speech", "tool": "Kokoro_TTS_mcp_test_generate_first", "arguments": {{"text": "hello"}}}} |
| IMPORTANT TOOL NAME MAPPING: |
| - For TTS server: use tool name "Kokoro_TTS_mcp_test_generate_first" |
| - For image generation: use tool name "dalle_3_xl_lora_v2_generate" |
| - For video generation: use tool name "ysharma_ltx_video_distilledtext_to_video" |
| - For letter counting: use tool name "gradio_app_dummy1_letter_counter" |
| EXACT SERVER NAMES TO USE: |
| {', '.join([f'"{name}"' for name in enabled_servers.keys()])} |
| FILE HANDLING FOR MCP TOOLS: |
| When using MCP tools with uploaded files, always use the public URLs provided above. |
| These URLs are accessible to remote MCP servers. |
| {uploaded_files_context} |
| MEDIA HANDLING: |
| When tool results contain media URLs (images, audio, videos), the system will automatically embed them as playable media. |
| IMPORTANT NOTES: |
| - Always use the EXACT server names and tool names as specified above |
| - Use proper JSON format for tool calls |
| - Include all required parameters in arguments |
| - For file inputs to MCP tools, use the public URLs provided, not local paths |
| - ALWAYS provide a descriptive message before the JSON tool call |
| - After tool execution, you can provide additional context or ask if the user needs anything else |
| Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| Current model: {self.mcp_client.current_model} via {self.mcp_client.current_provider}""" |
|
|