Spaces:
Running
Running
| import google.generativeai as genai | |
| import json | |
| import os | |
| import requests | |
| import gradio as gr | |
| import re | |
| import glob | |
| from collections import defaultdict | |
| # Configure Gemini API - Use environment variables for security | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| def push(text): | |
| try: | |
| requests.post( | |
| "https://api.pushover.net/1/messages.json", | |
| data={ | |
| "token": os.getenv("PUSHOVER_TOKEN"), | |
| "user": os.getenv("PUSHOVER_USER"), | |
| "message": text, | |
| } | |
| ) | |
| except: | |
| print(f"Push notification: {text}") | |
| def record_user_details(email, name="Name not provided", notes="not provided"): | |
| push(f"Recording {name} with email {email} and notes {notes}") | |
| return {"recorded": "ok"} | |
| def record_unknown_question(question): | |
| push(f"Recording {question}") | |
| return {"recorded": "ok"} | |
| record_user_details_json = { | |
| "name": "record_user_details", | |
| "description": "Use this tool to record that a user is interested in being in touch and provided an email address", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "email": { | |
| "type": "string", | |
| "description": "The email address of this user" | |
| }, | |
| "name": { | |
| "type": "string", | |
| "description": "The user's name, if they provided it" | |
| }, | |
| "notes": { | |
| "type": "string", | |
| "description": "Any additional information about the conversation that's worth recording to give context" | |
| } | |
| }, | |
| "required": ["email"], | |
| "additionalProperties": False | |
| } | |
| } | |
| record_unknown_question_json = { | |
| "name": "record_unknown_question", | |
| "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "question": { | |
| "type": "string", | |
| "description": "The question that couldn't be answered" | |
| } | |
| }, | |
| "required": ["question"], | |
| "additionalProperties": False | |
| } | |
| } | |
| tools = [record_user_details_json, record_unknown_question_json] | |
| class Me: | |
| def __init__(self): | |
| self.model = genai.GenerativeModel("gemini-1.5-flash") | |
| self.owner_name = "Duc Nguyen" | |
| self.chatbot_name = "DigitizedBrains" | |
| # RAG Knowledge Base - Load text documents only (fast loading) | |
| self.knowledge_base = self.load_text_documents() | |
| print(f"Loaded {len(self.knowledge_base)} text documents into RAG knowledge base") | |
| # Core information | |
| self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]') | |
| self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]') | |
| self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]') | |
| def load_text_documents(self): | |
| """Load only text documents for fast startup""" | |
| knowledge_base = {} | |
| document_dir = "document/" | |
| # Load all text files (fast) | |
| for txt_file in glob.glob(os.path.join(document_dir, "*.txt")): | |
| filename = os.path.basename(txt_file) | |
| try: | |
| with open(txt_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| knowledge_base[filename] = content | |
| print(f"Loaded: {filename} ({len(content)} chars)") | |
| except Exception as e: | |
| print(f"Failed: {filename}") | |
| return knowledge_base | |
| def search_relevant_content(self, query): | |
| """Simple RAG retrieval based on keyword matching""" | |
| query_lower = query.lower() | |
| relevant_docs = [] | |
| # Score documents based on relevance | |
| doc_scores = defaultdict(int) | |
| for filename, content in self.knowledge_base.items(): | |
| content_lower = content.lower() | |
| # Direct query match (highest score) | |
| if query_lower in content_lower: | |
| doc_scores[filename] += 10 | |
| # Word-by-word matching | |
| query_words = query_lower.split() | |
| for word in query_words: | |
| if len(word) > 2 and word in content_lower: | |
| doc_scores[filename] += 2 | |
| # Return top relevant documents | |
| sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True) | |
| # Get top 3 most relevant documents | |
| for filename, score in sorted_docs[:3]: | |
| if score > 0: | |
| relevant_docs.append({ | |
| 'filename': filename, | |
| 'content': self.knowledge_base[filename], | |
| 'score': score | |
| }) | |
| return relevant_docs | |
| def system_prompt(self, relevant_docs=None): | |
| system_prompt = f"You are {self.chatbot_name}, an AI representative for {self.owner_name}. \ | |
| You represent both {self.owner_name} personally and {self.chatbot_name} company. \ | |
| \n\nYou have access to a comprehensive knowledge base with {len(self.knowledge_base)} documents. \ | |
| Be professional, engaging, and use the knowledge base to provide accurate responses. \ | |
| \n\nIf you don't know something, use record_unknown_question tool. \ | |
| If users provide emails, use record_user_details tool." | |
| # Add core information (truncated for context limit) | |
| system_prompt += f"\n\n## Core Information:" | |
| system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:800]}..." | |
| system_prompt += f"\n\n### {self.chatbot_name} Business:\n{self.digitizedbrains_info[:800]}..." | |
| # Add relevant documents | |
| if relevant_docs: | |
| system_prompt += f"\n\n## Relevant Documents:" | |
| for doc in relevant_docs: | |
| system_prompt += f"\n\n### {doc['filename']} (Score: {doc['score']}):\n" | |
| content = doc['content'][:1500] + "..." if len(doc['content']) > 1500 else doc['content'] | |
| system_prompt += content | |
| return system_prompt | |
| def chat(self, message, history): | |
| # RAG Retrieval | |
| relevant_docs = self.search_relevant_content(message) | |
| print(f"\nQuery: {message[:50]}...") | |
| print(f"Found {len(relevant_docs)} relevant documents:") | |
| for doc in relevant_docs: | |
| print(f" - {doc['filename']} (score: {doc['score']})") | |
| # Generate response | |
| prompt = self.system_prompt(relevant_docs) + "\n\n" | |
| # Add conversation history | |
| for h in history: | |
| prompt += f"{h['role'].capitalize()}: {h['content']}\n" | |
| prompt += f"User: {message}\nAssistant:" | |
| try: | |
| response = self.model.generate_content(prompt) | |
| reply = response.text | |
| except Exception as e: | |
| reply = f"Xin lỗi, tôi gặp lỗi khi xử lý câu hỏi của bạn. Vui lòng thử lại. Error: {str(e)}" | |
| # Email detection | |
| email_match = re.search(r'[\w\.-]+@[\w\.-]+', message) | |
| if email_match: | |
| email = email_match.group(0) | |
| record_user_details(email, "Website Contact", f"RAG chat: {message[:100]}") | |
| # Unknown question detection | |
| if "I don't know" in reply or "không biết" in reply.lower(): | |
| record_unknown_question(message) | |
| return reply | |
| # Initialize the chatbot | |
| print("Starting RAG-Enhanced DigitizedBrains Chatbot...") | |
| me = Me() | |
| print("\n" + "="*60) | |
| print("RAG-ENHANCED DIGITIZEDBRAINS CHATBOT READY!") | |
| print("="*60) | |
| print("Features:") | |
| print(" - RAG-based knowledge retrieval") | |
| print(" - Multi-document search") | |
| print(" - Intelligent response generation") | |
| print(" - Lead capture & unknown question tracking") | |
| print("="*60) | |
| # Launch Gradio interface | |
| iface = gr.ChatInterface( | |
| me.chat, | |
| type="messages", | |
| title="DigitizedBrains RAG Chatbot", | |
| description="AI-powered chatbot with comprehensive knowledge base about Duc Nguyen and DigitizedBrains services." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(share=False, server_name="0.0.0.0") |