import gradio as gr import requests import json import re import time from datetime import datetime, timedelta import logging from typing import Dict, Any, Optional, List # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration MCP_URL = "https://long-planets-wish.loca.lt/" API_URL = "https://api-inference.huggingface.co/models/microsoft/DialoGPT-medium" # Fallback API URLs in case primary fails FALLBACK_APIS = [ "https://api-inference.huggingface.co/models/microsoft/DialoGPT-small", "https://api-inference.huggingface.co/models/gpt2" ] class MCPSAPClient: def __init__(self): self.session = requests.Session() self.session.timeout = 30 self.max_retries = 3 self.retry_delay = 2 def query_llm_with_fallback(self, prompt: str, max_length: int = 150) -> str: """Query LLM with fallback options and retry logic""" apis_to_try = [API_URL] + FALLBACK_APIS for api_url in apis_to_try: for attempt in range(self.max_retries): try: logger.info(f"Attempting LLM query with {api_url}, attempt {attempt + 1}") # Different payload formats for different APIs if "DialoGPT" in api_url: payload = { "inputs": prompt, "parameters": { "max_new_tokens": max_length, "temperature": 0.3, "return_full_text": False } } else: payload = { "inputs": prompt, "parameters": { "max_length": max_length, "temperature": 0.3 } } response = self.session.post( api_url, json=payload, timeout=30 ) if response.status_code == 200: result = response.json() # Handle different response formats if isinstance(result, list) and len(result) > 0: if "generated_text" in result[0]: return result[0]["generated_text"] elif "text" in result[0]: return result[0]["text"] elif isinstance(result, dict): if "generated_text" in result: return result["generated_text"] elif "text" in result: return result["text"] return str(result) elif response.status_code == 503: logger.warning(f"Model loading, waiting {self.retry_delay} seconds...") time.sleep(self.retry_delay) continue else: logger.warning(f"API returned status {response.status_code}") time.sleep(self.retry_delay) continue except requests.exceptions.RequestException as e: logger.error(f"Request failed: {e}") if attempt < self.max_retries - 1: time.sleep(self.retry_delay) continue except Exception as e: logger.error(f"Unexpected error: {e}") if attempt < self.max_retries - 1: time.sleep(self.retry_delay) continue logger.warning(f"All attempts failed for {api_url}, trying next API...") # If all APIs fail, return a fallback response return "Unable to process with LLM, using fallback parsing" def extract_query_params_robust(self, query: str) -> Dict[str, Any]: """Extract parameters with multiple strategies""" # Strategy 1: Try LLM extraction try: prompt = f""" Extract date and quantity from this query: '{query}' Return only JSON format: {{"date_from":"YYYY-MM-DD","min_quantity":number}} Examples: - "orders after 2023-04-01 with at least 10 units" -> {{"date_from":"2023-04-01","min_quantity":10}} - "show me orders from last month" -> {{"date_from":"2023-11-01","min_quantity":0}} """ llm_response = self.query_llm_with_fallback(prompt) logger.info(f"LLM Response: {llm_response}") # Try to extract JSON from response json_match = re.search(r'\\{[^}]*\\}', llm_response, re.DOTALL) if json_match: json_text = json_match.group() parsed = json.loads(json_text) if "date_from" in parsed and "min_quantity" in parsed: return parsed except Exception as e: logger.warning(f"LLM extraction failed: {e}") # Strategy 2: Rule-based extraction return self.extract_params_rule_based(query) def extract_params_rule_based(self, query: str) -> Dict[str, Any]: """Fallback rule-based parameter extraction""" params = {"date_from": "2023-01-01", "min_quantity": 0} # Date extraction patterns date_patterns = [ r'(\\d{4}-\\d{2}-\\d{2})', # YYYY-MM-DD r'(\\d{2}/\\d{2}/\\d{4})', # MM/DD/YYYY r'(\\d{1,2}/\\d{1,2}/\\d{4})', # M/D/YYYY ] for pattern in date_patterns: match = re.search(pattern, query) if match: date_str = match.group(1) try: # Convert to YYYY-MM-DD format if '-' in date_str: params["date_from"] = date_str elif '/' in date_str: parts = date_str.split('/') if len(parts) == 3: params["date_from"] = f"{parts[2]}-{parts[0].zfill(2)}-{parts[1].zfill(2)}" break except: continue # Quantity extraction quantity_patterns = [ r'at least (\\d+)', r'minimum (\\d+)', r'more than (\\d+)', r'> ?(\\d+)', r'greater than (\\d+)' ] for pattern in quantity_patterns: match = re.search(pattern, query, re.IGNORECASE) if match: try: params["min_quantity"] = int(match.group(1)) break except: continue # Handle relative dates if "last month" in query.lower(): last_month = datetime.now() - timedelta(days=30) params["date_from"] = last_month.strftime("%Y-%m-%d") elif "last week" in query.lower(): last_week = datetime.now() - timedelta(days=7) params["date_from"] = last_week.strftime("%Y-%m-%d") elif "yesterday" in query.lower(): yesterday = datetime.now() - timedelta(days=1) params["date_from"] = yesterday.strftime("%Y-%m-%d") elif "today" in query.lower(): params["date_from"] = datetime.now().strftime("%Y-%m-%d") return params def query_mcp_server(self, params: Dict[str, Any]) -> Dict[str, Any]: """Query MCP server with proper error handling""" for attempt in range(self.max_retries): try: payload = { "jsonrpc": "2.0", "method": "SAP.getFilteredProcOrder", "params": params, "id": f"dynamic-{int(time.time())}" } logger.info(f"MCP Request (attempt {attempt + 1}): {json.dumps(payload, indent=2)}") response = self.session.post( MCP_URL, json=payload, timeout=30 ) if response.status_code == 200: result = response.json() logger.info(f"MCP Response: {json.dumps(result, indent=2)}") return result else: logger.warning(f"MCP server returned status {response.status_code}: {response.text}") if attempt < self.max_retries - 1: time.sleep(self.retry_delay) continue else: return {"error": f"MCP server error: {response.status_code}"} except requests.exceptions.RequestException as e: logger.error(f"MCP request failed (attempt {attempt + 1}): {e}") if attempt < self.max_retries - 1: time.sleep(self.retry_delay) continue else: return {"error": f"Connection failed: {str(e)}"} return {"error": "Max retries exceeded"} def format_sap_results(self, results: List[Dict[str, Any]]) -> str: """Format SAP results for display""" if not results: return "āœ… Query executed successfully, but no matching orders found.\\n\\nTry adjusting your criteria:\\n- Use a different date range\\n- Lower the quantity threshold\\n- Check if the SAP system has data for the specified period" formatted_results = [] formatted_results.append(f"šŸ“Š **Found {len(results)} matching orders:**\\n") for i, result in enumerate(results, 1): order_id = result.get('ProcessOrderConfirmation', 'N/A') material = result.get('Material', 'N/A') quantity = result.get('ConfirmedYieldQuantity', 'N/A') unit = result.get('ConfirmedYieldQuantityUnit', '') posting_date = result.get('PostingDate', 'N/A') formatted_results.append( f"**Order {i}:**\\n" f"• Order ID: {order_id}\\n" f"• Material: {material}\\n" f"• Quantity: {quantity} {unit}\\n" f"• Posting Date: {posting_date}\\n" ) return "\\n".join(formatted_results) def agent_query(self, user_query: str) -> str: """Main query processing function""" if not user_query.strip(): return "āŒ Please enter a query about SAP orders." try: # Step 1: Extract parameters logger.info(f"Processing query: {user_query}") params = self.extract_query_params_robust(user_query) logger.info(f"Extracted parameters: {params}") # Step 2: Query MCP server response = self.query_mcp_server(params) # Step 3: Handle response if "error" in response: return f"āŒ **Error:** {response['error']}\\n\\n**Troubleshooting:**\\n- Check if your MCP server is running\\n- Verify the server URL is correct\\n- Ensure the SAP system is accessible" results = response.get("result", []) # Step 4: Format results formatted_output = self.format_sap_results(results) # Add query info query_info = f"\\n\\nšŸ“ **Query Details:**\\n• Date from: {params.get('date_from', 'N/A')}\\n• Minimum quantity: {params.get('min_quantity', 0)}" return formatted_output + query_info except Exception as e: logger.error(f"Unexpected error in agent_query: {e}") return f"āŒ **Unexpected Error:** {str(e)}\\n\\nPlease try again or contact support if the issue persists." # Initialize the client client = MCPSAPClient() # Example queries for users example_queries = [ "Show me orders after 2023-04-01 with at least 10 units", "Orders from last month with minimum 5 quantity", "Find orders from 2023-06-15 with at least 20 units", "Show me all orders from last week", "Orders with quantity greater than 50 from 2023-05-01" ] # Create Gradio interface def create_interface(): with gr.Blocks(title="SAP MCP Agent", theme=gr.themes.Soft()) as iface: gr.Markdown(""" # šŸ” SAP MCP Agent Query your SAP system naturally using the Model Context Protocol (MCP). **How to use:** - Ask about orders using natural language - Specify dates and quantities in your query - The system will extract parameters and query SAP for you **Example queries:** """) with gr.Row(): with gr.Column(): query_input = gr.Textbox( label="Enter your SAP query", placeholder="e.g., 'Show me orders after 2023-04-01 with at least 10 units'", lines=2 ) submit_btn = gr.Button("šŸ” Query SAP", variant="primary") gr.Markdown("**Example queries:**") example_buttons = [] for example in example_queries: btn = gr.Button(f"šŸ“‹ {example}", size="sm") example_buttons.append(btn) with gr.Column(): output = gr.Textbox( label="Results", lines=15, max_lines=20, show_copy_button=True ) # Event handlers submit_btn.click( client.agent_query, inputs=[query_input], outputs=[output] ) # Example button handlers for btn, example in zip(example_buttons, example_queries): btn.click( lambda x=example: x, outputs=[query_input] ) return iface # Create and launch the interface if __name__ == "__main__": iface = create_interface() iface.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True )