Spaces:
Build error
Build error
| import uuid | |
| import os | |
| import gradio as gr | |
| import pandas as pd | |
| import torch | |
| import numpy as np | |
| from sentence_transformers import util | |
| import google.generativeai as genai | |
| import chromadb | |
| from langchain_chroma import Chroma | |
| import gspread | |
| from google.oauth2.service_account import Credentials | |
| from langgraph.checkpoint.sqlite import SqliteSaver | |
| import sqlite3 | |
| import json | |
| from datetime import datetime | |
| import re | |
| from typing import Dict, List, Tuple | |
| # === Configuration === | |
| genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
| embedding_model = "models/embedding-001" | |
| llm_model_name = "models/gemma-3-4b-it" | |
| collection_name = "xeno_collection" | |
| # === Google Sheets Setup for Hugging Face === | |
| def get_google_sheets_credentials(): | |
| credentials_json = os.environ.get("GOOGLE_SHEETS_CREDENTIALS") | |
| if not credentials_json: | |
| raise ValueError("GOOGLE_SHEETS_CREDENTIALS environment variable not set.") | |
| credentials_dict = json.loads(credentials_json) | |
| scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] | |
| creds = Credentials.from_service_account_info(credentials_dict, scopes=scope) | |
| return creds | |
| # Authenticate with Google Sheets | |
| client_gspread = gspread.authorize(get_google_sheets_credentials()) | |
| # Open the Google Sheet | |
| sheet = client_gspread.open("Response_Log").sheet1 | |
| def log_response(question, answer, source_ids, knowledge_pairs, session_id): | |
| """ | |
| Log a question, answer, source IDs, and knowledge base question-answer pairs to the Google Sheet. | |
| Args: | |
| question (str): The question asked by the user. | |
| answer (str): The answer provided by the model. | |
| source_ids (str): Comma-separated list of source IDs used. | |
| knowledge_pairs (list): List of tuples containing (question, answer) from the knowledge base. | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| knowledge_question_1 = knowledge_pairs[0][0] if len(knowledge_pairs) > 0 else "N/A" | |
| knowledge_answer_1 = knowledge_pairs[0][1] if len(knowledge_pairs) > 0 else "N/A" | |
| knowledge_question_2 = knowledge_pairs[1][0] if len(knowledge_pairs) > 1 else "N/A" | |
| knowledge_answer_2 = knowledge_pairs[1][1] if len(knowledge_pairs) > 1 else "N/A" | |
| row = [ | |
| timestamp, | |
| session_id, | |
| question, | |
| answer, | |
| source_ids, | |
| knowledge_question_1, | |
| knowledge_answer_1, | |
| knowledge_question_2, | |
| knowledge_answer_2 | |
| ] | |
| try: | |
| sheet.append_row(row) | |
| print(f"Logged: {question} | Source IDs: {source_ids}") | |
| except Exception as e: | |
| print(f"Failed to log to Google Sheet: {e}") | |
| with open("/tmp/response_log.txt", "a") as f: | |
| f.write(f"{timestamp},{question},{answer},{source_ids},{knowledge_question_1},{knowledge_answer_1},{knowledge_question_2},{knowledge_answer_2}\n") | |
| # === LangGraph Memory Setup === | |
| conn = sqlite3.connect("xeno_memory.db", check_same_thread=False) | |
| memory = SqliteSaver(conn=conn) | |
| def update_memory(config, user_message, assistant_message): | |
| full_checkpoint = memory.get(config) or {} | |
| messages = full_checkpoint.get("channel_values", {}).get("messages", []) | |
| messages.append({"role": "user", "content": user_message}) | |
| messages.append({"role": "assistant", "content": assistant_message}) | |
| checkpoint_to_save = { | |
| "v": 1, | |
| "id": str(uuid.uuid4()), | |
| "ts": datetime.now().isoformat(), | |
| "channel_values": {"messages": messages}, | |
| "channel_versions": {}, | |
| "versions_seen": {}, | |
| } | |
| memory.put(config, checkpoint_to_save, {}, {}) | |
| # === Intent Classification System === | |
| class IntentClassifier: | |
| def __init__(self): | |
| # Define intent patterns and responses | |
| self.intent_patterns = { | |
| 'greeting': { | |
| 'patterns': [ | |
| r'\b(hi|hello|hey|good morning|good afternoon|good evening|greetings)\b', | |
| r'^(hi|hello|hey)[\s!.]*$', | |
| r'\b(how are you|how do you do)\b' | |
| ], | |
| 'responses': [ | |
| "Hello! I'm XENO Assistant. How can I help you with XENO financial services today?", | |
| "Hi there! I'm here to assist you with any questions about XENO services. What can I help you with?", | |
| "Good day! Welcome to XENO Support. How may I assist you today?" | |
| ] | |
| }, | |
| 'thanks': { | |
| 'patterns': [ | |
| r'\b(thank you|thanks|thank u|thx|appreciate|grateful)\b', | |
| r'^(thanks|thank you)[\s!.]*$', | |
| r'\b(much appreciated|thanks a lot|thank you so much)\b' | |
| ], | |
| 'responses': [ | |
| "You're welcome! Is there anything else I can help you with regarding XENO services?", | |
| "Happy to help! Feel free to ask if you have any other questions about XENO.", | |
| "Glad I could assist you! Let me know if you need help with anything else." | |
| ] | |
| }, | |
| 'goodbye': { | |
| 'patterns': [ | |
| r'\b(bye|goodbye|see you|farewell|take care|have a good day)\b', | |
| r'^(bye|goodbye)[\s!.]*$', | |
| r'\b(talk to you later|see you later|until next time)\b' | |
| ], | |
| 'responses': [ | |
| "Goodbye! Thank you for using XENO services. Have a great day!", | |
| "Take care! Feel free to return anytime you need help with XENO services.", | |
| "Have a wonderful day! Don't hesitate to reach out if you need assistance with XENO." | |
| ] | |
| } | |
| } | |
| def classify_intent(self, message: str) -> Tuple[str, str]: | |
| """ | |
| Classify the intent of a message and return appropriate response if it's a simple intent. | |
| Returns: (intent_name, response) - response is empty string if intent requires RAG | |
| """ | |
| message_lower = message.lower().strip() | |
| for intent_name, intent_data in self.intent_patterns.items(): | |
| for pattern in intent_data['patterns']: | |
| if re.search(pattern, message_lower, re.IGNORECASE): | |
| import random | |
| response = random.choice(intent_data['responses']) | |
| return intent_name, response | |
| return 'query', '' | |
| def is_simple_intent(self, intent: str) -> bool: | |
| """Check if intent can be handled without RAG""" | |
| simple_intents = ['greeting', 'thanks'] | |
| return intent in simple_intents | |
| # Initialize intent classifier | |
| intent_classifier = IntentClassifier() | |
| # === Load and Clean Knowledge Base === | |
| df_kb = pd.read_json("XENO_Uganda_KnowledgeBase_Advisory.json") | |
| df_kb.dropna(subset=['Content'], inplace=True) | |
| def prepare_documents(data): | |
| documents, metadatas, ids = [], [], [] | |
| for item in data: | |
| documents.append(f"Question: {item['Question']}\nAnswer: {item['Content']}") | |
| metadatas.append({ | |
| "question": item["Question"], | |
| "content": item["Content"], | |
| "section": item.get("Section", ""), | |
| "source": item.get("Source", ""), | |
| "owner": item.get("Owner", ""), | |
| "tag": item.get("Tag", ""), | |
| "id": item["ID"] | |
| }) | |
| ids.append(item["ID"]) | |
| return documents, metadatas, ids | |
| xeno_data_list = df_kb.to_dict('records') | |
| documents, metadatas, ids = prepare_documents(xeno_data_list) | |
| # === Setup ChromaDB === | |
| try: | |
| client = chromadb.PersistentClient(path="/tmp/xeno_db") | |
| try: | |
| collection = client.get_collection(name=collection_name) | |
| print(f"Loaded existing ChromaDB collection: {collection_name}") | |
| except: | |
| print(f"Creating new ChromaDB collection: {collection_name}") | |
| collection = client.create_collection(name=collection_name) | |
| collection.add(documents=documents, metadatas=metadatas, ids=ids) | |
| except Exception as e: | |
| print(f"Failed to initialize ChromaDB: {e}") | |
| raise | |
| vector_store = Chroma(client=client, collection_name=collection_name) | |
| retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 4}) | |
| # === Prompt System === | |
| SYSTEM_PROMPT = """You are a friendly XENO Support Assistant, an AI-powered helpful and professional customer service representative. | |
| Use only the information provided in the knowledge base context to answer user queries. | |
| Do not hallucinate. If context doesn't contain relevant info, say so in a calm polite manner by saying I'm sorry, I can't assist with that. | |
| Only use context that is clearly relevant to the user's question. | |
| For greetings like “hi” or “hello”, respond politely without using the context. | |
| remember previous conversations.""" | |
| # === Context Processing === | |
| def process_context(results, cosine_scores, max_results=2): | |
| sorted_indices = np.argsort(cosine_scores)[::-1][:max_results] | |
| formatted_context = "" | |
| source_ids = [] | |
| knowledge_pairs = [] | |
| for i, idx in enumerate(sorted_indices, 1): | |
| result = results[idx] | |
| score = cosine_scores[idx] | |
| question = result.metadata.get('question', 'N/A') | |
| answer = result.metadata.get('content', 'N/A') | |
| formatted_context += f"Knowledge Entry {i}:\n" | |
| formatted_context += f"Q: {question}\n" | |
| formatted_context += f"A: {answer}\n" | |
| formatted_context += "-" * 40 + "\n" | |
| source_ids.append(result.metadata.get('id', 'N/A')) | |
| knowledge_pairs.append((question, answer)) | |
| return formatted_context, source_ids, knowledge_pairs | |
| # === LLM Generation (Refactored) === | |
| def generate_xeno_response(context, question, chat_history): | |
| """Generates a response but does NOT handle memory.""" | |
| model = genai.GenerativeModel(llm_model_name) | |
| formatted_history = "\n".join( | |
| [f"{msg['role'].capitalize()}: {msg['content']}" for msg in chat_history] | |
| ) if chat_history else "None" | |
| prompt = f"{SYSTEM_PROMPT}\n### HISTORY ###\n{formatted_history}\n### CONTEXT ###\n{context}\n### QUESTION ###\n{question}" | |
| response = model.generate_content(prompt) | |
| return response.text.strip() | |
| # === Main Interface Logic (Refactored) === | |
| def get_context_and_answer(message, history, session_id="default"): | |
| """ | |
| Handles intent classification, RAG, and memory updates in one place. | |
| """ | |
| config = {"configurable": {"thread_id": str(session_id), "checkpoint_ns": ""}} | |
| full_checkpoint = memory.get(config) or {} | |
| chat_history = full_checkpoint.get("channel_values", {}).get("messages", []) | |
| intent, direct_response = intent_classifier.classify_intent(message) | |
| answer = "" | |
| source_ids = "N/A" | |
| knowledge_pairs = [] | |
| if intent != 'query': | |
| answer = direct_response | |
| else: | |
| if len(message.strip()) < 3: | |
| answer = "I'd be happy to help! Could you please provide more details about what you'd like to know?" | |
| else: | |
| try: | |
| queried_results = retriever.invoke(message) | |
| query_embedding = genai.embed_content(model=embedding_model, content=message, task_type="retrieval_query")['embedding'] | |
| doc_embeddings = [genai.embed_content(model=embedding_model, content=doc.page_content, task_type="retrieval_document")['embedding'] for doc in queried_results] | |
| cosine_scores = util.cos_sim(torch.tensor(query_embedding).float(), torch.tensor(doc_embeddings).float())[0].tolist() | |
| if max(cosine_scores) < 0.4: | |
| answer = "I'm sorry, I couldn't find specific information for your question. Could you try rephrasing it, or contact XENO support directly?" | |
| else: | |
| context, source_ids_list, knowledge_pairs = process_context(queried_results, cosine_scores) | |
| answer = generate_xeno_response(context, message, chat_history) | |
| source_ids = ", ".join(source_ids_list) | |
| except Exception as e: | |
| print(f"Error during RAG processing: {e}") | |
| answer = "I apologize, but I'm having a technical issue. Please try again shortly or contact XENO support." | |
| update_memory(config, message, answer) | |
| log_response(message, answer, source_ids, knowledge_pairs, session_id) | |
| return answer | |
| # === Enhanced Gradio UI === | |
| def respond(message, history, session_id): | |
| """Gradio's main response function.""" | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| response = get_context_and_answer(message, history, session_id) | |
| config = {"configurable": {"thread_id": str(session_id), "checkpoint_ns": ""}} | |
| updated_messages = (memory.get(config) or {}).get("messages", []) | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": response}) | |
| return "", history | |
| def create_interface(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""ASKXENO | |
| **Welcome to XENO AI Support!** | |
| I can help you with questions about XENO financial services including: | |
| • Account management and setup | |
| • Transaction processes and fees | |
| • Platform features and troubleshooting | |
| • General service information | |
| *Simply type your question below to get started!* | |
| """) | |
| session_id_box = gr.Textbox(label="Session ID", value=str(uuid.uuid4()), interactive=True) | |
| chatbot = gr.Chatbot(label="XENO Assistant", bubble_full_width=False, height=500, type="messages") | |
| msg = gr.Textbox(label="Your Message", placeholder="Type your question here...") | |
| msg.submit(respond, [msg, chatbot, session_id_box], [msg, chatbot]) | |
| return demo | |
| if __name__ == "__main__": | |
| iface = create_interface() | |
| iface.launch(share=False, server_name="0.0.0.0", server_port=7860) |