from dotenv import load_dotenv import google.generativeai as genai import json import os import requests from pypdf import PdfReader import gradio as gr import re import glob from collections import defaultdict load_dotenv(override=True) def push(text): requests.post( "https://api.pushover.net/1/messages.json", data={ "token": os.getenv("PUSHOVER_TOKEN"), "user": os.getenv("PUSHOVER_USER"), "message": text, } ) def record_user_details(email, name="Name not provided", notes="not provided"): push(f"Recording {name} with email {email} and notes {notes}") return {"recorded": "ok"} def record_unknown_question(question): push(f"Recording {question}") return {"recorded": "ok"} record_user_details_json = { "name": "record_user_details", "description": "Use this tool to record that a user is interested in being in touch and provided an email address", "parameters": { "type": "object", "properties": { "email": { "type": "string", "description": "The email address of this user" }, "name": { "type": "string", "description": "The user's name, if they provided it" }, "notes": { "type": "string", "description": "Any additional information about the conversation that's worth recording to give context" } }, "required": ["email"], "additionalProperties": False } } record_unknown_question_json = { "name": "record_unknown_question", "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "The question that couldn't be answered" } }, "required": ["question"], "additionalProperties": False } } tools = [record_user_details_json, record_unknown_question_json] class Me: def __init__(self): genai.configure(api_key=os.getenv("GEMINI_API_KEY")) self.model = genai.GenerativeModel("gemini-2.0-flash") self.owner_name = "Duc Nguyen" # Chủ sở hữu website và DigitizedBrains self.chatbot_name = "DigitizedBrains" # Nhân vật đại diện chatbot # RAG Knowledge Base - Load all documents self.knowledge_base = self.load_all_documents() print(f"Loaded {len(self.knowledge_base)} documents into RAG knowledge base") # Core information (backwards compatibility) self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]') self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]') self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]') def load_all_documents(self): """Load all documents from the document folder using RAG technique""" knowledge_base = {} document_dir = "document/" # Load all text files for txt_file in glob.glob(os.path.join(document_dir, "*.txt")): filename = os.path.basename(txt_file) try: with open(txt_file, "r", encoding="utf-8") as f: content = f.read() knowledge_base[filename] = content # Safe filename encoding for print safe_filename = filename.encode('ascii', errors='replace').decode('ascii') print(f"Loaded text document: {safe_filename} ({len(content)} chars)") except Exception as e: safe_filename = filename.encode('ascii', errors='replace').decode('ascii') print(f"Warning: Could not load {safe_filename}: text loading error") # Load all PDF files for pdf_file in glob.glob(os.path.join(document_dir, "*.pdf")): filename = os.path.basename(pdf_file) try: reader = PdfReader(pdf_file) pdf_content = "" for page in reader.pages: text = page.extract_text() if text: pdf_content += text + "\n" knowledge_base[filename] = pdf_content # Safe filename encoding for print safe_filename = filename.encode('utf-8', errors='replace').decode('utf-8') print(f"Loaded PDF document: {safe_filename} ({len(pdf_content)} chars)") except Exception as e: # Handle encoding issues in error messages safe_filename = filename.encode('ascii', errors='replace').decode('ascii') print(f"Warning: Could not load PDF {safe_filename}: PDF loading error") return knowledge_base def search_relevant_content(self, query): """Simple RAG retrieval - find most relevant documents based on keyword matching""" query_lower = query.lower() relevant_docs = [] # Keywords for different document types keywords = { 'personal': ['duc nguyen', 'linkedin', 'career', 'experience', 'education', 'background', 'profile'], 'business': ['digitizedbrains', 'company', 'services', 'solutions', 'automation', 'ai agent'], 'digital_transformation': ['chuyển đổi số', 'digital transformation', 'technology', 'broadcasting', 'htv'], 'experience': ['kinh nghiệm', 'experience', 'học', 'tham luận', 'diễn đàn'], 'hunan_broadcasting': ['hồ nam', 'hunan', 'truyền hình', 'broadcasting', 'television', 'đài', 'tập đoàn', 'ngụy văn bân', 'mango', 'bài học', 'lesson', 'kinh nghiệm'] } # Score documents based on keyword relevance doc_scores = defaultdict(int) for filename, content in self.knowledge_base.items(): content_lower = content.lower() # Direct query match if query_lower in content_lower: doc_scores[filename] += 10 # Keyword category matching for category, category_keywords in keywords.items(): for keyword in category_keywords: if keyword in query_lower and keyword in content_lower: doc_scores[filename] += 5 # Additional scoring for query words query_words = query_lower.split() for word in query_words: if len(word) > 2 and word in content_lower: doc_scores[filename] += 2 # Return top relevant documents sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True) # Get top 5 most relevant documents for filename, score in sorted_docs[:5]: if score > 0: relevant_docs.append({ 'filename': filename, 'content': self.knowledge_base[filename], 'score': score }) return relevant_docs def handle_tool_call(self, tool_calls): results = [] for tool_call in tool_calls: tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) print(f"Tool called: {tool_name}", flush=True) tool = globals().get(tool_name) result = tool(**arguments) if tool else {} results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id}) return results def system_prompt(self, relevant_docs=None): system_prompt = f"You are {self.chatbot_name}, an AI representative acting on behalf of {self.owner_name}. \ You are answering questions on {self.owner_name}'s website, representing both {self.owner_name} personally and the {self.chatbot_name} company/brand. \ \n\nYour responsibilities include: \ 1. Representing {self.owner_name}'s career, background, skills and experience using his comprehensive knowledge base \ 2. Representing {self.chatbot_name} as a digital transformation and AI solutions company \ 3. Answering questions about digital transformation, broadcasting, and technology expertise \ 4. Using the extensive document knowledge base to provide detailed, accurate responses \ \n\nYou have access to a comprehensive RAG knowledge base with {len(self.knowledge_base)} documents including: \ - Personal information about {self.owner_name} (career, LinkedIn, education, experience) \ - Business information about {self.chatbot_name} (services, solutions, capabilities) \ - Digital transformation expertise and case studies \ - Broadcasting and media technology knowledge \ - Academic papers and industry presentations \ \n\nBe professional and engaging, using the knowledge base to provide comprehensive answers. \ When discussing {self.owner_name}, speak about him in first person as his representative. \ When discussing {self.chatbot_name}, represent the company's capabilities and services. \ \n\nIf you don't know the answer to any question, use your record_unknown_question tool to record it. \ Only ask for contact information if the user specifically expresses interest in getting in touch or requests services. Do not proactively push for contact details or add unnecessary calls-to-action about API services." # Add core information system_prompt += f"\n\n## Core Information:" system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:2000]}..." system_prompt += f"\n\n### {self.chatbot_name} Business Profile:\n{self.digitizedbrains_info[:2000]}..." # Add relevant documents if provided if relevant_docs: system_prompt += f"\n\n## Relevant Knowledge Base Documents:" for doc in relevant_docs: system_prompt += f"\n\n### Document: {doc['filename']} (Relevance Score: {doc['score']})\n" # Truncate content to avoid context limit content = doc['content'][:3000] + "..." if len(doc['content']) > 3000 else doc['content'] system_prompt += content system_prompt += f"\n\nWith this comprehensive RAG knowledge base, please provide detailed and accurate responses as {self.chatbot_name}, \ representing both {self.owner_name} personally and the {self.chatbot_name} business professionally." return system_prompt def chat(self, message, history): # RAG Retrieval - Find relevant documents for the user's question relevant_docs = self.search_relevant_content(message) try: safe_message = message[:100].encode('ascii', errors='replace').decode('ascii') print(f"Found {len(relevant_docs)} relevant documents for query: {safe_message}...") except: print(f"Found {len(relevant_docs)} relevant documents for user query") # Generate prompt with relevant context prompt = self.system_prompt(relevant_docs) + "\n\n" # Add conversation history for h in history: prompt += f"{h['role'].capitalize()}: {h['content']}\n" prompt += f"User: {message}\nAssistant:" # Generate response response = self.model.generate_content(prompt) reply = response.text # Tìm email trong message hoặc reply email_match = re.search(r'[\w\.-]+@[\w\.-]+', message) if email_match: email = email_match.group(0) name = "Contact from website" # hoặc trích xuất tên nếu muốn notes = f"User provided email via {self.chatbot_name} chat with RAG knowledge base" record_user_details(email, name, notes) # Nếu Gemini trả lời không biết, thì ghi lại câu hỏi if "I don't know" in reply or "I'm not sure" in reply or "Tôi không biết" in reply: record_unknown_question(message) return reply if __name__ == "__main__": me = Me() gr.ChatInterface(me.chat, type="messages").launch()