import google.generativeai as genai import json import os import requests import gradio as gr import re import glob from collections import defaultdict # Configure Gemini API - Use environment variables for security genai.configure(api_key=os.getenv("GEMINI_API_KEY")) def push(text): try: requests.post( "https://api.pushover.net/1/messages.json", data={ "token": os.getenv("PUSHOVER_TOKEN"), "user": os.getenv("PUSHOVER_USER"), "message": text, } ) except: print(f"Push notification: {text}") def record_user_details(email, name="Name not provided", notes="not provided"): push(f"Recording {name} with email {email} and notes {notes}") return {"recorded": "ok"} def record_unknown_question(question): push(f"Recording {question}") return {"recorded": "ok"} record_user_details_json = { "name": "record_user_details", "description": "Use this tool to record that a user is interested in being in touch and provided an email address", "parameters": { "type": "object", "properties": { "email": { "type": "string", "description": "The email address of this user" }, "name": { "type": "string", "description": "The user's name, if they provided it" }, "notes": { "type": "string", "description": "Any additional information about the conversation that's worth recording to give context" } }, "required": ["email"], "additionalProperties": False } } record_unknown_question_json = { "name": "record_unknown_question", "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "The question that couldn't be answered" } }, "required": ["question"], "additionalProperties": False } } tools = [record_user_details_json, record_unknown_question_json] class Me: def __init__(self): self.model = genai.GenerativeModel("gemini-1.5-flash") self.owner_name = "Duc Nguyen" self.chatbot_name = "DigitizedBrains" # RAG Knowledge Base - Load text documents only (fast loading) self.knowledge_base = self.load_text_documents() print(f"Loaded {len(self.knowledge_base)} text documents into RAG knowledge base") # Core information self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]') self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]') self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]') def load_text_documents(self): """Load only text documents for fast startup""" knowledge_base = {} document_dir = "document/" # Load all text files (fast) for txt_file in glob.glob(os.path.join(document_dir, "*.txt")): filename = os.path.basename(txt_file) try: with open(txt_file, "r", encoding="utf-8") as f: content = f.read() knowledge_base[filename] = content print(f"Loaded: {filename} ({len(content)} chars)") except Exception as e: print(f"Failed: {filename}") return knowledge_base def search_relevant_content(self, query): """Simple RAG retrieval based on keyword matching""" query_lower = query.lower() relevant_docs = [] # Score documents based on relevance doc_scores = defaultdict(int) for filename, content in self.knowledge_base.items(): content_lower = content.lower() # Direct query match (highest score) if query_lower in content_lower: doc_scores[filename] += 10 # Word-by-word matching query_words = query_lower.split() for word in query_words: if len(word) > 2 and word in content_lower: doc_scores[filename] += 2 # Return top relevant documents sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True) # Get top 3 most relevant documents for filename, score in sorted_docs[:3]: if score > 0: relevant_docs.append({ 'filename': filename, 'content': self.knowledge_base[filename], 'score': score }) return relevant_docs def system_prompt(self, relevant_docs=None): system_prompt = f"You are {self.chatbot_name}, an AI representative for {self.owner_name}. \ You represent both {self.owner_name} personally and {self.chatbot_name} company. \ \n\nYou have access to a comprehensive knowledge base with {len(self.knowledge_base)} documents. \ Be professional, engaging, and use the knowledge base to provide accurate responses. \ \n\nIf you don't know something, use record_unknown_question tool. \ If users provide emails, use record_user_details tool." # Add core information (truncated for context limit) system_prompt += f"\n\n## Core Information:" system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:800]}..." system_prompt += f"\n\n### {self.chatbot_name} Business:\n{self.digitizedbrains_info[:800]}..." # Add relevant documents if relevant_docs: system_prompt += f"\n\n## Relevant Documents:" for doc in relevant_docs: system_prompt += f"\n\n### {doc['filename']} (Score: {doc['score']}):\n" content = doc['content'][:1500] + "..." if len(doc['content']) > 1500 else doc['content'] system_prompt += content return system_prompt def chat(self, message, history): # RAG Retrieval relevant_docs = self.search_relevant_content(message) print(f"\nQuery: {message[:50]}...") print(f"Found {len(relevant_docs)} relevant documents:") for doc in relevant_docs: print(f" - {doc['filename']} (score: {doc['score']})") # Generate response prompt = self.system_prompt(relevant_docs) + "\n\n" # Add conversation history for h in history: prompt += f"{h['role'].capitalize()}: {h['content']}\n" prompt += f"User: {message}\nAssistant:" try: response = self.model.generate_content(prompt) reply = response.text except Exception as e: reply = f"Xin lỗi, tôi gặp lỗi khi xử lý câu hỏi của bạn. Vui lòng thử lại. Error: {str(e)}" # Email detection email_match = re.search(r'[\w\.-]+@[\w\.-]+', message) if email_match: email = email_match.group(0) record_user_details(email, "Website Contact", f"RAG chat: {message[:100]}") # Unknown question detection if "I don't know" in reply or "không biết" in reply.lower(): record_unknown_question(message) return reply # Initialize the chatbot print("Starting RAG-Enhanced DigitizedBrains Chatbot...") me = Me() print("\n" + "="*60) print("RAG-ENHANCED DIGITIZEDBRAINS CHATBOT READY!") print("="*60) print("Features:") print(" - RAG-based knowledge retrieval") print(" - Multi-document search") print(" - Intelligent response generation") print(" - Lead capture & unknown question tracking") print("="*60) # Launch Gradio interface iface = gr.ChatInterface( me.chat, type="messages", title="DigitizedBrains RAG Chatbot", description="AI-powered chatbot with comprehensive knowledge base about Duc Nguyen and DigitizedBrains services." ) if __name__ == "__main__": iface.launch(share=False, server_name="0.0.0.0")