from fastapi import APIRouter, HTTPException, Depends, Query, Path from typing import Optional, Dict, List import logging import asyncio from datetime import datetime import uuid from functools import lru_cache import hashlib import time from concurrent.futures import ThreadPoolExecutor from pymongo import MongoClient from pymongo.errors import ConnectionFailure import threading from src.models.schemas import ( MsgPayload, RecommendationRequest, UserItemRecommendationRequest, RecommendationResponse, OutputResponse, OutputData, FeedbackRecommendationRequest, PastFeedbackItem, # Added PastFeedbackItem SmartTip, SmartTipSuggestion, RecommendationResponseWithSummary, RetrievedDocumentWithSummary # Added RetrievedDocumentWithSummary ) from src.core.recommender import recommender from src.database.mongodb import mongodb from src.config.settings import ( EMBED_MODEL_NAME, GENERATOR_MODEL_NAME, RERANKER_MODEL_NAME, INDEX_PATH, INTERACTION_LOG_PATH, INDIC_NLP_RESOURCES_PATH, HEADLINE_COL, ID_COL, TOPIC_COL, PROPERTY_COL, DEFAULT_K, SIMILARITY_THRESHOLD, CANDIDATE_MULTIPLIER ) from src.test_summarize import get_summary_points # Add this import at the top with other imports logger = logging.getLogger(__name__) router = APIRouter() # In-memory storage for messages (consider moving to MongoDB if needed) messages_list: dict[int, MsgPayload] = {} # Create a thread pool for batch processing thread_pool = ThreadPoolExecutor(max_workers=8) # Add MongoDB connection pooling mongodb_client = MongoClient( host='localhost', port=27017, maxPoolSize=50, minPoolSize=10, maxIdleTimeMS=30000, waitQueueTimeoutMS=10000 ) # Add caching for frequently accessed data @lru_cache(maxsize=1000) def get_cached_recommendations(query: str, k: int) -> Dict: return recommender.get_recommendations(query, k) @lru_cache(maxsize=1000) def get_cached_recommendations_by_id(msid: str, k: int) -> Dict: return recommender.get_recommendations_by_id(msid, k) @lru_cache(maxsize=1000) def get_cached_recommendations_user_feedback(user_id: str, msid: str, clicked_msid: str, k: int) -> Dict: return recommender.get_recommendations_user_feedback(user_id, msid, clicked_msid, k) @lru_cache(maxsize=1000) def get_cached_recommendations_with_summary(query: str, k: int, include_summary: bool, include_smart_tip: bool) -> Dict: """ Get cached recommendations with summary and smart tip. This function is intended to be used with query parameters. """ return recommender.get_recommendations_with_summary(query, k, include_summary, include_smart_tip) # Helper function to execute a single MongoDB operation in a thread def _execute_mongo_op_in_thread(op_spec: Dict, client: MongoClient, database_name: str): """ Executes a single MongoDB operation. This function is intended to be run in a separate thread. """ try: if client is None: logger.warning("MongoDB client is None. Skipping operation.") return db = client[database_name] collection = db[op_spec["collection"]] operation_name = op_spec["operation"] if operation_name == "update_one": collection.update_one( op_spec["filter"], op_spec["update"], upsert=op_spec.get("upsert", False), array_filters=op_spec.get("array_filters") ) elif operation_name == "insert_one": # Ensure 'document' key exists for insert_one collection.insert_one(op_spec["document"]) # Add other specific operations as needed (e.g., find_one, delete_one) else: logger.error(f"Unsupported MongoDB operation in batch: {operation_name}") raise ValueError(f"Unsupported MongoDB operation: {operation_name}") except Exception as e: logger.error(f"Error executing MongoDB operation {op_spec.get('operation', 'unknown')}: {e}") # Don't raise the exception to avoid failing the entire batch # Batch processing for MongoDB operations async def batch_mongodb_operations(operations: List[Dict]) -> None: """Execute MongoDB operations in batches.""" batch_size = 50 try: # Check if MongoDB is available if mongodb.db is None: logger.warning("MongoDB not available. Skipping batch operations.") return # Attempt to get DB name from the existing mongodb setup try: db_name = mongodb.news_collection.database.name except AttributeError: logger.warning( "Could not determine database name from mongodb.news_collection. " "Using fallback 'recommender_db'. Please configure DB name properly." ) db_name = "recommender_db" # FIXME: This should be configured via settings or a central DB config except Exception as e: logger.warning(f"Could not access MongoDB: {e}. Skipping batch operations.") return for i in range(0, len(operations), batch_size): batch = operations[i:i + batch_size] try: await asyncio.gather(*[ asyncio.to_thread(_execute_mongo_op_in_thread, op, mongodb._client, db_name) for op in batch ]) except Exception as e: logger.error(f"Error executing batch MongoDB operations: {e}") # Don't raise the exception to avoid failing the entire request #Getting recommendation using without user association @router.get("/recommendations/", response_model=RecommendationResponse) async def get_recommendations_endpoint( query: str = Query(..., description="The search query or input text for which recommendations are sought."), k: Optional[int] = Query(DEFAULT_K, description=f"The number of recommendations to return, defaults to {DEFAULT_K}.") ) -> RecommendationResponse: """ Get recommendations based on a user query. Args: query: The search query or input text. k: The number of recommendations to return. Returns: RecommendationResponse: The generated recommendations. """ start_time = time.time() logger.info(f"Received recommendation request: query='{query}', k={k}") try: # Use cached recommendations if available recommendations_data = get_cached_recommendations(query, k) end_time = time.time() logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms") return RecommendationResponse(**recommendations_data) except HTTPException: raise except Exception as e: logger.error(f"Error getting recommendations: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") def _parse_bool_query_param(value: Optional[str]) -> bool: """Parses a query parameter string into a boolean, treating empty string as False.""" if value is None: # Parameter not present return False if not value.strip(): # Empty string or just whitespace return False value_lower = value.lower().strip() # Consider any non-empty string as True unless explicitly false if value_lower in ["false", "0", "off", "no"]: return False return True # Default to True for any other non-empty value @router.get("/recommendations/msid", response_model=RecommendationResponse) async def get_recommendations_by_id_endpoint( msid: str = Query(..., description="The item ID (msid) to get recommendations for."), user_id: Optional[str] = Query(None, description="Optional user ID. If not provided, an anonymous ID will be generated."), k: Optional[int] = Query(DEFAULT_K, description=f"Number of recommendations to return, defaults to {DEFAULT_K}.") ) -> RecommendationResponse: """ Get personalized recommendations based on an item ID (msid). Handles both authenticated and anonymous users. Stores recommendation history in MongoDB. """ start_time = time.time() if not user_id: user_id = f"anonymous_{str(uuid.uuid4())[:8]}" logger.info(f"Generated anonymous user ID: {user_id}") logger.info(f"Received recommendation request for user '{user_id}', id (msid): '{msid}', k={k}") try: # Use cached recommendations if available recommendations_data = get_cached_recommendations_by_id(msid, k) if not recommendations_data or not recommendations_data.get("retrieved_documents"): logger.warning(f"No recommendations found for msid '{msid}'") raise HTTPException(status_code=404, detail=f"No recommendations found for msid '{msid}'") # Add msid to the response recommendations_data["msid"] = msid # Store session in MongoDB asynchronously using batch processing (optional) try: session_data = { "user_id": user_id, "recommendations": recommendations_data.get("retrieved_documents", []), "timestamp": datetime.now() } await batch_mongodb_operations([ { "collection": "sessions", "operation": "update_one", "filter": {"user_id": user_id}, "update": {"$set": session_data}, "upsert": True } ]) logger.info(f"Session data saved to MongoDB for user {user_id}") except Exception as mongo_error: logger.warning(f"Could not save session data to MongoDB: {mongo_error}") # Don't fail the request if MongoDB is unavailable end_time = time.time() logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms") return RecommendationResponse(**recommendations_data) except HTTPException: raise except Exception as e: logger.error(f"Error getting recommendations by id: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @router.get("/recommendations/feedback/user", response_model=Dict[str, str]) async def feedback_recommendation_endpoint( user_id: str = Query(..., description="The ID of the user."), msid: str = Query(..., description="The item ID (msid) to get recommendations for."), clicked_msid: str = Query(..., description="The item ID (msid) that the user clicked from a previous recommendation list."), k: Optional[int] = Query(DEFAULT_K, description=f"Number of recommendations to return, defaults to {DEFAULT_K}.") ) -> Dict[str, str]: """ Get recommendations based on msid and optionally track user feedback (clicked item). If clicked_msid is provided, recommendations are based on it. """ start_time = time.time() actual_clicked_msids = [s.strip() for s in clicked_msid.split(',') if s.strip()] if not actual_clicked_msids: raise HTTPException(status_code=400, detail="clicked_msid parameter is invalid or does not contain valid MSIDs.") logger.info( f"Feedback recommendation for user '{user_id}', based on clicked msids: {actual_clicked_msids}, original context msid: '{msid}', k={k}" ) try: # Process recommendations in parallel using asyncio.gather tasks = [] for c_msid in actual_clicked_msids: task = asyncio.get_event_loop().run_in_executor( thread_pool, get_cached_recommendations_by_id, c_msid, k ) tasks.append(task) # Wait for all tasks to complete batch_results = await asyncio.gather(*tasks, return_exceptions=True) # Process results combined_recommendations_docs = [] seen_recommendation_ids = set() for result in batch_results: if isinstance(result, Exception): continue for doc in result.get("retrieved_documents", []): if doc['id'] not in seen_recommendation_ids: combined_recommendations_docs.append(doc) seen_recommendation_ids.add(doc['id']) if not combined_recommendations_docs: logger.warning(f"No recommendations could be generated for any of clicked_msids: {actual_clicked_msids}") recommendations_result = {"retrieved_documents": [], "generated_response": "No recommendations found for the clicked items."} else: combined_recommendations_docs.sort(key=lambda x: x.get('score', 0.0), reverse=True) final_retrieved_documents = combined_recommendations_docs[:k] recommendations_result = { "retrieved_documents": final_retrieved_documents, "generated_response": f"Top {len(final_retrieved_documents)} recommendations based on your recent clicks on: {', '.join(actual_clicked_msids)}." } # Store feedback in MongoDB try: # Check if MongoDB is available if mongodb.db is None or mongodb.news_collection is None: logger.warning("MongoDB not available. Skipping feedback storage.") return {"message": "Response processed successfully (feedback storage unavailable)"} # First, try to find if the user document exists user_doc = await asyncio.get_event_loop().run_in_executor( thread_pool, lambda: mongodb.news_collection.database["user_feedback_tracking"].find_one({"user_id": user_id}) ) if user_doc: # Update existing document await asyncio.get_event_loop().run_in_executor( thread_pool, lambda: mongodb.news_collection.database["user_feedback_tracking"].update_one( {"user_id": user_id}, { "$addToSet": { "Articles": { "msid": msid, "Read": {"$each": actual_clicked_msids} } } } ) ) else: # Create new document await asyncio.get_event_loop().run_in_executor( thread_pool, lambda: mongodb.news_collection.database["user_feedback_tracking"].insert_one({ "user_id": user_id, "Articles": [{ "msid": msid, "Read": actual_clicked_msids }] }) ) logger.info(f"Successfully saved feedback for user {user_id}") except Exception as e: logger.error(f"Error saving feedback to MongoDB: {e}", exc_info=True) # Don't raise the error to the user, just log it # The recommendations will still be returned end_time = time.time() logger.info(f"Feedback recommendation processed in {(end_time - start_time)*1000:.2f}ms") return {"message": "Response saved successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error in feedback recommendation: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @router.get("/recommendations/summary", response_model=RecommendationResponseWithSummary) async def get_recommendations_with_summary_endpoint( query: Optional[str] = Query(None, description="The search query or input text. Provide either query or msid."), msid: Optional[str] = Query(None, description="The item ID (msid) to get recommendations for. Provide either query or msid."), k: Optional[int] = Query(DEFAULT_K, description=f"The number of recommendations to return, defaults to {DEFAULT_K}."), summary_str: Optional[str] = Query(None, alias="summary", description="Whether to include a generated summary for each recommendation (true/false)."), smart_tip_str: Optional[str] = Query(None, alias="smart_tip", description="Whether to include a smart tip with related articles for each recommendation (true/false).") ) -> RecommendationResponseWithSummary: """Get recommendations with summaries and SEO-friendly links.""" start_time = time.time() if query and msid: raise HTTPException(status_code=400, detail="Provide either 'query' or 'msid', not both.") if not query and not msid: raise HTTPException(status_code=400, detail="Either 'query' or 'msid' must be provided.") try: include_summary = _parse_bool_query_param(summary_str) include_smart_tip = _parse_bool_query_param(smart_tip_str) except ValueError as e: raise HTTPException(status_code=422, detail=str(e)) log_identifier = f"query='{query}'" if query else f"msid='{msid}'" logger.info(f"Received recommendation request: {log_identifier}, k={k}, include_summary={include_summary}, include_smart_tip={include_smart_tip}") try: # Get base recommendations using cached data if query: recommendations_data = get_cached_recommendations(query, k).copy() # Work on a copy else: # msid is guaranteed to be non-None due to earlier checks recommendations_data = get_cached_recommendations_by_id(msid, k).copy() # Work on a copy if not recommendations_data or "retrieved_documents" not in recommendations_data: logger.warning(f"No recommendations found for msid '{msid}' or data malformed. Returning empty.") recommendations_data = { "generated_response": f"No recommendations found for item ID '{msid}'.", "retrieved_documents": [] } elif "generated_response" not in recommendations_data: recommendations_data["generated_response"] = f"Recommendations based on item ID '{msid}'." retrieved_docs = recommendations_data.get("retrieved_documents", []) if not retrieved_docs: return RecommendationResponseWithSummary(**recommendations_data) # Batch fetch article details from MongoDB doc_ids_to_fetch = [doc["id"] for doc in retrieved_docs if doc.get("id")] articles_details_map = {} if doc_ids_to_fetch and (include_summary or include_smart_tip): try: projection = {"_id": 0, "id": 1} if include_summary: projection.update({"story": 1, "syn": 1}) # Add syn field as fallback if include_smart_tip: projection.update({"seolocation": 1, "tn": 1, "hl": 1}) # Use batch size of 50 for MongoDB queries batch_size = 50 for i in range(0, len(doc_ids_to_fetch), batch_size): batch_ids = doc_ids_to_fetch[i:i + batch_size] fetched_articles_list = await asyncio.get_event_loop().run_in_executor( thread_pool, lambda: list(mongodb.news_collection.find( {"id": {"$in": batch_ids}}, projection )) ) for article in fetched_articles_list: if article.get("id"): # Use synopsis as fallback if story is not available if include_summary and not article.get("story") and article.get("syn"): article["story"] = article["syn"] articles_details_map[article["id"]] = article except Exception as e: logger.warning(f"Could not fetch article details from MongoDB: {e}") # Continue without article details - recommendations will still work # Process documents in parallel using asyncio.gather async def process_doc_batch(docs_batch): tasks = [] for doc in docs_batch: article_data = articles_details_map.get(doc.get("id")) task = _process_doc_with_summary_and_related( doc.copy(), article_data, include_summary, include_smart_tip ) tasks.append(task) return await asyncio.gather(*tasks) # Process documents in batches of 10 batch_size = 10 processed_documents = [] for i in range(0, len(retrieved_docs), batch_size): batch = retrieved_docs[i:i + batch_size] batch_results = await process_doc_batch(batch) processed_documents.extend(batch_results) recommendations_data["retrieved_documents"] = [RetrievedDocumentWithSummary(**doc) for doc in processed_documents] end_time = time.time() logger.info(f"Recommendations with summary generated in {(end_time - start_time)*1000:.2f}ms") return RecommendationResponseWithSummary(**recommendations_data) except HTTPException: raise except Exception as e: logger.error(f"Error getting recommendations with summary: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # Include error message in response async def _process_doc_with_summary_and_related( doc: Dict, article_data: Optional[Dict], include_summary: bool, include_smart_tip: bool ) -> Dict: """Process a document with summary and related articles.""" if not article_data: return doc # Initialize summary and smart tip fields if needed if include_summary: doc["summary"] = None # Initialize to None, so it can be omitted or null if not generated if include_smart_tip: doc["smart_tip"] = SmartTip( title="Smart Tip", description="More to read.", suggestions=[] ) # Process summary and smart tip in parallel if both are requested if include_summary and include_smart_tip: summary_task = asyncio.create_task(_generate_summary(article_data)) smart_tip_task = asyncio.create_task(_generate_smart_tip(article_data)) summary, smart_tip = await asyncio.gather(summary_task, smart_tip_task) if summary: # Only assign if summary is a non-empty string doc["summary"] = summary # If smart_tip_val is None, smart_tip remains the default empty SmartTip object if smart_tip is not None: # Check for None explicitly if _generate_smart_tip can return None doc["smart_tip"] = smart_tip else: # Process only what's requested if include_summary: summary = await _generate_summary(article_data) if summary: # Only assign if summary is a non-empty string doc["summary"] = summary if include_smart_tip: smart_tip = await _generate_smart_tip(article_data) # If smart_tip_val is None, smart_tip remains the default empty SmartTip object if smart_tip is not None: # Check for None explicitly doc["smart_tip"] = smart_tip return doc async def _generate_summary(article_data: Dict) -> Optional[str]: """Generate a summary for an article.""" try: story = article_data.get("story", "") if not story: logger.warning("No story content found in article data") return None # Use thread pool for CPU-intensive summary generation summary_points = await asyncio.get_event_loop().run_in_executor( thread_pool, get_summary_points, story ) # Join summary points into a single string if it's a list if isinstance(summary_points, list): if not summary_points: # If list is empty logger.warning("No summary points generated") return None summary = " ".join(summary_points) logger.info(f"Generated summary with {len(summary_points)} points") return summary if summary.strip() else None elif isinstance(summary_points, str): logger.info("Generated summary as single string") return summary_points if summary_points.strip() else None else: logger.warning(f"Unexpected summary_points type: {type(summary_points)}") return None except Exception as e: logger.error(f"Error generating summary: {e}", exc_info=True) return None async def _generate_smart_tip(article_data: Dict) -> Optional[SmartTip]: """Generate a smart tip with related articles.""" try: seolocation = article_data.get("seolocation") title = article_data.get("tn") headline = article_data.get("hl") if not all([seolocation, title, headline]): return None # Get related articles based on topic topic = title.lower() if title else "" headline_text = headline.lower() if headline else "" # Build query based on topic and headline query = {} if topic: query["$or"] = [ {"tn": {"$regex": topic, "$options": "i"}}, {"hl": {"$regex": topic, "$options": "i"}} ] # Exclude current article if article_data.get("id"): query["id"] = {"$ne": article_data["id"]} # Fetch related articles related_articles = await asyncio.get_event_loop().run_in_executor( thread_pool, lambda: list(mongodb.news_collection.find( query, {"hl": 1, "seolocation": 1, "tn": 1, "_id": 0} ).limit(3)) ) suggestions = [] for rel_article in related_articles: if rel_article.get("hl") and rel_article.get("seolocation"): suggestions.append(SmartTipSuggestion( label=rel_article.get("hl", ""), # Use headline as label url=rel_article.get("seolocation", "") )) if not suggestions: # If no related articles found, create a default suggestion suggestions = [SmartTipSuggestion( label=headline, # Use current article's headline as label url=seolocation )] return SmartTip( title=f"🔍 Smart Tip: {title}", description="You might also be interested in:", suggestions=suggestions ) except Exception as e: logger.error(f"Error generating smart tip: {e}", exc_info=True) # Return a default smart tip instead of None return SmartTip( title="🔍 Smart Tip", description="More to read", suggestions=[SmartTipSuggestion( label=headline or "Click to read more", # Use headline or default text as label url=seolocation or "" )] )