import uuid import os import gradio as gr import pandas as pd import torch import numpy as np from sentence_transformers import util import google.generativeai as genai import chromadb from langchain_chroma import Chroma import gspread from google.oauth2.service_account import Credentials from langgraph.checkpoint.sqlite import SqliteSaver import sqlite3 import json from datetime import datetime import re from typing import Dict, List, Tuple # === Configuration === genai.configure(api_key=os.environ["GEMINI_API_KEY"]) embedding_model = "models/embedding-001" llm_model_name = "models/gemma-3-4b-it" collection_name = "xeno_collection" # === Google Sheets Setup for Hugging Face === def get_google_sheets_credentials(): credentials_json = os.environ.get("GOOGLE_SHEETS_CREDENTIALS") if not credentials_json: raise ValueError("GOOGLE_SHEETS_CREDENTIALS environment variable not set.") credentials_dict = json.loads(credentials_json) scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] creds = Credentials.from_service_account_info(credentials_dict, scopes=scope) return creds # Authenticate with Google Sheets client_gspread = gspread.authorize(get_google_sheets_credentials()) # Open the Google Sheet sheet = client_gspread.open("Response_Log").sheet1 def log_response(question, answer, source_ids, knowledge_pairs, session_id): """ Log a question, answer, source IDs, and knowledge base question-answer pairs to the Google Sheet. Args: question (str): The question asked by the user. answer (str): The answer provided by the model. source_ids (str): Comma-separated list of source IDs used. knowledge_pairs (list): List of tuples containing (question, answer) from the knowledge base. """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") knowledge_question_1 = knowledge_pairs[0][0] if len(knowledge_pairs) > 0 else "N/A" knowledge_answer_1 = knowledge_pairs[0][1] if len(knowledge_pairs) > 0 else "N/A" knowledge_question_2 = knowledge_pairs[1][0] if len(knowledge_pairs) > 1 else "N/A" knowledge_answer_2 = knowledge_pairs[1][1] if len(knowledge_pairs) > 1 else "N/A" row = [ timestamp, session_id, question, answer, source_ids, knowledge_question_1, knowledge_answer_1, knowledge_question_2, knowledge_answer_2 ] try: sheet.append_row(row) print(f"Logged: {question} | Source IDs: {source_ids}") except Exception as e: print(f"Failed to log to Google Sheet: {e}") with open("/tmp/response_log.txt", "a") as f: f.write(f"{timestamp},{question},{answer},{source_ids},{knowledge_question_1},{knowledge_answer_1},{knowledge_question_2},{knowledge_answer_2}\n") # === LangGraph Memory Setup === conn = sqlite3.connect("xeno_memory.db", check_same_thread=False) memory = SqliteSaver(conn=conn) def update_memory(config, user_message, assistant_message): full_checkpoint = memory.get(config) or {} messages = full_checkpoint.get("channel_values", {}).get("messages", []) messages.append({"role": "user", "content": user_message}) messages.append({"role": "assistant", "content": assistant_message}) checkpoint_to_save = { "v": 1, "id": str(uuid.uuid4()), "ts": datetime.now().isoformat(), "channel_values": {"messages": messages}, "channel_versions": {}, "versions_seen": {}, } memory.put(config, checkpoint_to_save, {}, {}) # === Intent Classification System === class IntentClassifier: def __init__(self): # Define intent patterns and responses self.intent_patterns = { 'greeting': { 'patterns': [ r'\b(hi|hello|hey|good morning|good afternoon|good evening|greetings)\b', r'^(hi|hello|hey)[\s!.]*$', r'\b(how are you|how do you do)\b' ], 'responses': [ "Hello! I'm XENO Assistant. How can I help you with XENO financial services today?", "Hi there! I'm here to assist you with any questions about XENO services. What can I help you with?", "Good day! Welcome to XENO Support. How may I assist you today?" ] }, 'thanks': { 'patterns': [ r'\b(thank you|thanks|thank u|thx|appreciate|grateful)\b', r'^(thanks|thank you)[\s!.]*$', r'\b(much appreciated|thanks a lot|thank you so much)\b' ], 'responses': [ "You're welcome! Is there anything else I can help you with regarding XENO services?", "Happy to help! Feel free to ask if you have any other questions about XENO.", "Glad I could assist you! Let me know if you need help with anything else." ] }, 'goodbye': { 'patterns': [ r'\b(bye|goodbye|see you|farewell|take care|have a good day)\b', r'^(bye|goodbye)[\s!.]*$', r'\b(talk to you later|see you later|until next time)\b' ], 'responses': [ "Goodbye! Thank you for using XENO services. Have a great day!", "Take care! Feel free to return anytime you need help with XENO services.", "Have a wonderful day! Don't hesitate to reach out if you need assistance with XENO." ] } } def classify_intent(self, message: str) -> Tuple[str, str]: """ Classify the intent of a message and return appropriate response if it's a simple intent. Returns: (intent_name, response) - response is empty string if intent requires RAG """ message_lower = message.lower().strip() for intent_name, intent_data in self.intent_patterns.items(): for pattern in intent_data['patterns']: if re.search(pattern, message_lower, re.IGNORECASE): import random response = random.choice(intent_data['responses']) return intent_name, response return 'query', '' def is_simple_intent(self, intent: str) -> bool: """Check if intent can be handled without RAG""" simple_intents = ['greeting', 'thanks'] return intent in simple_intents # Initialize intent classifier intent_classifier = IntentClassifier() # === Load and Clean Knowledge Base === df_kb = pd.read_json("XENO_Uganda_KnowledgeBase_Advisory.json") df_kb.dropna(subset=['Content'], inplace=True) def prepare_documents(data): documents, metadatas, ids = [], [], [] for item in data: documents.append(f"Question: {item['Question']}\nAnswer: {item['Content']}") metadatas.append({ "question": item["Question"], "content": item["Content"], "section": item.get("Section", ""), "source": item.get("Source", ""), "owner": item.get("Owner", ""), "tag": item.get("Tag", ""), "id": item["ID"] }) ids.append(item["ID"]) return documents, metadatas, ids xeno_data_list = df_kb.to_dict('records') documents, metadatas, ids = prepare_documents(xeno_data_list) # === Setup ChromaDB === try: client = chromadb.PersistentClient(path="/tmp/xeno_db") try: collection = client.get_collection(name=collection_name) print(f"Loaded existing ChromaDB collection: {collection_name}") except: print(f"Creating new ChromaDB collection: {collection_name}") collection = client.create_collection(name=collection_name) collection.add(documents=documents, metadatas=metadatas, ids=ids) except Exception as e: print(f"Failed to initialize ChromaDB: {e}") raise vector_store = Chroma(client=client, collection_name=collection_name) retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 4}) # === Prompt System === SYSTEM_PROMPT = """You are a friendly XENO Support Assistant, an AI-powered helpful and professional customer service representative. Use only the information provided in the knowledge base context to answer user queries. Do not hallucinate. If context doesn't contain relevant info, say so in a calm polite manner by saying I'm sorry, I can't assist with that. Only use context that is clearly relevant to the user's question. For greetings like “hi” or “hello”, respond politely without using the context. remember previous conversations.""" # === Context Processing === def process_context(results, cosine_scores, max_results=2): sorted_indices = np.argsort(cosine_scores)[::-1][:max_results] formatted_context = "" source_ids = [] knowledge_pairs = [] for i, idx in enumerate(sorted_indices, 1): result = results[idx] score = cosine_scores[idx] question = result.metadata.get('question', 'N/A') answer = result.metadata.get('content', 'N/A') formatted_context += f"Knowledge Entry {i}:\n" formatted_context += f"Q: {question}\n" formatted_context += f"A: {answer}\n" formatted_context += "-" * 40 + "\n" source_ids.append(result.metadata.get('id', 'N/A')) knowledge_pairs.append((question, answer)) return formatted_context, source_ids, knowledge_pairs # === LLM Generation (Refactored) === def generate_xeno_response(context, question, chat_history): """Generates a response but does NOT handle memory.""" model = genai.GenerativeModel(llm_model_name) formatted_history = "\n".join( [f"{msg['role'].capitalize()}: {msg['content']}" for msg in chat_history] ) if chat_history else "None" prompt = f"{SYSTEM_PROMPT}\n### HISTORY ###\n{formatted_history}\n### CONTEXT ###\n{context}\n### QUESTION ###\n{question}" response = model.generate_content(prompt) return response.text.strip() # === Main Interface Logic (Refactored) === def get_context_and_answer(message, history, session_id="default"): """ Handles intent classification, RAG, and memory updates in one place. """ config = {"configurable": {"thread_id": str(session_id), "checkpoint_ns": ""}} full_checkpoint = memory.get(config) or {} chat_history = full_checkpoint.get("channel_values", {}).get("messages", []) intent, direct_response = intent_classifier.classify_intent(message) answer = "" source_ids = "N/A" knowledge_pairs = [] if intent != 'query': answer = direct_response else: if len(message.strip()) < 3: answer = "I'd be happy to help! Could you please provide more details about what you'd like to know?" else: try: queried_results = retriever.invoke(message) query_embedding = genai.embed_content(model=embedding_model, content=message, task_type="retrieval_query")['embedding'] doc_embeddings = [genai.embed_content(model=embedding_model, content=doc.page_content, task_type="retrieval_document")['embedding'] for doc in queried_results] cosine_scores = util.cos_sim(torch.tensor(query_embedding).float(), torch.tensor(doc_embeddings).float())[0].tolist() if max(cosine_scores) < 0.4: answer = "I'm sorry, I couldn't find specific information for your question. Could you try rephrasing it, or contact XENO support directly?" else: context, source_ids_list, knowledge_pairs = process_context(queried_results, cosine_scores) answer = generate_xeno_response(context, message, chat_history) source_ids = ", ".join(source_ids_list) except Exception as e: print(f"Error during RAG processing: {e}") answer = "I apologize, but I'm having a technical issue. Please try again shortly or contact XENO support." update_memory(config, message, answer) log_response(message, answer, source_ids, knowledge_pairs, session_id) return answer # === Enhanced Gradio UI === def respond(message, history, session_id): """Gradio's main response function.""" if not session_id: session_id = str(uuid.uuid4()) response = get_context_and_answer(message, history, session_id) config = {"configurable": {"thread_id": str(session_id), "checkpoint_ns": ""}} updated_messages = (memory.get(config) or {}).get("messages", []) history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": response}) return "", history def create_interface(): with gr.Blocks() as demo: gr.Markdown("""ASKXENO **Welcome to XENO AI Support!** I can help you with questions about XENO financial services including: • Account management and setup • Transaction processes and fees • Platform features and troubleshooting • General service information *Simply type your question below to get started!* """) session_id_box = gr.Textbox(label="Session ID", value=str(uuid.uuid4()), interactive=True) chatbot = gr.Chatbot(label="XENO Assistant", bubble_full_width=False, height=500, type="messages") msg = gr.Textbox(label="Your Message", placeholder="Type your question here...") msg.submit(respond, [msg, chatbot, session_id_box], [msg, chatbot]) return demo if __name__ == "__main__": iface = create_interface() iface.launch(share=False, server_name="0.0.0.0", server_port=7860)