File size: 8,935 Bytes
3a4ffcf
 
 
 
 
 
 
 
f34b650
3a4ffcf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea8f4fd
 
3a4ffcf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f34b650
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import logging
import os
from datetime import datetime
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Set

from bson import ObjectId
from dateutil.relativedelta import relativedelta
from dateutil.tz import gettz
from pymongo import MongoClient

from .schemas import Transaction

logger = logging.getLogger(__name__)


@lru_cache(maxsize=1)
def get_mongo_client() -> MongoClient:
    """Return a cached MongoClient instance."""
    uri = os.getenv("MONGODB_URI")
    if not uri:
        raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
    logger.info("Connecting to MongoDB host from URI")
    return MongoClient(uri)


def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
    if not category_ids:
        return {}
    cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
    return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}


def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
    if not currency_ids:
        return {}
    cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
    return {doc["_id"]: doc.get("code", "USD") for doc in cursor}


def _safe_object_id(identifier: str) -> ObjectId:
    try:
        return ObjectId(identifier)
    except Exception as exc:  # noqa: BLE001
        raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc


def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
    """
    Fetch up to `months` of recent expense transactions for a user.

    Returns an empty list if no data exists (new user scenario).
    """
    if months <= 0:
        return []

    client = get_mongo_client()
    default_db = client.get_default_database()
    db = default_db if default_db is not None else client["expense"]
    collection = db["transactions"]

    user_obj_id = _safe_object_id(user_id)
    start_date = datetime.utcnow() - relativedelta(months=months)

    query = {
        "user": user_obj_id,
        "date": {"$gte": start_date},
        "type": {"$in": ["EXPENSE", "TRANSFER"]},
    }

    projection = {
        "date": 1,
        "amount": 1,
        "currency": 1,
        "category": 1,
    }
    docs = list(collection.find(query, projection).sort("date", -1))
    if not docs:
        logger.info("No MongoDB transactions found for user_id=%s", user_id)
        return []

    category_ids = {doc["category"] for doc in docs if doc.get("category")}
    currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}

    category_map = _resolve_category_titles(category_ids, db)
    currency_map = _resolve_currency_codes(currency_ids, db)

    transactions: List[Transaction] = []
    for doc in docs:
        date_value: Optional[datetime] = doc.get("date")
        if not date_value:
            continue

        category_name = category_map.get(doc.get("category"), "Uncategorized")
        currency_code = currency_map.get(doc.get("currency"), "USD")

        try:
            transactions.append(
                Transaction(
                    timestamp=date_value,
                    category=category_name,
                    amount=float(doc.get("amount", 0)),
                    currency=currency_code,
                )
            )
        except Exception as exc:  # noqa: BLE001
            logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)

    logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
    return transactions


def log_api_hit(user_id: Optional[str], status: str, response_time: float) -> None:
    """
    Log API hit to MongoDB with the specified format.
    Stores logs in 'expense' database, 'api_logs' collection (wallet sync > expense > api_logs).
    
    Args:
        user_id: Optional user identifier
        status: Status of the API call ("success" or "error")
        response_time: Response time in seconds
    """
    try:
        client = get_mongo_client()
        # Use 'expense' database for API logs (same as transactions)
        default_db = client.get_default_database()
        db = default_db if default_db is not None else client["expense"]
        collection = db["api_logs"]
        
        # Get IST timezone
        ist_tz = gettz("Asia/Kolkata")
        ist_now = datetime.now(ist_tz)
        
        # Format date as "DD-MM-YYYY HH:MM:SS:IST"
        formatted_date = ist_now.strftime("%d-%m-%Y %H:%M:%S:IST")
        
        log_doc = {
            "name": "AI_FINANCIAL_INSIGHTS",
            "status": status,
            "date": formatted_date,
            "response_time": round(response_time, 3),
            "user_id": user_id,
        }
        
        collection.insert_one(log_doc)
        logger.debug("Logged API hit to MongoDB: user_id=%s, status=%s, response_time=%.3f", user_id, status, response_time)
    except Exception as exc:  # noqa: BLE001
        # Don't fail the API if logging fails
        logger.warning("Failed to log API hit to MongoDB: %s", exc)



# import logging
# import os
# from datetime import datetime
# from functools import lru_cache
# from typing import Dict, Iterable, List, Optional, Set

# from bson import ObjectId
# from dateutil.relativedelta import relativedelta
# from pymongo import MongoClient

# from .schemas import Transaction

# logger = logging.getLogger(__name__)


# @lru_cache(maxsize=1)
# def get_mongo_client() -> MongoClient:
#     """Return a cached MongoClient instance."""
#     uri = os.getenv("MONGODB_URI")
#     if not uri:
#         raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
#     logger.info("Connecting to MongoDB host from URI")
#     return MongoClient(uri)


# def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
#     if not category_ids:
#         return {}
#     cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
#     return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}


# def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
#     if not currency_ids:
#         return {}
#     cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
#     return {doc["_id"]: doc.get("code", "USD") for doc in cursor}


# def _safe_object_id(identifier: str) -> ObjectId:
#     try:
#         return ObjectId(identifier)
#     except Exception as exc:  # noqa: BLE001
#         raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc


# def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
#     """
#     Fetch up to `months` of recent expense transactions for a user.

#     Returns an empty list if no data exists (new user scenario).
#     """
#     if months <= 0:
#         return []

#     client = get_mongo_client()
#     default_db = client.get_default_database()
#     db = default_db if default_db is not None else client["expense"]
#     collection = db["transactions"]

#     user_obj_id = _safe_object_id(user_id)
#     start_date = datetime.utcnow() - relativedelta(months=months)

#     query = {
#         "user": user_obj_id,
#         "date": {"$gte": start_date},
#         "type": {"$in": ["EXPENSE", "TRANSFER"]},
#     }

#     projection = {
#         "date": 1,
#         "amount": 1,
#         "currency": 1,
#         "category": 1,
#     }
#     docs = list(collection.find(query, projection).sort("date", -1))
#     if not docs:
#         logger.info("No MongoDB transactions found for user_id=%s", user_id)
#         return []

#     category_ids = {doc["category"] for doc in docs if doc.get("category")}
#     currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}

#     category_map = _resolve_category_titles(category_ids, db)
#     currency_map = _resolve_currency_codes(currency_ids, db)

#     transactions: List[Transaction] = []
#     for doc in docs:
#         date_value: Optional[datetime] = doc.get("date")
#         if not date_value:
#             continue

#         category_name = category_map.get(doc.get("category"), "Uncategorized")
#         currency_code = currency_map.get(doc.get("currency"), "USD")

#         try:
#             transactions.append(
#                 Transaction(
#                     timestamp=date_value,
#                     category=category_name,
#                     amount=float(doc.get("amount", 0)),
#                     currency=currency_code,
#                 )
#             )
#         except Exception as exc:  # noqa: BLE001
#             logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)

#     logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
#     return transactions