Spaces:
Sleeping
Sleeping
| # Setup Hugging Face Inference API for LLAMA3 | |
| import os | |
| import requests | |
| import json | |
| import gradio as gr | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import time | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration - Set these as environment variables in Hugging Face Spaces | |
| SAP_API_KEY = os.getenv('SAP_API_KEY') # Set in Space secrets | |
| HF_TOKEN = os.getenv('HF_TOKEN') # Set in Space secrets | |
| SAP_BASE_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata/sap" | |
| # Hugging Face Inference API endpoints | |
| HF_API_BASE = "https://api-inference.huggingface.co/models" | |
| LLAMA3_MODEL = "meta-llama/Meta-Llama-3-8B-Instruct" # Using inference API | |
| class LLAMA3Client: | |
| def __init__(self, hf_token: str): | |
| self.hf_token = hf_token | |
| self.api_url = f"{HF_API_BASE}/{LLAMA3_MODEL}" | |
| self.headers = { | |
| "Authorization": f"Bearer {hf_token}", | |
| "Content-Type": "application/json" | |
| } | |
| # Warm up the model | |
| self._warm_up_model() | |
| def _warm_up_model(self): | |
| """Warm up the model to avoid cold start delays""" | |
| try: | |
| logger.info("Warming up LLAMA3 model...") | |
| self._make_inference_request("Hello", max_new_tokens=10) | |
| logger.info("Model warmed up successfully") | |
| except Exception as e: | |
| logger.warning(f"Model warm-up failed: {e}") | |
| def _make_inference_request(self, prompt: str, max_new_tokens: int = 500, temperature: float = 0.1, max_retries: int = 3) -> str: | |
| """Make inference request to Hugging Face API with retry logic""" | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": temperature, | |
| "do_sample": True, | |
| "top_p": 0.9, | |
| "return_full_text": False | |
| } | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post( | |
| self.api_url, | |
| headers=self.headers, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code == 503: | |
| # Model is loading, wait and retry | |
| wait_time = min(20 * (attempt + 1), 60) | |
| logger.info(f"Model loading, waiting {wait_time}s...") | |
| time.sleep(wait_time) | |
| continue | |
| response.raise_for_status() | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| return result[0].get('generated_text', '').strip() | |
| elif isinstance(result, dict) and 'generated_text' in result: | |
| return result['generated_text'].strip() | |
| else: | |
| logger.error(f"Unexpected response format: {result}") | |
| return "I received an unexpected response format." | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request failed (attempt {attempt + 1}): {e}") | |
| if attempt == max_retries - 1: | |
| return f"Failed to get response after {max_retries} attempts: {str(e)}" | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {e}") | |
| return f"An unexpected error occurred: {str(e)}" | |
| return "Failed to generate response" | |
| def generate_response(self, prompt: str, max_length: int = 500, temperature: float = 0.1) -> str: | |
| """Generate response using LLAMA3 via Inference API""" | |
| # Format prompt for LLAMA3 instruction format | |
| formatted_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a helpful SAP data analyst. Provide clear, concise answers based on the provided data. Keep responses under 300 words.<|eot_id|><|start_header_id|>user<|end_header_id|> | |
| {prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|> | |
| """ | |
| try: | |
| response = self._make_inference_request( | |
| formatted_prompt, | |
| max_new_tokens=min(max_length, 400), # Limit tokens to avoid timeouts | |
| temperature=temperature | |
| ) | |
| # Clean up the response | |
| if response and len(response.strip()) > 0: | |
| return response | |
| else: | |
| return "I couldn't generate a proper response. Please try rephrasing your question." | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| return f"I encountered an error while processing your question: {str(e)}" | |
| class SAPDataFetcher: | |
| def __init__(self, api_key: str): | |
| self.api_key = api_key | |
| self.headers = { | |
| "APIKey": api_key, | |
| "Accept": "application/json", | |
| "Content-Type": "application/json" | |
| } | |
| def _make_request(self, url: str, timeout: int = 30) -> Optional[Dict]: | |
| """Make HTTP request with proper error handling""" | |
| try: | |
| logger.info(f"Making request to: {url}") | |
| response = requests.get(url, headers=self.headers, timeout=timeout) | |
| response.raise_for_status() | |
| data = response.json() | |
| logger.info(f"Request successful. Response size: {len(str(data))}") | |
| return data | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request failed: {e}") | |
| return None | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON decode error: {e}") | |
| return None | |
| def fetch_sales_orders(self, top: int = 30) -> List[Dict]: | |
| """Fetch sales orders with error handling""" | |
| url = f"{SAP_BASE_URL}/API_SALES_ORDER_SRV/A_SalesOrder?$top={top}&$inlinecount=allpages" | |
| data = self._make_request(url) | |
| if data and 'd' in data and 'results' in data['d']: | |
| orders = data['d']['results'] | |
| # Simplify the data structure | |
| simplified_orders = [] | |
| for order in orders: | |
| simplified_order = { | |
| "SalesOrder": order.get("SalesOrder", ""), | |
| "SalesOrderType": order.get("SalesOrderType", ""), | |
| "SalesOrganization": order.get("SalesOrganization", ""), | |
| "SoldToParty": order.get("SoldToParty", ""), | |
| "CreationDate": order.get("CreationDate", ""), | |
| "CreatedByUser": order.get("CreatedByUser", ""), | |
| "TransactionCurrency": order.get("TransactionCurrency", ""), | |
| "TotalNetAmount": order.get("TotalNetAmount", "0") | |
| } | |
| simplified_orders.append(simplified_order) | |
| return simplified_orders | |
| else: | |
| logger.error("Failed to fetch sales orders or invalid response format") | |
| return [] | |
| def fetch_purchase_orders(self, top: int = 30) -> List[Dict]: | |
| """Fetch purchase order headers""" | |
| url = f"{SAP_BASE_URL}/API_PURCHASEORDER_PROCESS_SRV/A_PurchaseOrder?$top={top}&$inlinecount=allpages" | |
| data = self._make_request(url) | |
| if data and 'd' in data and 'results' in data['d']: | |
| orders = data['d']['results'] | |
| simplified_orders = [] | |
| for order in orders: | |
| simplified_order = { | |
| "PurchaseOrder": order.get("PurchaseOrder", ""), | |
| "CompanyCode": order.get("CompanyCode", ""), | |
| "PurchaseOrderType": order.get("PurchaseOrderType", ""), | |
| "CreatedByUser": order.get("CreatedByUser", ""), | |
| "CreationDate": order.get("CreationDate", ""), | |
| "Supplier": order.get("Supplier", ""), | |
| "PurchasingOrganization": order.get("PurchasingOrganization", ""), | |
| "PurchasingGroup": order.get("PurchasingGroup", ""), | |
| "PurchaseOrderDate": order.get("PurchaseOrderDate", ""), | |
| "DocumentCurrency": order.get("DocumentCurrency", ""), | |
| "ExchangeRate": order.get("ExchangeRate", "1.0") | |
| } | |
| simplified_orders.append(simplified_order) | |
| return simplified_orders | |
| else: | |
| logger.error("Failed to fetch purchase orders or invalid response format") | |
| return [] | |
| def fetch_purchase_order_items(self, purchase_orders: List[str]) -> List[Dict]: | |
| """Fetch purchase order items for given order numbers""" | |
| all_items = [] | |
| for po_number in purchase_orders[:5]: # Reduced limit for faster processing | |
| url = f"{SAP_BASE_URL}/API_PURCHASEORDER_PROCESS_SRV/A_PurchaseOrderItem?$filter=PurchaseOrder eq '{po_number}'" | |
| data = self._make_request(url) | |
| if data and 'd' in data and 'results' in data['d']: | |
| items = data['d']['results'] | |
| for item in items: | |
| simplified_item = { | |
| "PurchaseOrder": item.get("PurchaseOrder", ""), | |
| "PurchaseOrderItem": item.get("PurchaseOrderItem", ""), | |
| "Plant": item.get("Plant", ""), | |
| "StorageLocation": item.get("StorageLocation", ""), | |
| "MaterialGroup": item.get("MaterialGroup", ""), | |
| "OrderQuantity": item.get("OrderQuantity", "0"), | |
| "PurchaseOrderQuantityUnit": item.get("PurchaseOrderQuantityUnit", ""), | |
| "DocumentCurrency": item.get("DocumentCurrency", ""), | |
| "NetPriceAmount": item.get("NetPriceAmount", "0"), | |
| "NetPriceQuantity": item.get("NetPriceQuantity", "0") | |
| } | |
| all_items.append(simplified_item) | |
| return all_items | |
| class SAPAgent: | |
| def __init__(self, data_fetcher: SAPDataFetcher, llama_client: LLAMA3Client): | |
| self.data_fetcher = data_fetcher | |
| self.llama_client = llama_client | |
| def categorize_query(self, question: str) -> str: | |
| """Determine if query is about sales or purchase orders""" | |
| category_prompt = f"""Analyze this question and determine if it's about Sales Orders or Purchase Orders: | |
| Question: "{question}" | |
| Guidelines: | |
| - Sales Orders: customer orders, sales transactions, revenue, sold to party | |
| - Purchase Orders: supplier orders, procurement, purchasing, vendor transactions | |
| Respond with exactly one word: "sales" or "purchase" """ | |
| try: | |
| response = self.llama_client.generate_response(category_prompt, max_length=20, temperature=0) | |
| category = response.strip().lower() | |
| return "sales" if "sales" in category else "purchase" | |
| except Exception as e: | |
| logger.error(f"Error in categorization: {e}") | |
| return "purchase" # Default to purchase | |
| def needs_item_details(self, question: str) -> bool: | |
| """Determine if question requires item-level details""" | |
| detail_prompt = f"""Does this question require detailed item-level information (quantities, prices, materials, line items)? | |
| Question: "{question}" | |
| Answer only "yes" or "no" """ | |
| try: | |
| response = self.llama_client.generate_response(detail_prompt, max_length=20, temperature=0) | |
| answer = response.strip().lower() | |
| return "yes" in answer | |
| except Exception as e: | |
| logger.error(f"Error determining detail needs: {e}") | |
| return False | |
| def process_query(self, question: str) -> str: | |
| """Main function to process user queries""" | |
| logger.info(f"Processing query: {question}") | |
| # Categorize the query | |
| category = self.categorize_query(question) | |
| logger.info(f"Query categorized as: {category}") | |
| # Fetch appropriate data | |
| if category == "sales": | |
| data = self.data_fetcher.fetch_sales_orders() | |
| data_type = "Sales Orders" | |
| context = {"orders": data} | |
| else: | |
| # Fetch purchase order headers | |
| po_headers = self.data_fetcher.fetch_purchase_orders() | |
| context = {"headers": po_headers} | |
| data_type = "Purchase Order Headers" | |
| # Check if item details are needed | |
| if self.needs_item_details(question) and po_headers: | |
| logger.info("Fetching item-level details") | |
| po_numbers = [po["PurchaseOrder"] for po in po_headers[:5] if po["PurchaseOrder"]] # Limit for performance | |
| po_items = self.data_fetcher.fetch_purchase_order_items(po_numbers) | |
| context["items"] = po_items | |
| data_type = "Purchase Orders with Item Details" | |
| # Calculate total value | |
| total_value = 0.0 | |
| for item in po_items: | |
| try: | |
| net_price = float(item.get("NetPriceAmount", 0)) | |
| quantity = float(item.get("OrderQuantity", 0)) | |
| total_value += net_price * quantity | |
| except (ValueError, TypeError): | |
| continue | |
| context["total_value"] = total_value | |
| # Generate response using LLAMA3 | |
| return self.generate_response(question, context, data_type) | |
| def generate_response(self, question: str, context: Dict, data_type: str) -> str: | |
| """Generate response using LLAMA3""" | |
| # Limit context size for API efficiency | |
| context_str = json.dumps(context, indent=2) | |
| if len(context_str) > 2000: # Smaller limit for API | |
| context_str = context_str[:2000] + "... (truncated)" | |
| prompt = f"""Data Type: {data_type} | |
| Available Data: | |
| {context_str} | |
| User Question: {question} | |
| Instructions: | |
| 1. Provide a clear, concise answer based on the data | |
| 2. Include specific numbers, dates, or values when relevant | |
| 3. If the data doesn't contain enough information to answer fully, mention this | |
| 4. Format your response in a user-friendly way | |
| 5. Keep response under 250 words""" | |
| try: | |
| return self.llama_client.generate_response(prompt, max_length=400, temperature=0.1) | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| return f"I encountered an error while processing your question: {str(e)}" | |
| # Initialize the system | |
| try: | |
| if not HF_TOKEN: | |
| logger.error("HF_TOKEN not found in environment variables") | |
| sap_agent = None | |
| else: | |
| llama_client = LLAMA3Client(HF_TOKEN) | |
| if SAP_API_KEY: | |
| data_fetcher = SAPDataFetcher(SAP_API_KEY) | |
| sap_agent = SAPAgent(data_fetcher, llama_client) | |
| logger.info("SAP Agent initialized successfully") | |
| else: | |
| logger.warning("SAP_API_KEY not found. Demo mode enabled.") | |
| sap_agent = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize SAP Agent: {e}") | |
| sap_agent = None | |
| # Gradio Interface | |
| def chat_with_sap(message, history): | |
| """Handle chat interactions""" | |
| if not sap_agent: | |
| return history + [("System", "SAP Agent not initialized. Please check your HF_TOKEN and SAP_API_KEY in Space secrets.")] | |
| if not message.strip(): | |
| return history | |
| try: | |
| # Add typing indicator | |
| history = history or [] | |
| history.append((message, "π€ Thinking...")) | |
| yield history | |
| # Process the query | |
| response = sap_agent.process_query(message) | |
| history[-1] = (message, response) | |
| yield history | |
| except Exception as e: | |
| error_msg = f"Error processing your request: {str(e)}" | |
| history = history or [] | |
| if history and history[-1][1] == "π€ Thinking...": | |
| history[-1] = (message, error_msg) | |
| else: | |
| history.append((message, error_msg)) | |
| yield history | |
| def clear_chat(): | |
| return [] | |
| # Create Gradio interface | |
| with gr.Blocks(title="SAP Order Analytics Agent with LLAMA3") as demo: | |
| gr.Markdown(""" | |
| # π SAP Order Analytics Agent (Powered by LLAMA3 via Inference API) | |
| This AI agent uses Meta's LLAMA3 model via Hugging Face Inference API to analyze SAP data. Ask questions like: | |
| - "How many sales orders do we have?" | |
| - "What's the total value of all purchase orders?" | |
| - "Show me recent purchase orders" | |
| - "What are the top suppliers?" | |
| **Setup Required:** | |
| 1. Set `HF_TOKEN` in Space secrets (your Hugging Face token) | |
| 2. Set `SAP_API_KEY` in Space secrets (your SAP API key) | |
| 3. Ensure you have access to LLAMA3 model on Hugging Face | |
| """) | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| placeholder="Ask me anything about your SAP orders...", | |
| show_copy_button=True | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Type your question here...", | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", scale=1, variant="primary") | |
| clear_btn = gr.Button("Clear", scale=1) | |
| # Event handlers | |
| submit_btn.click(chat_with_sap, [msg, chatbot], [chatbot]) | |
| msg.submit(chat_with_sap, [msg, chatbot], [chatbot]) | |
| clear_btn.click(clear_chat, outputs=[chatbot]) | |
| # Clear input after submission | |
| submit_btn.click(lambda: "", outputs=[msg]) | |
| msg.submit(lambda: "", outputs=[msg]) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |