Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import pickle | |
| from typing import List, Optional, BinaryIO, Dict, Any | |
| from langchain_community.document_loaders import Docx2txtLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_openai import ChatOpenAI | |
| from langchain.chains import create_retrieval_chain | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.schema import Document | |
| import tempfile | |
| from pymongo import MongoClient | |
| from bson.binary import Binary | |
| import uuid | |
| import tiktoken | |
| from googleapiclient.discovery import build | |
| import numpy as np | |
| os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers_cache' | |
| os.environ['HF_HOME'] = '/tmp/huggingface_home' | |
| os.environ['HUGGINGFACE_HUB_CACHE'] = '/tmp/huggingface_cache' | |
| # Create the directories | |
| os.makedirs('/tmp/transformers_cache', exist_ok=True) | |
| os.makedirs('/tmp/huggingface_home', exist_ok=True) | |
| os.makedirs('/tmp/huggingface_cache', exist_ok=True) | |
| # MongoDB connection | |
| MONGODB_URI = os.getenv( | |
| "MONGODB_URI", | |
| "mongodb+srv://ahmed0499280:haseeb.2003@cluster0.hzgrxp2.mongodb.net/" | |
| "?retryWrites=true&w=majority&appName=Cluster0" | |
| ) | |
| # MongoDB client | |
| client = MongoClient(MONGODB_URI) | |
| db = client["Cluster0"] | |
| chroma_db_collection = db["chroma_db_store"] # Collection for storing Chroma DB | |
| YOUTUBE_API_KEY = "AIzaSyDIaXWJJX2W8swWl093DMyNZ_7TZUGe3DI" | |
| youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
| # Create a custom in-memory ChromaDB client | |
| class MongoChromaStore: | |
| """Custom storage for Chroma that uses MongoDB instead of disk""" | |
| def save_chroma(chroma_db, collection_name="default"): | |
| """Save a Chroma DB to MongoDB""" | |
| try: | |
| # Extract necessary data from Chroma DB | |
| # This is a simplification - in a real implementation we'd need to extract more data | |
| embeddings = chroma_db._collection.get() | |
| # Prepare data for MongoDB storage | |
| chroma_data = { | |
| "_id": collection_name, | |
| "embeddings": Binary(pickle.dumps(embeddings)), | |
| "last_updated": pickle.dumps(embeddings["metadatas"] if "metadatas" in embeddings else []) | |
| } | |
| # Store or update in MongoDB | |
| chroma_db_collection.replace_one( | |
| {"_id": collection_name}, | |
| chroma_data, | |
| upsert=True | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error saving Chroma DB to MongoDB: {e}") | |
| return False | |
| def count_tokens(text, model_name="gpt-3.5-turbo"): | |
| """Count tokens for a text string.""" | |
| try: | |
| encoding = tiktoken.encoding_for_model(model_name) | |
| except KeyError: | |
| # Fall back to cl100k_base encoding if model not found | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| return len(encoding.encode(text)) | |
| # Video search | |
| def search_youtube_video(query: str) -> str: | |
| """ | |
| Search YouTube for the top video matching `query` and return its videoId. | |
| """ | |
| res = ( | |
| youtube.search() | |
| .list( | |
| q=query, | |
| part="id,snippet", | |
| type="video", | |
| maxResults=1 | |
| ) | |
| .execute() | |
| ) | |
| items = res.get("items", []) | |
| if not items: | |
| raise ValueError("No video found for query.") | |
| return items[0]["id"]["videoId"] | |
| # Load and split DOCX into chunks (from file path) | |
| def load_and_split(filepath: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]: | |
| loader = Docx2txtLoader(filepath) | |
| docs = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| return text_splitter.split_documents(docs) | |
| # Load and split DOCX from bytes (for MongoDB storage) | |
| def load_and_split_bytes(file_bytes: BinaryIO, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Document]: | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file: | |
| temp_path = temp_file.name | |
| # If file_bytes is a BytesIO object, get its content | |
| if hasattr(file_bytes, 'read'): | |
| file_bytes.seek(0) | |
| content = file_bytes.read() | |
| temp_file.write(content) | |
| else: | |
| # If it's already bytes | |
| temp_file.write(file_bytes) | |
| try: | |
| # Process the temp file | |
| return load_and_split(temp_path, chunk_size, chunk_overlap) | |
| finally: | |
| # Clean up | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| # Build Chroma index and save to MongoDB | |
| def build_chroma_index(docs, embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"): | |
| # Create temporary directory for Chroma | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| embeddings = HuggingFaceEmbeddings(model_name=embedding_model) | |
| # Create or update Chroma DB | |
| chroma_db = Chroma.from_documents( | |
| docs, | |
| embeddings, | |
| persist_directory=temp_dir, | |
| collection_name=collection_name | |
| ) | |
| # Save Chroma DB to MongoDB | |
| MongoChromaStore.save_chroma(chroma_db, collection_name) | |
| # Return the retriever | |
| return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
| finally: | |
| # Clean up temporary directory | |
| import shutil | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| # Get existing Chroma DB from MongoDB | |
| def get_existing_retriever(embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"): | |
| # Check if collection exists in MongoDB | |
| chroma_data = chroma_db_collection.find_one({"_id": collection_name}) | |
| if not chroma_data: | |
| return None | |
| try: | |
| # Create temporary directory for Chroma | |
| temp_dir = tempfile.mkdtemp() | |
| # Deserialize embeddings from MongoDB | |
| embeddings_data = pickle.loads(chroma_data["embeddings"]) | |
| # Use the embeddings to recreate the Chroma DB | |
| embeddings = HuggingFaceEmbeddings(model_name=embedding_model) | |
| # At this point we would need to reconstruct the Chroma DB | |
| # This is a simplified implementation that doesn't fully work | |
| # In a production system, you would need a more complete solution | |
| # For now, let's create a new Chroma DB and add the documents | |
| # This is not ideal but shows the concept | |
| if "documents" in embeddings_data and embeddings_data["documents"]: | |
| # Create documents from the stored data | |
| docs = [] | |
| for i, text in enumerate(embeddings_data["documents"]): | |
| metadata = embeddings_data["metadatas"][i] if "metadatas" in embeddings_data else {} | |
| docs.append(Document(page_content=text, metadata=metadata)) | |
| # Create a new Chroma DB | |
| chroma_db = Chroma.from_documents( | |
| docs, | |
| embeddings, | |
| persist_directory=temp_dir, | |
| collection_name=collection_name | |
| ) | |
| return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
| except Exception as e: | |
| print(f"Error loading Chroma DB from MongoDB: {e}") | |
| return None | |
| finally: | |
| # Clean up temporary directory | |
| import shutil | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) | |
| # Get document count in the collection | |
| def get_collection_stats(collection_name: str = "default"): | |
| # Check if collection exists in MongoDB | |
| chroma_data = chroma_db_collection.find_one({"_id": collection_name}) | |
| if not chroma_data: | |
| return {"exists": False, "document_count": 0} | |
| try: | |
| # Deserialize embeddings from MongoDB | |
| embeddings_data = pickle.loads(chroma_data["embeddings"]) | |
| # Count documents | |
| doc_count = len(embeddings_data["documents"]) if "documents" in embeddings_data else 0 | |
| return { | |
| "exists": True, | |
| "document_count": doc_count | |
| } | |
| except Exception as e: | |
| print(f"Error getting collection stats: {e}") | |
| return {"exists": False, "document_count": 0} | |
| # Instantiate LLM (Google Gemini or OpenAI) | |
| def get_llm(temperature: float = 0.0): | |
| return ChatOpenAI(model="o4-mini", api_key="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") | |
| def create_rag_chain_with_history(retriever, llm, lan, level,diacritics=False, history=None ): | |
| if history is None: | |
| history = [] | |
| # Create the base system prompt based on language | |
| if lan.lower() == 'arabic' and diacritics == True: | |
| system_prompt = ( | |
| "You are an Assistant for answering questions. " | |
| "Use the following retrieved context snippets to answer. " | |
| "Look for the relevance between the context and the question before answering. " | |
| "If you do not know the answer, say that you do not know. " | |
| "Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. " | |
| "Consider the conversation history when responding. " | |
| "You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. " | |
| "Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. " | |
| "Adjust your explanations according to the student's level: " | |
| "for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; " | |
| "for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; " | |
| "for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. " | |
| "Grammar and contextual explanations should start at the appropriate level and build gradually. " | |
| "Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. " | |
| f"Always respond in {lan} *with all proper diacritics*. " | |
| f"Student level: {level}." | |
| "{context}" | |
| ) | |
| elif lan.lower() == 'arabic' and diacritics == False: | |
| system_prompt = ( | |
| "You are an Assistant for answering questions. " | |
| "Use the following retrieved context snippets to answer. " | |
| "Look for the relevance between the context and the question before answering. " | |
| "If you do not know the answer, say that you do not know. " | |
| "Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. " | |
| "Consider the conversation history when responding. " | |
| "You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. " | |
| "Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. " | |
| "Adjust your explanations according to the student's level: " | |
| "for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; " | |
| "for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; " | |
| "for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. " | |
| "Grammar and contextual explanations should start at the appropriate level and build gradually. " | |
| "Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. " | |
| f"Always respond in {lan} *without diacritics*" | |
| f"Student level: {level}." | |
| "{context}" | |
| ) | |
| else: | |
| system_prompt = ( | |
| "You are an Assistant for answering questions. " | |
| "Use the following retrieved context snippets to answer. " | |
| "Look for the relevance between the context and the question before answering. " | |
| "If you do not know the answer, say that you do not know. " | |
| "Be polite, act like a teacher, and provide as detailed an answer as possible based on the context. " | |
| "Consider the conversation history when responding. " | |
| "You are designed to help Muslims learn Arabic, so explanations should be culturally respectful and appropriate. " | |
| "Be responsive to the user's needs—if the user seems stuck or confused during the chat, proactively offer helpful suggestions, clarifications, or encouragement. " | |
| "Adjust your explanations according to the student's level: " | |
| "for 'beginner', use very simple language, break down grammar and context step-by-step, and give clear examples; " | |
| "for 'intermediate', provide more detailed grammar and usage insights with moderate complexity; " | |
| "for 'advanced', include deeper linguistic explanations, nuanced examples, and encourage self-reflection. " | |
| "Grammar and contextual explanations should start at the appropriate level and build gradually. " | |
| "Include examples from the connected knowledge base when possible; otherwise, generate clear and relevant examples yourself. " | |
| f"Always respond in {lan}. " | |
| f"Student level: {level}. " | |
| "{context}" | |
| ) | |
| # Create messages with history | |
| messages = [('system', system_prompt)] | |
| # Add conversation history | |
| for message in history: | |
| messages.append((message["role"], message["content"])) | |
| # Add current user query | |
| messages.append(('human', '{input}')) | |
| prompt = ChatPromptTemplate.from_messages(messages) | |
| question_answer_chain = create_stuff_documents_chain(llm, prompt) | |
| return create_retrieval_chain(retriever, question_answer_chain) | |
| # Additional function to add documents to existing index | |
| def add_documents_to_index(docs, embedding_model: str = "Omartificial-Intelligence-Space/GATE-AraBert-v1", collection_name: str = "default"): | |
| # Get existing retriever | |
| existing_retriever = get_existing_retriever(embedding_model, collection_name) | |
| # If no existing retriever, create a new one | |
| if not existing_retriever: | |
| return build_chroma_index(docs, embedding_model, collection_name) | |
| # If we have an existing retriever, we need to add documents to it | |
| # This is a simplified implementation | |
| # In a production system, you would need a more complete solution | |
| # Create temporary directory for Chroma | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| embeddings = HuggingFaceEmbeddings(model_name=embedding_model) | |
| # Get existing documents | |
| chroma_data = chroma_db_collection.find_one({"_id": collection_name}) | |
| if chroma_data: | |
| embeddings_data = pickle.loads(chroma_data["embeddings"]) | |
| # Create documents from the stored data | |
| existing_docs = [] | |
| if "documents" in embeddings_data and embeddings_data["documents"]: | |
| for i, text in enumerate(embeddings_data["documents"]): | |
| metadata = embeddings_data["metadatas"][i] if "metadatas" in embeddings_data else {} | |
| existing_docs.append(Document(page_content=text, metadata=metadata)) | |
| # Combine with new documents | |
| all_docs = existing_docs + docs | |
| # Create a new Chroma DB with all documents | |
| chroma_db = Chroma.from_documents( | |
| all_docs, | |
| embeddings, | |
| persist_directory=temp_dir, | |
| collection_name=collection_name | |
| ) | |
| # Save Chroma DB to MongoDB | |
| MongoChromaStore.save_chroma(chroma_db, collection_name) | |
| return chroma_db.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
| else: | |
| # If no existing data, create a new index | |
| return build_chroma_index(docs, embedding_model, collection_name) | |
| finally: | |
| # Clean up temporary directory | |
| import shutil | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) |