digitizedgemini / app.py
ducnguyen1978's picture
Upload folder using huggingface_hub
a3a19b5 verified
import google.generativeai as genai
import json
import os
import requests
import gradio as gr
import re
import glob
from collections import defaultdict
# Configure Gemini API - Use environment variables for security
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
def push(text):
try:
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
except:
print(f"Push notification: {text}")
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Recording {name} with email {email} and notes {notes}")
return {"recorded": "ok"}
def record_unknown_question(question):
push(f"Recording {question}")
return {"recorded": "ok"}
record_user_details_json = {
"name": "record_user_details",
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
"parameters": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "The email address of this user"
},
"name": {
"type": "string",
"description": "The user's name, if they provided it"
},
"notes": {
"type": "string",
"description": "Any additional information about the conversation that's worth recording to give context"
}
},
"required": ["email"],
"additionalProperties": False
}
}
record_unknown_question_json = {
"name": "record_unknown_question",
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that couldn't be answered"
}
},
"required": ["question"],
"additionalProperties": False
}
}
tools = [record_user_details_json, record_unknown_question_json]
class Me:
def __init__(self):
self.model = genai.GenerativeModel("gemini-1.5-flash")
self.owner_name = "Duc Nguyen"
self.chatbot_name = "DigitizedBrains"
# RAG Knowledge Base - Load text documents only (fast loading)
self.knowledge_base = self.load_text_documents()
print(f"Loaded {len(self.knowledge_base)} text documents into RAG knowledge base")
# Core information
self.linkedin = self.knowledge_base.get('linkedin_profile.txt', '[LinkedIn profile not found]')
self.summary = self.knowledge_base.get('summary.txt', '[Summary not found]')
self.digitizedbrains_info = self.knowledge_base.get('digitizedbrains_profile.txt', '[DigitizedBrains profile not found]')
def load_text_documents(self):
"""Load only text documents for fast startup"""
knowledge_base = {}
document_dir = "document/"
# Load all text files (fast)
for txt_file in glob.glob(os.path.join(document_dir, "*.txt")):
filename = os.path.basename(txt_file)
try:
with open(txt_file, "r", encoding="utf-8") as f:
content = f.read()
knowledge_base[filename] = content
print(f"Loaded: {filename} ({len(content)} chars)")
except Exception as e:
print(f"Failed: {filename}")
return knowledge_base
def search_relevant_content(self, query):
"""Simple RAG retrieval based on keyword matching"""
query_lower = query.lower()
relevant_docs = []
# Score documents based on relevance
doc_scores = defaultdict(int)
for filename, content in self.knowledge_base.items():
content_lower = content.lower()
# Direct query match (highest score)
if query_lower in content_lower:
doc_scores[filename] += 10
# Word-by-word matching
query_words = query_lower.split()
for word in query_words:
if len(word) > 2 and word in content_lower:
doc_scores[filename] += 2
# Return top relevant documents
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# Get top 3 most relevant documents
for filename, score in sorted_docs[:3]:
if score > 0:
relevant_docs.append({
'filename': filename,
'content': self.knowledge_base[filename],
'score': score
})
return relevant_docs
def system_prompt(self, relevant_docs=None):
system_prompt = f"You are {self.chatbot_name}, an AI representative for {self.owner_name}. \
You represent both {self.owner_name} personally and {self.chatbot_name} company. \
\n\nYou have access to a comprehensive knowledge base with {len(self.knowledge_base)} documents. \
Be professional, engaging, and use the knowledge base to provide accurate responses. \
\n\nIf you don't know something, use record_unknown_question tool. \
If users provide emails, use record_user_details tool."
# Add core information (truncated for context limit)
system_prompt += f"\n\n## Core Information:"
system_prompt += f"\n### {self.owner_name}'s Summary:\n{self.summary[:800]}..."
system_prompt += f"\n\n### {self.chatbot_name} Business:\n{self.digitizedbrains_info[:800]}..."
# Add relevant documents
if relevant_docs:
system_prompt += f"\n\n## Relevant Documents:"
for doc in relevant_docs:
system_prompt += f"\n\n### {doc['filename']} (Score: {doc['score']}):\n"
content = doc['content'][:1500] + "..." if len(doc['content']) > 1500 else doc['content']
system_prompt += content
return system_prompt
def chat(self, message, history):
# RAG Retrieval
relevant_docs = self.search_relevant_content(message)
print(f"\nQuery: {message[:50]}...")
print(f"Found {len(relevant_docs)} relevant documents:")
for doc in relevant_docs:
print(f" - {doc['filename']} (score: {doc['score']})")
# Generate response
prompt = self.system_prompt(relevant_docs) + "\n\n"
# Add conversation history
for h in history:
prompt += f"{h['role'].capitalize()}: {h['content']}\n"
prompt += f"User: {message}\nAssistant:"
try:
response = self.model.generate_content(prompt)
reply = response.text
except Exception as e:
reply = f"Xin lỗi, tôi gặp lỗi khi xử lý câu hỏi của bạn. Vui lòng thử lại. Error: {str(e)}"
# Email detection
email_match = re.search(r'[\w\.-]+@[\w\.-]+', message)
if email_match:
email = email_match.group(0)
record_user_details(email, "Website Contact", f"RAG chat: {message[:100]}")
# Unknown question detection
if "I don't know" in reply or "không biết" in reply.lower():
record_unknown_question(message)
return reply
# Initialize the chatbot
print("Starting RAG-Enhanced DigitizedBrains Chatbot...")
me = Me()
print("\n" + "="*60)
print("RAG-ENHANCED DIGITIZEDBRAINS CHATBOT READY!")
print("="*60)
print("Features:")
print(" - RAG-based knowledge retrieval")
print(" - Multi-document search")
print(" - Intelligent response generation")
print(" - Lead capture & unknown question tracking")
print("="*60)
# Launch Gradio interface
iface = gr.ChatInterface(
me.chat,
type="messages",
title="DigitizedBrains RAG Chatbot",
description="AI-powered chatbot with comprehensive knowledge base about Duc Nguyen and DigitizedBrains services."
)
if __name__ == "__main__":
iface.launch(share=False, server_name="0.0.0.0")