LogicGoInfotechSpaces commited on
Commit
3a4ffcf
·
0 Parent(s):

Add AI Financial Insights FastAPI service

Browse files
.history/app/db_20251124161247.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ from datetime import datetime
4
+ from functools import lru_cache
5
+ from typing import Dict, Iterable, List, Optional, Set
6
+
7
+ from bson import ObjectId
8
+ from dateutil.relativedelta import relativedelta
9
+ from pymongo import MongoClient
10
+
11
+ from .schemas import Transaction
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ @lru_cache(maxsize=1)
17
+ def get_mongo_client() -> MongoClient:
18
+ """Return a cached MongoClient instance."""
19
+ uri = os.getenv("MONGODB_URI")
20
+ if not uri:
21
+ raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
22
+ logger.info("Connecting to MongoDB host from URI")
23
+ return MongoClient(uri)
24
+
25
+
26
+ def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
27
+ if not category_ids:
28
+ return {}
29
+ cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
30
+ return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
31
+
32
+
33
+ def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
34
+ if not currency_ids:
35
+ return {}
36
+ cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
37
+ return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
38
+
39
+
40
+ def _safe_object_id(identifier: str) -> ObjectId:
41
+ try:
42
+ return ObjectId(identifier)
43
+ except Exception as exc: # noqa: BLE001
44
+ raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
45
+
46
+
47
+ def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
48
+ """
49
+ Fetch up to `months` of recent expense transactions for a user.
50
+
51
+ Returns an empty list if no data exists (new user scenario).
52
+ """
53
+ if months <= 0:
54
+ return []
55
+
56
+ client = get_mongo_client()
57
+ db = client.get_default_database() or client["expense"]
58
+ collection = db["transactions"]
59
+
60
+ user_obj_id = _safe_object_id(user_id)
61
+ start_date = datetime.utcnow() - relativedelta(months=months)
62
+
63
+ query = {
64
+ "user": user_obj_id,
65
+ "date": {"$gte": start_date},
66
+ "type": {"$in": ["EXPENSE", "TRANSFER"]},
67
+ }
68
+
69
+ projection = {
70
+ "date": 1,
71
+ "amount": 1,
72
+ "currency": 1,
73
+ "category": 1,
74
+ }
75
+ docs = list(collection.find(query, projection).sort("date", -1))
76
+ if not docs:
77
+ logger.info("No MongoDB transactions found for user_id=%s", user_id)
78
+ return []
79
+
80
+ category_ids = {doc["category"] for doc in docs if doc.get("category")}
81
+ currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
82
+
83
+ category_map = _resolve_category_titles(category_ids, db)
84
+ currency_map = _resolve_currency_codes(currency_ids, db)
85
+
86
+ transactions: List[Transaction] = []
87
+ for doc in docs:
88
+ date_value: Optional[datetime] = doc.get("date")
89
+ if not date_value:
90
+ continue
91
+
92
+ category_name = category_map.get(doc.get("category"), "Uncategorized")
93
+ currency_code = currency_map.get(doc.get("currency"), "USD")
94
+
95
+ try:
96
+ transactions.append(
97
+ Transaction(
98
+ timestamp=date_value,
99
+ category=category_name,
100
+ amount=float(doc.get("amount", 0)),
101
+ currency=currency_code,
102
+ )
103
+ )
104
+ except Exception as exc: # noqa: BLE001
105
+ logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
106
+
107
+ logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
108
+ return transactions
109
+
.history/app/db_20251124163521.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ from datetime import datetime
4
+ from functools import lru_cache
5
+ from typing import Dict, Iterable, List, Optional, Set
6
+
7
+ from bson import ObjectId
8
+ from dateutil.relativedelta import relativedelta
9
+ from pymongo import MongoClient
10
+
11
+ from .schemas import Transaction
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ @lru_cache(maxsize=1)
17
+ def get_mongo_client() -> MongoClient:
18
+ """Return a cached MongoClient instance."""
19
+ uri = os.getenv("MONGODB_URI")
20
+ if not uri:
21
+ raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
22
+ logger.info("Connecting to MongoDB host from URI")
23
+ return MongoClient(uri)
24
+
25
+
26
+ def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
27
+ if not category_ids:
28
+ return {}
29
+ cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
30
+ return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
31
+
32
+
33
+ def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
34
+ if not currency_ids:
35
+ return {}
36
+ cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
37
+ return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
38
+
39
+
40
+ def _safe_object_id(identifier: str) -> ObjectId:
41
+ try:
42
+ return ObjectId(identifier)
43
+ except Exception as exc: # noqa: BLE001
44
+ raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
45
+
46
+
47
+ def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
48
+ """
49
+ Fetch up to `months` of recent expense transactions for a user.
50
+
51
+ Returns an empty list if no data exists (new user scenario).
52
+ """
53
+ if months <= 0:
54
+ return []
55
+
56
+ client = get_mongo_client()
57
+ db = client.get_default_database() or client["expense"]
58
+ collection = db["transactions"]
59
+
60
+ user_obj_id = _safe_object_id(user_id)
61
+ start_date = datetime.utcnow() - relativedelta(months=months)
62
+
63
+ query = {
64
+ "user": user_obj_id,
65
+ "date": {"$gte": start_date},
66
+ "type": {"$in": ["EXPENSE", "TRANSFER"]},
67
+ }
68
+
69
+ projection = {
70
+ "date": 1,
71
+ "amount": 1,
72
+ "currency": 1,
73
+ "category": 1,
74
+ }
75
+ docs = list(collection.find(query, projection).sort("date", -1))
76
+ if not docs:
77
+ logger.info("No MongoDB transactions found for user_id=%s", user_id)
78
+ return []
79
+
80
+ category_ids = {doc["category"] for doc in docs if doc.get("category")}
81
+ currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
82
+
83
+ category_map = _resolve_category_titles(category_ids, db)
84
+ currency_map = _resolve_currency_codes(currency_ids, db)
85
+
86
+ transactions: List[Transaction] = []
87
+ for doc in docs:
88
+ date_value: Optional[datetime] = doc.get("date")
89
+ if not date_value:
90
+ continue
91
+
92
+ category_name = category_map.get(doc.get("category"), "Uncategorized")
93
+ currency_code = currency_map.get(doc.get("currency"), "USD")
94
+
95
+ try:
96
+ transactions.append(
97
+ Transaction(
98
+ timestamp=date_value,
99
+ category=category_name,
100
+ amount=float(doc.get("amount", 0)),
101
+ currency=currency_code,
102
+ )
103
+ )
104
+ except Exception as exc: # noqa: BLE001
105
+ logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
106
+
107
+ logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
108
+ return transactions
109
+
.history/app/db_20251124164204.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ from datetime import datetime
4
+ from functools import lru_cache
5
+ from typing import Dict, Iterable, List, Optional, Set
6
+
7
+ from bson import ObjectId
8
+ from dateutil.relativedelta import relativedelta
9
+ from pymongo import MongoClient
10
+
11
+ from .schemas import Transaction
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ @lru_cache(maxsize=1)
17
+ def get_mongo_client() -> MongoClient:
18
+ """Return a cached MongoClient instance."""
19
+ uri = os.getenv("MONGODB_URI")
20
+ if not uri:
21
+ raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
22
+ logger.info("Connecting to MongoDB host from URI")
23
+ return MongoClient(uri)
24
+
25
+
26
+ def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
27
+ if not category_ids:
28
+ return {}
29
+ cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
30
+ return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
31
+
32
+
33
+ def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
34
+ if not currency_ids:
35
+ return {}
36
+ cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
37
+ return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
38
+
39
+
40
+ def _safe_object_id(identifier: str) -> ObjectId:
41
+ try:
42
+ return ObjectId(identifier)
43
+ except Exception as exc: # noqa: BLE001
44
+ raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
45
+
46
+
47
+ def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
48
+ """
49
+ Fetch up to `months` of recent expense transactions for a user.
50
+
51
+ Returns an empty list if no data exists (new user scenario).
52
+ """
53
+ if months <= 0:
54
+ return []
55
+
56
+ client = get_mongo_client()
57
+ db = client.get_default_database() or client["expense"]
58
+ collection = db["transactions"]
59
+
60
+ user_obj_id = _safe_object_id(user_id)
61
+ start_date = datetime.utcnow() - relativedelta(months=months)
62
+
63
+ query = {
64
+ "user": user_obj_id,
65
+ "date": {"$gte": start_date},
66
+ "type": {"$in": ["EXPENSE", "TRANSFER"]},
67
+ }
68
+
69
+ projection = {
70
+ "date": 1,
71
+ "amount": 1,
72
+ "currency": 1,
73
+ "category": 1,
74
+ }
75
+ docs = list(collection.find(query, projection).sort("date", -1))
76
+ if not docs:
77
+ logger.info("No MongoDB transactions found for user_id=%s", user_id)
78
+ return []
79
+
80
+ category_ids = {doc["category"] for doc in docs if doc.get("category")}
81
+ currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
82
+
83
+ category_map = _resolve_category_titles(category_ids, db)
84
+ currency_map = _resolve_currency_codes(currency_ids, db)
85
+
86
+ transactions: List[Transaction] = []
87
+ for doc in docs:
88
+ date_value: Optional[datetime] = doc.get("date")
89
+ if not date_value:
90
+ continue
91
+
92
+ category_name = category_map.get(doc.get("category"), "Uncategorized")
93
+ currency_code = currency_map.get(doc.get("currency"), "USD")
94
+
95
+ try:
96
+ transactions.append(
97
+ Transaction(
98
+ timestamp=date_value,
99
+ category=category_name,
100
+ amount=float(doc.get("amount", 0)),
101
+ currency=currency_code,
102
+ )
103
+ )
104
+ except Exception as exc: # noqa: BLE001
105
+ logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
106
+
107
+ logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
108
+ return transactions
109
+
Dockerfile ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim
2
+
3
+ ENV PYTHONDONTWRITEBYTECODE=1 \
4
+ PYTHONUNBUFFERED=1 \
5
+ PATH="/home/user/.local/bin:$PATH"
6
+
7
+ RUN useradd -m -u 1000 user
8
+ USER user
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user:user requirements.txt .
13
+ RUN pip install --no-cache-dir --upgrade pip && \
14
+ pip install --no-cache-dir -r requirements.txt
15
+
16
+ COPY --chown=user:user . .
17
+
18
+ EXPOSE 7860
19
+
20
+ CMD ["python3", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
21
+
README.md ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ## AI Financial Insights
2
+
3
+ AI Financial Insights provides personalized spending summaries that mirror the AI-driven budgeting direction outlined for WalletSync, helping users spot changing expense patterns before overspending occurs. The service analyzes up to the three most recent months of transaction history (or gracefully handles brand-new users) and surfaces conversational insights such as “Your grocery expenses increased by 22% this month compared to last month.”
4
+
5
+ ### Features
6
+ - FastAPI backend with `/insights` endpoint that accepts a list of transactions and returns contextual insights per category plus overall month-over-month highlights.
7
+ - Logic considers up to the latest three months of data; if less history is available, it adapts without errors.
8
+ - Dockerfile tailored for Hugging Face Spaces (listening on port `7860`).
9
+
10
+ ### Project structure
11
+ ```
12
+ .
13
+ ├── app
14
+ │ ├── __init__.py
15
+ │ ├── main.py # FastAPI entrypoint
16
+ │ ├── schemas.py # Pydantic models
17
+ │ └── services.py # Insight generation logic
18
+ ├── requirements.txt
19
+ └── Dockerfile
20
+ ```
21
+
22
+ ### Local development
23
+ 1. Create a virtual environment and install dependencies:
24
+ ```
25
+ python -m venv .venv
26
+ .\.venv\Scripts\activate
27
+ python -m pip install --upgrade pip
28
+ python -m pip install -r requirements.txt
29
+ ```
30
+ 2. Export your Mongo connection string (provided by WalletSync) before running the API:
31
+ ```
32
+ setx MONGODB_URI "mongodb://expenseuser:Kem_6o%3F%3F@165.227.69.221:27017/expense?authSource=admin"
33
+ ```
34
+ 3. Run the FastAPI server (per user rule, use `python3` to launch uvicorn):
35
+ ```
36
+ python3 -m uvicorn app.main:app --host 0.0.0.0 --port 8000
37
+ ```
38
+ 4. Open the docs at `http://localhost:8000/docs`.
39
+
40
+ ### Request example
41
+ ```
42
+ POST /insights
43
+ {
44
+ "user_id": "demo-user",
45
+ "transactions": [
46
+ {"timestamp": "2025-09-05T10:00:00Z", "category": "Groceries", "amount": 120.5, "currency": "INR"},
47
+ {"timestamp": "2025-10-04T11:00:00Z", "category": "Groceries", "amount": 150.0, "currency": "INR"},
48
+ {"timestamp": "2025-10-15T09:30:00Z", "category": "Travel", "amount": 2000.0, "currency": "INR"}
49
+ ]
50
+ }
51
+ ```
52
+
53
+ ### Docker (Hugging Face Space)
54
+ Build and test locally:
55
+ ```
56
+ docker build -t ai-financial-insights .
57
+ docker run -it -p 7860:7860 ai-financial-insights
58
+ ```
59
+ When deploying on Hugging Face, add `MONGODB_URI` as a Space secret so the container can reach your database.
60
+
61
+ ### Deploying to Hugging Face
62
+ 1. Log in and clone the Space:
63
+ ```
64
+ git clone https://huggingface.co/spaces/LogicGoInfotechSpaces/AI_FINANCIAL_INSIGHTS
65
+ cd AI_FINANCIAL_INSIGHTS
66
+ ```
67
+ 2. Copy the project files into the cloned repo and commit:
68
+ ```
69
+ git add .
70
+ git commit -m "Add AI Financial Insights FastAPI app"
71
+ git push
72
+ ```
73
+ 3. The Docker Space automatically builds and serves the FastAPI app on port `7860`. Monitor the Space page for build logs and status.
74
+
app/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ """
2
+ AI Financial Insights FastAPI application package.
3
+ """
4
+
app/__pycache__/__init__.cpython-310.pyc ADDED
Binary file (206 Bytes). View file
 
app/__pycache__/db.cpython-310.pyc ADDED
Binary file (3.66 kB). View file
 
app/__pycache__/main.cpython-310.pyc ADDED
Binary file (2.25 kB). View file
 
app/__pycache__/schemas.cpython-310.pyc ADDED
Binary file (2.48 kB). View file
 
app/__pycache__/services.cpython-310.pyc ADDED
Binary file (4.61 kB). View file
 
app/db.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ from datetime import datetime
4
+ from functools import lru_cache
5
+ from typing import Dict, Iterable, List, Optional, Set
6
+
7
+ from bson import ObjectId
8
+ from dateutil.relativedelta import relativedelta
9
+ from pymongo import MongoClient
10
+
11
+ from .schemas import Transaction
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ @lru_cache(maxsize=1)
17
+ def get_mongo_client() -> MongoClient:
18
+ """Return a cached MongoClient instance."""
19
+ uri = os.getenv("MONGODB_URI")
20
+ if not uri:
21
+ raise RuntimeError("MONGODB_URI environment variable is required for MongoDB access.")
22
+ logger.info("Connecting to MongoDB host from URI")
23
+ return MongoClient(uri)
24
+
25
+
26
+ def _resolve_category_titles(category_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
27
+ if not category_ids:
28
+ return {}
29
+ cursor = db["categories"].find({"_id": {"$in": list(category_ids)}}, {"title": 1})
30
+ return {doc["_id"]: doc.get("title", "Uncategorized") for doc in cursor}
31
+
32
+
33
+ def _resolve_currency_codes(currency_ids: Set[ObjectId], db) -> Dict[ObjectId, str]:
34
+ if not currency_ids:
35
+ return {}
36
+ cursor = db["currencies"].find({"_id": {"$in": list(currency_ids)}}, {"code": 1})
37
+ return {doc["_id"]: doc.get("code", "USD") for doc in cursor}
38
+
39
+
40
+ def _safe_object_id(identifier: str) -> ObjectId:
41
+ try:
42
+ return ObjectId(identifier)
43
+ except Exception as exc: # noqa: BLE001
44
+ raise ValueError(f"Invalid Mongo ObjectId: {identifier}") from exc
45
+
46
+
47
+ def fetch_recent_transactions(user_id: str, months: int = 3) -> List[Transaction]:
48
+ """
49
+ Fetch up to `months` of recent expense transactions for a user.
50
+
51
+ Returns an empty list if no data exists (new user scenario).
52
+ """
53
+ if months <= 0:
54
+ return []
55
+
56
+ client = get_mongo_client()
57
+ db = client.get_default_database() or client["expense"]
58
+ collection = db["transactions"]
59
+
60
+ user_obj_id = _safe_object_id(user_id)
61
+ start_date = datetime.utcnow() - relativedelta(months=months)
62
+
63
+ query = {
64
+ "user": user_obj_id,
65
+ "date": {"$gte": start_date},
66
+ "type": {"$in": ["EXPENSE", "TRANSFER"]},
67
+ }
68
+
69
+ projection = {
70
+ "date": 1,
71
+ "amount": 1,
72
+ "currency": 1,
73
+ "category": 1,
74
+ }
75
+ docs = list(collection.find(query, projection).sort("date", -1))
76
+ if not docs:
77
+ logger.info("No MongoDB transactions found for user_id=%s", user_id)
78
+ return []
79
+
80
+ category_ids = {doc["category"] for doc in docs if doc.get("category")}
81
+ currency_ids = {doc["currency"] for doc in docs if doc.get("currency")}
82
+
83
+ category_map = _resolve_category_titles(category_ids, db)
84
+ currency_map = _resolve_currency_codes(currency_ids, db)
85
+
86
+ transactions: List[Transaction] = []
87
+ for doc in docs:
88
+ date_value: Optional[datetime] = doc.get("date")
89
+ if not date_value:
90
+ continue
91
+
92
+ category_name = category_map.get(doc.get("category"), "Uncategorized")
93
+ currency_code = currency_map.get(doc.get("currency"), "USD")
94
+
95
+ try:
96
+ transactions.append(
97
+ Transaction(
98
+ timestamp=date_value,
99
+ category=category_name,
100
+ amount=float(doc.get("amount", 0)),
101
+ currency=currency_code,
102
+ )
103
+ )
104
+ except Exception as exc: # noqa: BLE001
105
+ logger.warning("Skipping malformed transaction doc=%s error=%s", doc.get("_id"), exc)
106
+
107
+ logger.info("Fetched %d transactions from MongoDB for user_id=%s", len(transactions), user_id)
108
+ return transactions
109
+
app/main.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ from typing import Dict, List
4
+
5
+ from fastapi import FastAPI, HTTPException
6
+
7
+ from .db import fetch_recent_transactions
8
+ from .schemas import InsightRequest, InsightResponse, Transaction
9
+ from .services import generate_insights
10
+
11
+ logging.basicConfig(
12
+ level=os.environ.get("LOG_LEVEL", "INFO"),
13
+ format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
14
+ )
15
+ logger = logging.getLogger(__name__)
16
+
17
+ app = FastAPI(
18
+ title="AI Financial Insights",
19
+ description="AI-powered personalized spending insight generator for WalletSync.",
20
+ version="1.0.0",
21
+ )
22
+
23
+
24
+ @app.get("/")
25
+ def root() -> Dict[str, str]:
26
+ """Simple description endpoint."""
27
+ return {
28
+ "message": "AI Financial Insights is ready. POST /insights with transactions to receive insights.",
29
+ "docs": "/docs",
30
+ }
31
+
32
+
33
+ @app.get("/health")
34
+ def health() -> Dict[str, str]:
35
+ """Health probe for monitoring."""
36
+ return {"status": "ok"}
37
+
38
+
39
+ def _resolve_transactions(payload: InsightRequest) -> List[Transaction]:
40
+ if payload.transactions:
41
+ return payload.transactions
42
+ if not payload.user_id:
43
+ raise HTTPException(
44
+ status_code=400,
45
+ detail="Provide `transactions` or `user_id` to fetch them from MongoDB.",
46
+ )
47
+ try:
48
+ return fetch_recent_transactions(payload.user_id)
49
+ except ValueError as exc:
50
+ raise HTTPException(status_code=400, detail=str(exc)) from exc
51
+ except Exception as exc: # noqa: BLE001
52
+ logger.exception("Failed to fetch MongoDB data for user_id=%s", payload.user_id)
53
+ raise HTTPException(status_code=502, detail="Unable to query transaction store.") from exc
54
+
55
+
56
+ @app.post("/insights", response_model=InsightResponse)
57
+ def create_insights(payload: InsightRequest) -> InsightResponse:
58
+ """Generate insights from a transaction payload."""
59
+ logger.info("Received request for user_id=%s", payload.user_id)
60
+ transactions = _resolve_transactions(payload)
61
+ insights, months = generate_insights(transactions)
62
+ return InsightResponse(user_id=payload.user_id, insights=insights, evaluated_months=months)
63
+
app/schemas.py ADDED
@@ -0,0 +1,50 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ from typing import List, Optional
3
+
4
+ from pydantic import BaseModel, Field, field_validator
5
+
6
+
7
+ class Transaction(BaseModel):
8
+ """Incoming transaction payload."""
9
+
10
+ timestamp: datetime = Field(..., description="Timestamp of the transaction (ISO 8601).")
11
+ category: str = Field(..., min_length=1, max_length=100, description="Spending category label.")
12
+ amount: float = Field(..., gt=0, description="Spending amount in the provided currency.")
13
+ currency: str = Field("USD", min_length=1, max_length=5, description="ISO currency code.")
14
+
15
+ @field_validator("category")
16
+ def normalize_category(cls, value: str) -> str:
17
+ normalized = value.strip()
18
+ if not normalized:
19
+ raise ValueError("category must contain visible characters")
20
+ return normalized.title()
21
+
22
+ @field_validator("currency")
23
+ def uppercase_currency(cls, value: str) -> str:
24
+ normalized = value.strip().upper()
25
+ if not normalized:
26
+ raise ValueError("currency must contain visible characters")
27
+ return normalized
28
+
29
+
30
+ class InsightRequest(BaseModel):
31
+ """Request model for insight generation."""
32
+
33
+ user_id: Optional[str] = Field(None, description="Optional identifier for the user.")
34
+ transactions: List[Transaction] = Field(default_factory=list)
35
+
36
+
37
+ class Insight(BaseModel):
38
+ """Single insight message."""
39
+
40
+ message: str
41
+ category: Optional[str] = None
42
+
43
+
44
+ class InsightResponse(BaseModel):
45
+ """Response payload for generated insights."""
46
+
47
+ user_id: Optional[str] = None
48
+ insights: List[Insight] = Field(default_factory=list)
49
+ evaluated_months: List[str] = Field(default_factory=list, description="ISO year-month strings considered.")
50
+
app/services.py ADDED
@@ -0,0 +1,147 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from collections import defaultdict
5
+ from dataclasses import dataclass
6
+ from datetime import datetime
7
+ from typing import Dict, Iterable, List, Tuple
8
+
9
+ from dateutil.relativedelta import relativedelta
10
+
11
+ from .schemas import Insight, Transaction
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+ CURRENCY_SYMBOLS: Dict[str, str] = {
16
+ "INR": "₹",
17
+ "USD": "$",
18
+ "EUR": "€",
19
+ "GBP": "£",
20
+ }
21
+
22
+
23
+ @dataclass(frozen=True)
24
+ class MonthlySummary:
25
+ """Aggregate spending for a single month and category."""
26
+
27
+ year: int
28
+ month: int
29
+ category: str
30
+ total: float
31
+
32
+ @property
33
+ def iso_month(self) -> str:
34
+ return f"{self.year:04d}-{self.month:02d}"
35
+
36
+
37
+ def _bucket_transactions(transactions: Iterable[Transaction]) -> List[MonthlySummary]:
38
+ """Aggregate transactions per month and category."""
39
+ buckets: Dict[Tuple[int, int, str], float] = defaultdict(float)
40
+ for txn in transactions:
41
+ key = (txn.timestamp.year, txn.timestamp.month, txn.category)
42
+ buckets[key] += txn.amount
43
+
44
+ summaries = [
45
+ MonthlySummary(year=year, month=month, category=category, total=round(total, 2))
46
+ for (year, month, category), total in buckets.items()
47
+ ]
48
+ logger.debug("Created %d monthly summaries", len(summaries))
49
+ return summaries
50
+
51
+
52
+ def _format_currency(amount: float, currency: str) -> str:
53
+ symbol = CURRENCY_SYMBOLS.get(currency.upper(), "")
54
+ formatted_amount = f"{amount:,.2f}"
55
+ return f"{symbol}{formatted_amount}" if symbol else f"{formatted_amount} {currency.upper()}"
56
+
57
+
58
+ def _month_key(summary: MonthlySummary) -> datetime:
59
+ return datetime(summary.year, summary.month, 1)
60
+
61
+
62
+ def generate_insights(transactions: Iterable[Transaction]) -> Tuple[List[Insight], List[str]]:
63
+ """Create AI-inspired insights from historical spending."""
64
+ txns = list(transactions)
65
+ if not txns:
66
+ logger.info("No transactions provided, returning onboarding insight")
67
+ return [Insight(message="Add a few expenses to start seeing personalized insights.")], []
68
+
69
+ summaries = _bucket_transactions(txns)
70
+ if not summaries:
71
+ return [Insight(message="No spending data yet. Track expenses to unlock insights.")], []
72
+
73
+ summaries.sort(key=_month_key, reverse=True)
74
+ recent_months = summaries[:12] # guardrail, though we only surface up to 3 months
75
+
76
+ grouped: Dict[str, List[MonthlySummary]] = defaultdict(list)
77
+ for summary in recent_months:
78
+ grouped[summary.category].append(summary)
79
+
80
+ latest_month = max(recent_months, key=_month_key)
81
+ latest_month_dt = _month_key(latest_month)
82
+ evaluated_months = []
83
+
84
+ insights: List[Insight] = []
85
+ for offset in range(3):
86
+ evaluated_months.append((latest_month_dt - relativedelta(months=offset)).strftime("%Y-%m"))
87
+ evaluated_months = sorted(set(evaluated_months))
88
+
89
+ currency = txns[0].currency
90
+
91
+ for category, entries in grouped.items():
92
+ entries.sort(key=_month_key, reverse=True)
93
+ current = entries[0]
94
+ history = entries[1:3]
95
+ if not history:
96
+ continue
97
+
98
+ history_avg = sum(e.total for e in history) / len(history)
99
+ if history_avg == 0:
100
+ continue
101
+
102
+ change_pct = ((current.total - history_avg) / history_avg) * 100
103
+ diff_amount = current.total - history_avg
104
+
105
+ if abs(change_pct) < 10:
106
+ continue
107
+
108
+ trend = "increased" if change_pct > 0 else "decreased"
109
+ insights.append(
110
+ Insight(
111
+ category=category,
112
+ message=(
113
+ f"Your {category.lower()} spending {trend} by {abs(change_pct):.0f}% this month "
114
+ f"versus your prior average, about {_format_currency(abs(diff_amount), currency)} difference."
115
+ ),
116
+ )
117
+ )
118
+
119
+ # Additional highlight: biggest month-over-month change regardless of category
120
+ monthly_totals: Dict[str, float] = defaultdict(float)
121
+ for summary in summaries:
122
+ monthly_totals[summary.iso_month] += summary.total
123
+
124
+ sorted_months = sorted(monthly_totals.items(), reverse=True)
125
+ if len(sorted_months) >= 2:
126
+ latest_label, latest_total = sorted_months[0]
127
+ prev_label, prev_total = sorted_months[1]
128
+ delta = latest_total - prev_total
129
+ if abs(delta) >= 1:
130
+ descriptor = "more" if delta > 0 else "less"
131
+ insights.append(
132
+ Insight(
133
+ message=(
134
+ f"You spent {_format_currency(abs(delta), currency)} {descriptor} in {latest_label} compared "
135
+ f"to {prev_label}. Consider reviewing large outliers."
136
+ )
137
+ )
138
+ )
139
+
140
+ if not insights:
141
+ insights.append(
142
+ Insight(message="Spending is stable across tracked months. Keep up the steady habits!")
143
+ )
144
+
145
+ logger.info("Generated %d insights", len(insights))
146
+ return insights, evaluated_months
147
+
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ fastapi==0.115.0
2
+ uvicorn[standard]==0.30.4
3
+ python-dateutil==2.9.0.post0
4
+ pymongo==4.8.0
5
+
tests/__pycache__/test_main.cpython-310-pytest-7.4.0.pyc ADDED
Binary file (3.45 kB). View file
 
tests/__pycache__/test_services.cpython-310-pytest-7.4.0.pyc ADDED
Binary file (2.55 kB). View file
 
tests/test_main.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+ from typing import List
3
+
4
+ import pytest
5
+ from fastapi import HTTPException
6
+ from fastapi.testclient import TestClient
7
+
8
+ from app.main import app, _resolve_transactions
9
+ from app.schemas import InsightRequest, Transaction
10
+
11
+
12
+ def _txn() -> Transaction:
13
+ return Transaction(
14
+ timestamp=datetime.utcnow(),
15
+ category="Travel",
16
+ amount=100.0,
17
+ currency="INR",
18
+ )
19
+
20
+
21
+ def test_resolve_transactions_handles_payload_list(monkeypatch):
22
+ payload = InsightRequest(user_id="abc", transactions=[_txn()])
23
+ result = _resolve_transactions(payload)
24
+ assert len(result) == 1
25
+
26
+
27
+ def test_resolve_transactions_requires_user_id_when_no_transactions():
28
+ payload = InsightRequest(user_id=None, transactions=[])
29
+ with pytest.raises(HTTPException):
30
+ _resolve_transactions(payload)
31
+
32
+
33
+ def test_resolve_transactions_fetches_from_db(monkeypatch):
34
+ payload = InsightRequest(user_id="6686280844fc8cbae596aa51", transactions=[])
35
+
36
+ def fake_fetch(user_id: str) -> List[Transaction]:
37
+ assert user_id == payload.user_id
38
+ return [_txn()]
39
+
40
+ monkeypatch.setattr("app.main.fetch_recent_transactions", fake_fetch)
41
+ result = _resolve_transactions(payload)
42
+ assert len(result) == 1
43
+
44
+
45
+ def test_insights_endpoint_requires_payload_when_empty(monkeypatch):
46
+ client = TestClient(app)
47
+ response = client.post("/insights", json={"user_id": None, "transactions": []})
48
+ assert response.status_code == 400
49
+
tests/test_services.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime
2
+
3
+ from app.schemas import Transaction
4
+ from app.services import generate_insights
5
+
6
+
7
+ def _txn(ts: str, category: str, amount: float) -> Transaction:
8
+ return Transaction(
9
+ timestamp=datetime.fromisoformat(ts),
10
+ category=category,
11
+ amount=amount,
12
+ currency="INR",
13
+ )
14
+
15
+
16
+ def test_generate_insights_detects_increase():
17
+ txns = [
18
+ _txn("2025-09-05T00:00:00", "Groceries", 100),
19
+ _txn("2025-10-05T00:00:00", "Groceries", 130),
20
+ _txn("2025-11-05T00:00:00", "Groceries", 200),
21
+ ]
22
+ insights, months = generate_insights(txns)
23
+ assert insights, "Expected at least one insight"
24
+ assert any("increased" in insight.message for insight in insights)
25
+ assert months, "Expected evaluated months to be returned"
26
+
27
+
28
+ def test_generate_insights_handles_new_user():
29
+ insights, months = generate_insights([])
30
+ assert months == []
31
+ assert insights[0].message.startswith("Add a few expenses")
32
+