|
|
import logging |
|
|
import os |
|
|
from datetime import datetime |
|
|
from functools import lru_cache |
|
|
from typing import Dict, Iterable, List, Optional, Set |
|
|
|
|
|
from bson import ObjectId |
|
|
from dateutil.relativedelta import relativedelta |
|
|
from dateutil.tz import gettz |
|
|
from pymongo import MongoClient |
|
|
|
|
|
from .schemas import Transaction |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=1) |
|
|
def get_mongo_client() -> MongoClient: |
|
|
"""Return a cached MongoClient instance.""" |
|
|
uri = os.getenv("MONGODB_URI") |
|
|
if not uri: |
|
|
raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.") |
|
|
logger.info("Connecting to MongoDB host from URI") |
|
|
return MongoClient(uri) |
|
|
|
|
|
|
|
|
def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: |
|
|
if not category_ids: |
|
|
return {} |
|
|
cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1}) |
|
|
return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor} |
|
|
|
|
|
|
|
|
def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]: |
|
|
if not currency_ids: |
|
|
return {} |
|
|
cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1}) |
|
|
return {doc["_id"]: doc.get("code", "USD") for doc in cursor} |
|
|
|
|
|
|
|
|
def _safe_object_id(identifier: str) -> ObjectId: |
|
|
try: |
|
|
return ObjectId(identifier) |
|
|
except Exception as exc: |
|
|
raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc |
|
|
|
|
|
|
|
|
def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]: |
|
|
""" |
|
|
Fetch up to `months` of recent expense transactions for a user. |
|
|
|
|
|
Returns an empty list if no data exists (new user scenario). |
|
|
""" |
|
|
if months <= 0: |
|
|
return [] |
|
|
|
|
|
client = get_mongo_client() |
|
|
default_db = client.get_default_database() |
|
|
db = default_db if default_db is not None else client["expense"] |
|
|
collection = db["transactions"] |
|
|
|
|
|
user_obj_id = _safe_object_id(user_id) |
|
|
start_date = datetime.utcnow() - relativedelta(months=months) |
|
|
|
|
|
query = { |
|
|
"user": user_obj_id, |
|
|
"date": {"$gte": start_date}, |
|
|
"type": {"$in": ["EXPENSE", "TRANSFER"]}, |
|
|
} |
|
|
|
|
|
projection = { |
|
|
"date": 1, |
|
|
"amount": 1, |
|
|
"currency": 1, |
|
|
"category": 1, |
|
|
} |
|
|
docs = list(collection.find(query, projection).sort("date", -1)) |
|
|
if not docs: |
|
|
logger.info("No MongoDB transactions found for user_id=%s", user_id) |
|
|
return [] |
|
|
|
|
|
category_ids = {doc["category"] for doc in docs if doc.get("category")} |
|
|
currency_ids = {doc["currency"] for doc in docs if doc.get("currency")} |
|
|
|
|
|
category_map = _resolve_category_titles(category_ids, db) |
|
|
currency_map = _resolve_currency_codes(currency_ids, db) |
|
|
|
|
|
transactions: List[Transaction] = [] |
|
|
for doc in docs: |
|
|
date_value: Optional[datetime] = doc.get("date") |
|
|
if not date_value: |
|
|
continue |
|
|
|
|
|
category_name = category_map.get(doc.get("category"), "Uncategorized") |
|
|
currency_code = currency_map.get(doc.get("currency"), "USD") |
|
|
|
|
|
try: |
|
|
transactions.append( |
|
|
Transaction( |
|
|
timestamp=date_value, |
|
|
category=category_name, |
|
|
amount=float(doc.get("amount", 0)), |
|
|
currency=currency_code, |
|
|
) |
|
|
) |
|
|
except Exception as exc: |
|
|
logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc) |
|
|
|
|
|
logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id) |
|
|
return transactions |
|
|
|
|
|
|
|
|
def log_api_hit(user_id: Optional[str], status: str, response_time: float) -> None: |
|
|
""" |
|
|
Log API hit to MongoDB with the specified format. |
|
|
Stores logs in 'expense' database, 'api_logs' collection (wallet sync > expense > api_logs). |
|
|
|
|
|
Args: |
|
|
user_id: Optional user identifier |
|
|
status: Status of the API call ("success" or "error") |
|
|
response_time: Response time in seconds |
|
|
""" |
|
|
try: |
|
|
client = get_mongo_client() |
|
|
|
|
|
default_db = client.get_default_database() |
|
|
db = default_db if default_db is not None else client["expense"] |
|
|
collection = db["api_logs"] |
|
|
|
|
|
|
|
|
ist_tz = gettz("Asia/Kolkata") |
|
|
ist_now = datetime.now(ist_tz) |
|
|
|
|
|
|
|
|
formatted_date = ist_now.strftime("%d-%m-%Y %H:%M:%S:IST") |
|
|
|
|
|
log_doc = { |
|
|
"name": "AI_FINANCIAL_INSIGHTS", |
|
|
"status": status, |
|
|
"date": formatted_date, |
|
|
"response_time": round(response_time, 3), |
|
|
"user_id": user_id, |
|
|
} |
|
|
|
|
|
collection.insert_one(log_doc) |
|
|
logger.debug("Logged API hit to MongoDB: user_id=%s, status=%s, response_time=%.3f", user_id, status, response_time) |
|
|
except Exception as exc: |
|
|
|
|
|
logger.warning("Failed to log API hit to MongoDB: %s", exc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|