Spaces:
Sleeping
Sleeping
Commit
·
339c97b
1
Parent(s):
ca2bd00
Bug fix
Browse files
__pycache__/app.cpython-311.pyc
CHANGED
|
Binary files a/__pycache__/app.cpython-311.pyc and b/__pycache__/app.cpython-311.pyc differ
|
|
|
src/api/routes.py
CHANGED
|
@@ -74,45 +74,66 @@ def _execute_mongo_op_in_thread(op_spec: Dict, client: MongoClient, database_nam
|
|
| 74 |
Executes a single MongoDB operation.
|
| 75 |
This function is intended to be run in a separate thread.
|
| 76 |
"""
|
| 77 |
-
|
| 78 |
-
|
| 79 |
-
|
| 80 |
-
|
| 81 |
-
|
| 82 |
-
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
|
| 86 |
-
|
| 87 |
-
|
| 88 |
-
|
| 89 |
-
|
| 90 |
-
|
| 91 |
-
|
| 92 |
-
|
| 93 |
-
|
| 94 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 95 |
|
| 96 |
# Batch processing for MongoDB operations
|
| 97 |
async def batch_mongodb_operations(operations: List[Dict]) -> None:
|
| 98 |
"""Execute MongoDB operations in batches."""
|
| 99 |
batch_size = 50
|
| 100 |
try:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 101 |
# Attempt to get DB name from the existing mongodb setup
|
| 102 |
-
|
| 103 |
-
|
| 104 |
-
|
| 105 |
-
|
| 106 |
-
|
| 107 |
-
|
| 108 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 109 |
|
| 110 |
for i in range(0, len(operations), batch_size):
|
| 111 |
batch = operations[i:i + batch_size]
|
| 112 |
-
|
| 113 |
-
asyncio.
|
| 114 |
-
|
| 115 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 116 |
|
| 117 |
#Getting recommendation using without user association
|
| 118 |
@router.get("/recommendations/", response_model=RecommendationResponse)
|
|
@@ -189,21 +210,26 @@ async def get_recommendations_by_id_endpoint(
|
|
| 189 |
# Add msid to the response
|
| 190 |
recommendations_data["msid"] = msid
|
| 191 |
|
| 192 |
-
# Store session in MongoDB asynchronously using batch processing
|
| 193 |
-
|
| 194 |
-
|
| 195 |
-
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
await batch_mongodb_operations([
|
| 199 |
-
{
|
| 200 |
-
"collection": "sessions",
|
| 201 |
-
"operation": "update_one",
|
| 202 |
-
"filter": {"user_id": user_id},
|
| 203 |
-
"update": {"$set": session_data},
|
| 204 |
-
"upsert": True
|
| 205 |
}
|
| 206 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 207 |
|
| 208 |
end_time = time.time()
|
| 209 |
logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms")
|
|
@@ -275,6 +301,11 @@ async def feedback_recommendation_endpoint(
|
|
| 275 |
|
| 276 |
# Store feedback in MongoDB
|
| 277 |
try:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 278 |
# First, try to find if the user document exists
|
| 279 |
user_doc = await asyncio.get_event_loop().run_in_executor(
|
| 280 |
thread_pool,
|
|
@@ -376,29 +407,33 @@ async def get_recommendations_with_summary_endpoint(
|
|
| 376 |
articles_details_map = {}
|
| 377 |
|
| 378 |
if doc_ids_to_fetch and (include_summary or include_smart_tip):
|
| 379 |
-
|
| 380 |
-
|
| 381 |
-
|
| 382 |
-
|
| 383 |
-
|
| 384 |
-
|
| 385 |
-
|
| 386 |
-
|
| 387 |
-
|
| 388 |
-
|
| 389 |
-
|
| 390 |
-
|
| 391 |
-
|
| 392 |
-
|
| 393 |
-
|
| 394 |
-
|
| 395 |
-
|
| 396 |
-
|
| 397 |
-
|
| 398 |
-
|
| 399 |
-
|
| 400 |
-
article
|
| 401 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 402 |
|
| 403 |
# Process documents in parallel using asyncio.gather
|
| 404 |
async def process_doc_batch(docs_batch):
|
|
|
|
| 74 |
Executes a single MongoDB operation.
|
| 75 |
This function is intended to be run in a separate thread.
|
| 76 |
"""
|
| 77 |
+
try:
|
| 78 |
+
if client is None:
|
| 79 |
+
logger.warning("MongoDB client is None. Skipping operation.")
|
| 80 |
+
return
|
| 81 |
+
|
| 82 |
+
db = client[database_name]
|
| 83 |
+
collection = db[op_spec["collection"]]
|
| 84 |
+
operation_name = op_spec["operation"]
|
| 85 |
+
|
| 86 |
+
if operation_name == "update_one":
|
| 87 |
+
collection.update_one(
|
| 88 |
+
op_spec["filter"],
|
| 89 |
+
op_spec["update"],
|
| 90 |
+
upsert=op_spec.get("upsert", False),
|
| 91 |
+
array_filters=op_spec.get("array_filters")
|
| 92 |
+
)
|
| 93 |
+
elif operation_name == "insert_one":
|
| 94 |
+
# Ensure 'document' key exists for insert_one
|
| 95 |
+
collection.insert_one(op_spec["document"])
|
| 96 |
+
# Add other specific operations as needed (e.g., find_one, delete_one)
|
| 97 |
+
else:
|
| 98 |
+
logger.error(f"Unsupported MongoDB operation in batch: {operation_name}")
|
| 99 |
+
raise ValueError(f"Unsupported MongoDB operation: {operation_name}")
|
| 100 |
+
except Exception as e:
|
| 101 |
+
logger.error(f"Error executing MongoDB operation {op_spec.get('operation', 'unknown')}: {e}")
|
| 102 |
+
# Don't raise the exception to avoid failing the entire batch
|
| 103 |
|
| 104 |
# Batch processing for MongoDB operations
|
| 105 |
async def batch_mongodb_operations(operations: List[Dict]) -> None:
|
| 106 |
"""Execute MongoDB operations in batches."""
|
| 107 |
batch_size = 50
|
| 108 |
try:
|
| 109 |
+
# Check if MongoDB is available
|
| 110 |
+
if mongodb.db is None:
|
| 111 |
+
logger.warning("MongoDB not available. Skipping batch operations.")
|
| 112 |
+
return
|
| 113 |
+
|
| 114 |
# Attempt to get DB name from the existing mongodb setup
|
| 115 |
+
try:
|
| 116 |
+
db_name = mongodb.news_collection.database.name
|
| 117 |
+
except AttributeError:
|
| 118 |
+
logger.warning(
|
| 119 |
+
"Could not determine database name from mongodb.news_collection. "
|
| 120 |
+
"Using fallback 'recommender_db'. Please configure DB name properly."
|
| 121 |
+
)
|
| 122 |
+
db_name = "recommender_db" # FIXME: This should be configured via settings or a central DB config
|
| 123 |
+
except Exception as e:
|
| 124 |
+
logger.warning(f"Could not access MongoDB: {e}. Skipping batch operations.")
|
| 125 |
+
return
|
| 126 |
|
| 127 |
for i in range(0, len(operations), batch_size):
|
| 128 |
batch = operations[i:i + batch_size]
|
| 129 |
+
try:
|
| 130 |
+
await asyncio.gather(*[
|
| 131 |
+
asyncio.to_thread(_execute_mongo_op_in_thread, op, mongodb._client, db_name)
|
| 132 |
+
for op in batch
|
| 133 |
+
])
|
| 134 |
+
except Exception as e:
|
| 135 |
+
logger.error(f"Error executing batch MongoDB operations: {e}")
|
| 136 |
+
# Don't raise the exception to avoid failing the entire request
|
| 137 |
|
| 138 |
#Getting recommendation using without user association
|
| 139 |
@router.get("/recommendations/", response_model=RecommendationResponse)
|
|
|
|
| 210 |
# Add msid to the response
|
| 211 |
recommendations_data["msid"] = msid
|
| 212 |
|
| 213 |
+
# Store session in MongoDB asynchronously using batch processing (optional)
|
| 214 |
+
try:
|
| 215 |
+
session_data = {
|
| 216 |
+
"user_id": user_id,
|
| 217 |
+
"recommendations": recommendations_data.get("retrieved_documents", []),
|
| 218 |
+
"timestamp": datetime.now()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 219 |
}
|
| 220 |
+
await batch_mongodb_operations([
|
| 221 |
+
{
|
| 222 |
+
"collection": "sessions",
|
| 223 |
+
"operation": "update_one",
|
| 224 |
+
"filter": {"user_id": user_id},
|
| 225 |
+
"update": {"$set": session_data},
|
| 226 |
+
"upsert": True
|
| 227 |
+
}
|
| 228 |
+
])
|
| 229 |
+
logger.info(f"Session data saved to MongoDB for user {user_id}")
|
| 230 |
+
except Exception as mongo_error:
|
| 231 |
+
logger.warning(f"Could not save session data to MongoDB: {mongo_error}")
|
| 232 |
+
# Don't fail the request if MongoDB is unavailable
|
| 233 |
|
| 234 |
end_time = time.time()
|
| 235 |
logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms")
|
|
|
|
| 301 |
|
| 302 |
# Store feedback in MongoDB
|
| 303 |
try:
|
| 304 |
+
# Check if MongoDB is available
|
| 305 |
+
if mongodb.db is None or mongodb.news_collection is None:
|
| 306 |
+
logger.warning("MongoDB not available. Skipping feedback storage.")
|
| 307 |
+
return {"message": "Response processed successfully (feedback storage unavailable)"}
|
| 308 |
+
|
| 309 |
# First, try to find if the user document exists
|
| 310 |
user_doc = await asyncio.get_event_loop().run_in_executor(
|
| 311 |
thread_pool,
|
|
|
|
| 407 |
articles_details_map = {}
|
| 408 |
|
| 409 |
if doc_ids_to_fetch and (include_summary or include_smart_tip):
|
| 410 |
+
try:
|
| 411 |
+
projection = {"_id": 0, "id": 1}
|
| 412 |
+
if include_summary:
|
| 413 |
+
projection.update({"story": 1, "syn": 1}) # Add syn field as fallback
|
| 414 |
+
if include_smart_tip:
|
| 415 |
+
projection.update({"seolocation": 1, "tn": 1, "hl": 1})
|
| 416 |
+
|
| 417 |
+
# Use batch size of 50 for MongoDB queries
|
| 418 |
+
batch_size = 50
|
| 419 |
+
for i in range(0, len(doc_ids_to_fetch), batch_size):
|
| 420 |
+
batch_ids = doc_ids_to_fetch[i:i + batch_size]
|
| 421 |
+
fetched_articles_list = await asyncio.get_event_loop().run_in_executor(
|
| 422 |
+
thread_pool,
|
| 423 |
+
lambda: list(mongodb.news_collection.find(
|
| 424 |
+
{"id": {"$in": batch_ids}},
|
| 425 |
+
projection
|
| 426 |
+
))
|
| 427 |
+
)
|
| 428 |
+
for article in fetched_articles_list:
|
| 429 |
+
if article.get("id"):
|
| 430 |
+
# Use synopsis as fallback if story is not available
|
| 431 |
+
if include_summary and not article.get("story") and article.get("syn"):
|
| 432 |
+
article["story"] = article["syn"]
|
| 433 |
+
articles_details_map[article["id"]] = article
|
| 434 |
+
except Exception as e:
|
| 435 |
+
logger.warning(f"Could not fetch article details from MongoDB: {e}")
|
| 436 |
+
# Continue without article details - recommendations will still work
|
| 437 |
|
| 438 |
# Process documents in parallel using asyncio.gather
|
| 439 |
async def process_doc_batch(docs_batch):
|
src/core/__pycache__/recommender.cpython-311.pyc
CHANGED
|
Binary files a/src/core/__pycache__/recommender.cpython-311.pyc and b/src/core/__pycache__/recommender.cpython-311.pyc differ
|
|
|
src/database/__pycache__/mongodb.cpython-311.pyc
CHANGED
|
Binary files a/src/database/__pycache__/mongodb.cpython-311.pyc and b/src/database/__pycache__/mongodb.cpython-311.pyc differ
|
|
|