digitizedgemini / app_gemini.py
ducnguyen1978's picture
Upload folder using huggingface_hub
a3a19b5 verified
from dotenv import load_dotenv
import google.generativeai as genai
import json
import os
import requests
from pypdf import PdfReader
import gradio as gr
import re
import glob
from collections import defaultdict
load_dotenv(override=True)
def push(text):
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Recording {name} with email {email} and notes {notes}")
return {"recorded": "ok"}
def record_unknown_question(question):
push(f"Recording {question}")
return {"recorded": "ok"}
record_user_details_json = {
"name": "record_user_details",
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
"parameters": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "The email address of this user"
},
"name": {
"type": "string",
"description": "The user's name, if they provided it"
},
"notes": {
"type": "string",
"description": "Any additional information about the conversation that's worth recording to give context"
}
},
"required": ["email"],
"additionalProperties": False
}
}
record_unknown_question_json = {
"name": "record_unknown_question",
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that couldn't be answered"
}
},
"required": ["question"],
"additionalProperties": False
}
}
tools = [record_user_details_json, record_unknown_question_json]
class Me:
def __init__(self):
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
self.model = genai.GenerativeModel("gemini-2.0-flash")
self.owner_name = "Duc Nguyen" # Chủ sở hữu website và DigitizedBrains
self.chatbot_name = "DigitizedBrains" # Nhân vật đại diện chatbot
# RAG Knowledge Base - Load all documents
self.knowledge_base = self.load_all_documents()
print(f"Loaded {len(self.knowledge_base)} documents into RAG knowledge base")
# Core information (backwards compatibility)
self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]')
self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]')
self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]')
def load_all_documents(self):
"""Load all documents from the document folder using RAG technique"""
knowledge_base = {}
document_dir = "document/"
# Load all text files
for txt_file in glob.glob(os.path.join(document_dir, "*.txt")):
filename = os.path.basename(txt_file)
try:
with open(txt_file, "r", encoding="utf-8") as f:
content = f.read()
knowledge_base[filename] = content
# Safe filename encoding for print
safe_filename = filename.encode('ascii', errors='replace').decode('ascii')
print(f"Loaded text document: {safe_filename} ({len(content)} chars)")
except Exception as e:
safe_filename = filename.encode('ascii', errors='replace').decode('ascii')
print(f"Warning: Could not load {safe_filename}: text loading error")
# Load all PDF files
for pdf_file in glob.glob(os.path.join(document_dir, "*.pdf")):
filename = os.path.basename(pdf_file)
try:
reader = PdfReader(pdf_file)
pdf_content = ""
for page in reader.pages:
text = page.extract_text()
if text:
pdf_content += text + "\n"
knowledge_base[filename] = pdf_content
# Safe filename encoding for print
safe_filename = filename.encode('utf-8', errors='replace').decode('utf-8')
print(f"Loaded PDF document: {safe_filename} ({len(pdf_content)} chars)")
except Exception as e:
# Handle encoding issues in error messages
safe_filename = filename.encode('ascii', errors='replace').decode('ascii')
print(f"Warning: Could not load PDF {safe_filename}: PDF loading error")
return knowledge_base
def search_relevant_content(self, query):
"""Simple RAG retrieval - find most relevant documents based on keyword matching"""
query_lower = query.lower()
relevant_docs = []
# Keywords for different document types
keywords = {
'personal': ['duc nguyen', 'linkedin', 'career', 'experience', 'education', 'background', 'profile'],
'business': ['digitizedbrains', 'company', 'services', 'solutions', 'automation', 'ai agent'],
'digital_transformation': ['chuyển đổi số', 'digital transformation', 'technology', 'broadcasting', 'htv'],
'experience': ['kinh nghiệm', 'experience', 'học', 'tham luận', 'diễn đàn'],
'hunan_broadcasting': ['hồ nam', 'hunan', 'truyền hình', 'broadcasting', 'television', 'đài', 'tập đoàn', 'ngụy văn bân', 'mango', 'bài học', 'lesson', 'kinh nghiệm']
}
# Score documents based on keyword relevance
doc_scores = defaultdict(int)
for filename, content in self.knowledge_base.items():
content_lower = content.lower()
# Direct query match
if query_lower in content_lower:
doc_scores[filename] += 10
# Keyword category matching
for category, category_keywords in keywords.items():
for keyword in category_keywords:
if keyword in query_lower and keyword in content_lower:
doc_scores[filename] += 5
# Additional scoring for query words
query_words = query_lower.split()
for word in query_words:
if len(word) > 2 and word in content_lower:
doc_scores[filename] += 2
# Return top relevant documents
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# Get top 5 most relevant documents
for filename, score in sorted_docs[:5]:
if score > 0:
relevant_docs.append({
'filename': filename,
'content': self.knowledge_base[filename],
'score': score
})
return relevant_docs
def handle_tool_call(self, tool_calls):
results = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Tool called: {tool_name}", flush=True)
tool = globals().get(tool_name)
result = tool(**arguments) if tool else {}
results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id})
return results
def system_prompt(self, relevant_docs=None):
system_prompt = f"You are {self.chatbot_name}, an AI representative acting on behalf of {self.owner_name}. \
You are answering questions on {self.owner_name}'s website, representing both {self.owner_name} personally and the {self.chatbot_name} company/brand. \
\n\nYour responsibilities include: \
1. Representing {self.owner_name}'s career, background, skills and experience using his comprehensive knowledge base \
2. Representing {self.chatbot_name} as a digital transformation and AI solutions company \
3. Answering questions about digital transformation, broadcasting, and technology expertise \
4. Using the extensive document knowledge base to provide detailed, accurate responses \
\n\nYou have access to a comprehensive RAG knowledge base with {len(self.knowledge_base)} documents including: \
- Personal information about {self.owner_name} (career, LinkedIn, education, experience) \
- Business information about {self.chatbot_name} (services, solutions, capabilities) \
- Digital transformation expertise and case studies \
- Broadcasting and media technology knowledge \
- Academic papers and industry presentations \
\n\nBe professional and engaging, using the knowledge base to provide comprehensive answers. \
When discussing {self.owner_name}, speak about him in first person as his representative. \
When discussing {self.chatbot_name}, represent the company's capabilities and services. \
\n\nIf you don't know the answer to any question, use your record_unknown_question tool to record it. \
Only ask for contact information if the user specifically expresses interest in getting in touch or requests services. Do not proactively push for contact details or add unnecessary calls-to-action about API services."
# Add core information
system_prompt += f"\n\n## Core Information:"
system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:2000]}..."
system_prompt += f"\n\n### {self.chatbot_name} Business Profile:\n{self.digitizedbrains_info[:2000]}..."
# Add relevant documents if provided
if relevant_docs:
system_prompt += f"\n\n## Relevant Knowledge Base Documents:"
for doc in relevant_docs:
system_prompt += f"\n\n### Document: {doc['filename']} (Relevance Score: {doc['score']})\n"
# Truncate content to avoid context limit
content = doc['content'][:3000] + "..." if len(doc['content']) > 3000 else doc['content']
system_prompt += content
system_prompt += f"\n\nWith this comprehensive RAG knowledge base, please provide detailed and accurate responses as {self.chatbot_name}, \
representing both {self.owner_name} personally and the {self.chatbot_name} business professionally."
return system_prompt
def chat(self, message, history):
# RAG Retrieval - Find relevant documents for the user's question
relevant_docs = self.search_relevant_content(message)
try:
safe_message = message[:100].encode('ascii', errors='replace').decode('ascii')
print(f"Found {len(relevant_docs)} relevant documents for query: {safe_message}...")
except:
print(f"Found {len(relevant_docs)} relevant documents for user query")
# Generate prompt with relevant context
prompt = self.system_prompt(relevant_docs) + "\n\n"
# Add conversation history
for h in history:
prompt += f"{h['role'].capitalize()}: {h['content']}\n"
prompt += f"User: {message}\nAssistant:"
# Generate response
response = self.model.generate_content(prompt)
reply = response.text
# Tìm email trong message hoặc reply
email_match = re.search(r'[\w\.-]+@[\w\.-]+', message)
if email_match:
email = email_match.group(0)
name = "Contact from website" # hoặc trích xuất tên nếu muốn
notes = f"User provided email via {self.chatbot_name} chat with RAG knowledge base"
record_user_details(email, name, notes)
# Nếu Gemini trả lời không biết, thì ghi lại câu hỏi
if "I don't know" in reply or "I'm not sure" in reply or "Tôi không biết" in reply:
record_unknown_question(message)
return reply
if __name__ == "__main__":
me = Me()
gr.ChatInterface(me.chat, type="messages").launch()