# import gradio as gr # import os # import time # import requests # import concurrent.futures # import json # import re # from huggingface_hub import hf_hub_download # from llama_cpp import Llama # # ========================= # # ENV VARIABLES # # ========================= # client_id = os.getenv("sap_client_id") # client_secret = os.getenv("sap_client_secret") # token_url = os.getenv("sap_token_url") # urls = { # "customers": os.getenv("sap_cap_service_url_customers"), # "products": os.getenv("sap_cap_service_url_products"), # "orders": os.getenv("sap_cap_service_url_salesorders"), # "order_items": os.getenv("sap_cap_service_url_salesorderitems"), # } # # ========================= # # LOAD POWERFUL 3B MODEL # # ========================= # # This model is 6x larger than the 0.5B and much smarter at reasoning # model_path = hf_hub_download( # repo_id="Qwen/Qwen2.5-Coder-3B-Instruct-GGUF", # filename="qwen2.5-coder-3b-instruct-q4_k_m.gguf" # ) # llm = Llama( # model_path=model_path, # n_ctx=2048, # n_threads=4 # Increased threads for the larger model # ) # # ========================= # # CACHE & DATA FETCHING # # ========================= # access_token = None # data_cache = {} # last_refresh = 0 # def get_token(): # global access_token # if not token_url: return # try: # res = requests.post(token_url, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret), timeout=10) # access_token = res.json().get("access_token") # except Exception as e: # print(f"Auth Error: {e}") # def fetch_data(): # global data_cache, last_refresh # if time.time() - last_refresh < 600 and data_cache: # return data_cache # get_token() # headers = {"Authorization": f"Bearer {access_token}"} # def fetch(url): # try: # r = requests.get(url, headers=headers, timeout=10) # return r.json().get("value", []) # except: return [] # with concurrent.futures.ThreadPoolExecutor() as executor: # futures = {k: executor.submit(fetch, v) for k, v in urls.items()} # data_cache = {k: f.result() for k, f in futures.items()} # last_refresh = time.time() # return data_cache # # ========================= # # SMART LLM QUERY GENERATOR # # ========================= # def generate_query(user_prompt): # prompt = f"""<|im_start|>system # You are an SAP Data Expert. Convert user requests into a JSON query. # Rules: # 1. If the user wants a list/table, use "type": "select". # 2. If the user wants totals, counts, or "top" spending, use "type": "aggregation". # 3. Tables: "customers", "products", "orders", "order_items". # Output format: # {{ # "type": "select" | "aggregation", # "table": "string", # "limit": number # }} # <|im_end|> # <|im_start|>user # {user_prompt} # <|im_end|> # <|im_start|>assistant # """ # output = llm(prompt, max_tokens=128, temperature=0.1, stop=["<|im_end|>"]) # text = output["choices"][0]["text"].strip() # # Extract JSON using regex (handles model chatter) # match = re.search(r'(\{.*\})', text, re.DOTALL) # if match: # try: # return json.loads(match.group(1)) # except: # return None # return None # # ========================= # # ENHANCED EXECUTION ENGINE # # ========================= # def execute_query(q, data): # if not q: # return "System could not parse your request into a valid query." # query_type = q.get("type") # table_name = q.get("table") # limit = q.get("limit", 10) # try: # # AGGREGATION LOGIC (Totals/Summing) # if query_type == "aggregation": # items = data.get("order_items", []) # orders = {o['ID']: o for o in data.get("orders", [])} # customers = {c['ID']: c for c in data.get("customers", [])} # summary = {} # for item in items: # order = orders.get(item.get("parent_ID")) # if order: # cust = customers.get(order.get("customer_ID")) # name = cust.get("name", "Unknown") if cust else "Unknown Customer" # amount = float(item.get("netAmount", 0)) # summary[name] = summary.get(name, 0) + amount # sorted_res = sorted(summary.items(), key=lambda x: x[1], reverse=True) # return "\n".join([f"{name}: ${amt:,.2f}" for name, amt in sorted_res[:limit]]) # # SELECT LOGIC (Listing data) # elif query_type == "select": # rows = data.get(table_name, []) # if not rows: return f"No data found in {table_name}." # # Clean up the output for better readability in the UI # formatted_list = [] # for row in rows[:limit]: # # If it's an order, try to make it readable # if table_name == "orders": # formatted_list.append(f"Order ID: {row.get('ID')} | Date: {row.get('createdAt', 'N/A')} | Status: {row.get('lifecycleStatus', 'N/A')}") # else: # formatted_list.append(str(row)) # return "\n".join(formatted_list) # except Exception as e: # return f"Execution Error: {str(e)}" # return "Invalid query parameters generated by LLM." # # ========================= # # GRADIO UI # # ========================= # def main_process(user_prompt): # data = fetch_data() # query_obj = generate_query(user_prompt) # print(f"DEBUG: Logic identified as: {query_obj}") # return execute_query(query_obj, data) # with gr.Blocks(theme=gr.themes.Default(primary_hue="blue")) as demo: # gr.Markdown("# 🚀 Enterprise SAP AI Assistant") # with gr.Row(): # with gr.Column(): # inp = gr.Textbox(placeholder="Try: 'List all orders' or 'Who are my top customers?'", label="Query") # btn = gr.Button("Execute", variant="primary") # with gr.Column(): # out = gr.Textbox(label="Result", lines=15) # btn.click(main_process, inputs=inp, outputs=out) # demo.launch(server_name="0.0.0.0") # ==================================================================================== # import gradio as gr # import os # import time # import requests # import json # import re # import numpy as np # import faiss # from huggingface_hub import hf_hub_download # from llama_cpp import Llama # from sentence_transformers import SentenceTransformer # # ========================= # # CONFIG & MODEL LOAD # # ========================= # # Using the 3B model as the "Brain" and SentenceTransformer as the "Searcher" # model_path = hf_hub_download( # repo_id="Qwen/Qwen2.5-Coder-3B-Instruct-GGUF", # filename="qwen2.5-coder-3b-instruct-q4_k_m.gguf" # ) # llm = Llama(model_path=model_path, n_ctx=2048, n_threads=4) # embed_model = SentenceTransformer('all-MiniLM-L6-v2') # # SAP URLs (Placeholders) # urls = { # "customers": os.getenv("sap_cap_service_url_customers"), # "orders": os.getenv("sap_cap_service_url_salesorders"), # } # # ========================= # # DATA FETCHING # # ========================= # def fetch_all_data(): # # In a real scenario, use your Auth token logic here # # For now, this combines all text data for the RAG engine # all_docs = [] # # Mock retrieval for demonstration - Replace with your fetch_data() logic # raw_data = { # "customers": [{"name": "Acme Corp", "city": "Berlin"}, {"name": "Stark Ind", "city": "NY"}], # "orders": [{"ID": "101", "date": "2023-10-01", "total": 500}] # } # for table, rows in raw_data.items(): # for row in rows: # all_docs.append(f"Table: {table} | Data: {json.dumps(row)}") # return all_docs # # ========================= # # RAG ENGINE (The "Filter") # # ========================= # def get_relevant_context(query, documents): # if not documents: return "" # # 1. Create Embeddings # doc_embeddings = embed_model.encode(documents) # query_embedding = embed_model.encode([query]) # # 2. Setup Vector DB (FAISS) # dimension = doc_embeddings.shape[1] # index = faiss.IndexFlatL2(dimension) # index.add(np.array(doc_embeddings).astype('float32')) # # 3. Search for top 3 most relevant rows # D, I = index.search(np.array(query_embedding).astype('float32'), k=3) # retrieved_context = "\n".join([documents[i] for i in I[0]]) # return retrieved_context # # ========================= # # MAIN PROCESS # # ========================= # def rag_process(user_prompt): # # Step 1: Get all data # documents = fetch_all_data() # # Step 2: RAG Filtering (Retrieve only what matters) # context = get_relevant_context(user_prompt, documents) # # Step 3: LLM Generation (Only looks at filtered data) # prompt = f"""<|im_start|>system # You are an SAP assistant. Use the provided context to answer the user request. # Context: # {context} # <|im_end|> # <|im_start|>user # {user_prompt} # <|im_end|> # <|im_start|>assistant # """ # output = llm(prompt, max_tokens=256, temperature=0.1, stop=["<|im_end|>"]) # return output["choices"][0]["text"].strip() # # ========================= # # UI # # ========================= # demo = gr.Interface( # fn=rag_process, # inputs=gr.Textbox( # lines=2, # placeholder="Ask about SAP data (e.g., Find customers in Berlin)" # ), # outputs="text", # title="SAP Assistant", # description="Ask questions about SAP data using RAG" # ) # demo.launch() # ============================================================================================================= import gradio as gr from llama_cpp import Llama from huggingface_hub import hf_hub_download import requests import os import pandas as pd import time # ========================= # ENV VARIABLES # ========================= client_id = "sb-cap1-3c4588e0trial-dev!t617058" client_secret = "acbe78be-ead5-4b12-b3b4-32fdb27d0f5f$hFj-hDXxwHkNHC-CAvv-OKSr3KH96nLL4KqwIg7M8D8=" token_url = "https://3c4588e0trial.authentication.us10.hana.ondemand.com/oauth/token" cap_service_url_customers = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/Customers?$top=2" cap_service_url_products = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/Products?$top=2" cap_service_url_saleorders = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/SalesOrders?$top=2" cap_service_url_saleorderitems = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/SalesOrderItems?$top=2" # ========================= # GLOBAL VARIABLES # ========================= access_token = None cached_customers = None cached_products = None cached_salesorders = None cached_salesorderitems = None last_refresh = 0 # ========================= # LOAD GGUF MODEL (once) # ========================= print("Downloading/Locating GGUF model...") model_path = hf_hub_download( repo_id="Qwen/Qwen2.5-1.5B-Instruct-GGUF", filename="qwen2.5-1.5b-instruct-q4_k_m.gguf" ) print("Loading model into Llama CPP...") llm = Llama( model_path=model_path, n_ctx=4096, # Increased context window to ensure SAP data fits n_threads=None, # Automatically uses maximum available CPU threads verbose=False # Set to True if you want to see inference speed logs ) # ========================= # TOKEN FUNCTION # ========================= def generate_sap_xsuaa_token(): global access_token print("Generating SAP token...") auth_response = requests.post( token_url, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret) ) if auth_response.status_code != 200: print("Token Error:", auth_response.text) return None access_token = auth_response.json().get("access_token") print("Token generated!") return access_token # ========================= # FETCH SAP DATA # ========================= def fetch_sap_data(): global access_token if not access_token: generate_sap_xsuaa_token() headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/json" } res1 = requests.get(cap_service_url_customers, headers=headers) res2 = requests.get(cap_service_url_products, headers=headers) res3 = requests.get(cap_service_url_saleorders, headers=headers) res4 = requests.get(cap_service_url_saleorderitems, headers=headers) # Retry if token expired if res1.status_code in [400, 401, 403]: print("Token expired. Regenerating...") access_token = None generate_sap_xsuaa_token() headers["Authorization"] = f"Bearer {access_token}" res1 = requests.get(cap_service_url_customers, headers=headers) res2 = requests.get(cap_service_url_products, headers=headers) res3 = requests.get(cap_service_url_saleorders, headers=headers) res4 = requests.get(cap_service_url_saleorderitems, headers=headers) df_customers = pd.DataFrame(res1.json().get("value", [])) df_products = pd.DataFrame(res2.json().get("value", [])) df_saleorders = pd.DataFrame(res3.json().get("value", [])) df_saleorderitems = pd.DataFrame(res4.json().get("value", [])) # Keep only important columns if they exist if not df_customers.empty: df_customers = df_customers[["ID", "name", "country", "industry"]] if not df_products.empty: df_products = df_products[["ID", "name", "category", "price", "currency"]] if not df_saleorders.empty: df_saleorders = df_saleorders[["ID", "customer_ID", "orderDate", "status"]] if not df_saleorderitems.empty: df_saleorderitems = df_saleorderitems[["ID", "parent_ID", "product_ID", "quantity", "netAmount"]] return df_customers, df_products, df_saleorders, df_saleorderitems # ========================= # CACHE LOGIC # ========================= def get_cached_data(): global cached_customers, cached_products, cached_salesorders, cached_salesorderitems, last_refresh # Refresh every 5 minutes (300 seconds, not 3000) if time.time() - last_refresh > 300 or cached_customers is None: print("Refreshing SAP data...") cached_customers, cached_products, cached_salesorders, cached_salesorderitems = fetch_sap_data() last_refresh = time.time() return cached_customers, cached_products, cached_salesorders, cached_salesorderitems # ========================= # MAIN FUNCTION (LLM) # ========================= def generate_response(user_prompt): try: # Get cached SAP data df_customers, df_products, df_saleorders, df_saleorderitems = get_cached_data() # Reduce size customers_text = df_customers.to_string(index=False) if not df_customers.empty else "No Data" products_text = df_products.to_string(index=False) if not df_products.empty else "No Data" saleorders_text = df_saleorders.to_string(index=False) if not df_saleorders.empty else "No Data" saleorderitems_text = df_saleorderitems.to_string(index=False) if not df_saleorderitems.empty else "No Data" # Build system prompt system_prompt = f""" You are an intelligent Corporate SAP Assistant bot. Your sole purpose is to answer the user's questions based strictly on the database records provided to you. Customers Data: {customers_text} Products Data: {products_text} Sale orders Data: {saleorders_text} Sale order items Data: {saleorderitems_text} CRITICAL RULES: 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided. 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database." 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read. 4. TONE: Be concise, highly professional, and helpful. """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] # Generate response using llama-cpp-python response = llm.create_chat_completion( messages=messages, max_tokens=150, temperature=0.2, ) # Extract the content from the response dictionary generated_text = response["choices"][0]["message"]["content"].strip() return generated_text except Exception as e: return f"Error: {str(e)}" # ========================= # GRADIO UI + API # ========================= with gr.Blocks() as demo: user_input = gr.Textbox(label="User Question") output = gr.Textbox(label="Response") btn = gr.Button("Generate") btn.click( fn=generate_response, inputs=[user_input], outputs=output, api_name="predict" ) # REQUIRED for API exposure demo.queue() demo.launch()