Spaces:
Running
Running
| from dotenv import load_dotenv | |
| import google.generativeai as genai | |
| import json | |
| import os | |
| import requests | |
| from pypdf import PdfReader | |
| import gradio as gr | |
| import re | |
| import glob | |
| from collections import defaultdict | |
| load_dotenv(override=True) | |
| def push(text): | |
| requests.post( | |
| "https://api.pushover.net/1/messages.json", | |
| data={ | |
| "token": os.getenv("PUSHOVER_TOKEN"), | |
| "user": os.getenv("PUSHOVER_USER"), | |
| "message": text, | |
| } | |
| ) | |
| def record_user_details(email, name="Name not provided", notes="not provided"): | |
| push(f"Recording {name} with email {email} and notes {notes}") | |
| return {"recorded": "ok"} | |
| def record_unknown_question(question): | |
| push(f"Recording {question}") | |
| return {"recorded": "ok"} | |
| record_user_details_json = { | |
| "name": "record_user_details", | |
| "description": "Use this tool to record that a user is interested in being in touch and provided an email address", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "email": { | |
| "type": "string", | |
| "description": "The email address of this user" | |
| }, | |
| "name": { | |
| "type": "string", | |
| "description": "The user's name, if they provided it" | |
| }, | |
| "notes": { | |
| "type": "string", | |
| "description": "Any additional information about the conversation that's worth recording to give context" | |
| } | |
| }, | |
| "required": ["email"], | |
| "additionalProperties": False | |
| } | |
| } | |
| record_unknown_question_json = { | |
| "name": "record_unknown_question", | |
| "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "question": { | |
| "type": "string", | |
| "description": "The question that couldn't be answered" | |
| } | |
| }, | |
| "required": ["question"], | |
| "additionalProperties": False | |
| } | |
| } | |
| tools = [record_user_details_json, record_unknown_question_json] | |
| class Me: | |
| def __init__(self): | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| self.model = genai.GenerativeModel("gemini-2.0-flash") | |
| self.owner_name = "Duc Nguyen" # Chủ sở hữu website và DigitizedBrains | |
| self.chatbot_name = "DigitizedBrains" # Nhân vật đại diện chatbot | |
| # RAG Knowledge Base - Load all documents | |
| self.knowledge_base = self.load_all_documents() | |
| print(f"Loaded {len(self.knowledge_base)} documents into RAG knowledge base") | |
| # Core information (backwards compatibility) | |
| self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]') | |
| self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]') | |
| self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]') | |
| def load_all_documents(self): | |
| """Load all documents from the document folder using RAG technique""" | |
| knowledge_base = {} | |
| document_dir = "document/" | |
| # Load all text files | |
| for txt_file in glob.glob(os.path.join(document_dir, "*.txt")): | |
| filename = os.path.basename(txt_file) | |
| try: | |
| with open(txt_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| knowledge_base[filename] = content | |
| # Safe filename encoding for print | |
| safe_filename = filename.encode('ascii', errors='replace').decode('ascii') | |
| print(f"Loaded text document: {safe_filename} ({len(content)} chars)") | |
| except Exception as e: | |
| safe_filename = filename.encode('ascii', errors='replace').decode('ascii') | |
| print(f"Warning: Could not load {safe_filename}: text loading error") | |
| # Load all PDF files | |
| for pdf_file in glob.glob(os.path.join(document_dir, "*.pdf")): | |
| filename = os.path.basename(pdf_file) | |
| try: | |
| reader = PdfReader(pdf_file) | |
| pdf_content = "" | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| pdf_content += text + "\n" | |
| knowledge_base[filename] = pdf_content | |
| # Safe filename encoding for print | |
| safe_filename = filename.encode('utf-8', errors='replace').decode('utf-8') | |
| print(f"Loaded PDF document: {safe_filename} ({len(pdf_content)} chars)") | |
| except Exception as e: | |
| # Handle encoding issues in error messages | |
| safe_filename = filename.encode('ascii', errors='replace').decode('ascii') | |
| print(f"Warning: Could not load PDF {safe_filename}: PDF loading error") | |
| return knowledge_base | |
| def search_relevant_content(self, query): | |
| """Simple RAG retrieval - find most relevant documents based on keyword matching""" | |
| query_lower = query.lower() | |
| relevant_docs = [] | |
| # Keywords for different document types | |
| keywords = { | |
| 'personal': ['duc nguyen', 'linkedin', 'career', 'experience', 'education', 'background', 'profile'], | |
| 'business': ['digitizedbrains', 'company', 'services', 'solutions', 'automation', 'ai agent'], | |
| 'digital_transformation': ['chuyển đổi số', 'digital transformation', 'technology', 'broadcasting', 'htv'], | |
| 'experience': ['kinh nghiệm', 'experience', 'học', 'tham luận', 'diễn đàn'], | |
| 'hunan_broadcasting': ['hồ nam', 'hunan', 'truyền hình', 'broadcasting', 'television', 'đài', 'tập đoàn', 'ngụy văn bân', 'mango', 'bài học', 'lesson', 'kinh nghiệm'] | |
| } | |
| # Score documents based on keyword relevance | |
| doc_scores = defaultdict(int) | |
| for filename, content in self.knowledge_base.items(): | |
| content_lower = content.lower() | |
| # Direct query match | |
| if query_lower in content_lower: | |
| doc_scores[filename] += 10 | |
| # Keyword category matching | |
| for category, category_keywords in keywords.items(): | |
| for keyword in category_keywords: | |
| if keyword in query_lower and keyword in content_lower: | |
| doc_scores[filename] += 5 | |
| # Additional scoring for query words | |
| query_words = query_lower.split() | |
| for word in query_words: | |
| if len(word) > 2 and word in content_lower: | |
| doc_scores[filename] += 2 | |
| # Return top relevant documents | |
| sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True) | |
| # Get top 5 most relevant documents | |
| for filename, score in sorted_docs[:5]: | |
| if score > 0: | |
| relevant_docs.append({ | |
| 'filename': filename, | |
| 'content': self.knowledge_base[filename], | |
| 'score': score | |
| }) | |
| return relevant_docs | |
| def handle_tool_call(self, tool_calls): | |
| results = [] | |
| for tool_call in tool_calls: | |
| tool_name = tool_call.function.name | |
| arguments = json.loads(tool_call.function.arguments) | |
| print(f"Tool called: {tool_name}", flush=True) | |
| tool = globals().get(tool_name) | |
| result = tool(**arguments) if tool else {} | |
| results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id}) | |
| return results | |
| def system_prompt(self, relevant_docs=None): | |
| system_prompt = f"You are {self.chatbot_name}, an AI representative acting on behalf of {self.owner_name}. \ | |
| You are answering questions on {self.owner_name}'s website, representing both {self.owner_name} personally and the {self.chatbot_name} company/brand. \ | |
| \n\nYour responsibilities include: \ | |
| 1. Representing {self.owner_name}'s career, background, skills and experience using his comprehensive knowledge base \ | |
| 2. Representing {self.chatbot_name} as a digital transformation and AI solutions company \ | |
| 3. Answering questions about digital transformation, broadcasting, and technology expertise \ | |
| 4. Using the extensive document knowledge base to provide detailed, accurate responses \ | |
| \n\nYou have access to a comprehensive RAG knowledge base with {len(self.knowledge_base)} documents including: \ | |
| - Personal information about {self.owner_name} (career, LinkedIn, education, experience) \ | |
| - Business information about {self.chatbot_name} (services, solutions, capabilities) \ | |
| - Digital transformation expertise and case studies \ | |
| - Broadcasting and media technology knowledge \ | |
| - Academic papers and industry presentations \ | |
| \n\nBe professional and engaging, using the knowledge base to provide comprehensive answers. \ | |
| When discussing {self.owner_name}, speak about him in first person as his representative. \ | |
| When discussing {self.chatbot_name}, represent the company's capabilities and services. \ | |
| \n\nIf you don't know the answer to any question, use your record_unknown_question tool to record it. \ | |
| Only ask for contact information if the user specifically expresses interest in getting in touch or requests services. Do not proactively push for contact details or add unnecessary calls-to-action about API services." | |
| # Add core information | |
| system_prompt += f"\n\n## Core Information:" | |
| system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:2000]}..." | |
| system_prompt += f"\n\n### {self.chatbot_name} Business Profile:\n{self.digitizedbrains_info[:2000]}..." | |
| # Add relevant documents if provided | |
| if relevant_docs: | |
| system_prompt += f"\n\n## Relevant Knowledge Base Documents:" | |
| for doc in relevant_docs: | |
| system_prompt += f"\n\n### Document: {doc['filename']} (Relevance Score: {doc['score']})\n" | |
| # Truncate content to avoid context limit | |
| content = doc['content'][:3000] + "..." if len(doc['content']) > 3000 else doc['content'] | |
| system_prompt += content | |
| system_prompt += f"\n\nWith this comprehensive RAG knowledge base, please provide detailed and accurate responses as {self.chatbot_name}, \ | |
| representing both {self.owner_name} personally and the {self.chatbot_name} business professionally." | |
| return system_prompt | |
| def chat(self, message, history): | |
| # RAG Retrieval - Find relevant documents for the user's question | |
| relevant_docs = self.search_relevant_content(message) | |
| try: | |
| safe_message = message[:100].encode('ascii', errors='replace').decode('ascii') | |
| print(f"Found {len(relevant_docs)} relevant documents for query: {safe_message}...") | |
| except: | |
| print(f"Found {len(relevant_docs)} relevant documents for user query") | |
| # Generate prompt with relevant context | |
| prompt = self.system_prompt(relevant_docs) + "\n\n" | |
| # Add conversation history | |
| for h in history: | |
| prompt += f"{h['role'].capitalize()}: {h['content']}\n" | |
| prompt += f"User: {message}\nAssistant:" | |
| # Generate response | |
| response = self.model.generate_content(prompt) | |
| reply = response.text | |
| # Tìm email trong message hoặc reply | |
| email_match = re.search(r'[\w\.-]+@[\w\.-]+', message) | |
| if email_match: | |
| email = email_match.group(0) | |
| name = "Contact from website" # hoặc trích xuất tên nếu muốn | |
| notes = f"User provided email via {self.chatbot_name} chat with RAG knowledge base" | |
| record_user_details(email, name, notes) | |
| # Nếu Gemini trả lời không biết, thì ghi lại câu hỏi | |
| if "I don't know" in reply or "I'm not sure" in reply or "Tôi không biết" in reply: | |
| record_unknown_question(message) | |
| return reply | |
| if __name__ == "__main__": | |
| me = Me() | |
| gr.ChatInterface(me.chat, type="messages").launch() |