import logging import os from datetime import datetime from functools import lru_cache from typing import Dict, Iterable, List, Optional, Set from bson import ObjectId from dateutil.relativedelta import relativedelta from dateutil.tz import gettz from pymongo import MongoClient from .schemas import Transaction logger = logging.getLogger(__name__) @lru_cache(maxsize=1) def get_mongo_client() -> MongoClient: """Return a cached MongoClient instance.""" uri = os.getenv("MONGODB_URI") if not uri: raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.") logger.info("Connecting to MongoDB host from URI") return MongoClient(uri) def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: if not category_ids: return {} cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1}) return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor} def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: if not currency_ids: return {} cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1}) return {doc["_id"]: doc.get("code", "USD") for doc in cursor} def _safe_object_id(identifier: str) -> ObjectId: try: return ObjectId(identifier) except Exception as exc: # noqa: BLE001 raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]: """ Fetch up to `months` of recent expense transactions for a user. Returns an empty list if no data exists (new user scenario). """ if months <= 0: return [] client = get_mongo_client() default_db = client.get_default_database() db = default_db if default_db is not None else client["expense"] collection = db["transactions"] user_obj_id = _safe_object_id(user_id) start_date = datetime.utcnow() - relativedelta(months=months) query = { "user": user_obj_id, "date": {"$gte": start_date}, "type": {"$in": ["EXPENSE", "TRANSFER"]}, } projection = { "date": 1, "amount": 1, "currency": 1, "category": 1, } docs = list(collection.find(query, projection).sort("date", -1)) if not docs: logger.info("No MongoDB transactions found for user_id=%s", user_id) return [] category_ids = {doc["category"] for doc in docs if doc.get("category")} currency_ids = {doc["currency"] for doc in docs if doc.get("currency")} category_map = _resolve_category_titles(category_ids, db) currency_map = _resolve_currency_codes(currency_ids, db) transactions: List[Transaction] = [] for doc in docs: date_value: Optional[datetime] = doc.get("date") if not date_value: continue category_name = category_map.get(doc.get("category"), "Uncategorized") currency_code = currency_map.get(doc.get("currency"), "USD") try: transactions.append( Transaction( timestamp=date_value, category=category_name, amount=float(doc.get("amount", 0)), currency=currency_code, ) ) except Exception as exc: # noqa: BLE001 logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc) logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id) return transactions def log_api_hit(user_id: Optional[str], status: str, response_time: float) -> None: """ Log API hit to MongoDB with the specified format. Stores logs in 'expense' database, 'api_logs' collection (wallet sync > expense > api_logs). Args: user_id: Optional user identifier status: Status of the API call ("success" or "error") response_time: Response time in seconds """ try: client = get_mongo_client() # Use 'expense' database for API logs (same as transactions) default_db = client.get_default_database() db = default_db if default_db is not None else client["expense"] collection = db["api_logs"] # Get IST timezone ist_tz = gettz("Asia/Kolkata") ist_now = datetime.now(ist_tz) # Format date as "DD-MM-YYYY HH:MM:SS:IST" formatted_date = ist_now.strftime("%d-%m-%Y %H:%M:%S:IST") log_doc = { "name": "AI_FINANCIAL_INSIGHTS", "status": status, "date": formatted_date, "response_time": round(response_time, 3), "user_id": user_id, } collection.insert_one(log_doc) logger.debug("Logged API hit to MongoDB: user_id=%s, status=%s, response_time=%.3f", user_id, status, response_time) except Exception as exc: # noqa: BLE001 # Don't fail the API if logging fails logger.warning("Failed to log API hit to MongoDB: %s", exc) # import logging # import os # from datetime import datetime # from functools import lru_cache # from typing import Dict, Iterable, List, Optional, Set # from bson import ObjectId # from dateutil.relativedelta import relativedelta # from pymongo import MongoClient # from .schemas import Transaction # logger = logging.getLogger(__name__) # @lru_cache(maxsize=1) # def get_mongo_client() -> MongoClient: # """Return a cached MongoClient instance.""" # uri = os.getenv("MONGODB_URI") # if not uri: # raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.") # logger.info("Connecting to MongoDB host from URI") # return MongoClient(uri) # def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: # if not category_ids: # return {} # cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1}) # return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor} # def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: # if not currency_ids: # return {} # cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1}) # return {doc["_id"]: doc.get("code", "USD") for doc in cursor} # def _safe_object_id(identifier: str) -> ObjectId: # try: # return ObjectId(identifier) # except Exception as exc: # noqa: BLE001 # raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc # def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]: # """ # Fetch up to `months` of recent expense transactions for a user. # Returns an empty list if no data exists (new user scenario). # """ # if months <= 0: # return [] # client = get_mongo_client() # default_db = client.get_default_database() # db = default_db if default_db is not None else client["expense"] # collection = db["transactions"] # user_obj_id = _safe_object_id(user_id) # start_date = datetime.utcnow() - relativedelta(months=months) # query = { # "user": user_obj_id, # "date": {"$gte": start_date}, # "type": {"$in": ["EXPENSE", "TRANSFER"]}, # } # projection = { # "date": 1, # "amount": 1, # "currency": 1, # "category": 1, # } # docs = list(collection.find(query, projection).sort("date", -1)) # if not docs: # logger.info("No MongoDB transactions found for user_id=%s", user_id) # return [] # category_ids = {doc["category"] for doc in docs if doc.get("category")} # currency_ids = {doc["currency"] for doc in docs if doc.get("currency")} # category_map = _resolve_category_titles(category_ids, db) # currency_map = _resolve_currency_codes(currency_ids, db) # transactions: List[Transaction] = [] # for doc in docs: # date_value: Optional[datetime] = doc.get("date") # if not date_value: # continue # category_name = category_map.get(doc.get("category"), "Uncategorized") # currency_code = currency_map.get(doc.get("currency"), "USD") # try: # transactions.append( # Transaction( # timestamp=date_value, # category=category_name, # amount=float(doc.get("amount", 0)), # currency=currency_code, # ) # ) # except Exception as exc: # noqa: BLE001 # logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc) # logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id) # return transactions