from typing import Dict from src.utils.mongo import PostCRUD from src.utils.logger import logger from src.utils.mongo import UserCRUD, ReactionCRUD, DestinationCRUD from asyncio import gather from src.utils.helper import call_external_api, serialize_datetime from datetime import datetime from bson import ObjectId import math from src.utils.helper import deserialize_objectid from typing import List import random async def create_a_post_controller( content: str, user_id: str, destination_id: str, images: list ) -> Dict: try: image_url = await call_external_api( method="POST", # url="http://localhost:8000/upload_image", url="https://abao77-image-retrieval-full.hf.space/upload_image", json={"base64_image": images}, ) print(f"IMAGE URL: {image_url}") post = { "content": content, "user_id": user_id, "destination_id": destination_id, "comment_count": 0, "reaction_count": 0, "picture": image_url["public_url"] if image_url.get("public_url") else None, "like": [], } await PostCRUD.create(post) return {"status": "success", "message": "Post created successfully"} except Exception as e: logger.error(f"Error creating post: {str(e)}") return {"status": "error", "message": str(e)} async def get_a_post_controller(post_id: str) -> Dict: try: post = await PostCRUD.find_by_id(post_id) if post is None: return {"status": "error", "message": "Post not found"} serialized_post = { "id": serialize_datetime(post.get("_id")), "content": post.get("content"), "user_id": post.get("user_id"), "destination_id": post.get("destination_id"), "comment_count": post.get("comment_count", 0), "reaction_count": post.get("reaction_count", 0), "created_at": serialize_datetime(post.get("created_at")), "updated_at": serialize_datetime(post.get("updated_at")), } return {"status": "success", "message": serialized_post} except Exception as e: logger.error(f"Error getting post: {str(e)}") return {"status": "error", "message": str(e)} import time async def list_all_posts_controller_priority( user_id: str, page: int = 1, top_5_destinations: List[str] = [], view_post_ids: List[str] = [], ): print("view_post_ids", view_post_ids) destinations_list = [] try: if page > 1: response_recommendation_destinations = await call_external_api( method="GET", url=f"https://abao77-triventure-personalize.hf.space/model/get_recommendation_destinations/{user_id}/5", ) top_5_destinations = response_recommendation_destinations.get( "destination_ids", [] ) # top_5_destinations = [] destinations_list = response_recommendation_destinations.get( "destinations_list", None ) start_time = time.time() PAGE_SIZE = 10 TOP_LIMIT = int(PAGE_SIZE * 0.7) OTHER_LIMIT = PAGE_SIZE - TOP_LIMIT skip_top = (page - 1) * TOP_LIMIT skip_other = (page - 1) * OTHER_LIMIT excluded_ids = [deserialize_objectid(pid) for pid in view_post_ids] top_filter = { "destination_id": {"$in": top_5_destinations}, "_id": {"$nin": excluded_ids}, } other_filter = { "destination_id": {"$nin": top_5_destinations}, "_id": {"$nin": excluded_ids}, } total_top = await PostCRUD.count(top_filter) total_other = await PostCRUD.count(other_filter) total_items = total_top + total_other total_pages = math.ceil(total_items / PAGE_SIZE) top_posts = await PostCRUD.find_many_with_score( filter=top_filter, top_destinations=top_5_destinations, limit=TOP_LIMIT, skip=skip_top, ) random.shuffle(top_posts) other_posts = await PostCRUD.find_many_with_score( filter=other_filter, top_destinations=[], limit=OTHER_LIMIT, skip=skip_other, ) combined_posts = [] i = j = 0 while len(combined_posts) < PAGE_SIZE and ( i < len(top_posts) or j < len(other_posts) ): if i < len(top_posts): combined_posts.append(top_posts[i]) i += 1 if j < len(other_posts) and len(combined_posts) < PAGE_SIZE: combined_posts.append(other_posts[j]) j += 1 user_ids = list( {deserialize_objectid(post.get("user_id")) for post in combined_posts} ) destination_ids = list( { deserialize_objectid(post.get("destination_id")) for post in combined_posts } ) post_ids = [serialize_datetime(post["_id"]) for post in combined_posts] user_infos_result = await UserCRUD.read({"_id": {"$in": user_ids}}) destination_infos_result = await DestinationCRUD.read( {"_id": {"$in": destination_ids}} ) reactions_result = ( await ReactionCRUD.read({"user_id": user_id, "post_id": {"$in": post_ids}}) if user_id else [] ) user_infos = user_infos_result destination_infos = destination_infos_result reactions = reactions_result user_info_map = { info["_id"]: { "user_id": info["_id"], "name": info["name"], "picture": info.get("picture"), } for info in user_infos if info } destination_info_map = { info["_id"]: info["name"] for info in destination_infos if info } user_reaction_map = {} for reaction in reactions: if reaction: post_id = reaction.get("post_id") user_reaction_map[post_id] = { "id": serialize_datetime(reaction["_id"]), "post_id": post_id, "user_id": reaction.get("user_id"), "reaction_type": reaction.get("type"), } serialized_posts = [] for post in combined_posts: pid = serialize_datetime(post["_id"]) uid = post.get("user_id") dest_id = post.get("destination_id") serialized_posts.append( { "id": pid, "content": post.get("content"), "destination_id": dest_id, "destination_name": destination_info_map.get(dest_id), "comment_count": post.get("comment_count", 0), "reaction_count": post.get("reaction_count", 0), "current_user_reaction": user_reaction_map.get(pid), "picture": post.get("picture", []), "created_at": serialize_datetime(post.get("created_at")), "updated_at": serialize_datetime(post.get("updated_at")), "priority_score": post.get("PriorityScore", 0), "destination_score": post.get("DestinationScore", 0), "engagement_score": post.get("EngagementScore", 0), "freshness_score": post.get("FreshnessScore", 0), "user_info": user_info_map.get(uid), } ) end_time = time.time() print(f"Time taken: {end_time - start_time} seconds") return { "status": "success", "message": { "data": serialized_posts, "page": page, "top_5_destinations": top_5_destinations if top_5_destinations else [], "destinations_list": destinations_list if destinations_list else [], "total_pages": total_pages, "total_items": total_items, "page_size": PAGE_SIZE, }, } except Exception as e: logger.error(f"Error listing personalized posts: {str(e)}") return {"status": "error", "message": str(e)} async def list_all_posts_controller(user_id: str, page: int = 1): try: start_time = time.time() PAGE_SIZE = 5 # Calculate skip value for pagination skip = (page - 1) * PAGE_SIZE # Get total count for pagination metadata total_items = await PostCRUD.count({}) total_pages = math.ceil(total_items / PAGE_SIZE) # Get paginated posts posts = await PostCRUD.find_many( filter={}, # Empty filter means get all skip=skip, limit=PAGE_SIZE, sort=[("created_at", -1)], # Sort by created_at descending ) # Get unique user and destination IDs user_ids = list({deserialize_objectid(post.get("user_id")) for post in posts}) destination_ids = list( {deserialize_objectid(post.get("destination_id")) for post in posts} ) # Use $in operator for batch lookups user_infos = await UserCRUD.read({"_id": {"$in": user_ids}}) print("user_infos", user_infos) destination_infos = await DestinationCRUD.read( {"_id": {"$in": destination_ids}} ) # Create user info map user_info_map = { info.get("_id"): { "user_id": info.get("_id"), "name": info.get("name"), "picture": info.get("picture"), } for info in user_infos if info } # Create destination info map destination_info_map = { info.get("_id"): info.get("name") for info in destination_infos if info } formatted_user_reactions_map = {} if user_id: all_post_ids = [serialize_datetime(post.get("_id")) for post in posts] # Use $in operator for batch lookup of reactions reactions = await ReactionCRUD.read( {"user_id": user_id, "post_id": {"$in": all_post_ids}} ) for reaction in reactions: if reaction: post_id = reaction.get("post_id") formatted_user_reactions_map[post_id] = { "id": serialize_datetime(reaction.get("_id")), "post_id": post_id, "user_id": reaction.get("user_id"), "reaction_type": reaction.get("type"), } serialized_posts = [] for post in posts: post_id = serialize_datetime(post.get("_id")) uid = post.get("user_id") dest_id = post.get("destination_id") serialized_post = { "id": post_id, "content": post.get("content"), "destination_id": dest_id, "destination_name": destination_info_map.get(dest_id), "comment_count": post.get("comment_count", []), "reaction_count": post.get("reaction_count", []), "current_user_reaction": formatted_user_reactions_map.get(post_id), "picture": post.get("picture", []), "created_at": serialize_datetime(post.get("created_at")), "updated_at": serialize_datetime(post.get("updated_at")), "user_info": user_info_map.get(uid), } serialized_posts.append(serialized_post) return { "status": "success", "message": { "data": serialized_posts, "page": page, "total_pages": total_pages, "total_items": total_items, "page_size": PAGE_SIZE, }, } except Exception as e: logger.error(f"Error listing posts: {str(e)}") return {"status": "error", "message": str(e)} async def list_posts_by_destination_controller( destination_id: str, user_id: str, page: int = 1 ): try: start_time = time.time() PAGE_SIZE = 5 # Changed from 1 to 5 posts per page # Calculate skip value for pagination skip = (page - 1) * PAGE_SIZE # Get total count for pagination metadata total_items = await PostCRUD.count({"destination_id": destination_id}) total_pages = math.ceil(total_items / PAGE_SIZE) # Get paginated posts posts = await PostCRUD.find_many( filter={"destination_id": destination_id}, skip=skip, limit=PAGE_SIZE, sort=[("created_at", -1)], # Sort by created_at descending ) if not posts: return { "status": "success", "message": { "data": [], "page": page, "total_pages": total_pages, "total_items": total_items, "page_size": PAGE_SIZE, }, } # Get unique user_ids user_ids = list({deserialize_objectid(post.get("user_id")) for post in posts}) # Use $in operator for batch user lookup user_infos = await UserCRUD.read({"_id": {"$in": user_ids}}) user_info_map = { info.get("_id"): { "user_id": info.get("_id"), "name": info.get("name"), "picture": info.get("picture"), } for info in user_infos if info } # Get destination name destination_info = await DestinationCRUD.find_by_id(destination_id) destination_name = destination_info.get("name") if destination_info else None # Reactions by current user for these posts formatted_user_reactions_map = {} if user_id: all_post_ids = [serialize_datetime(post.get("_id")) for post in posts] # Use $in operator for batch lookup of reactions reactions = await ReactionCRUD.read( {"user_id": user_id, "post_id": {"$in": all_post_ids}} ) for reaction in reactions: if reaction: post_id = reaction.get("post_id") formatted_user_reactions_map[post_id] = { "id": serialize_datetime(reaction.get("_id")), "post_id": post_id, "user_id": reaction.get("user_id"), "reaction_type": reaction.get("type"), } # Final serialization serialized_posts = [] for post in posts: post_id = serialize_datetime(post.get("_id")) uid = post.get("user_id") serialized_post = { "id": post_id, "content": post.get("content"), "destination_id": destination_id, "destination_info": destination_info, "destination_name": destination_name, "comment_count": post.get("comment_count", []), "reaction_count": post.get("reaction_count", []), "current_user_reaction": formatted_user_reactions_map.get(post_id), "picture": post.get("picture", []), "created_at": serialize_datetime(post.get("created_at")), "updated_at": serialize_datetime(post.get("updated_at")), "user_info": user_info_map.get(uid), } serialized_posts.append(serialized_post) return { "status": "success", "message": { "data": serialized_posts, "page": page, "total_pages": total_pages, "total_items": total_items, "page_size": PAGE_SIZE, }, } except Exception as e: logger.error(f"Error listing posts by destination: {str(e)}") return {"status": "error", "message": str(e)} async def update_a_post_controller(user_id: str, post_id: str, content: str) -> Dict: try: exist_data = await PostCRUD.find_by_id(post_id) if exist_data["user_id"] != user_id: return { "status": "error", "message": "You are not allowed to update this post", } if exist_data is None: return {"status": "error", "message": "Post not found"} await PostCRUD.update( {"_id": ObjectId(post_id)}, { "content": content, }, ) return {"status": "success", "message": "Post updated successfully"} except Exception as e: logger.error(f"Error updating post: {str(e)}") return {"status": "error", "message": str(e)} async def delete_a_post_controller(user_id: str, post_id: str) -> Dict: try: exist_data = await PostCRUD.find_by_id(post_id) if exist_data["user_id"] != user_id: return { "status": "error", "message": "You are not allowed to delete this post", } if exist_data is None: return {"status": "error", "message": "Post not found"} await PostCRUD.delete({"_id": ObjectId(post_id)}) return {"status": "success", "message": "Post deleted successfully"} except Exception as e: logger.error(f"Error deleting post: {str(e)}") return {"status": "error", "message": str(e)}