arabic-chatbot / App /utils.py
itsmehardawood's picture
Fixes
f7111ce
import os
import json
import pickle
from typing import List, Optional, BinaryIO, Dict, Any
from langchain_community.document_loaders import Docx2txtLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document
import tempfile
from pymongo import MongoClient
from bson.binary import Binary
import uuid
import tiktoken
from googleapiclient.discovery import build
import numpy as np
os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers_cache'
os.environ['HF_HOME'] = '/tmp/huggingface_home'
os.environ['HUGGINGFACE_HUB_CACHE'] = '/tmp/huggingface_cache'
# Create the directories
os.makedirs('/tmp/transformers_cache', exist_ok=True)
os.makedirs('/tmp/huggingface_home', exist_ok=True)
os.makedirs('/tmp/huggingface_cache', exist_ok=True)
# MongoDB connection
MONGODB_URI = os.getenv(
"MONGODB_URI",
"mongodb+srv://ahmed0499280:haseeb.2003@cluster0.hzgrxp2.mongodb.net/"
"?retryWrites=true&w=majority&appName=Cluster0"
)
# MongoDB client
client = MongoClient(MONGODB_URI)
db = client["Cluster0"]
chroma_db_collection = db["chroma_db_store"] # Collection for storing Chroma DB
YOUTUBE_API_KEY = "AIzaSyDIaXWJJX2W8swWl093DMyNZ_7TZUGe3DI"
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
# Create a custom in-memory ChromaDB client
class MongoChromaStore:
"""Custom storage for Chroma that uses MongoDB instead of disk"""
@staticmethod
def save_chroma(chroma_db, collection_name="default"):
"""Save a Chroma DB to MongoDB"""
try:
# Extract necessary data from Chroma DB
# This is a simplification - in a real implementation we'd need to extract more data
embeddings = chroma_db._collection.get()
# Prepare data for MongoDB storage
chroma_data = {
"_id": collection_name,
"embeddings": Binary(pickle.dumps(embeddings)),
"last_updated": pickle.dumps(embeddings["metadatas"] if "metadatas" in embeddings else [])
}
# Store or update in MongoDB
chroma_db_collection.replace_one(
{"_id": collection_name},
chroma_data,
upsert=True
)
return True
except Exception as e:
print(f"Error saving Chroma DB to MongoDB: {e}")
return False
def count_tokens(text, model_name="gpt-3.5-turbo"):
"""Count tokens for a text string."""
try:
encoding = tiktoken.encoding_for_model(model_name)
except KeyError:
# Fall back to cl100k_base encoding if model not found
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
# Video search
def search_youtube_video(query: str) -> str:
"""
Search YouTube for the top video matching `query` and return its videoId.
"""
res = (
youtube.search()
.list(
q=query,
part="id,snippet",
type="video",
maxResults=1
)
.execute()
)
items = res.get("items", [])
if not items:
raise ValueError("No video found for query.")
return items[0]["id"]["videoId"]
# Load and split DOCX into chunks (from file path)
def load_and_split(filepath: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]:
loader = Docx2txtLoader(filepath)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
return text_splitter.split_documents(docs)
# Load and split DOCX from bytes (for MongoDB storage)
def load_and_split_bytes(file_bytes: BinaryIO, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]:
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file:
temp_path = temp_file.name
# If file_bytes is a BytesIO object, get its content
if hasattr(file_bytes, 'read'):
file_bytes.seek(0)
content = file_bytes.read()
temp_file.write(content)
else:
# If it's already bytes
temp_file.write(file_bytes)
try:
# Process the temp file
return load_and_split(temp_path, chunk_size, chunk_overlap)
finally:
# Clean up
if os.path.exists(temp_path):
os.remove(temp_path)
# Build Chroma index and save to MongoDB
def build_chroma_index(docs, embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"):
# Create temporary directory for Chroma
temp_dir = tempfile.mkdtemp()
try:
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# Create or update Chroma DB
chroma_db = Chroma.from_documents(
docs,
embeddings,
persist_directory=temp_dir,
collection_name=collection_name
)
# Save Chroma DB to MongoDB
MongoChromaStore.save_chroma(chroma_db, collection_name)
# Return the retriever
return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
finally:
# Clean up temporary directory
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Get existing Chroma DB from MongoDB
def get_existing_retriever(embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"):
# Check if collection exists in MongoDB
chroma_data = chroma_db_collection.find_one({"_id": collection_name})
if not chroma_data:
return None
try:
# Create temporary directory for Chroma
temp_dir = tempfile.mkdtemp()
# Deserialize embeddings from MongoDB
embeddings_data = pickle.loads(chroma_data["embeddings"])
# Use the embeddings to recreate the Chroma DB
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# At this point we would need to reconstruct the Chroma DB
# This is a simplified implementation that doesn't fully work
# In a production system, you would need a more complete solution
# For now, let's create a new Chroma DB and add the documents
# This is not ideal but shows the concept
if "documents" in embeddings_data and embeddings_data["documents"]:
# Create documents from the stored data
docs = []
for i, text in enumerate(embeddings_data["documents"]):
metadata = embeddings_data["metadatas"][i] if "metadatas" in embeddings_data else {}
docs.append(Document(page_content=text, metadata=metadata))
# Create a new Chroma DB
chroma_db = Chroma.from_documents(
docs,
embeddings,
persist_directory=temp_dir,
collection_name=collection_name
)
return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
except Exception as e:
print(f"Error loading Chroma DB from MongoDB: {e}")
return None
finally:
# Clean up temporary directory
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
# Get document count in the collection
def get_collection_stats(collection_name: str = "default"):
# Check if collection exists in MongoDB
chroma_data = chroma_db_collection.find_one({"_id": collection_name})
if not chroma_data:
return {"exists": False, "document_count": 0}
try:
# Deserialize embeddings from MongoDB
embeddings_data = pickle.loads(chroma_data["embeddings"])
# Count documents
doc_count = len(embeddings_data["documents"]) if "documents" in embeddings_data else 0
return {
"exists": True,
"document_count": doc_count
}
except Exception as e:
print(f"Error getting collection stats: {e}")
return {"exists": False, "document_count": 0}
# Instantiate LLM (Google Gemini or OpenAI)
def get_llm(temperature: float = 0.0):
return ChatOpenAI(model="o4-mini", api_key="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")
def create_rag_chain_with_history(retriever, llm, lan, level,diacritics=False, history=None ):
if history is None:
history = []
# Create the base system prompt based on language
if lan.lower() == 'arabic' and diacritics == True:
system_prompt = (
"You are an Assistant for answering questions. "
"Use the following retrieved context snippets to answer. "
"Look for the relevance between the context and the question before answering. "
"If you do not know the answer, say that you do not know. "
"Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. "
"Consider the conversation history when responding. "
"You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. "
"Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. "
"Adjust your explanations according to the student's level: "
"for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; "
"for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; "
"for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. "
"Grammar and contextual explanations should start at the appropriate level and build gradually. "
"Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. "
f"Always respond in {lan} *with all proper diacritics*. "
f"Student level: {level}."
"{context}"
)
elif lan.lower() == 'arabic' and diacritics == False:
system_prompt = (
"You are an Assistant for answering questions. "
"Use the following retrieved context snippets to answer. "
"Look for the relevance between the context and the question before answering. "
"If you do not know the answer, say that you do not know. "
"Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. "
"Consider the conversation history when responding. "
"You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. "
"Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. "
"Adjust your explanations according to the student's level: "
"for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; "
"for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; "
"for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. "
"Grammar and contextual explanations should start at the appropriate level and build gradually. "
"Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. "
f"Always respond in {lan} *without diacritics*"
f"Student level: {level}."
"{context}"
)
else:
system_prompt = (
"You are an Assistant for answering questions. "
"Use the following retrieved context snippets to answer. "
"Look for the relevance between the context and the question before answering. "
"If you do not know the answer, say that you do not know. "
"Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. "
"Consider the conversation history when responding. "
"You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. "
"Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. "
"Adjust your explanations according to the student's level: "
"for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; "
"for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; "
"for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. "
"Grammar and contextual explanations should start at the appropriate level and build gradually. "
"Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. "
f"Always respond in {lan}. "
f"Student level: {level}. "
"{context}"
)
# Create messages with history
messages = [('system', system_prompt)]
# Add conversation history
for message in history:
messages.append((message["role"], message["content"]))
# Add current user query
messages.append(('human', '{input}'))
prompt = ChatPromptTemplate.from_messages(messages)
question_answer_chain = create_stuff_documents_chain(llm, prompt)
return create_retrieval_chain(retriever, question_answer_chain)
# Additional function to add documents to existing index
def add_documents_to_index(docs, embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"):
# Get existing retriever
existing_retriever = get_existing_retriever(embedding_model, collection_name)
# If no existing retriever, create a new one
if not existing_retriever:
return build_chroma_index(docs, embedding_model, collection_name)
# If we have an existing retriever, we need to add documents to it
# This is a simplified implementation
# In a production system, you would need a more complete solution
# Create temporary directory for Chroma
temp_dir = tempfile.mkdtemp()
try:
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# Get existing documents
chroma_data = chroma_db_collection.find_one({"_id": collection_name})
if chroma_data:
embeddings_data = pickle.loads(chroma_data["embeddings"])
# Create documents from the stored data
existing_docs = []
if "documents" in embeddings_data and embeddings_data["documents"]:
for i, text in enumerate(embeddings_data["documents"]):
metadata = embeddings_data["metadatas"][i] if "metadatas" in embeddings_data else {}
existing_docs.append(Document(page_content=text, metadata=metadata))
# Combine with new documents
all_docs = existing_docs + docs
# Create a new Chroma DB with all documents
chroma_db = Chroma.from_documents(
all_docs,
embeddings,
persist_directory=temp_dir,
collection_name=collection_name
)
# Save Chroma DB to MongoDB
MongoChromaStore.save_chroma(chroma_db, collection_name)
return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3})
else:
# If no existing data, create a new index
return build_chroma_index(docs, embedding_model, collection_name)
finally:
# Clean up temporary directory
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)