Spc1 / app.py
Vedant104's picture
Update app.py
ab63a8a verified
# import gradio as gr
# import os
# import time
# import requests
# import concurrent.futures
# import json
# import re
# from huggingface_hub import hf_hub_download
# from llama_cpp import Llama
# # =========================
# # ENV VARIABLES
# # =========================
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# urls = {
# "customers": os.getenv("sap_cap_service_url_customers"),
# "products": os.getenv("sap_cap_service_url_products"),
# "orders": os.getenv("sap_cap_service_url_salesorders"),
# "order_items": os.getenv("sap_cap_service_url_salesorderitems"),
# }
# # =========================
# # LOAD POWERFUL 3B MODEL
# # =========================
# # This model is 6x larger than the 0.5B and much smarter at reasoning
# model_path = hf_hub_download(
# repo_id="Qwen/Qwen2.5-Coder-3B-Instruct-GGUF",
# filename="qwen2.5-coder-3b-instruct-q4_k_m.gguf"
# )
# llm = Llama(
# model_path=model_path,
# n_ctx=2048,
# n_threads=4 # Increased threads for the larger model
# )
# # =========================
# # CACHE & DATA FETCHING
# # =========================
# access_token = None
# data_cache = {}
# last_refresh = 0
# def get_token():
# global access_token
# if not token_url: return
# try:
# res = requests.post(token_url, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret), timeout=10)
# access_token = res.json().get("access_token")
# except Exception as e:
# print(f"Auth Error: {e}")
# def fetch_data():
# global data_cache, last_refresh
# if time.time() - last_refresh < 600 and data_cache:
# return data_cache
# get_token()
# headers = {"Authorization": f"Bearer {access_token}"}
# def fetch(url):
# try:
# r = requests.get(url, headers=headers, timeout=10)
# return r.json().get("value", [])
# except: return []
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = {k: executor.submit(fetch, v) for k, v in urls.items()}
# data_cache = {k: f.result() for k, f in futures.items()}
# last_refresh = time.time()
# return data_cache
# # =========================
# # SMART LLM QUERY GENERATOR
# # =========================
# def generate_query(user_prompt):
# prompt = f"""<|im_start|>system
# You are an SAP Data Expert. Convert user requests into a JSON query.
# Rules:
# 1. If the user wants a list/table, use "type": "select".
# 2. If the user wants totals, counts, or "top" spending, use "type": "aggregation".
# 3. Tables: "customers", "products", "orders", "order_items".
# Output format:
# {{
# "type": "select" | "aggregation",
# "table": "string",
# "limit": number
# }}
# <|im_end|>
# <|im_start|>user
# {user_prompt}
# <|im_end|>
# <|im_start|>assistant
# """
# output = llm(prompt, max_tokens=128, temperature=0.1, stop=["<|im_end|>"])
# text = output["choices"][0]["text"].strip()
# # Extract JSON using regex (handles model chatter)
# match = re.search(r'(\{.*\})', text, re.DOTALL)
# if match:
# try:
# return json.loads(match.group(1))
# except:
# return None
# return None
# # =========================
# # ENHANCED EXECUTION ENGINE
# # =========================
# def execute_query(q, data):
# if not q:
# return "System could not parse your request into a valid query."
# query_type = q.get("type")
# table_name = q.get("table")
# limit = q.get("limit", 10)
# try:
# # AGGREGATION LOGIC (Totals/Summing)
# if query_type == "aggregation":
# items = data.get("order_items", [])
# orders = {o['ID']: o for o in data.get("orders", [])}
# customers = {c['ID']: c for c in data.get("customers", [])}
# summary = {}
# for item in items:
# order = orders.get(item.get("parent_ID"))
# if order:
# cust = customers.get(order.get("customer_ID"))
# name = cust.get("name", "Unknown") if cust else "Unknown Customer"
# amount = float(item.get("netAmount", 0))
# summary[name] = summary.get(name, 0) + amount
# sorted_res = sorted(summary.items(), key=lambda x: x[1], reverse=True)
# return "\n".join([f"{name}: ${amt:,.2f}" for name, amt in sorted_res[:limit]])
# # SELECT LOGIC (Listing data)
# elif query_type == "select":
# rows = data.get(table_name, [])
# if not rows: return f"No data found in {table_name}."
# # Clean up the output for better readability in the UI
# formatted_list = []
# for row in rows[:limit]:
# # If it's an order, try to make it readable
# if table_name == "orders":
# formatted_list.append(f"Order ID: {row.get('ID')} | Date: {row.get('createdAt', 'N/A')} | Status: {row.get('lifecycleStatus', 'N/A')}")
# else:
# formatted_list.append(str(row))
# return "\n".join(formatted_list)
# except Exception as e:
# return f"Execution Error: {str(e)}"
# return "Invalid query parameters generated by LLM."
# # =========================
# # GRADIO UI
# # =========================
# def main_process(user_prompt):
# data = fetch_data()
# query_obj = generate_query(user_prompt)
# print(f"DEBUG: Logic identified as: {query_obj}")
# return execute_query(query_obj, data)
# with gr.Blocks(theme=gr.themes.Default(primary_hue="blue")) as demo:
# gr.Markdown("# 🚀 Enterprise SAP AI Assistant")
# with gr.Row():
# with gr.Column():
# inp = gr.Textbox(placeholder="Try: 'List all orders' or 'Who are my top customers?'", label="Query")
# btn = gr.Button("Execute", variant="primary")
# with gr.Column():
# out = gr.Textbox(label="Result", lines=15)
# btn.click(main_process, inputs=inp, outputs=out)
# demo.launch(server_name="0.0.0.0")
# ====================================================================================
# import gradio as gr
# import os
# import time
# import requests
# import json
# import re
# import numpy as np
# import faiss
# from huggingface_hub import hf_hub_download
# from llama_cpp import Llama
# from sentence_transformers import SentenceTransformer
# # =========================
# # CONFIG & MODEL LOAD
# # =========================
# # Using the 3B model as the "Brain" and SentenceTransformer as the "Searcher"
# model_path = hf_hub_download(
# repo_id="Qwen/Qwen2.5-Coder-3B-Instruct-GGUF",
# filename="qwen2.5-coder-3b-instruct-q4_k_m.gguf"
# )
# llm = Llama(model_path=model_path, n_ctx=2048, n_threads=4)
# embed_model = SentenceTransformer('all-MiniLM-L6-v2')
# # SAP URLs (Placeholders)
# urls = {
# "customers": os.getenv("sap_cap_service_url_customers"),
# "orders": os.getenv("sap_cap_service_url_salesorders"),
# }
# # =========================
# # DATA FETCHING
# # =========================
# def fetch_all_data():
# # In a real scenario, use your Auth token logic here
# # For now, this combines all text data for the RAG engine
# all_docs = []
# # Mock retrieval for demonstration - Replace with your fetch_data() logic
# raw_data = {
# "customers": [{"name": "Acme Corp", "city": "Berlin"}, {"name": "Stark Ind", "city": "NY"}],
# "orders": [{"ID": "101", "date": "2023-10-01", "total": 500}]
# }
# for table, rows in raw_data.items():
# for row in rows:
# all_docs.append(f"Table: {table} | Data: {json.dumps(row)}")
# return all_docs
# # =========================
# # RAG ENGINE (The "Filter")
# # =========================
# def get_relevant_context(query, documents):
# if not documents: return ""
# # 1. Create Embeddings
# doc_embeddings = embed_model.encode(documents)
# query_embedding = embed_model.encode([query])
# # 2. Setup Vector DB (FAISS)
# dimension = doc_embeddings.shape[1]
# index = faiss.IndexFlatL2(dimension)
# index.add(np.array(doc_embeddings).astype('float32'))
# # 3. Search for top 3 most relevant rows
# D, I = index.search(np.array(query_embedding).astype('float32'), k=3)
# retrieved_context = "\n".join([documents[i] for i in I[0]])
# return retrieved_context
# # =========================
# # MAIN PROCESS
# # =========================
# def rag_process(user_prompt):
# # Step 1: Get all data
# documents = fetch_all_data()
# # Step 2: RAG Filtering (Retrieve only what matters)
# context = get_relevant_context(user_prompt, documents)
# # Step 3: LLM Generation (Only looks at filtered data)
# prompt = f"""<|im_start|>system
# You are an SAP assistant. Use the provided context to answer the user request.
# Context:
# {context}
# <|im_end|>
# <|im_start|>user
# {user_prompt}
# <|im_end|>
# <|im_start|>assistant
# """
# output = llm(prompt, max_tokens=256, temperature=0.1, stop=["<|im_end|>"])
# return output["choices"][0]["text"].strip()
# # =========================
# # UI
# # =========================
# demo = gr.Interface(
# fn=rag_process,
# inputs=gr.Textbox(
# lines=2,
# placeholder="Ask about SAP data (e.g., Find customers in Berlin)"
# ),
# outputs="text",
# title="SAP Assistant",
# description="Ask questions about SAP data using RAG"
# )
# demo.launch()
# =============================================================================================================
import gradio as gr
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import requests
import os
import pandas as pd
import time
# =========================
# ENV VARIABLES
# =========================
client_id = "sb-cap1-3c4588e0trial-dev!t617058"
client_secret = "acbe78be-ead5-4b12-b3b4-32fdb27d0f5f$hFj-hDXxwHkNHC-CAvv-OKSr3KH96nLL4KqwIg7M8D8="
token_url = "https://3c4588e0trial.authentication.us10.hana.ondemand.com/oauth/token"
cap_service_url_customers = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/Customers?$top=2"
cap_service_url_products = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/Products?$top=2"
cap_service_url_saleorders = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/SalesOrders?$top=2"
cap_service_url_saleorderitems = "https://3c4588e0trial-dev-cap1-srv.cfapps.us10-001.hana.ondemand.com/odata/v4/sales/SalesOrderItems?$top=2"
# =========================
# GLOBAL VARIABLES
# =========================
access_token = None
cached_customers = None
cached_products = None
cached_salesorders = None
cached_salesorderitems = None
last_refresh = 0
# =========================
# LOAD GGUF MODEL (once)
# =========================
print("Downloading/Locating GGUF model...")
model_path = hf_hub_download(
repo_id="Qwen/Qwen2.5-1.5B-Instruct-GGUF",
filename="qwen2.5-1.5b-instruct-q4_k_m.gguf"
)
print("Loading model into Llama CPP...")
llm = Llama(
model_path=model_path,
n_ctx=4096, # Increased context window to ensure SAP data fits
n_threads=None, # Automatically uses maximum available CPU threads
verbose=False # Set to True if you want to see inference speed logs
)
# =========================
# TOKEN FUNCTION
# =========================
def generate_sap_xsuaa_token():
global access_token
print("Generating SAP token...")
auth_response = requests.post(
token_url,
data={"grant_type": "client_credentials"},
auth=(client_id, client_secret)
)
if auth_response.status_code != 200:
print("Token Error:", auth_response.text)
return None
access_token = auth_response.json().get("access_token")
print("Token generated!")
return access_token
# =========================
# FETCH SAP DATA
# =========================
def fetch_sap_data():
global access_token
if not access_token:
generate_sap_xsuaa_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
res1 = requests.get(cap_service_url_customers, headers=headers)
res2 = requests.get(cap_service_url_products, headers=headers)
res3 = requests.get(cap_service_url_saleorders, headers=headers)
res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# Retry if token expired
if res1.status_code in [400, 401, 403]:
print("Token expired. Regenerating...")
access_token = None
generate_sap_xsuaa_token()
headers["Authorization"] = f"Bearer {access_token}"
res1 = requests.get(cap_service_url_customers, headers=headers)
res2 = requests.get(cap_service_url_products, headers=headers)
res3 = requests.get(cap_service_url_saleorders, headers=headers)
res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
df_customers = pd.DataFrame(res1.json().get("value", []))
df_products = pd.DataFrame(res2.json().get("value", []))
df_saleorders = pd.DataFrame(res3.json().get("value", []))
df_saleorderitems = pd.DataFrame(res4.json().get("value", []))
# Keep only important columns if they exist
if not df_customers.empty: df_customers = df_customers[["ID", "name", "country", "industry"]]
if not df_products.empty: df_products = df_products[["ID", "name", "category", "price", "currency"]]
if not df_saleorders.empty: df_saleorders = df_saleorders[["ID", "customer_ID", "orderDate", "status"]]
if not df_saleorderitems.empty: df_saleorderitems = df_saleorderitems[["ID", "parent_ID", "product_ID", "quantity", "netAmount"]]
return df_customers, df_products, df_saleorders, df_saleorderitems
# =========================
# CACHE LOGIC
# =========================
def get_cached_data():
global cached_customers, cached_products, cached_salesorders, cached_salesorderitems, last_refresh
# Refresh every 5 minutes (300 seconds, not 3000)
if time.time() - last_refresh > 300 or cached_customers is None:
print("Refreshing SAP data...")
cached_customers, cached_products, cached_salesorders, cached_salesorderitems = fetch_sap_data()
last_refresh = time.time()
return cached_customers, cached_products, cached_salesorders, cached_salesorderitems
# =========================
# MAIN FUNCTION (LLM)
# =========================
def generate_response(user_prompt):
try:
# Get cached SAP data
df_customers, df_products, df_saleorders, df_saleorderitems = get_cached_data()
# Reduce size
customers_text = df_customers.to_string(index=False) if not df_customers.empty else "No Data"
products_text = df_products.to_string(index=False) if not df_products.empty else "No Data"
saleorders_text = df_saleorders.to_string(index=False) if not df_saleorders.empty else "No Data"
saleorderitems_text = df_saleorderitems.to_string(index=False) if not df_saleorderitems.empty else "No Data"
# Build system prompt
system_prompt = f"""
You are an intelligent Corporate SAP Assistant bot.
Your sole purpose is to answer the user's questions based strictly on the database records provided to you.
Customers Data: {customers_text}
Products Data: {products_text}
Sale orders Data: {saleorders_text}
Sale order items Data: {saleorderitems_text}
CRITICAL RULES:
1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided.
2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
4. TONE: Be concise, highly professional, and helpful.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# Generate response using llama-cpp-python
response = llm.create_chat_completion(
messages=messages,
max_tokens=150,
temperature=0.2,
)
# Extract the content from the response dictionary
generated_text = response["choices"][0]["message"]["content"].strip()
return generated_text
except Exception as e:
return f"Error: {str(e)}"
# =========================
# GRADIO UI + API
# =========================
with gr.Blocks() as demo:
user_input = gr.Textbox(label="User Question")
output = gr.Textbox(label="Response")
btn = gr.Button("Generate")
btn.click(
fn=generate_response,
inputs=[user_input],
outputs=output,
api_name="predict"
)
# REQUIRED for API exposure
demo.queue()
demo.launch()