File size: 8,935 Bytes
3a4ffcf f34b650 3a4ffcf ea8f4fd 3a4ffcf f34b650 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
import logging
import os
from datetime import datetime
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Set
from bson import ObjectId
from dateutil.relativedelta import relativedelta
from dateutil.tz import gettz
from pymongo import MongoClient
from .schemas import Transaction
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_mongo_client() -> MongoClient:
"""Return a cached MongoClient instance."""
uri = os.getenv("MONGODB_URI")
if not uri:
raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
logger.info("Connecting to MongoDB host from URI")
return MongoClient(uri)
def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
if not category_ids:
return {}
cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
if not currency_ids:
return {}
cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
def _safe_object_id(identifier: str) -> ObjectId:
try:
return ObjectId(identifier)
except Exception as exc: # noqa: BLE001
raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
"""
Fetch up to `months` of recent expense transactions for a user.
Returns an empty list if no data exists (new user scenario).
"""
if months <= 0:
return []
client = get_mongo_client()
default_db = client.get_default_database()
db = default_db if default_db is not None else client["expense"]
collection = db["transactions"]
user_obj_id = _safe_object_id(user_id)
start_date = datetime.utcnow() - relativedelta(months=months)
query = {
"user": user_obj_id,
"date": {"$gte": start_date},
"type": {"$in": ["EXPENSE", "TRANSFER"]},
}
projection = {
"date": 1,
"amount": 1,
"currency": 1,
"category": 1,
}
docs = list(collection.find(query, projection).sort("date", -1))
if not docs:
logger.info("No MongoDB transactions found for user_id=%s", user_id)
return []
category_ids = {doc["category"] for doc in docs if doc.get("category")}
currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
category_map = _resolve_category_titles(category_ids, db)
currency_map = _resolve_currency_codes(currency_ids, db)
transactions: List[Transaction] = []
for doc in docs:
date_value: Optional[datetime] = doc.get("date")
if not date_value:
continue
category_name = category_map.get(doc.get("category"), "Uncategorized")
currency_code = currency_map.get(doc.get("currency"), "USD")
try:
transactions.append(
Transaction(
timestamp=date_value,
category=category_name,
amount=float(doc.get("amount", 0)),
currency=currency_code,
)
)
except Exception as exc: # noqa: BLE001
logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
return transactions
def log_api_hit(user_id: Optional[str], status: str, response_time: float) -> None:
"""
Log API hit to MongoDB with the specified format.
Stores logs in 'expense' database, 'api_logs' collection (wallet sync > expense > api_logs).
Args:
user_id: Optional user identifier
status: Status of the API call ("success" or "error")
response_time: Response time in seconds
"""
try:
client = get_mongo_client()
# Use 'expense' database for API logs (same as transactions)
default_db = client.get_default_database()
db = default_db if default_db is not None else client["expense"]
collection = db["api_logs"]
# Get IST timezone
ist_tz = gettz("Asia/Kolkata")
ist_now = datetime.now(ist_tz)
# Format date as "DD-MM-YYYY HH:MM:SS:IST"
formatted_date = ist_now.strftime("%d-%m-%Y %H:%M:%S:IST")
log_doc = {
"name": "AI_FINANCIAL_INSIGHTS",
"status": status,
"date": formatted_date,
"response_time": round(response_time, 3),
"user_id": user_id,
}
collection.insert_one(log_doc)
logger.debug("Logged API hit to MongoDB: user_id=%s, status=%s, response_time=%.3f", user_id, status, response_time)
except Exception as exc: # noqa: BLE001
# Don't fail the API if logging fails
logger.warning("Failed to log API hit to MongoDB: %s", exc)
# import logging
# import os
# from datetime import datetime
# from functools import lru_cache
# from typing import Dict, Iterable, List, Optional, Set
# from bson import ObjectId
# from dateutil.relativedelta import relativedelta
# from pymongo import MongoClient
# from .schemas import Transaction
# logger = logging.getLogger(__name__)
# @lru_cache(maxsize=1)
# def get_mongo_client() -> MongoClient:
# """Return a cached MongoClient instance."""
# uri = os.getenv("MONGODB_URI")
# if not uri:
# raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
# logger.info("Connecting to MongoDB host from URI")
# return MongoClient(uri)
# def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
# if not category_ids:
# return {}
# cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
# return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
# def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
# if not currency_ids:
# return {}
# cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
# return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
# def _safe_object_id(identifier: str) -> ObjectId:
# try:
# return ObjectId(identifier)
# except Exception as exc: # noqa: BLE001
# raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
# def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
# """
# Fetch up to `months` of recent expense transactions for a user.
# Returns an empty list if no data exists (new user scenario).
# """
# if months <= 0:
# return []
# client = get_mongo_client()
# default_db = client.get_default_database()
# db = default_db if default_db is not None else client["expense"]
# collection = db["transactions"]
# user_obj_id = _safe_object_id(user_id)
# start_date = datetime.utcnow() - relativedelta(months=months)
# query = {
# "user": user_obj_id,
# "date": {"$gte": start_date},
# "type": {"$in": ["EXPENSE", "TRANSFER"]},
# }
# projection = {
# "date": 1,
# "amount": 1,
# "currency": 1,
# "category": 1,
# }
# docs = list(collection.find(query, projection).sort("date", -1))
# if not docs:
# logger.info("No MongoDB transactions found for user_id=%s", user_id)
# return []
# category_ids = {doc["category"] for doc in docs if doc.get("category")}
# currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
# category_map = _resolve_category_titles(category_ids, db)
# currency_map = _resolve_currency_codes(currency_ids, db)
# transactions: List[Transaction] = []
# for doc in docs:
# date_value: Optional[datetime] = doc.get("date")
# if not date_value:
# continue
# category_name = category_map.get(doc.get("category"), "Uncategorized")
# currency_code = currency_map.get(doc.get("currency"), "USD")
# try:
# transactions.append(
# Transaction(
# timestamp=date_value,
# category=category_name,
# amount=float(doc.get("amount", 0)),
# currency=currency_code,
# )
# )
# except Exception as exc: # noqa: BLE001
# logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
# logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
# return transactions
|