Spaces:
Sleeping
Sleeping
File size: 6,985 Bytes
531a2b2 fbdfc24 478b91f fbdfc24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# Add project root to Python path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from pymongo import MongoClient, ReadPreference
from pymongo.errors import ServerSelectionTimeoutError, ConnectionFailure
from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch
from langchain_openai import OpenAIEmbeddings
from typing import Dict
import logging
from config.settings import settings
logger = logging.getLogger(__name__)
class MongoDBClient:
def __init__(self):
self.client = None
self.db = None
self.benin_collection = None
self.madagascar_collection = None
self.benin_vectorstore = None
self.madagascar_vectorstore = None
self.embedding_model = None
def connect(self):
"""Connect to MongoDB and initialize collections"""
try:
# CRITICAL FIX: Add read preference to allow reading from secondary nodes
self.client = MongoClient(
settings.MONGO_URI,
# Allow reading from secondary nodes when primary is unavailable
read_preference=ReadPreference.SECONDARY_PREFERRED,
# Reduce timeouts to fail faster (instead of 30s)
serverSelectionTimeoutMS=10000, # 10 seconds
connectTimeoutMS=10000,
socketTimeoutMS=10000,
# Retry configuration
retryWrites=True,
retryReads=True,
# Connection pool settings
maxPoolSize=50,
minPoolSize=10,
# Write concern (for writes to still work)
w='majority',
journal=True
)
# Test the connection
self.client.admin.command('ping')
logger.info("β
MongoDB connection test successful")
self.db = self.client[settings.DATABASE_NAME]
# Initialize collections
self.benin_collection = self.db[settings.BENIN_COLLECTION]
self.madagascar_collection = self.db[settings.MADAGASCAR_COLLECTION]
# Verify collections exist and have data
benin_count = self.benin_collection.count_documents({})
madagascar_count = self.madagascar_collection.count_documents({})
logger.info(f"π BΓ©nin collection: {benin_count} documents")
logger.info(f"π Madagascar collection: {madagascar_count} documents")
# Initialize embedding model
self.embedding_model = OpenAIEmbeddings(
model=settings.EMBEDDING_MODEL,
openai_api_key=settings.OPENAI_API_KEY
)
# Initialize vector stores with read preference
self.benin_vectorstore = MongoDBAtlasVectorSearch(
collection=self.benin_collection,
embedding=self.embedding_model,
index_name=settings.VECTOR_INDEX_NAME,
text_key=settings.TEXT_KEY,
embedding_key=settings.EMBEDDING_KEY,
)
self.madagascar_vectorstore = MongoDBAtlasVectorSearch(
collection=self.madagascar_collection,
embedding=self.embedding_model,
index_name=settings.VECTOR_INDEX_NAME,
text_key=settings.TEXT_KEY,
embedding_key=settings.EMBEDDING_KEY,
)
print("β
MongoDB connected successfully with SECONDARY_PREFERRED read preference")
return True
except (ServerSelectionTimeoutError, ConnectionFailure) as e:
logger.error(f"β MongoDB connection failed: {e}")
logger.error("π Possible issues:")
logger.error(" 1. MongoDB Atlas cluster is paused")
logger.error(" 2. Network connectivity issues")
logger.error(" 3. IP address not whitelisted in Atlas")
logger.error(" 4. Cluster is undergoing maintenance")
print(f"β MongoDB connection failed: {e}")
return False
except Exception as e:
logger.error(f"β Unexpected error during MongoDB connection: {e}")
print(f"β MongoDB connection failed: {e}")
return False
def get_collection_stats(self) -> Dict:
"""Get statistics for both collections"""
if not self.client:
return {}
try:
benin_count = self.benin_collection.count_documents({})
madagascar_count = self.madagascar_collection.count_documents({})
# Sample document to check schema
benin_sample = self.benin_collection.find_one()
madagascar_sample = self.madagascar_collection.find_one()
# Check for documents by doc_type
benin_case_study_count = self.benin_collection.count_documents({"doc_type": "case_study"})
benin_articles_count = self.benin_collection.count_documents({"doc_type": "articles"})
madagascar_case_study_count = self.madagascar_collection.count_documents({"doc_type": "case_study"})
madagascar_articles_count = self.madagascar_collection.count_documents({"doc_type": "articles"})
return {
"benin": {
"total_documents": benin_count,
"case_study_count": benin_case_study_count,
"articles_count": benin_articles_count,
"has_embeddings": bool(benin_sample and 'vecteur_embedding' in benin_sample),
"sample_fields": list(benin_sample.keys()) if benin_sample else [],
"sample_doc_type": benin_sample.get('doc_type', 'NOT_SET') if benin_sample else None
},
"madagascar": {
"total_documents": madagascar_count,
"case_study_count": madagascar_case_study_count,
"articles_count": madagascar_articles_count,
"has_embeddings": bool(madagascar_sample and 'vecteur_embedding' in madagascar_sample),
"sample_fields": list(madagascar_sample.keys()) if madagascar_sample else [],
"sample_doc_type": madagascar_sample.get('doc_type', 'NOT_SET') if madagascar_sample else None
}
}
except Exception as e:
logger.error(f"Error getting collection stats: {e}")
print(f"Error getting collection stats: {e}")
return {}
def close(self):
"""Close MongoDB connection"""
if self.client:
self.client.close()
logger.info("β
MongoDB connection closed")
print("β
MongoDB connection closed") |