Spc5 / app.py
Vedant104's picture
Update app.py
35e3341 verified
# from dotenv import load_dotenv
# from fastapi import FastAPI, Request, Response
# from botbuilder.core import BotFrameworkAdapter, BotFrameworkAdapterSettings, ActivityHandler, TurnContext
# from botbuilder.schema import Activity
# import gradio as gr
# import asyncio
# import os
# import sys
# import traceback
# import time
# import requests
# import pandas as pd
# from google import genai
# from google.genai import types
# import uvicorn
# # Load environment variables
# load_dotenv()
# # =========================
# # ENVIRONMENT VARIABLES
# # =========================
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
# cap_service_url_products = os.getenv("sap_cap_service_url_products")
# cap_service_url_saleorders = os.getenv("sap_cap_service_url_saleorders")
# cap_service_url_saleorderitems = os.getenv("sap_cap_service_url_saleorderitems")
# # Bot Framework Credentials
# APP_ID = os.environ.get("MicrosoftAppId", "")
# APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")
# # Initialize FastAPI
# app = FastAPI()
# # =========================
# # GLOBAL VARIABLES (CACHE)
# # =========================
# access_token = None
# cached_customers = None
# cached_products = None
# cached_salesorders = None
# cached_salesorderitems = None
# last_refresh = 0
# # =========================
# # SAP FETCH LOGIC
# # =========================
# def generate_sap_xsuaa_token():
# global access_token
# print("Generating SAP token...")
# auth_response = requests.post(
# token_url,
# data={"grant_type": "client_credentials"},
# auth=(client_id, client_secret)
# )
# if auth_response.status_code != 200:
# print("Token Error:", auth_response.text)
# return None
# access_token = auth_response.json().get("access_token")
# print("Token generated!")
# return access_token
# def fetch_sap_data():
# global access_token
# if not access_token:
# generate_sap_xsuaa_token()
# headers = {
# "Authorization": f"Bearer {access_token}",
# "Accept": "application/json"
# }
# try:
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# if res1.status_code in [400, 401, 403]:
# print("Token expired. Regenerating...")
# access_token = None
# generate_sap_xsuaa_token()
# headers["Authorization"] = f"Bearer {access_token}"
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# if res1.status_code == 200 and res2.status_code == 200:
# df_customers = pd.DataFrame(res1.json().get("value", []))[["ID", "name", "country", "industry"]]
# df_products = pd.DataFrame(res2.json().get("value", []))[["ID", "name", "category", "price", "currency"]]
# df_saleorders = pd.DataFrame(res3.json().get("value", []))[["ID", "customer_ID", "orderDate", "status"]]
# df_saleorderitems = pd.DataFrame(res4.json().get("value", []))[["ID", "parent_ID", "product_ID", "quantity", "netAmount"]]
# return df_customers, df_products, df_saleorders, df_saleorderitems
# else:
# raise Exception("SAP returned a non-200 status code.")
# except Exception as e:
# print(f"Error fetching from SAP: {e}")
# return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
# def get_cached_data():
# global cached_customers, cached_products, cached_salesorders, cached_salesorderitems, last_refresh
# if time.time() - last_refresh > 3000 or cached_customers is None:
# print("Refreshing SAP data...")
# cached_customers, cached_products, cached_salesorders, cached_salesorderitems = fetch_sap_data()
# last_refresh = time.time()
# return cached_customers, cached_products, cached_salesorders, cached_salesorderitems
# # =========================
# # LLM LOGIC
# # =========================
# def generate_response(user_prompt):
# try:
# # Move the AI Client initialization here so it doesn't crash on startup!
# google_api_key = os.getenv("google_api_key")
# if not google_api_key:
# return "Error: The google_api_key is missing from Hugging Face settings."
# ai_client = genai.Client(api_key=google_api_key)
# df_customers, df_products, df_saleorders, df_saleorderitems = get_cached_data()
# db_context = f"""
# Customers Data:
# {df_customers.to_string(index=False)}
# Products Data:
# {df_products.to_string(index=False)}
# Sale Orders Data:
# {df_saleorders.to_string(index=False)}
# Sale Order Items Data:
# {df_saleorderitems.to_string(index=False)}
# """
# system_prompt = f"""
# Your purpose is to answer the user's questions based strictly on the database records provided to you.
# {db_context}
# CRITICAL RULES:
# 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided above.
# 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
# 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
# 4. TONE: Be concise, highly professional, and helpful.
# """
# response = ai_client.models.generate_content(
# model="gemini-2.5-flash", # Restored to Gemini model for the Google SDK
# contents=user_prompt,
# config=types.GenerateContentConfig(system_instruction=system_prompt)
# )
# return response.text
# except Exception as e:
# return f"Error communicating with AI/SAP: {str(e)}"
# # =========================
# # BOT FRAMEWORK LOGIC
# # =========================
# class MyBot(ActivityHandler):
# async def on_message_activity(self, turn_context: TurnContext):
# user_message = turn_context.activity.text
# await turn_context.send_activity("Thinking... fetching SAP data and generating answer ⏳")
# conversation_reference = TurnContext.get_conversation_reference(turn_context.activity)
# asyncio.create_task(self.run_llm_in_background(user_message, conversation_reference))
# async def run_llm_in_background(self, user_message: str, conversation_reference):
# try:
# bot_reply = await asyncio.to_thread(generate_response, user_message)
# except Exception as e:
# bot_reply = f"Error: {str(e)}"
# async def send_final_reply(turn_context: TurnContext):
# await turn_context.send_activity(bot_reply)
# await adapter.continue_conversation(conversation_reference, send_final_reply, APP_ID)
# adapter_settings = BotFrameworkAdapterSettings(APP_ID, APP_PASSWORD)
# adapter = BotFrameworkAdapter(adapter_settings)
# bot = MyBot()
# # =========================
# # FASTAPI ROUTES
# # =========================
# @app.post("/api/messages")
# async def messages(req: Request):
# if "application/json" in req.headers.get("Content-Type", ""):
# body = await req.json()
# else:
# return Response(status_code=415)
# activity = Activity().deserialize(body)
# auth_header = req.headers.get("Authorization", "")
# try:
# await adapter.process_activity(activity, auth_header, bot.on_turn)
# return Response(status_code=201)
# except Exception as e:
# print(f"Error processing activity: {e}", file=sys.stderr)
# traceback.print_exc()
# raise e
# # =========================
# # GRADIO UI (NEW)
# # =========================
# # with gr.Blocks() as demo:
# # gr.Markdown("The backend API for Teams bot")
# # app = gr.mount_gradio_app(app, demo, path="/")
# # with gr.Blocks() as demo:
# # gr.Markdown("Backend running...")
# # demo.queue().launch(server_name="0.0.0.0", server_port=7860)
# with gr.Blocks() as demo:
# gr.Markdown("Backend running...")
# # 1. Glue the UI to your FastAPI app so the /api/messages route actually works!
# app = gr.mount_gradio_app(app, demo, path="/")
# # 2. Actually START the FastAPI server on port 7860 so Hugging Face doesn't crash
# uvicorn.run(app, host="0.0.0.0", port=7860)
# =============================================================================================================================
# import os
# import sys
# import time
# import asyncio
# import traceback
# import requests
# import pandas as pd
# import gradio as gr
# import uvicorn
# from fastapi import FastAPI, Request, Response
# from dotenv import load_dotenv
# from botbuilder.core import (
# BotFrameworkAdapter,
# BotFrameworkAdapterSettings,
# ActivityHandler,
# TurnContext
# )
# from botbuilder.schema import Activity
# from google import genai
# from google.genai import types
# # Load local environment variables (if testing locally)
# load_dotenv()
# # =========================
# # ENVIRONMENT VARIABLES
# # =========================
# # SAP Credentials
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
# cap_service_url_products = os.getenv("sap_cap_service_url_products")
# cap_service_url_saleorders = os.getenv("sap_cap_service_url_saleorders")
# cap_service_url_saleorderitems = os.getenv("sap_cap_service_url_saleorderitems")
# # Bot Framework Credentials
# APP_ID = os.environ.get("MicrosoftAppId", "")
# APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")
# # Gemini API Key
# GOOGLE_API_KEY = os.environ.get("google_api_key")
# # Initialize FastAPI
# app = FastAPI()
# # =========================
# # GLOBAL VARIABLES (CACHE)
# # =========================
# access_token = None
# cached_customers = pd.DataFrame()
# cached_products = pd.DataFrame()
# cached_salesorders = pd.DataFrame()
# cached_salesorderitems = pd.DataFrame()
# last_refresh = 0
# # =========================
# # SAP FETCH LOGIC
# # =========================
# def generate_sap_xsuaa_token():
# global access_token
# print("Generating SAP token...")
# try:
# auth_response = requests.post(
# token_url,
# data={"grant_type": "client_credentials"},
# auth=(client_id, client_secret),
# timeout=10
# )
# if auth_response.status_code != 200:
# print("Token Error:", auth_response.text)
# return None
# access_token = auth_response.json().get("access_token")
# print("Token generated successfully!")
# return access_token
# except Exception as e:
# print(f"Error generating SAP token: {e}")
# return None
# def fetch_sap_data():
# global access_token
# if not access_token:
# generate_sap_xsuaa_token()
# headers = {
# "Authorization": f"Bearer {access_token}",
# "Accept": "application/json"
# }
# try:
# res1 = requests.get(cap_service_url_customers, headers=headers, timeout=15)
# res2 = requests.get(cap_service_url_products, headers=headers, timeout=15)
# res3 = requests.get(cap_service_url_saleorders, headers=headers, timeout=15)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers, timeout=15)
# # Handle token expiration
# if res1.status_code in [400, 401, 403]:
# print("Token expired. Regenerating...")
# generate_sap_xsuaa_token()
# headers["Authorization"] = f"Bearer {access_token}"
# res1 = requests.get(cap_service_url_customers, headers=headers, timeout=15)
# res2 = requests.get(cap_service_url_products, headers=headers, timeout=15)
# res3 = requests.get(cap_service_url_saleorders, headers=headers, timeout=15)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers, timeout=15)
# if res1.status_code == 200 and res2.status_code == 200:
# df_cust = pd.DataFrame(res1.json().get("value", []))
# df_prod = pd.DataFrame(res2.json().get("value", []))
# df_so = pd.DataFrame(res3.json().get("value", []))
# df_soi = pd.DataFrame(res4.json().get("value", []))
# # Safely filter columns only if they exist to prevent KeyError
# if not df_cust.empty: df_cust = df_cust[["ID", "name", "country", "industry"]]
# if not df_prod.empty: df_prod = df_prod[["ID", "name", "category", "price", "currency"]]
# if not df_so.empty: df_so = df_so[["ID", "customer_ID", "orderDate", "status"]]
# if not df_soi.empty: df_soi = df_soi[["ID", "parent_ID", "product_ID", "quantity", "netAmount"]]
# return df_cust, df_prod, df_so, df_soi
# else:
# raise Exception(f"SAP returned non-200. Cust: {res1.status_code}, Prod: {res2.status_code}")
# except Exception as e:
# print(f"Error fetching from SAP: {e}")
# return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
# def get_cached_data():
# global cached_customers, cached_products, cached_salesorders, cached_salesorderitems, last_refresh
# # Refresh cache every 50 minutes (3000 seconds)
# if time.time() - last_refresh > 3000 or cached_customers.empty:
# print("Refreshing SAP data cache...")
# cached_customers, cached_products, cached_salesorders, cached_salesorderitems = fetch_sap_data()
# last_refresh = time.time()
# return cached_customers, cached_products, cached_salesorders, cached_salesorderitems
# # =========================
# # LLM LOGIC
# # =========================
# def generate_response(user_prompt: str) -> str:
# try:
# if not GOOGLE_API_KEY:
# return "Error: The `google_api_key` is missing from Hugging Face Space secrets."
# ai_client = genai.Client(api_key=GOOGLE_API_KEY)
# df_customers, df_products, df_saleorders, df_saleorderitems = get_cached_data()
# db_context = f"""
# Customers Data:
# {df_customers.to_string(index=False)}
# Products Data:
# {df_products.to_string(index=False)}
# Sale Orders Data:
# {df_salesorders.to_string(index=False)}
# Sale Order Items Data:
# {df_salesorderitems.to_string(index=False)}
# """
# system_prompt = f"""
# Your purpose is to answer the user's questions based strictly on the database records provided to you.
# {db_context}
# CRITICAL RULES:
# 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided above.
# 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
# 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
# 4. TONE: Be concise, highly professional, and helpful.
# """
# response = ai_client.models.generate_content(
# model="gemini-2.5-flash",
# contents=user_prompt,
# config=types.GenerateContentConfig(system_instruction=system_prompt)
# )
# return response.text
# except Exception as e:
# return f"Error communicating with AI/SAP: {str(e)}"
# # =========================
# # BOT FRAMEWORK LOGIC
# # =========================
# class MyBot(ActivityHandler):
# async def on_message_activity(self, turn_context: TurnContext):
# user_message = turn_context.activity.text
# # 1. Immediately send "Thinking..." to prevent Teams/Emulator timeout
# await turn_context.send_activity("Thinking... fetching SAP data and generating answer ⏳")
# # 2. Get reference to continue the conversation later
# conversation_reference = TurnContext.get_conversation_reference(turn_context.activity)
# # 3. Trigger the background processing
# asyncio.create_task(self.process_and_reply(user_message, conversation_reference))
# async def process_and_reply(self, user_message: str, reference):
# # Run the synchronous SAP/LLM code in a background thread
# try:
# bot_reply = await asyncio.to_thread(generate_response, user_message)
# except Exception as e:
# bot_reply = f"Error generating response: {str(e)}"
# # Send the final response back to the user
# async def send_final_reply(turn_context: TurnContext):
# await turn_context.send_activity(bot_reply)
# await adapter.continue_conversation(reference, send_final_reply, APP_ID)
# adapter_settings = BotFrameworkAdapterSettings(APP_ID, APP_PASSWORD)
# adapter = BotFrameworkAdapter(adapter_settings)
# bot = MyBot()
# # Global error handler for Bot Framework
# async def on_error(context: TurnContext, error: Exception):
# print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr)
# traceback.print_exc()
# await context.send_activity("The bot encountered an error or bug.")
# adapter.on_turn_error = on_error
# # =========================
# # FASTAPI ROUTES
# # =========================
# @app.post("/api/messages")
# async def messages(req: Request):
# if "application/json" in req.headers.get("Content-Type", ""):
# body = await req.json()
# else:
# return Response(status_code=415)
# activity = Activity().deserialize(body)
# auth_header = req.headers.get("Authorization", "")
# try:
# await adapter.process_activity(activity, auth_header, bot.on_turn)
# return Response(status_code=201)
# except Exception as e:
# raise e
# # =========================
# # GRADIO UI (Landing Page)
# # =========================
# with gr.Blocks() as demo:
# gr.Markdown("The FastAPI server")
# # Mount Gradio onto the FastAPI app so both run together
# app = gr.mount_gradio_app(app, demo, path="/")
# # =========================
# # RUN SERVER
# # =========================
# if __name__ == "__main__":
# # Host 0.0.0.0 and port 7860 are REQUIRED for Hugging Face Spaces
# uvicorn.run(app, host="0.0.0.0", port=7860)
# ==========================================================================================================
# import gradio as gr
# import requests
# import os
# import pandas as pd
# import time
# from dotenv import load_dotenv
# import pandas as pd
# from google import genai
# from google.genai import types
# import time
# load_dotenv()
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
# cap_service_url_products = os.getenv("sap_cap_service_url_products")
# cap_service_url_saleorders = os.getenv("sap_cap_service_url_saleorders")
# cap_service_url_saleorderitems = os.getenv("sap_cap_service_url_saleorderitems")
# APP_ID = os.environ.get("MicrosoftAppId", "")
# APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")
# GOOGLE_API_KEY = os.environ.get("google_api_key")
# # =========================
# # GLOBAL VARIABLES (CACHE)
# # =========================
# access_token = None
# cached_customers = pd.DataFrame()
# cached_products = pd.DataFrame()
# cached_salesorders = pd.DataFrame()
# cached_salesorderitems = pd.DataFrame()
# last_refresh = 0
# # =========================
# # TOKEN FUNCTION
# # =========================
# def generate_sap_xsuaa_token():
# global access_token
# print("Generating SAP token...")
# auth_response = requests.post(
# token_url,
# data={"grant_type": "client_credentials"},
# auth=(client_id, client_secret)
# )
# if auth_response.status_code != 200:
# print("Token Error:", auth_response.text)
# return None
# access_token = auth_response.json().get("access_token")
# print("Token generated!")
# return access_token
# # =========================
# # FETCH SAP DATA
# # =========================
# def fetch_sap_data():
# global access_token
# if not access_token:
# generate_sap_xsuaa_token()
# headers = {
# "Authorization": f"Bearer {access_token}",
# "Accept": "application/json"
# }
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# # Retry if token expired
# if res1.status_code in [400,401,403]:
# print("Token expired. Regenerating...")
# access_token = None
# generate_sap_xsuaa_token()
# headers["Authorization"] = f"Bearer {access_token}"
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# df_customers = pd.DataFrame(res1.json()["value"])
# df_products = pd.DataFrame(res2.json()["value"])
# df_salesorders = pd.DataFrame(res3.json()["value"])
# df_saleorderitems = pd.DataFrame(res4.json()["value"])
# # Keep only important columns
# df_customers = df_customers[["ID","name","country","industry"]]
# df_products = df_products[["ID","name","category","price","currency"]]
# df_salesorders = df_salesorders[["ID","customer_ID","orderDate","status"]]
# df_saleorderitems = df_saleorderitems[["ID","parent_ID","product_ID","quantity","netAmount"]]
# return df_customers, df_products, df_salesorders, df_saleorderitems
# # =========================
# # CACHE LOGIC
# # =========================
# def get_cached_data():
# global cached_customers, cached_products,cached_salesorders,cached_salesorderitems, last_refresh
# # Refresh every 50 minutes
# if time.time() - last_refresh > 3000 or cached_customers is None:
# print("Refreshing SAP data...")
# cached_customers, cached_products,cached_salesorders,cached_salesorderitems = fetch_sap_data()
# last_refresh = time.time()
# return cached_customers, cached_products,cached_salesorders,cached_salesorderitems
# # =========================
# # MAIN FUNCTION (LLM)
# # =========================
# # def generate_response(user_prompt):
# # try:
# # if not GOOGLE_API_KEY:
# # return "Error: The `google api key` is missing from Hugging Face Space secrets."
# # ai_client = genai.Client(api_key=GOOGLE_API_KEY)
# # df_customers, df_products, df_salesorders, df_salesorderitems = get_cached_data()
# # db_context = f"""
# # Customers Data:
# # {df_customers.to_string(index=False)}
# # Products Data:
# # {df_products.to_string(index=False)}
# # Sales Orders Data:
# # {df_salesorders.to_string(index=False)}
# # Sales Order Items Data:
# # {df_salesorderitems.to_string(index=False)}
# # """
# # system_prompt = f"""
# # Your purpose is to answer the user's questions based strictly on the database records provided to you.
# # {db_context}
# # CRITICAL RULES:
# # 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided above.
# # 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
# # 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
# # 4. TONE: Be concise, highly professional, and helpful.
# # """
# # response = ai_client.models.generate_content(
# # model = "gemini-2.5-pro",
# # contents=user_prompt,
# # config=types.GenerateContentConfig(system_instruction=system_prompt)
# # )
# # return response.text
# # except Exception as e:
# # return f"Error communicating with AI/SAP: {str(e)}"
# def generate_response(user_prompt):
# try:
# if not GOOGLE_API_KEY:
# return "Error: The `google api key` is missing from Hugging Face Space secrets."
# ai_client = genai.Client(api_key=GOOGLE_API_KEY)
# df_customers, df_products, df_salesorders, df_salesorderitems = get_cached_data()
# db_context = f"""
# Customers Data:
# {df_customers.to_string(index=False)}
# Products Data:
# {df_products.to_string(index=False)}
# Sales Orders Data:
# {df_salesorders.to_string(index=False)}
# Sales Order Items Data:
# {df_salesorderitems.to_string(index=False)}
# """
# system_prompt = f"""
# Your purpose is to answer the user's questions based strictly on the database records provided to you.
# {db_context}
# CRITICAL RULES:
# 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided above.
# 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
# 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
# 4. TONE: Be concise, highly professional, and helpful.
# """
# # Implement a retry mechanism for 503 Server Busy errors
# max_retries = 3
# for attempt in range(max_retries):
# try:
# response = ai_client.models.generate_content(
# # model="gemini-2.5-flash", # To test Gemma, change this to: "gemma-3-4b-it"
# model = "gemini-3.1-flash-live-preview",
# contents=user_prompt,
# config=types.GenerateContentConfig(system_instruction=system_prompt)
# )
# return response.text
# except Exception as api_e:
# # If the error is a 503 and we haven't run out of retries, wait and try again
# if "503" in str(api_e) and attempt < max_retries - 1:
# wait_time = 2 ** attempt # Exponential backoff: waits 1s, then 2s...
# print(f"API busy (503). Retrying in {wait_time} seconds...")
# time.sleep(wait_time)
# else:
# # If it's a different error (like Auth) or we are out of retries, fail out
# raise api_e
# except Exception as e:
# return f"Error communicating with AI/SAP: {str(e)}"
# # =========================
# # GRADIO UI + API
# # =========================
# with gr.Blocks() as demo:
# user_input = gr.Textbox(label="User Question")
# output = gr.Textbox(label="Response")
# btn = gr.Button("Generate")
# btn.click(
# fn=generate_response,
# inputs=[user_input],
# outputs=output,
# api_name="predict"
# )
# # REQUIRED for API exposure
# demo.queue()
# demo.launch()
# =========================================================================================================================
# import gradio as gr
# import requests
# import os
# import pandas as pd
# import time
# from dotenv import load_dotenv
# from google import genai
# from google.genai import types
# load_dotenv()
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
# cap_service_url_products = os.getenv("sap_cap_service_url_products")
# cap_service_url_saleorders = os.getenv("sap_cap_service_url_saleorders")
# cap_service_url_saleorderitems = os.getenv("sap_cap_service_url_saleorderitems")
# APP_ID = os.environ.get("MicrosoftAppId", "")
# APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")
# GOOGLE_API_KEY = os.environ.get("google_api_key")
# # =========================
# # GLOBAL VARIABLES (CACHE)
# # =========================
# access_token = None
# cached_customers = pd.DataFrame()
# cached_products = pd.DataFrame()
# cached_salesorders = pd.DataFrame()
# cached_salesorderitems = pd.DataFrame()
# last_refresh = 0
# # =========================
# # TOKEN FUNCTION
# # =========================
# def generate_sap_xsuaa_token():
# global access_token
# print("Generating SAP token...")
# auth_response = requests.post(
# token_url,
# data={"grant_type": "client_credentials"},
# auth=(client_id, client_secret)
# )
# if auth_response.status_code != 200:
# print("Token Error:", auth_response.text)
# return None
# access_token = auth_response.json().get("access_token")
# print("Token generated!")
# return access_token
# # =========================
# # FETCH SAP DATA
# # =========================
# def fetch_sap_data():
# global access_token
# if not access_token:
# generate_sap_xsuaa_token()
# headers = {
# "Authorization": f"Bearer {access_token}",
# "Accept": "application/json"
# }
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# # Retry if token expired
# if res1.status_code in [400,401,403]:
# print("Token expired. Regenerating...")
# access_token = None
# generate_sap_xsuaa_token()
# headers["Authorization"] = f"Bearer {access_token}"
# res1 = requests.get(cap_service_url_customers, headers=headers)
# res2 = requests.get(cap_service_url_products, headers=headers)
# res3 = requests.get(cap_service_url_saleorders, headers=headers)
# res4 = requests.get(cap_service_url_saleorderitems, headers=headers)
# df_customers = pd.DataFrame(res1.json()["value"])
# df_products = pd.DataFrame(res2.json()["value"])
# df_salesorders = pd.DataFrame(res3.json()["value"])
# df_saleorderitems = pd.DataFrame(res4.json()["value"])
# # Keep only important columns
# df_customers = df_customers[["ID","name","country","industry"]]
# df_products = df_products[["ID","name","category","price","currency"]]
# df_salesorders = df_salesorders[["ID","customer_ID","orderDate","status"]]
# df_saleorderitems = df_saleorderitems[["ID","parent_ID","product_ID","quantity","netAmount"]]
# return df_customers, df_products, df_salesorders, df_saleorderitems
# # =========================
# # CACHE LOGIC
# # =========================
# def get_cached_data():
# global cached_customers, cached_products,cached_salesorders,cached_salesorderitems, last_refresh
# # Refresh every 50 minutes
# if time.time() - last_refresh > 3000 or cached_customers is None:
# print("Refreshing SAP data...")
# cached_customers, cached_products,cached_salesorders,cached_salesorderitems = fetch_sap_data()
# last_refresh = time.time()
# return cached_customers, cached_products,cached_salesorders,cached_salesorderitems
# # =========================
# # MAIN FUNCTION (LLM)
# # =========================
# def generate_response(user_prompt):
# try:
# if not GOOGLE_API_KEY:
# return "Error: The `google api key` is missing from Hugging Face Space secrets."
# ai_client = genai.Client(api_key=GOOGLE_API_KEY)
# df_customers, df_products, df_salesorders, df_salesorderitems = get_cached_data()
# db_context = f"""
# Customers Data:
# {df_customers.to_string(index=False)}
# Products Data:
# {df_products.to_string(index=False)}
# Sales Orders Data:
# {df_salesorders.to_string(index=False)}
# Sales Order Items Data:
# {df_salesorderitems.to_string(index=False)}
# """
# system_prompt = f"""
# Your purpose is to answer the user's questions based strictly on the database records provided to you.
# {db_context}
# CRITICAL RULES:
# 1. NO HALLUCINATIONS: You must base your answer ONLY on the data provided above.
# 2. MISSING DATA: If the provided data does not contain the answer, do not guess. Say: "I could not find that information in the current SAP database."
# 3. FORMATTING: You must output your response in Markdown. Use bold text for important nouns and bullet points for lists to make it easy to read.
# 4. TONE: Be concise, highly professional, and helpful.
# """
# # FIX: Combine the system instructions and the user prompt into one string
# combined_prompt = f"{system_prompt}\n\nUSER QUESTION:\n{user_prompt}"
# # Implement a retry mechanism for 503 Server Busy errors
# max_retries = 3
# for attempt in range(max_retries):
# try:
# response = ai_client.models.generate_content(
# model="gemma-3-12b-it",
# contents=combined_prompt
# # FIX: Removed config=types.GenerateContentConfig(system_instruction=system_prompt)
# )
# return response.text
# except Exception as api_e:
# # If the error is a 503 and we haven't run out of retries, wait and try again
# if "503" in str(api_e) and attempt < max_retries - 1:
# wait_time = 2 ** attempt # Exponential backoff: waits 1s, then 2s...
# print(f"API busy (503). Retrying in {wait_time} seconds...")
# time.sleep(wait_time)
# else:
# # If it's a different error (like Auth) or we are out of retries, fail out
# raise api_e
# except Exception as e:
# return f"Error communicating with AI/SAP: {str(e)}"
# # =========================
# # GRADIO UI + API
# # =========================
# with gr.Blocks() as demo:
# user_input = gr.Textbox(label="User Question")
# output = gr.Textbox(label="Response")
# btn = gr.Button("Generate")
# btn.click(
# fn=generate_response,
# inputs=[user_input],
# outputs=output,
# api_name="predict"
# )
# # REQUIRED for API exposure
# demo.queue()
# demo.launch()
# =========================================================================================================
# import gradio as gr
# import requests
# import os
# import pandas as pd
# import time
# import asyncio
# import uvicorn
# from fastapi import FastAPI, Request
# from dotenv import load_dotenv
# from google import genai
# # =========================
# # LOAD ENV
# # =========================
# load_dotenv()
# client_id = os.getenv("sap_client_id")
# client_secret = os.getenv("sap_client_secret")
# token_url = os.getenv("sap_token_url")
# cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
# cap_service_url_products = os.getenv("sap_cap_service_url_products")
# cap_service_url_salesorders = os.getenv("sap_cap_service_url_saleorders")
# cap_service_url_salesorderitems = os.getenv("sap_cap_service_url_saleorderitems")
# GOOGLE_API_KEY = os.getenv("google_api_key")
# app = FastAPI()
# # =========================
# # CACHE
# # =========================
# access_token = None
# cached_data = {}
# last_refresh = 0
# # =========================
# # TOKEN
# # =========================
# def generate_token():
# global access_token
# res = requests.post(
# token_url,
# data={"grant_type": "client_credentials"},
# auth=(client_id, client_secret)
# )
# access_token = res.json().get("access_token")
# return access_token
# # =========================
# # GENERIC FETCH
# # =========================
# def fetch_api(url):
# global access_token
# if not access_token:
# generate_token()
# headers = {"Authorization": f"Bearer {access_token}"}
# res = requests.get(url, headers=headers)
# if res.status_code in [401,403]:
# generate_token()
# headers["Authorization"] = f"Bearer {access_token}"
# res = requests.get(url, headers=headers)
# return res.json().get("value", [])
# # =========================
# # FETCH ALL DATA
# # =========================
# def fetch_all_data():
# print("🔄 Fetching all SAP data...")
# customers = pd.DataFrame(fetch_api(cap_service_url_customers))
# products = pd.DataFrame(fetch_api(cap_service_url_products))
# orders = pd.DataFrame(fetch_api(cap_service_url_salesorders))
# order_items = pd.DataFrame(fetch_api(cap_service_url_salesorderitems))
# customers = customers[["ID","name","country","industry"]]
# products = products[["ID","name","category","price","currency"]]
# orders = orders[["ID","customer_ID","orderDate","status"]]
# order_items = order_items[["ID","parent_ID","product_ID","quantity","netAmount"]]
# return {
# "customers": customers,
# "products": products,
# "orders": orders,
# "order_items": order_items
# }
# # =========================
# # CACHE
# # =========================
# def get_data():
# global cached_data, last_refresh
# if time.time() - last_refresh > 3000 or not cached_data:
# cached_data = fetch_all_data()
# last_refresh = time.time()
# return cached_data
# # =========================
# # AI RESPONSE
# # =========================
# def generate_ai_response(user_prompt):
# try:
# data = get_data()
# context = f"""
# CUSTOMERS:
# {data['customers'].to_string(index=False)}
# PRODUCTS:
# {data['products'].to_string(index=False)}
# SALES ORDERS:
# {data['orders'].to_string(index=False)}
# ORDER ITEMS:
# {data['order_items'].to_string(index=False)}
# """
# prompt = f"""
# You are SAP assistant.
# Answer ONLY using data below.
# {context}
# User Question:
# {user_prompt}
# """
# client = genai.Client(api_key=GOOGLE_API_KEY)
# response = client.models.generate_content(
# model="gemma-3-12b-it", # your choice
# contents=prompt
# )
# return response.text
# except Exception as e:
# return f"Error: {str(e)}"
# # =========================
# # GOOGLE CHAT WEBHOOK
# # =========================
# @app.post("/api/messages")
# async def webhook(req: Request):
# body = await req.json()
# print("Incoming:", body)
# user_message = body.get("message", {}).get("text", "").strip()
# if not user_message:
# return {"text": "Hello 👋 Ask me about SAP data"}
# # if "ping" in user_message.lower():
# # return {"text": "Pong 🏓"}
# # ⚡ DIRECT RESPONSE (IMPORTANT)
# bot_reply = await asyncio.to_thread(generate_ai_response, user_message)
# return {"text": bot_reply}
# # =========================
# # GRADIO TEST UI
# # =========================
# with gr.Blocks() as demo:
# gr.Markdown("## SAP AI Assistant")
# inp = gr.Textbox(label="Ask")
# out = gr.Textbox(label="Response")
# btn = gr.Button("Ask")
# btn.click(generate_ai_response, inp, out)
# app = gr.mount_gradio_app(app, demo, path="/")
# # =========================
# # RUN
# # =========================
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=7860)
# ==========================================================================================
# worling code
import gradio as gr
import requests
import os
import pandas as pd
import time
import asyncio
import uvicorn
import json
import uuid
import matplotlib.pyplot as plt
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
from google import genai
import chromadb
# =========================
# LOAD ENV
# =========================
load_dotenv()
client_id = os.getenv("sap_client_id")
client_secret = os.getenv("sap_client_secret")
token_url = os.getenv("sap_token_url")
cap_service_url_customers = os.getenv("sap_cap_service_url_customers")
cap_service_url_products = os.getenv("sap_cap_service_url_products")
cap_service_url_salesorders = os.getenv("sap_cap_service_url_saleorders")
cap_service_url_salesorderitems = os.getenv("sap_cap_service_url_saleorderitems")
GOOGLE_API_KEY = os.getenv("google_api_key")
app = FastAPI()
# =========================
# MOUNT STATIC FOR IMAGES
# =========================
# This allows Google Chat to access the generated charts via URL
os.makedirs("static", exist_ok=True)
app.mount("/static", StaticFiles(directory="static"), name="static")
# =========================
# CHROMA DATABASE
# =========================
chroma_client = chromadb.CloudClient(
tenant=os.environ.get("CHROMA_TENANT"),
database=os.environ.get("CHROMA_DATABASE"),
api_key=os.environ.get("CHROMA_API_KEY"),
cloud_host="europe-west1.gcp.trychroma.com",
cloud_port=443
)
# Create or open your collection on the cloud
collection = chroma_client.get_or_create_collection(name="chat_history")
# =========================
# CACHE
# =========================
access_token = None
cached_data = {}
last_refresh = 0
# =========================
# TOKEN & FETCH
# =========================
def generate_token():
global access_token
res = requests.post(
token_url,
data={"grant_type": "client_credentials"},
auth=(client_id, client_secret)
)
access_token = res.json().get("access_token")
return access_token
def fetch_api(url):
global access_token
if not access_token:
generate_token()
headers = {"Authorization": f"Bearer {access_token}"}
res = requests.get(url, headers=headers)
if res.status_code in [401,403]:
generate_token()
headers["Authorization"] = f"Bearer {access_token}"
res = requests.get(url, headers=headers)
return res.json().get("value", [])
def fetch_all_data():
print("Fetching all SAP data...")
customers = pd.DataFrame(fetch_api(cap_service_url_customers))
products = pd.DataFrame(fetch_api(cap_service_url_products))
orders = pd.DataFrame(fetch_api(cap_service_url_salesorders))
order_items = pd.DataFrame(fetch_api(cap_service_url_salesorderitems))
customers = customers[["ID","name","country","industry"]]
products = products[["ID","name","category","price","currency"]]
orders = orders[["ID","customer_ID","orderDate","status"]]
order_items = order_items[["ID","parent_ID","product_ID","quantity","netAmount"]]
return {
"customers": customers,
"products": products,
"orders": orders,
"order_items": order_items
}
def get_data():
global cached_data, last_refresh
if time.time() - last_refresh > 3000 or not cached_data:
cached_data = fetch_all_data()
last_refresh = time.time()
return cached_data
# # =========================
# # AI RESPONSE & CHART GEN
# # =========================
# def generate_ai_response(user_prompt):
# try:
# # Fetch relevant past conversation context from ChromaDB
# past_context = ""
# try:
# results = collection.query(
# query_texts=[user_prompt],
# n_results=3,
# where={"user_id": user_id}
# )
# if results.get('documents') and results['documents'][0]:
# past_context = "\n".join(results['documents'][0])
# except Exception as e:
# print(f"ChromaDB Query Error: {e}")
# # Get current SAP Data
# data = get_data()
# context = f"""CUSTOMERS:\n{data['customers'].to_string(index=False)}\nPRODUCTS:\n{data['products'].to_string(index=False)}\nSALES ORDERS:\n{data['orders'].to_string(index=False)}\nORDER ITEMS:\n{data['order_items'].to_string(index=False)}"""
# prompt = f"""You are an SAP data assistant.
# Answer ONLY using the data below. Decide if a chart helps answer the user's question.
# {context}
# User Question: {user_prompt}
# OUTPUT STRICTLY IN THIS JSON FORMAT:
# {{
# "response_text": "If making a chart, provide a brief 1-2 sentence summary of what the chart shows. If no chart, provide the full text answer here.",
# "needs_chart": true or false,
# "chart_type": "bar" or "pie" or "line",
# "chart_title": "Title of Chart",
# "chart_labels": ["Label1", "Label2"],
# "chart_values": [10.5, 20.0]
# }}"""
# client = genai.Client(api_key=GOOGLE_API_KEY)
# response = client.models.generate_content(
# # model="gemma-3-12b-it",
# model = "gemma-4-26b-a4b-it",
# # model = "gemma-4-31b-it",
# contents=prompt
# )
# # Clean up possible markdown wrappers from the LLM
# raw_output = response.text.strip()
# if raw_output.startswith("```json"):
# raw_output = raw_output[7:-3].strip()
# elif raw_output.startswith("```"):
# raw_output = raw_output[3:-3].strip()
# parsed_data = json.loads(raw_output)
# # Grab the text/summary generated by the LLM
# result = {"text": parsed_data.get("response_text", "Here is the data visualization you requested.")}
# # Save the interaction to ChromaDB for future memory
# try:
# interaction_text = f"User Question: {user_prompt}\nBot Answer: {result['text']}"
# collection.add(
# documents=[interaction_text],
# metadatas=[{"user_id": user_id}],
# ids=[str(uuid.uuid4())]
# )
# except Exception as e:
# print(f"ChromaDB Add Error: {e}")
# # Generate the chart if requested
# if parsed_data.get("needs_chart") and parsed_data.get("chart_labels") and parsed_data.get("chart_values"):
# plt.figure(figsize=(8, 5))
# c_type = parsed_data.get("chart_type", "bar")
# labels = parsed_data["chart_labels"]
# values = parsed_data["chart_values"]
# title = parsed_data.get("chart_title", "SAP Data Chart")
# if c_type == "pie":
# plt.pie(values, labels=labels, autopct='%1.1f%%', startangle=140)
# elif c_type == "line":
# plt.plot(labels, values, marker='o', linestyle='-', color='b')
# else:
# plt.bar(labels, values, color='skyblue')
# plt.title(title)
# plt.xticks(rotation=45, ha='right')
# plt.tight_layout()
# # Save the chart
# filename = f"chart_{uuid.uuid4().hex}.png"
# filepath = os.path.join("static", filename)
# plt.savefig(filepath)
# plt.close()
# result["image_path"] = filepath
# return result
# except Exception as e:
# return {"text": f"Error: {str(e)}"}
# =========================
# AI RESPONSE & CHART GEN
# =========================
def generate_ai_response(user_prompt: str, user_id: str):
try:
# Initialize Google Client
client = genai.Client(api_key=GOOGLE_API_KEY)
# ---------------------------------------------------------
# 1. SEARCH CHROMA DB (Using Google Embeddings)
# ---------------------------------------------------------
past_context = ""
try:
print("Embedding search query...")
# Convert user prompt to vector (UPDATED MODEL)
query_emb_res = client.models.embed_content(
model="gemini-embedding-2",
contents=user_prompt
)
query_vector = query_emb_res.embeddings[0].values
# Search Chroma using the vector
results = collection.query(
query_embeddings=[query_vector],
n_results=3,
where={"user_id": user_id}
)
if results.get('documents') and results['documents'][0]:
past_context = "\n".join(results['documents'][0])
print("Found past context!")
except Exception as e:
print(f"ChromaDB Query Error: {e}")
# ---------------------------------------------------------
# 2. GET SAP DATA & GENERATE PROMPT
# ---------------------------------------------------------
data = get_data()
sap_data = f"""CUSTOMERS:\n{data['customers'].to_string(index=False)}\nPRODUCTS:\n{data['products'].to_string(index=False)}\nSALES ORDERS:\n{data['orders'].to_string(index=False)}\nORDER ITEMS:\n{data['order_items'].to_string(index=False)}"""
prompt = f"""You are an SAP data assistant.
Answer ONLY using the data below. Decide if a chart helps answer the user's question.
PAST CONVERSATION HISTORY:
{past_context if past_context else "No prior history."}
CURRENT SAP DATA:
{sap_data}
User Question: {user_prompt}
OUTPUT STRICTLY IN THIS JSON FORMAT:
{{
"response_text": "If making a chart, provide a brief summary. If no chart, provide the full text answer.",
"needs_chart": true or false,
"chart_type": "bar" or "pie" or "line",
"chart_title": "Title of Chart",
"chart_labels": ["Label1", "Label2"],
"chart_values": [10.5, 20.0]
}}"""
# ---------------------------------------------------------
# 3. GENERATE LLM RESPONSE
# ---------------------------------------------------------
response = client.models.generate_content(
# model="gemma-4-31b-it",
# model = "gemma-4-26b-a4b-it",
# model = "gemini-3.1-flash-lite-preview",
model = "gemini-3-flash-preview",
contents=prompt
)
raw_output = response.text.strip()
if raw_output.startswith("```json"):
raw_output = raw_output[7:-3].strip()
elif raw_output.startswith("```"):
raw_output = raw_output[3:-3].strip()
parsed_data = json.loads(raw_output)
result = {"text": parsed_data.get("response_text", "Here is the data visualization you requested.")}
# ---------------------------------------------------------
# 4. SAVE TO CHROMA DB (Using Google Embeddings)
# ---------------------------------------------------------
try:
interaction_text = f"User Question: {user_prompt}\nBot Answer: {result['text']}"
print("Embedding document to save...")
# Convert the text to save into a vector (UPDATED MODEL)
doc_emb_res = client.models.embed_content(
model="gemini-embedding-2",
contents=interaction_text
)
doc_vector = doc_emb_res.embeddings[0].values
# Save the text AND the vector to Chroma
collection.add(
documents=[interaction_text],
embeddings=[doc_vector],
metadatas=[{"user_id": user_id}],
ids=[str(uuid.uuid4())]
)
print("Successfully saved to ChromaDB!")
except Exception as e:
print(f"ChromaDB Add Error: {e}")
# ---------------------------------------------------------
# 5. GENERATE CHART (If requested)
# ---------------------------------------------------------
if parsed_data.get("needs_chart") and parsed_data.get("chart_labels") and parsed_data.get("chart_values"):
plt.figure(figsize=(8, 5))
c_type = parsed_data.get("chart_type", "bar")
labels = parsed_data["chart_labels"]
values = parsed_data["chart_values"]
title = parsed_data.get("chart_title", "SAP Data Chart")
if c_type == "pie":
plt.pie(values, labels=labels, autopct='%1.1f%%', startangle=140)
elif c_type == "line":
plt.plot(labels, values, marker='o', linestyle='-', color='b')
else:
plt.bar(labels, values, color='skyblue')
plt.title(title)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
filename = f"chart_{uuid.uuid4().hex}.png"
filepath = os.path.join("static", filename)
plt.savefig(filepath)
plt.close()
result["image_path"] = filepath
return result
except Exception as e:
return {"text": f"Error: {str(e)}"}
# =========================
# GOOGLE CHAT WEBHOOK
# =========================
@app.post("/api/messages")
async def webhook(req: Request):
body = await req.json()
user_message = body.get("message", {}).get("text", "").strip()
user_id = body.get("user", {}).get("name", "unknown_user")
if not user_message:
return {"text": "How may I help you?"}
# Dynamically resolve HF Space Base URL for hosting the image
host = req.headers.get('x-forwarded-host', req.headers.get('host', req.url.netloc))
scheme = req.headers.get('x-forwarded-proto', req.url.scheme)
base_url = f"{scheme}://{host}"
# ai_res = await asyncio.to_thread(generate_ai_response, user_message)
ai_res = await asyncio.to_thread(generate_ai_response, user_message, user_id)
text_reply = ai_res.get("text", "Here is your data.")
image_path = ai_res.get("image_path")
# If there is an image, return BOTH the summary text and the card
if image_path:
image_url = f"{base_url}/{image_path}"
return {
"text": text_reply, # This now contains your LLM summary!
"cardsV2": [
{
"cardId": "chartCard",
"card": {
"header": {
"title": "Data Visualization",
"subtitle": "Generated from SAP Data"
},
"sections": [
{
"widgets": [
{
"image": {
"imageUrl": image_url
}
}
]
}
]
}
}
]
}
# Otherwise, return just the text answer
return {"text": text_reply}
# =========================
# GRADIO TEST UI
# =========================
def gradio_wrapper(user_prompt):
res = generate_ai_response(user_prompt)
if "image_path" in res:
return res["text"], res["image_path"]
return res["text"], None
with gr.Blocks() as demo:
gr.Markdown("## SAP AI Assistant (With Charts)")
with gr.Row():
inp = gr.Textbox(label="Ask")
btn = gr.Button("Ask")
with gr.Row():
out_text = gr.Textbox(label="Response")
out_img = gr.Image(label="Chart", type="filepath")
btn.click(gradio_wrapper, inp, [out_text, out_img])
app = gr.mount_gradio_app(app, demo, path="/")
# =========================
# RUN
# =========================
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)