Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import pandas as pd | |
| import torch | |
| import numpy as np | |
| from sentence_transformers import util | |
| import google.generativeai as genai | |
| import chromadb | |
| from langchain_chroma import Chroma | |
| import gspread | |
| from google.oauth2.service_account import Credentials | |
| from datetime import datetime | |
| import json | |
| # === Configuration === | |
| genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
| embedding_model = "models/embedding-001" | |
| llm_model_name = "models/gemma-3-4b-it" | |
| collection_name = "xeno_collection" | |
| # === Google Sheets Setup for Hugging Face === | |
| def get_google_sheets_credentials(): | |
| credentials_json = os.environ.get("GOOGLE_SHEETS_CREDENTIALS") | |
| if not credentials_json: | |
| raise ValueError("GOOGLE_SHEETS_CREDENTIALS environment variable not set.") | |
| credentials_dict = json.loads(credentials_json) | |
| scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] | |
| creds = Credentials.from_service_account_info(credentials_dict, scopes=scope) | |
| return creds | |
| # Authenticate with Google Sheets | |
| client_gspread = gspread.authorize(get_google_sheets_credentials()) | |
| # Open the Google Sheet | |
| sheet = client_gspread.open("Response_Log").sheet1 | |
| def log_response(question, answer, source_ids, knowledge_pairs): | |
| """ | |
| Log a question, answer, source IDs, and knowledge base question-answer pairs to the Google Sheet. | |
| Args: | |
| question (str): The question asked by the user. | |
| answer (str): The answer provided by the model. | |
| source_ids (str): Comma-separated list of source IDs used. | |
| knowledge_pairs (list): List of tuples containing (question, answer) from the knowledge base. | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Prepare row with user question, answer, source IDs, and knowledge base pairs | |
| knowledge_question_1 = knowledge_pairs[0][0] if len(knowledge_pairs) > 0 else "N/A" | |
| knowledge_answer_1 = knowledge_pairs[0][1] if len(knowledge_pairs) > 0 else "N/A" | |
| knowledge_question_2 = knowledge_pairs[1][0] if len(knowledge_pairs) > 1 else "N/A" | |
| knowledge_answer_2 = knowledge_pairs[1][1] if len(knowledge_pairs) > 1 else "N/A" | |
| row = [ | |
| timestamp, | |
| question, | |
| answer, | |
| source_ids, | |
| knowledge_question_1, | |
| knowledge_answer_1, | |
| knowledge_question_2, | |
| knowledge_answer_2 | |
| ] | |
| try: | |
| sheet.append_row(row) | |
| print(f"Logged: {question} | Source IDs: {source_ids}") | |
| except Exception as e: | |
| print(f"Failed to log to Google Sheet: {e}") | |
| with open("/tmp/response_log.txt", "a") as f: | |
| f.write(f"{timestamp},{question},{answer},{source_ids},{knowledge_question_1},{knowledge_answer_1},{knowledge_question_2},{knowledge_answer_2}\n") | |
| # === Load and Clean Knowledge Base === | |
| df_kb = pd.read_json("XENO_Uganda_KnowledgeBase_Advisory.json") | |
| df_kb.dropna(subset=['Content'], inplace=True) | |
| def prepare_documents(data): | |
| documents, metadatas, ids = [], [], [] | |
| for item in data: | |
| documents.append(f"Question: {item['Question']}\nAnswer: {item['Content']}") | |
| metadatas.append({ | |
| "question": item["Question"], | |
| "content": item["Content"], | |
| "section": item.get("Section", ""), | |
| "source": item.get("Source", ""), | |
| "owner": item.get("Owner", ""), | |
| "tag": item.get("Tag", ""), | |
| "id": item["ID"] | |
| }) | |
| ids.append(item["ID"]) | |
| return documents, metadatas, ids | |
| xeno_data_list = df_kb.to_dict('records') | |
| documents, metadatas, ids = prepare_documents(xeno_data_list) | |
| # === Setup ChromaDB === | |
| try: | |
| client = chromadb.PersistentClient(path="/tmp/xeno_db") | |
| try: | |
| collection = client.get_collection(name=collection_name) | |
| print(f"Loaded existing ChromaDB collection: {collection_name}") | |
| except: | |
| print(f"Creating new ChromaDB collection: {collection_name}") | |
| collection = client.create_collection(name=collection_name) | |
| collection.add(documents=documents, metadatas=metadatas, ids=ids) | |
| except Exception as e: | |
| print(f"Failed to initialize ChromaDB: {e}") | |
| raise | |
| vector_store = Chroma(client=client, collection_name=collection_name) | |
| retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 4}) | |
| # === Prompt System === | |
| SYSTEM_PROMPT = """You are a friendly XENO Support Assistant, an AI-powered helpful and professional customer service representative. | |
| Use only the information provided in the knowledge base context to answer user queries. | |
| Do not hallucinate. If context doesn't contain relevant info, say so in a calm polite manner by saying I'm sorry, I can't assist with that. | |
| Only use context that is clearly relevant to the user's question. | |
| For greetings like “hi” or “hello”, respond politely without using the context. | |
| remember previous conversations.""" | |
| # === Context Processing === | |
| def process_context(results, cosine_scores, max_results=2): | |
| sorted_indices = np.argsort(cosine_scores)[::-1][:max_results] | |
| formatted_context = "" | |
| source_ids = [] | |
| knowledge_pairs = [] | |
| for i, idx in enumerate(sorted_indices, 1): | |
| result = results[idx] | |
| score = cosine_scores[idx] | |
| question = result.metadata.get('question', 'N/A') | |
| answer = result.metadata.get('content', 'N/A') | |
| formatted_context += f"Knowledge Entry {i}:\n" | |
| formatted_context += f"Q: {question}\n" | |
| formatted_context += f"A: {answer}\n" | |
| formatted_context += "-" * 40 + "\n" | |
| source_ids.append(result.metadata.get('id', 'N/A')) | |
| knowledge_pairs.append((question, answer)) | |
| return formatted_context, source_ids, knowledge_pairs | |
| # === LLM Generation === | |
| def generate_xeno_response(context, question): | |
| model = genai.GenerativeModel(llm_model_name) | |
| prompt = f"""{SYSTEM_PROMPT} | |
| ### CONTEXT ### | |
| {context} | |
| ### QUESTION ### | |
| {question}""" | |
| response = model.generate_content(prompt) | |
| return response.text.strip() | |
| # === Main Interface Logic === | |
| def get_context_and_answer(message, history): | |
| if message.lower().strip() in {"hi", "hello", "hey"}: | |
| answer = "Hello! How can I assist you with XENO services today?" | |
| log_response(message, answer, "N/A", []) | |
| return answer | |
| queried_results = retriever.invoke(message) | |
| query_embedding = genai.embed_content(model=embedding_model, | |
| content=message, | |
| task_type="retrieval_query")['embedding'] | |
| cosine_scores = [] | |
| for doc in queried_results: | |
| doc_embedding = genai.embed_content(model=embedding_model, | |
| content=doc.page_content, | |
| task_type="retrieval_document")['embedding'] | |
| cos_sim = util.cos_sim(torch.tensor(query_embedding).float(), torch.tensor(doc_embedding).float())[0][0].item() | |
| cosine_scores.append(cos_sim) | |
| if max(cosine_scores) < 0.6: | |
| answer = "I'm sorry, I couldn't find the specific information you're looking for in my knowledge base." | |
| log_response(message, answer, "N/A", []) | |
| return answer | |
| context, source_ids, knowledge_pairs = process_context(queried_results, cosine_scores) | |
| answer = generate_xeno_response(context, message) | |
| log_response(message, answer, ", ".join(source_ids), knowledge_pairs) | |
| return answer | |
| # === Gradio UI === | |
| iface = gr.ChatInterface( | |
| fn=get_context_and_answer, | |
| title="ASKXENO", | |
| description="Ask anything about XENO's financial services.", | |
| theme="soft" | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(share=False) |