MCP_test / app.py
PD03's picture
Update app.py
71ea048 verified
raw
history blame
14.9 kB
import gradio as gr
import requests
import json
import re
import time
from datetime import datetime, timedelta
import logging
from typing import Dict, Any, Optional, List
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
MCP_URL = "https://long-planets-wish.loca.lt/"
API_URL = "https://api-inference.huggingface.co/models/microsoft/DialoGPT-medium"
# Fallback API URLs in case primary fails
FALLBACK_APIS = [
"https://api-inference.huggingface.co/models/microsoft/DialoGPT-small",
"https://api-inference.huggingface.co/models/gpt2"
]
class MCPSAPClient:
def __init__(self):
self.session = requests.Session()
self.session.timeout = 30
self.max_retries = 3
self.retry_delay = 2
def query_llm_with_fallback(self, prompt: str, max_length: int = 150) -> str:
"""Query LLM with fallback options and retry logic"""
apis_to_try = [API_URL] + FALLBACK_APIS
for api_url in apis_to_try:
for attempt in range(self.max_retries):
try:
logger.info(f"Attempting LLM query with {api_url}, attempt {attempt + 1}")
# Different payload formats for different APIs
if "DialoGPT" in api_url:
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_length,
"temperature": 0.3,
"return_full_text": False
}
}
else:
payload = {
"inputs": prompt,
"parameters": {
"max_length": max_length,
"temperature": 0.3
}
}
response = self.session.post(
api_url,
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
# Handle different response formats
if isinstance(result, list) and len(result) > 0:
if "generated_text" in result[0]:
return result[0]["generated_text"]
elif "text" in result[0]:
return result[0]["text"]
elif isinstance(result, dict):
if "generated_text" in result:
return result["generated_text"]
elif "text" in result:
return result["text"]
return str(result)
elif response.status_code == 503:
logger.warning(f"Model loading, waiting {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
continue
else:
logger.warning(f"API returned status {response.status_code}")
time.sleep(self.retry_delay)
continue
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
except Exception as e:
logger.error(f"Unexpected error: {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
logger.warning(f"All attempts failed for {api_url}, trying next API...")
# If all APIs fail, return a fallback response
return "Unable to process with LLM, using fallback parsing"
def extract_query_params_robust(self, query: str) -> Dict[str, Any]:
"""Extract parameters with multiple strategies"""
# Strategy 1: Try LLM extraction
try:
prompt = f"""
Extract date and quantity from this query: '{query}'
Return only JSON format:
{{"date_from":"YYYY-MM-DD","min_quantity":number}}
Examples:
- "orders after 2023-04-01 with at least 10 units" -> {{"date_from":"2023-04-01","min_quantity":10}}
- "show me orders from last month" -> {{"date_from":"2023-11-01","min_quantity":0}}
"""
llm_response = self.query_llm_with_fallback(prompt)
logger.info(f"LLM Response: {llm_response}")
# Try to extract JSON from response
json_match = re.search(r'\\{[^}]*\\}', llm_response, re.DOTALL)
if json_match:
json_text = json_match.group()
parsed = json.loads(json_text)
if "date_from" in parsed and "min_quantity" in parsed:
return parsed
except Exception as e:
logger.warning(f"LLM extraction failed: {e}")
# Strategy 2: Rule-based extraction
return self.extract_params_rule_based(query)
def extract_params_rule_based(self, query: str) -> Dict[str, Any]:
"""Fallback rule-based parameter extraction"""
params = {"date_from": "2023-01-01", "min_quantity": 0}
# Date extraction patterns
date_patterns = [
r'(\\d{4}-\\d{2}-\\d{2})', # YYYY-MM-DD
r'(\\d{2}/\\d{2}/\\d{4})', # MM/DD/YYYY
r'(\\d{1,2}/\\d{1,2}/\\d{4})', # M/D/YYYY
]
for pattern in date_patterns:
match = re.search(pattern, query)
if match:
date_str = match.group(1)
try:
# Convert to YYYY-MM-DD format
if '-' in date_str:
params["date_from"] = date_str
elif '/' in date_str:
parts = date_str.split('/')
if len(parts) == 3:
params["date_from"] = f"{parts[2]}-{parts[0].zfill(2)}-{parts[1].zfill(2)}"
break
except:
continue
# Quantity extraction
quantity_patterns = [
r'at least (\\d+)',
r'minimum (\\d+)',
r'more than (\\d+)',
r'> ?(\\d+)',
r'greater than (\\d+)'
]
for pattern in quantity_patterns:
match = re.search(pattern, query, re.IGNORECASE)
if match:
try:
params["min_quantity"] = int(match.group(1))
break
except:
continue
# Handle relative dates
if "last month" in query.lower():
last_month = datetime.now() - timedelta(days=30)
params["date_from"] = last_month.strftime("%Y-%m-%d")
elif "last week" in query.lower():
last_week = datetime.now() - timedelta(days=7)
params["date_from"] = last_week.strftime("%Y-%m-%d")
elif "yesterday" in query.lower():
yesterday = datetime.now() - timedelta(days=1)
params["date_from"] = yesterday.strftime("%Y-%m-%d")
elif "today" in query.lower():
params["date_from"] = datetime.now().strftime("%Y-%m-%d")
return params
def query_mcp_server(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Query MCP server with proper error handling"""
for attempt in range(self.max_retries):
try:
payload = {
"jsonrpc": "2.0",
"method": "SAP.getFilteredProcOrder",
"params": params,
"id": f"dynamic-{int(time.time())}"
}
logger.info(f"MCP Request (attempt {attempt + 1}): {json.dumps(payload, indent=2)}")
response = self.session.post(
MCP_URL,
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
logger.info(f"MCP Response: {json.dumps(result, indent=2)}")
return result
else:
logger.warning(f"MCP server returned status {response.status_code}: {response.text}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
else:
return {"error": f"MCP server error: {response.status_code}"}
except requests.exceptions.RequestException as e:
logger.error(f"MCP request failed (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
else:
return {"error": f"Connection failed: {str(e)}"}
return {"error": "Max retries exceeded"}
def format_sap_results(self, results: List[Dict[str, Any]]) -> str:
"""Format SAP results for display"""
if not results:
return "βœ… Query executed successfully, but no matching orders found.\\n\\nTry adjusting your criteria:\\n- Use a different date range\\n- Lower the quantity threshold\\n- Check if the SAP system has data for the specified period"
formatted_results = []
formatted_results.append(f"πŸ“Š **Found {len(results)} matching orders:**\\n")
for i, result in enumerate(results, 1):
order_id = result.get('ProcessOrderConfirmation', 'N/A')
material = result.get('Material', 'N/A')
quantity = result.get('ConfirmedYieldQuantity', 'N/A')
unit = result.get('ConfirmedYieldQuantityUnit', '')
posting_date = result.get('PostingDate', 'N/A')
formatted_results.append(
f"**Order {i}:**\\n"
f"β€’ Order ID: {order_id}\\n"
f"β€’ Material: {material}\\n"
f"β€’ Quantity: {quantity} {unit}\\n"
f"β€’ Posting Date: {posting_date}\\n"
)
return "\\n".join(formatted_results)
def agent_query(self, user_query: str) -> str:
"""Main query processing function"""
if not user_query.strip():
return "❌ Please enter a query about SAP orders."
try:
# Step 1: Extract parameters
logger.info(f"Processing query: {user_query}")
params = self.extract_query_params_robust(user_query)
logger.info(f"Extracted parameters: {params}")
# Step 2: Query MCP server
response = self.query_mcp_server(params)
# Step 3: Handle response
if "error" in response:
return f"❌ **Error:** {response['error']}\\n\\n**Troubleshooting:**\\n- Check if your MCP server is running\\n- Verify the server URL is correct\\n- Ensure the SAP system is accessible"
results = response.get("result", [])
# Step 4: Format results
formatted_output = self.format_sap_results(results)
# Add query info
query_info = f"\\n\\nπŸ“ **Query Details:**\\nβ€’ Date from: {params.get('date_from', 'N/A')}\\nβ€’ Minimum quantity: {params.get('min_quantity', 0)}"
return formatted_output + query_info
except Exception as e:
logger.error(f"Unexpected error in agent_query: {e}")
return f"❌ **Unexpected Error:** {str(e)}\\n\\nPlease try again or contact support if the issue persists."
# Initialize the client
client = MCPSAPClient()
# Example queries for users
example_queries = [
"Show me orders after 2023-04-01 with at least 10 units",
"Orders from last month with minimum 5 quantity",
"Find orders from 2023-06-15 with at least 20 units",
"Show me all orders from last week",
"Orders with quantity greater than 50 from 2023-05-01"
]
# Create Gradio interface
def create_interface():
with gr.Blocks(title="SAP MCP Agent", theme=gr.themes.Soft()) as iface:
gr.Markdown("""
# πŸ” SAP MCP Agent
Query your SAP system naturally using the Model Context Protocol (MCP).
**How to use:**
- Ask about orders using natural language
- Specify dates and quantities in your query
- The system will extract parameters and query SAP for you
**Example queries:**
""")
with gr.Row():
with gr.Column():
query_input = gr.Textbox(
label="Enter your SAP query",
placeholder="e.g., 'Show me orders after 2023-04-01 with at least 10 units'",
lines=2
)
submit_btn = gr.Button("πŸ” Query SAP", variant="primary")
gr.Markdown("**Example queries:**")
example_buttons = []
for example in example_queries:
btn = gr.Button(f"πŸ“‹ {example}", size="sm")
example_buttons.append(btn)
with gr.Column():
output = gr.Textbox(
label="Results",
lines=15,
max_lines=20,
show_copy_button=True
)
# Event handlers
submit_btn.click(
client.agent_query,
inputs=[query_input],
outputs=[output]
)
# Example button handlers
for btn, example in zip(example_buttons, example_queries):
btn.click(
lambda x=example: x,
outputs=[query_input]
)
return iface
# Create and launch the interface
if __name__ == "__main__":
iface = create_interface()
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)