| import gradio as gr |
| import requests |
| import json |
| import re |
| import time |
| from datetime import datetime, timedelta |
| import logging |
| from typing import Dict, Any, Optional, List |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| MCP_URL = "https://bright-moons-tease.loca.lt/mcp" |
| API_URL = "https://api-inference.huggingface.co/models/microsoft/DialoGPT-medium" |
|
|
| |
| FALLBACK_APIS = [ |
| "https://api-inference.huggingface.co/models/microsoft/DialoGPT-small", |
| "https://api-inference.huggingface.co/models/gpt2" |
| ] |
|
|
| class MCPSAPClient: |
| def __init__(self): |
| self.session = requests.Session() |
| self.session.timeout = 30 |
| self.max_retries = 3 |
| self.retry_delay = 2 |
| |
| def query_llm_with_fallback(self, prompt: str, max_length: int = 150) -> str: |
| """Query LLM with fallback options and retry logic""" |
| apis_to_try = [API_URL] + FALLBACK_APIS |
| |
| for api_url in apis_to_try: |
| for attempt in range(self.max_retries): |
| try: |
| logger.info(f"Attempting LLM query with {api_url}, attempt {attempt + 1}") |
| |
| |
| if "DialoGPT" in api_url: |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_new_tokens": max_length, |
| "temperature": 0.3, |
| "return_full_text": False |
| } |
| } |
| else: |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_length": max_length, |
| "temperature": 0.3 |
| } |
| } |
| |
| response = self.session.post( |
| api_url, |
| json=payload, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| |
| |
| if isinstance(result, list) and len(result) > 0: |
| if "generated_text" in result[0]: |
| return result[0]["generated_text"] |
| elif "text" in result[0]: |
| return result[0]["text"] |
| elif isinstance(result, dict): |
| if "generated_text" in result: |
| return result["generated_text"] |
| elif "text" in result: |
| return result["text"] |
| |
| return str(result) |
| |
| elif response.status_code == 503: |
| logger.warning(f"Model loading, waiting {self.retry_delay} seconds...") |
| time.sleep(self.retry_delay) |
| continue |
| else: |
| logger.warning(f"API returned status {response.status_code}") |
| time.sleep(self.retry_delay) |
| continue |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Request failed: {e}") |
| if attempt < self.max_retries - 1: |
| time.sleep(self.retry_delay) |
| continue |
| |
| except Exception as e: |
| logger.error(f"Unexpected error: {e}") |
| if attempt < self.max_retries - 1: |
| time.sleep(self.retry_delay) |
| continue |
| |
| logger.warning(f"All attempts failed for {api_url}, trying next API...") |
| |
| |
| return "Unable to process with LLM, using fallback parsing" |
| |
| def extract_query_params_robust(self, query: str) -> Dict[str, Any]: |
| """Extract parameters with multiple strategies""" |
| |
| |
| try: |
| prompt = f""" |
| Extract date and quantity from this query: '{query}' |
| Return only JSON format: |
| {{"date_from":"YYYY-MM-DD","min_quantity":number}} |
| |
| Examples: |
| - "orders after 2023-04-01 with at least 10 units" -> {{"date_from":"2023-04-01","min_quantity":10}} |
| - "show me orders from last month" -> {{"date_from":"2023-11-01","min_quantity":0}} |
| """ |
| |
| llm_response = self.query_llm_with_fallback(prompt) |
| logger.info(f"LLM Response: {llm_response}") |
| |
| |
| json_match = re.search(r'\\{[^}]*\\}', llm_response, re.DOTALL) |
| if json_match: |
| json_text = json_match.group() |
| parsed = json.loads(json_text) |
| if "date_from" in parsed and "min_quantity" in parsed: |
| return parsed |
| |
| except Exception as e: |
| logger.warning(f"LLM extraction failed: {e}") |
| |
| |
| return self.extract_params_rule_based(query) |
| |
| def extract_params_rule_based(self, query: str) -> Dict[str, Any]: |
| """Fallback rule-based parameter extraction""" |
| params = {"date_from": "2023-01-01", "min_quantity": 0} |
| |
| |
| date_patterns = [ |
| r'(\\d{4}-\\d{2}-\\d{2})', |
| r'(\\d{2}/\\d{2}/\\d{4})', |
| r'(\\d{1,2}/\\d{1,2}/\\d{4})', |
| ] |
| |
| for pattern in date_patterns: |
| match = re.search(pattern, query) |
| if match: |
| date_str = match.group(1) |
| try: |
| |
| if '-' in date_str: |
| params["date_from"] = date_str |
| elif '/' in date_str: |
| parts = date_str.split('/') |
| if len(parts) == 3: |
| params["date_from"] = f"{parts[2]}-{parts[0].zfill(2)}-{parts[1].zfill(2)}" |
| break |
| except: |
| continue |
| |
| |
| quantity_patterns = [ |
| r'at least (\\d+)', |
| r'minimum (\\d+)', |
| r'more than (\\d+)', |
| r'> ?(\\d+)', |
| r'greater than (\\d+)' |
| ] |
| |
| for pattern in quantity_patterns: |
| match = re.search(pattern, query, re.IGNORECASE) |
| if match: |
| try: |
| params["min_quantity"] = int(match.group(1)) |
| break |
| except: |
| continue |
| |
| |
| if "last month" in query.lower(): |
| last_month = datetime.now() - timedelta(days=30) |
| params["date_from"] = last_month.strftime("%Y-%m-%d") |
| elif "last week" in query.lower(): |
| last_week = datetime.now() - timedelta(days=7) |
| params["date_from"] = last_week.strftime("%Y-%m-%d") |
| elif "yesterday" in query.lower(): |
| yesterday = datetime.now() - timedelta(days=1) |
| params["date_from"] = yesterday.strftime("%Y-%m-%d") |
| elif "today" in query.lower(): |
| params["date_from"] = datetime.now().strftime("%Y-%m-%d") |
| |
| return params |
| |
| def query_mcp_server(self, params: Dict[str, Any]) -> Dict[str, Any]: |
| """Query MCP server with proper error handling""" |
| for attempt in range(self.max_retries): |
| try: |
| payload = { |
| "jsonrpc": "2.0", |
| "method": "SAP.getFilteredProcOrder", |
| "params": params, |
| "id": f"dynamic-{int(time.time())}" |
| } |
| |
| logger.info(f"MCP Request (attempt {attempt + 1}): {json.dumps(payload, indent=2)}") |
| |
| response = self.session.post( |
| MCP_URL, |
| json=payload, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| logger.info(f"MCP Response: {json.dumps(result, indent=2)}") |
| return result |
| else: |
| logger.warning(f"MCP server returned status {response.status_code}: {response.text}") |
| if attempt < self.max_retries - 1: |
| time.sleep(self.retry_delay) |
| continue |
| else: |
| return {"error": f"MCP server error: {response.status_code}"} |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"MCP request failed (attempt {attempt + 1}): {e}") |
| if attempt < self.max_retries - 1: |
| time.sleep(self.retry_delay) |
| continue |
| else: |
| return {"error": f"Connection failed: {str(e)}"} |
| |
| return {"error": "Max retries exceeded"} |
| |
| def format_sap_results(self, results: List[Dict[str, Any]]) -> str: |
| """Format SAP results for display""" |
| if not results: |
| return "β
Query executed successfully, but no matching orders found.\\n\\nTry adjusting your criteria:\\n- Use a different date range\\n- Lower the quantity threshold\\n- Check if the SAP system has data for the specified period" |
| |
| formatted_results = [] |
| formatted_results.append(f"π **Found {len(results)} matching orders:**\\n") |
| |
| for i, result in enumerate(results, 1): |
| order_id = result.get('ProcessOrderConfirmation', 'N/A') |
| material = result.get('Material', 'N/A') |
| quantity = result.get('ConfirmedYieldQuantity', 'N/A') |
| unit = result.get('ConfirmedYieldQuantityUnit', '') |
| posting_date = result.get('PostingDate', 'N/A') |
| |
| formatted_results.append( |
| f"**Order {i}:**\\n" |
| f"β’ Order ID: {order_id}\\n" |
| f"β’ Material: {material}\\n" |
| f"β’ Quantity: {quantity} {unit}\\n" |
| f"β’ Posting Date: {posting_date}\\n" |
| ) |
| |
| return "\\n".join(formatted_results) |
| |
| def agent_query(self, user_query: str) -> str: |
| """Main query processing function""" |
| if not user_query.strip(): |
| return "β Please enter a query about SAP orders." |
| |
| try: |
| |
| logger.info(f"Processing query: {user_query}") |
| params = self.extract_query_params_robust(user_query) |
| logger.info(f"Extracted parameters: {params}") |
| |
| |
| response = self.query_mcp_server(params) |
| |
| |
| if "error" in response: |
| return f"β **Error:** {response['error']}\\n\\n**Troubleshooting:**\\n- Check if your MCP server is running\\n- Verify the server URL is correct\\n- Ensure the SAP system is accessible" |
| |
| results = response.get("result", []) |
| |
| |
| formatted_output = self.format_sap_results(results) |
| |
| |
| query_info = f"\\n\\nπ **Query Details:**\\nβ’ Date from: {params.get('date_from', 'N/A')}\\nβ’ Minimum quantity: {params.get('min_quantity', 0)}" |
| |
| return formatted_output + query_info |
| |
| except Exception as e: |
| logger.error(f"Unexpected error in agent_query: {e}") |
| return f"β **Unexpected Error:** {str(e)}\\n\\nPlease try again or contact support if the issue persists." |
|
|
| |
| client = MCPSAPClient() |
|
|
| |
| example_queries = [ |
| "Show me orders after 2023-04-01 with at least 10 units", |
| "Orders from last month with minimum 5 quantity", |
| "Find orders from 2023-06-15 with at least 20 units", |
| "Show me all orders from last week", |
| "Orders with quantity greater than 50 from 2023-05-01" |
| ] |
|
|
| |
| def create_interface(): |
| with gr.Blocks(title="SAP MCP Agent", theme=gr.themes.Soft()) as iface: |
| gr.Markdown(""" |
| # π SAP MCP Agent |
| |
| Query your SAP system naturally using the Model Context Protocol (MCP). |
| |
| **How to use:** |
| - Ask about orders using natural language |
| - Specify dates and quantities in your query |
| - The system will extract parameters and query SAP for you |
| |
| **Example queries:** |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| query_input = gr.Textbox( |
| label="Enter your SAP query", |
| placeholder="e.g., 'Show me orders after 2023-04-01 with at least 10 units'", |
| lines=2 |
| ) |
| |
| submit_btn = gr.Button("π Query SAP", variant="primary") |
| |
| gr.Markdown("**Example queries:**") |
| example_buttons = [] |
| for example in example_queries: |
| btn = gr.Button(f"π {example}", size="sm") |
| example_buttons.append(btn) |
| |
| with gr.Column(): |
| output = gr.Textbox( |
| label="Results", |
| lines=15, |
| max_lines=20, |
| show_copy_button=True |
| ) |
| |
| |
| submit_btn.click( |
| client.agent_query, |
| inputs=[query_input], |
| outputs=[output] |
| ) |
| |
| |
| for btn, example in zip(example_buttons, example_queries): |
| btn.click( |
| lambda x=example: x, |
| outputs=[query_input] |
| ) |
| |
| return iface |
|
|
| |
| if __name__ == "__main__": |
| iface = create_interface() |
| iface.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=True, |
| show_error=True |
| ) |