|
|
import gradio as gr |
|
|
import requests |
|
|
import json |
|
|
import re |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
import logging |
|
|
from typing import Dict, Any, Optional, List |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
MCP_URL = "https://long-planets-wish.loca.lt/" |
|
|
API_URL = "https://api-inference.huggingface.co/models/microsoft/DialoGPT-medium" |
|
|
|
|
|
|
|
|
FALLBACK_APIS = [ |
|
|
"https://api-inference.huggingface.co/models/microsoft/DialoGPT-small", |
|
|
"https://api-inference.huggingface.co/models/gpt2" |
|
|
] |
|
|
|
|
|
class MCPSAPClient: |
|
|
def __init__(self): |
|
|
self.session = requests.Session() |
|
|
self.session.timeout = 30 |
|
|
self.max_retries = 3 |
|
|
self.retry_delay = 2 |
|
|
|
|
|
def query_llm_with_fallback(self, prompt: str, max_length: int = 150) -> str: |
|
|
"""Query LLM with fallback options and retry logic""" |
|
|
apis_to_try = [API_URL] + FALLBACK_APIS |
|
|
|
|
|
for api_url in apis_to_try: |
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
logger.info(f"Attempting LLM query with {api_url}, attempt {attempt + 1}") |
|
|
|
|
|
|
|
|
if "DialoGPT" in api_url: |
|
|
payload = { |
|
|
"inputs": prompt, |
|
|
"parameters": { |
|
|
"max_new_tokens": max_length, |
|
|
"temperature": 0.3, |
|
|
"return_full_text": False |
|
|
} |
|
|
} |
|
|
else: |
|
|
payload = { |
|
|
"inputs": prompt, |
|
|
"parameters": { |
|
|
"max_length": max_length, |
|
|
"temperature": 0.3 |
|
|
} |
|
|
} |
|
|
|
|
|
response = self.session.post( |
|
|
api_url, |
|
|
json=payload, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
|
|
|
|
|
|
if isinstance(result, list) and len(result) > 0: |
|
|
if "generated_text" in result[0]: |
|
|
return result[0]["generated_text"] |
|
|
elif "text" in result[0]: |
|
|
return result[0]["text"] |
|
|
elif isinstance(result, dict): |
|
|
if "generated_text" in result: |
|
|
return result["generated_text"] |
|
|
elif "text" in result: |
|
|
return result["text"] |
|
|
|
|
|
return str(result) |
|
|
|
|
|
elif response.status_code == 503: |
|
|
logger.warning(f"Model loading, waiting {self.retry_delay} seconds...") |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
else: |
|
|
logger.warning(f"API returned status {response.status_code}") |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Request failed: {e}") |
|
|
if attempt < self.max_retries - 1: |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error: {e}") |
|
|
if attempt < self.max_retries - 1: |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
|
|
|
logger.warning(f"All attempts failed for {api_url}, trying next API...") |
|
|
|
|
|
|
|
|
return "Unable to process with LLM, using fallback parsing" |
|
|
|
|
|
def extract_query_params_robust(self, query: str) -> Dict[str, Any]: |
|
|
"""Extract parameters with multiple strategies""" |
|
|
|
|
|
|
|
|
try: |
|
|
prompt = f""" |
|
|
Extract date and quantity from this query: '{query}' |
|
|
Return only JSON format: |
|
|
{{"date_from":"YYYY-MM-DD","min_quantity":number}} |
|
|
|
|
|
Examples: |
|
|
- "orders after 2023-04-01 with at least 10 units" -> {{"date_from":"2023-04-01","min_quantity":10}} |
|
|
- "show me orders from last month" -> {{"date_from":"2023-11-01","min_quantity":0}} |
|
|
""" |
|
|
|
|
|
llm_response = self.query_llm_with_fallback(prompt) |
|
|
logger.info(f"LLM Response: {llm_response}") |
|
|
|
|
|
|
|
|
json_match = re.search(r'\\{[^}]*\\}', llm_response, re.DOTALL) |
|
|
if json_match: |
|
|
json_text = json_match.group() |
|
|
parsed = json.loads(json_text) |
|
|
if "date_from" in parsed and "min_quantity" in parsed: |
|
|
return parsed |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"LLM extraction failed: {e}") |
|
|
|
|
|
|
|
|
return self.extract_params_rule_based(query) |
|
|
|
|
|
def extract_params_rule_based(self, query: str) -> Dict[str, Any]: |
|
|
"""Fallback rule-based parameter extraction""" |
|
|
params = {"date_from": "2023-01-01", "min_quantity": 0} |
|
|
|
|
|
|
|
|
date_patterns = [ |
|
|
r'(\\d{4}-\\d{2}-\\d{2})', |
|
|
r'(\\d{2}/\\d{2}/\\d{4})', |
|
|
r'(\\d{1,2}/\\d{1,2}/\\d{4})', |
|
|
] |
|
|
|
|
|
for pattern in date_patterns: |
|
|
match = re.search(pattern, query) |
|
|
if match: |
|
|
date_str = match.group(1) |
|
|
try: |
|
|
|
|
|
if '-' in date_str: |
|
|
params["date_from"] = date_str |
|
|
elif '/' in date_str: |
|
|
parts = date_str.split('/') |
|
|
if len(parts) == 3: |
|
|
params["date_from"] = f"{parts[2]}-{parts[0].zfill(2)}-{parts[1].zfill(2)}" |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
|
|
|
quantity_patterns = [ |
|
|
r'at least (\\d+)', |
|
|
r'minimum (\\d+)', |
|
|
r'more than (\\d+)', |
|
|
r'> ?(\\d+)', |
|
|
r'greater than (\\d+)' |
|
|
] |
|
|
|
|
|
for pattern in quantity_patterns: |
|
|
match = re.search(pattern, query, re.IGNORECASE) |
|
|
if match: |
|
|
try: |
|
|
params["min_quantity"] = int(match.group(1)) |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
|
|
|
if "last month" in query.lower(): |
|
|
last_month = datetime.now() - timedelta(days=30) |
|
|
params["date_from"] = last_month.strftime("%Y-%m-%d") |
|
|
elif "last week" in query.lower(): |
|
|
last_week = datetime.now() - timedelta(days=7) |
|
|
params["date_from"] = last_week.strftime("%Y-%m-%d") |
|
|
elif "yesterday" in query.lower(): |
|
|
yesterday = datetime.now() - timedelta(days=1) |
|
|
params["date_from"] = yesterday.strftime("%Y-%m-%d") |
|
|
elif "today" in query.lower(): |
|
|
params["date_from"] = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
return params |
|
|
|
|
|
def query_mcp_server(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Query MCP server with proper error handling""" |
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
payload = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "SAP.getFilteredProcOrder", |
|
|
"params": params, |
|
|
"id": f"dynamic-{int(time.time())}" |
|
|
} |
|
|
|
|
|
logger.info(f"MCP Request (attempt {attempt + 1}): {json.dumps(payload, indent=2)}") |
|
|
|
|
|
response = self.session.post( |
|
|
MCP_URL, |
|
|
json=payload, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
logger.info(f"MCP Response: {json.dumps(result, indent=2)}") |
|
|
return result |
|
|
else: |
|
|
logger.warning(f"MCP server returned status {response.status_code}: {response.text}") |
|
|
if attempt < self.max_retries - 1: |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
else: |
|
|
return {"error": f"MCP server error: {response.status_code}"} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"MCP request failed (attempt {attempt + 1}): {e}") |
|
|
if attempt < self.max_retries - 1: |
|
|
time.sleep(self.retry_delay) |
|
|
continue |
|
|
else: |
|
|
return {"error": f"Connection failed: {str(e)}"} |
|
|
|
|
|
return {"error": "Max retries exceeded"} |
|
|
|
|
|
def format_sap_results(self, results: List[Dict[str, Any]]) -> str: |
|
|
"""Format SAP results for display""" |
|
|
if not results: |
|
|
return "β
Query executed successfully, but no matching orders found.\\n\\nTry adjusting your criteria:\\n- Use a different date range\\n- Lower the quantity threshold\\n- Check if the SAP system has data for the specified period" |
|
|
|
|
|
formatted_results = [] |
|
|
formatted_results.append(f"π **Found {len(results)} matching orders:**\\n") |
|
|
|
|
|
for i, result in enumerate(results, 1): |
|
|
order_id = result.get('ProcessOrderConfirmation', 'N/A') |
|
|
material = result.get('Material', 'N/A') |
|
|
quantity = result.get('ConfirmedYieldQuantity', 'N/A') |
|
|
unit = result.get('ConfirmedYieldQuantityUnit', '') |
|
|
posting_date = result.get('PostingDate', 'N/A') |
|
|
|
|
|
formatted_results.append( |
|
|
f"**Order {i}:**\\n" |
|
|
f"β’ Order ID: {order_id}\\n" |
|
|
f"β’ Material: {material}\\n" |
|
|
f"β’ Quantity: {quantity} {unit}\\n" |
|
|
f"β’ Posting Date: {posting_date}\\n" |
|
|
) |
|
|
|
|
|
return "\\n".join(formatted_results) |
|
|
|
|
|
def agent_query(self, user_query: str) -> str: |
|
|
"""Main query processing function""" |
|
|
if not user_query.strip(): |
|
|
return "β Please enter a query about SAP orders." |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Processing query: {user_query}") |
|
|
params = self.extract_query_params_robust(user_query) |
|
|
logger.info(f"Extracted parameters: {params}") |
|
|
|
|
|
|
|
|
response = self.query_mcp_server(params) |
|
|
|
|
|
|
|
|
if "error" in response: |
|
|
return f"β **Error:** {response['error']}\\n\\n**Troubleshooting:**\\n- Check if your MCP server is running\\n- Verify the server URL is correct\\n- Ensure the SAP system is accessible" |
|
|
|
|
|
results = response.get("result", []) |
|
|
|
|
|
|
|
|
formatted_output = self.format_sap_results(results) |
|
|
|
|
|
|
|
|
query_info = f"\\n\\nπ **Query Details:**\\nβ’ Date from: {params.get('date_from', 'N/A')}\\nβ’ Minimum quantity: {params.get('min_quantity', 0)}" |
|
|
|
|
|
return formatted_output + query_info |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error in agent_query: {e}") |
|
|
return f"β **Unexpected Error:** {str(e)}\\n\\nPlease try again or contact support if the issue persists." |
|
|
|
|
|
|
|
|
client = MCPSAPClient() |
|
|
|
|
|
|
|
|
example_queries = [ |
|
|
"Show me orders after 2023-04-01 with at least 10 units", |
|
|
"Orders from last month with minimum 5 quantity", |
|
|
"Find orders from 2023-06-15 with at least 20 units", |
|
|
"Show me all orders from last week", |
|
|
"Orders with quantity greater than 50 from 2023-05-01" |
|
|
] |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(title="SAP MCP Agent", theme=gr.themes.Soft()) as iface: |
|
|
gr.Markdown(""" |
|
|
# π SAP MCP Agent |
|
|
|
|
|
Query your SAP system naturally using the Model Context Protocol (MCP). |
|
|
|
|
|
**How to use:** |
|
|
- Ask about orders using natural language |
|
|
- Specify dates and quantities in your query |
|
|
- The system will extract parameters and query SAP for you |
|
|
|
|
|
**Example queries:** |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
query_input = gr.Textbox( |
|
|
label="Enter your SAP query", |
|
|
placeholder="e.g., 'Show me orders after 2023-04-01 with at least 10 units'", |
|
|
lines=2 |
|
|
) |
|
|
|
|
|
submit_btn = gr.Button("π Query SAP", variant="primary") |
|
|
|
|
|
gr.Markdown("**Example queries:**") |
|
|
example_buttons = [] |
|
|
for example in example_queries: |
|
|
btn = gr.Button(f"π {example}", size="sm") |
|
|
example_buttons.append(btn) |
|
|
|
|
|
with gr.Column(): |
|
|
output = gr.Textbox( |
|
|
label="Results", |
|
|
lines=15, |
|
|
max_lines=20, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
client.agent_query, |
|
|
inputs=[query_input], |
|
|
outputs=[output] |
|
|
) |
|
|
|
|
|
|
|
|
for btn, example in zip(example_buttons, example_queries): |
|
|
btn.click( |
|
|
lambda x=example: x, |
|
|
outputs=[query_input] |
|
|
) |
|
|
|
|
|
return iface |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface = create_interface() |
|
|
iface.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True, |
|
|
show_error=True |
|
|
) |