LogicGoInfotechSpaces's picture
Update app/db.py
f34b650 verified
import logging
import os
from datetime import datetime
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Set
from bson import ObjectId
from dateutil.relativedelta import relativedelta
from dateutil.tz import gettz
from pymongo import MongoClient
from .schemas import Transaction
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_mongo_client() -> MongoClient:
"""Return a cached MongoClient instance."""
uri = os.getenv("MONGODB_URI")
if not uri:
raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
logger.info("Connecting to MongoDB host from URI")
return MongoClient(uri)
def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
if not category_ids:
return {}
cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
if not currency_ids:
return {}
cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
def _safe_object_id(identifier: str) -> ObjectId:
try:
return ObjectId(identifier)
except Exception as exc: # noqa: BLE001
raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
"""
Fetch up to `months` of recent expense transactions for a user.
Returns an empty list if no data exists (new user scenario).
"""
if months <= 0:
return []
client = get_mongo_client()
default_db = client.get_default_database()
db = default_db if default_db is not None else client["expense"]
collection = db["transactions"]
user_obj_id = _safe_object_id(user_id)
start_date = datetime.utcnow() - relativedelta(months=months)
query = {
"user": user_obj_id,
"date": {"$gte": start_date},
"type": {"$in": ["EXPENSE", "TRANSFER"]},
}
projection = {
"date": 1,
"amount": 1,
"currency": 1,
"category": 1,
}
docs = list(collection.find(query, projection).sort("date", -1))
if not docs:
logger.info("No MongoDB transactions found for user_id=%s", user_id)
return []
category_ids = {doc["category"] for doc in docs if doc.get("category")}
currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
category_map = _resolve_category_titles(category_ids, db)
currency_map = _resolve_currency_codes(currency_ids, db)
transactions: List[Transaction] = []
for doc in docs:
date_value: Optional[datetime] = doc.get("date")
if not date_value:
continue
category_name = category_map.get(doc.get("category"), "Uncategorized")
currency_code = currency_map.get(doc.get("currency"), "USD")
try:
transactions.append(
Transaction(
timestamp=date_value,
category=category_name,
amount=float(doc.get("amount", 0)),
currency=currency_code,
)
)
except Exception as exc: # noqa: BLE001
logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
return transactions
def log_api_hit(user_id: Optional[str], status: str, response_time: float) -> None:
"""
Log API hit to MongoDB with the specified format.
Stores logs in 'expense' database, 'api_logs' collection (wallet sync > expense > api_logs).
Args:
user_id: Optional user identifier
status: Status of the API call ("success" or "error")
response_time: Response time in seconds
"""
try:
client = get_mongo_client()
# Use 'expense' database for API logs (same as transactions)
default_db = client.get_default_database()
db = default_db if default_db is not None else client["expense"]
collection = db["api_logs"]
# Get IST timezone
ist_tz = gettz("Asia/Kolkata")
ist_now = datetime.now(ist_tz)
# Format date as "DD-MM-YYYY HH:MM:SS:IST"
formatted_date = ist_now.strftime("%d-%m-%Y %H:%M:%S:IST")
log_doc = {
"name": "AI_FINANCIAL_INSIGHTS",
"status": status,
"date": formatted_date,
"response_time": round(response_time, 3),
"user_id": user_id,
}
collection.insert_one(log_doc)
logger.debug("Logged API hit to MongoDB: user_id=%s, status=%s, response_time=%.3f", user_id, status, response_time)
except Exception as exc: # noqa: BLE001
# Don't fail the API if logging fails
logger.warning("Failed to log API hit to MongoDB: %s", exc)
# import logging
# import os
# from datetime import datetime
# from functools import lru_cache
# from typing import Dict, Iterable, List, Optional, Set
# from bson import ObjectId
# from dateutil.relativedelta import relativedelta
# from pymongo import MongoClient
# from .schemas import Transaction
# logger = logging.getLogger(__name__)
# @lru_cache(maxsize=1)
# def get_mongo_client() -> MongoClient:
# """Return a cached MongoClient instance."""
# uri = os.getenv("MONGODB_URI")
# if not uri:
# raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
# logger.info("Connecting to MongoDB host from URI")
# return MongoClient(uri)
# def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
# if not category_ids:
# return {}
# cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
# return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
# def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
# if not currency_ids:
# return {}
# cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
# return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
# def _safe_object_id(identifier: str) -> ObjectId:
# try:
# return ObjectId(identifier)
# except Exception as exc: # noqa: BLE001
# raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
# def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
# """
# Fetch up to `months` of recent expense transactions for a user.
# Returns an empty list if no data exists (new user scenario).
# """
# if months <= 0:
# return []
# client = get_mongo_client()
# default_db = client.get_default_database()
# db = default_db if default_db is not None else client["expense"]
# collection = db["transactions"]
# user_obj_id = _safe_object_id(user_id)
# start_date = datetime.utcnow() - relativedelta(months=months)
# query = {
# "user": user_obj_id,
# "date": {"$gte": start_date},
# "type": {"$in": ["EXPENSE", "TRANSFER"]},
# }
# projection = {
# "date": 1,
# "amount": 1,
# "currency": 1,
# "category": 1,
# }
# docs = list(collection.find(query, projection).sort("date", -1))
# if not docs:
# logger.info("No MongoDB transactions found for user_id=%s", user_id)
# return []
# category_ids = {doc["category"] for doc in docs if doc.get("category")}
# currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
# category_map = _resolve_category_titles(category_ids, db)
# currency_map = _resolve_currency_codes(currency_ids, db)
# transactions: List[Transaction] = []
# for doc in docs:
# date_value: Optional[datetime] = doc.get("date")
# if not date_value:
# continue
# category_name = category_map.get(doc.get("category"), "Uncategorized")
# currency_code = currency_map.get(doc.get("currency"), "USD")
# try:
# transactions.append(
# Transaction(
# timestamp=date_value,
# category=category_name,
# amount=float(doc.get("amount", 0)),
# currency=currency_code,
# )
# )
# except Exception as exc: # noqa: BLE001
# logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
# logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
# return transactions