Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| from contextlib import AsyncExitStack | |
| import gradio as gr | |
| import asyncio, os | |
| from typing import List, Dict, Any, Optional | |
| import gradio as gr | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| from anthropic import Anthropic | |
| from dotenv import load_dotenv | |
| from tool_utils import filter_tools_for_context, summarize_latest_results, count_tokens, trim_conversation | |
| load_dotenv() | |
| import os | |
| import re | |
| import asyncio | |
| import logging | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from contextlib import AsyncExitStack | |
| from anthropic import Anthropic | |
| from mcp.client.session import ClientSession | |
| from mcp.client.stdio import stdio_client | |
| from mcp.client.stdio import StdioServerParameters | |
| # Logger configuré | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("MCPClient") | |
| MAX_HISTORY_MESSAGES = 5 | |
| def retry_async(max_attempts: int = 3, delay: float = 1.0): | |
| """Décorateur de retry pour fonctions async.""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| return await func(*args, **kwargs) | |
| except Exception as e: | |
| if attempt == max_attempts: | |
| raise | |
| logger.warning(f"Échec tentative {attempt}/{max_attempts} : {e}. Retry dans {delay}s...") | |
| await asyncio.sleep(delay) | |
| # Ne devrait jamais arriver | |
| raise RuntimeError("Retry loop exited unexpectedly") | |
| return wrapper | |
| return decorator | |
| class MCPClient: | |
| """Client MCP robuste avec gestion de connexion et retries.""" | |
| def __init__(self): | |
| self.loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self.loop) | |
| self.session: Optional[ClientSession] = None | |
| self.tools: List[Dict[str, Any]] = [] | |
| self.connected: bool = False | |
| self.max_iterations: int = 3 | |
| self.client: Optional[Anthropic] = None | |
| self.exit_stack: Optional[AsyncExitStack] = None | |
| self._init_client() | |
| def _init_client(self): | |
| key = os.getenv("ANTHROPIC_API_KEY") | |
| if not key: | |
| raise EnvironmentError("❌ ANTHROPIC_API_KEY manquant dans l'environnement") | |
| self.client = Anthropic() | |
| def connect(self) -> str: | |
| """Connexion synchrone MCP (wrap async).""" | |
| return self.loop.run_until_complete(self._connect()) | |
| async def _connect(self) -> str: | |
| """Connexion asynchrone avec MCP via stdio.""" | |
| if self.exit_stack: | |
| await self.exit_stack.aclose() | |
| self.exit_stack = AsyncExitStack() | |
| params = StdioServerParameters( | |
| command="python", | |
| args=["gradio_mcp_server.py"], | |
| env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"}, | |
| ) | |
| try: | |
| stdio_transport = await self.exit_stack.enter_async_context(stdio_client(params)) | |
| self.stdio, self.write = stdio_transport | |
| self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) | |
| await self.session.initialize() | |
| resp = await self.session.list_tools() | |
| self.tools = [ | |
| {"name": t.name, "description": t.description, "input_schema": t.inputSchema} | |
| for t in resp.tools | |
| ] | |
| self.connected = True | |
| return f"✅ MCP connecté ({len(self.tools)} outils disponibles)" | |
| except Exception as e: | |
| self.connected = False | |
| return f"❌ Connexion MCP échouée : {e}" | |
| def _read_file(self, path: str) -> str: | |
| """Lecture robuste de fichiers selon leur extension.""" | |
| import PyPDF2 | |
| ext = os.path.splitext(path)[1].lower() | |
| try: | |
| if ext in [".txt", ".md", ".py", ".json", ".csv"]: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| elif ext == ".pdf": | |
| with open(path, "rb") as f: | |
| return "\n".join(page.extract_text() for page in PyPDF2.PdfReader(f).pages) | |
| else: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"[Erreur lecture fichier {os.path.basename(path)}: {e}]" | |
| def process_message( | |
| self, message: str, files: Optional[List] = None, history: Optional[List[List[str]]] = None | |
| ) -> Tuple[List[List[str]], str, None]: | |
| """Pipeline haut-niveau (message + fichiers → réponse).""" | |
| if not self.session or not self.connected: | |
| return history + [[message, "❌ Serveur MCP non connecté."]], "", None | |
| file_content = "" | |
| if files: | |
| for file in files: | |
| path = getattr(file, "name", file) | |
| file_content += f"\nFichier: {os.path.basename(path)}\n{self._read_file(path)}\n" | |
| full_message = (file_content + message).strip() | |
| new_msgs = self.loop.run_until_complete(self._process_query(full_message, history or [])) | |
| assistant_reply = "\n\n".join(m.get("content", "") for m in new_msgs if m.get("role") == "assistant") | |
| return (history or []) + [[message, assistant_reply]], "", None | |
| async def _process_query(self, message: str, history: List[Any]): | |
| """Exécution de la requête utilisateur avec gestion outils.""" | |
| if not self.client: | |
| return [{"role": "assistant", "content": "❌ Client Claude indisponible."}] | |
| # Prompt système (LEXICON) | |
| sys_prompt = ( | |
| "You are LEXICON, an intelligent agricultural and weather data assistant with access to " | |
| "specialized tools. Your mission: produce complete, accurate answers using planning + multiple tool calls." | |
| ) | |
| conv = [{"role": r, "content": c} for h in history for r, c in zip(["user", "assistant"], h)] | |
| conv.append({"role": "user", "content": message}) | |
| try: | |
| return await self._tool_loop(conv, sys_prompt) | |
| except Exception as e: | |
| return [{"role": "assistant", "content": f"❌ Erreur Claude : {e}"}] | |
| async def _tool_loop(self, messages: List[Dict[str, str]], sys_prompt: str): | |
| """Boucle principale de planification/exécution avec outils MCP.""" | |
| result_msgs: List[Dict[str, str]] = [] | |
| conv = messages.copy() | |
| seen_tool_calls = set() | |
| iteration = 0 | |
| last_summary = None | |
| max_context_tokens = 2000 | |
| tool_timeout = 10.0 | |
| while iteration < self.max_iterations: | |
| iteration += 1 | |
| tools_this_round = filter_tools_for_context(self.tools, conv, []) | |
| try: | |
| resp = self.client.messages.create( | |
| model=os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022"), | |
| max_tokens=int(os.getenv("CLAUDE_MAX_TOKENS", "8192")), | |
| system=sys_prompt, | |
| messages=conv, | |
| tools=tools_this_round, | |
| ) | |
| except Exception as e: | |
| result_msgs.append({"role": "assistant", "content": f"❌ Erreur appel modèle : {e}"}) | |
| break | |
| has_tool_calls = False | |
| iteration_changes = False | |
| for c in resp.content: | |
| if c.type == "tool_use": | |
| has_tool_calls = True | |
| tool_name, tool_args, tool_call_id = c.name, c.input, c.id | |
| key = (tool_name, tuple(sorted(tool_args.items()))) | |
| if key in seen_tool_calls: | |
| result_msgs.append({"role": "assistant", "content": f"ℹ️ Tool déjà appelé {tool_name}({tool_args})"}) | |
| continue | |
| seen_tool_calls.add(key) | |
| try: | |
| tool_result = await asyncio.wait_for( | |
| self.session.call_tool(tool_name, tool_args), timeout=tool_timeout | |
| ) | |
| raw_str = "\n".join(str(item) for item in tool_result.content) | |
| conv.extend([ | |
| {"role": "assistant", "content": [{"type": "tool_use", "id": tool_call_id, "name": tool_name, "input": tool_args}]}, | |
| {"role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_call_id, "content": raw_str}]} | |
| ]) | |
| result_msgs.append({"role": "assistant", "content": f"🔧 {tool_name}({tool_args})\n```json\n{raw_str}\n```"}) | |
| iteration_changes = True | |
| except asyncio.TimeoutError: | |
| msg = f"❌ Timeout outil {tool_name}({tool_args})" | |
| result_msgs.append({"role": "assistant", "content": msg}) | |
| except Exception as e: | |
| msg = f"❌ Erreur outil {tool_name}({tool_args}) : {e}" | |
| result_msgs.append({"role": "assistant", "content": msg}) | |
| elif c.type == "text": | |
| text = c.text.strip() | |
| if text: | |
| result_msgs.append({"role": "assistant", "content": text}) | |
| conv.append({"role": "assistant", "content": text}) | |
| iteration_changes = True | |
| # Conditions d'arrêt | |
| if not has_tool_calls or not iteration_changes: | |
| break | |
| summary = summarize_latest_results(conv) | |
| if last_summary is not None and summary == last_summary: | |
| result_msgs.append({"role": "assistant", "content": "ℹ️ Pas de nouvelles infos, arrêt."}) | |
| break | |
| last_summary = summary | |
| if max_context_tokens and count_tokens(conv) > max_context_tokens: | |
| conv = trim_conversation(conv, keep_last_n=MAX_HISTORY_MESSAGES) | |
| # Synthèse finale | |
| result_msgs.append({"role": "assistant", "content": "## 📋 Synthèse finale :"}) | |
| try: | |
| final_prompt = "Basé sur les données collectées, rédige une réponse claire et utile à la question initiale." | |
| conv.append({"role": "user", "content": final_prompt}) | |
| final_resp = self.client.messages.create( | |
| model=os.getenv("CLAUDE_MODEL", "claude-3-5-sonnet-20241022"), | |
| max_tokens=int(os.getenv("CLAUDE_MAX_TOKENS", "8192")), | |
| system="You are the assistant producing the final analysis.", | |
| messages=conv, | |
| tools=[], | |
| ) | |
| for c in final_resp.content: | |
| if c.type == "text": | |
| result_msgs.append({"role": "assistant", "content": c.text.strip()}) | |
| except Exception as e: | |
| result_msgs.append({"role": "assistant", "content": f"❌ Erreur synthèse finale : {e}"}) | |
| return result_msgs | |
| client = MCPClient() | |
| def gradio_interface(): | |
| # Keep the custom orange and red theme | |
| theme = gr.themes.Default( | |
| primary_hue=gr.themes.colors.orange, | |
| secondary_hue=gr.themes.colors.red, | |
| neutral_hue=gr.themes.colors.slate, | |
| ) | |
| with gr.Blocks(title="MCP LEXICON", theme=theme, css=".gradio-container {max-width: 95% !important;}") as demo: | |
| # 1. Top row with title and the new dynamic status button | |
| with gr.Row(): | |
| with gr.Column(scale=8): | |
| gr.Markdown("## 🌾 LEXICON CHATBOT") | |
| with gr.Column(scale=10, min_width=220): | |
| status_button = gr.Button( | |
| "Connecting...", | |
| variant="stop", | |
| interactive=False | |
| ) | |
| # 2. Main chat interface with a clear button | |
| with gr.Row(): | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| value=[], | |
| height=650, | |
| show_copy_button=True, | |
| avatar_images=("👤", "🌾"), | |
| bubble_full_width=False, | |
| ) | |
| clear_btn = gr.Button("🗑️ Clear", scale=0) | |
| # 3. Concise input bar at the bottom (standard chatbot layout) | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| msg = gr.Textbox( | |
| label="User Prompt", | |
| placeholder="Ask a question about agriculture, weather, or geography...", | |
| show_label=False, | |
| container=False # Removes border for a cleaner look | |
| ) | |
| file_btn = gr.UploadButton("📎", file_count="multiple", scale=1) | |
| submit_btn = gr.Button( | |
| "Ask", | |
| variant="primary", | |
| scale=1 | |
| ) | |
| # Examples accordion remains at the bottom | |
| with gr.Accordion("💡 Example Queries", open=False): | |
| gr.Examples( | |
| examples=[ | |
| "What's the complete agricultural profile of Bignan including weather stations, cadastral parcels, and production data?", | |
| "Find all weather stations near Paris, get their latest data, and analyze weather patterns", | |
| "I need comprehensive information about vine varieties and which phytosanitary products are recommended for vineyard management", | |
| ], | |
| inputs=msg | |
| ) | |
| # Event handlers | |
| def auto_connect(): | |
| return client.connect() | |
| def process_and_clear(message, files, history): | |
| if not message.strip() and not files: | |
| return history, "", None | |
| # Simply return the result from the client method | |
| return client.process_message(message, files, history) | |
| # Setup events | |
| demo.load(auto_connect, outputs=status_button) | |
| status_button.click(auto_connect, outputs=status_button) | |
| submit_btn.click( | |
| process_and_clear, | |
| inputs=[msg, file_btn, chatbot], | |
| outputs=[chatbot, msg, file_btn] | |
| ) | |
| msg.submit( | |
| process_and_clear, | |
| inputs=[msg, file_btn, chatbot], | |
| outputs=[chatbot, msg, file_btn] | |
| ) | |
| clear_btn.click(lambda: ([], "", None), outputs=[chatbot, msg, file_btn], queue=False) | |
| return demo | |
| if __name__ == "__main__": | |
| if not os.getenv("ANTHROPIC_API_KEY"): | |
| print("Warning: ANTHROPIC_API_KEY not found in environment.") | |
| print("Please set it in your .env file: ANTHROPIC_API_KEY=your_key_here") | |
| else: | |
| print("Found Anthropic API key") | |
| print("Starting Enhanced MCP Client with Multi-Step Planning...") | |
| print("API endpoint: https://lexicon.osfarm.org") | |
| interface = gradio_interface() | |
| interface.launch(debug=True, share=True) | |