| import json | |
| import os | |
| import time | |
| from datetime import datetime, timedelta | |
| from bson import ObjectId | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException | |
| from openai import OpenAI | |
| from pydantic import BaseModel | |
| from pymongo import MongoClient | |
| from pymongo.errors import PyMongoError | |
| # =========================== | |
| # ENV SETUP | |
| # =========================== | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| if not MONGO_URI: | |
| raise RuntimeError("❌ MONGO_URI missing") | |
| if not OPENAI_API_KEY: | |
| raise RuntimeError("❌ OPENAI_API_KEY missing") | |
| mongo_client = MongoClient(MONGO_URI) | |
| default_db = mongo_client.get_default_database() | |
| budget_collection = default_db["budgets"] | |
| transaction_collection = default_db["transactions"] | |
| currencies_collection = default_db["currencies"] | |
| creditcards_collection = default_db["creditcards"] | |
| api_logs_collection = default_db["api_logs"] | |
| openai = OpenAI(api_key=OPENAI_API_KEY) | |
| app = FastAPI(title="Financial Health Score Service") | |
| # =========================== | |
| # MODELS | |
| # =========================== | |
| class ScoreRequest(BaseModel): | |
| userId: str | |
| # =========================== | |
| # HELPERS | |
| # =========================== | |
| def ist_now(): | |
| return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") | |
| def log_api_event( | |
| *, | |
| status: str, | |
| response_time: float, | |
| user_id: str | None = None, | |
| error_message: str | None = None | |
| ): | |
| payload = { | |
| "name": "Financial Health Score", | |
| "status": status, | |
| "date": ist_now(), | |
| "response_time": round(response_time, 3), | |
| } | |
| if user_id: | |
| payload["user_id"] = user_id | |
| if error_message: | |
| payload["error_message"] = error_message | |
| try: | |
| api_logs_collection.insert_one(payload) | |
| except Exception: | |
| pass # logging must never break API | |
| def safe_number(v): | |
| try: | |
| return float(v) | |
| except: | |
| return None | |
| def normalize_budgets(budgets): | |
| out = [] | |
| for b in budgets: | |
| heads = [] | |
| for h in b.get("headCategories", []) or []: | |
| heads.append({ | |
| "spendLimitType": h.get("spendLimitType"), | |
| "spendAmount": safe_number(h.get("spendAmount")), | |
| "maxAmount": safe_number(h.get("maxAmount")), | |
| "remainingAmount": safe_number(h.get("remainingAmount")), | |
| "notifications": h.get("notifications") or [] | |
| }) | |
| out.append({ | |
| "name": b.get("name"), | |
| "status": b.get("status"), | |
| "period": b.get("period"), | |
| "maxAmount": safe_number(b.get("maxAmount")), | |
| "spendAmount": safe_number(b.get("spendAmount")), | |
| "remainingAmount": safe_number(b.get("remainingAmount")), | |
| "rollover": b.get("rollover"), | |
| "notifications": b.get("notifications") or [], | |
| "headCategories": heads, | |
| }) | |
| return out | |
| def normalize_transactions(txns): | |
| out = [] | |
| for txn in txns: | |
| date_val = txn.get("date") | |
| date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None | |
| currency_code = None | |
| currency_id = txn.get("currency") | |
| try: | |
| doc = currencies_collection.find_one({"_id": ObjectId(currency_id)}) | |
| if doc: | |
| currency_code = doc.get("code") or doc.get("currency") | |
| except: | |
| pass | |
| out.append({ | |
| "type": txn.get("type"), | |
| "amount": safe_number(txn.get("amount")), | |
| "currency": currency_code, | |
| "date": date_str, | |
| }) | |
| return out | |
| def normalize_creditcards(cards): | |
| out = [] | |
| for c in cards: | |
| try: | |
| start_date = c.get("billing_cycle", {}).get("start_date") | |
| end_date = c.get("billing_cycle", {}).get("end_date") | |
| due_date = c.get("due_date") | |
| start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None | |
| end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None | |
| due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None | |
| except: | |
| start_date = end_date = due_date = None | |
| out.append({ | |
| "card_name": c.get("card_name"), | |
| "credit_limit": safe_number(c.get("credit_limit")), | |
| "current_balance": safe_number(c.get("current_balance")), | |
| "total_due_amount": safe_number(c.get("total_due_amount")), | |
| "minimum_due": safe_number(c.get("minimum_due")), | |
| "billing_cycle_start": start_date, | |
| "billing_cycle_end": end_date, | |
| "due_date": due_date, | |
| }) | |
| return out | |
| def build_prompt(budgets, transactions, creditcards): | |
| return f""" | |
| You are a financial health scoring engine. | |
| Compute a score (0–100) considering: | |
| - Budget usage | |
| - Spending behavior | |
| - Credit card utilization and dues | |
| Return JSON only. | |
| Budgets: | |
| {json.dumps(budgets, indent=2)} | |
| Transactions: | |
| {json.dumps(transactions, indent=2)} | |
| Credit Cards: | |
| {json.dumps(creditcards, indent=2)} | |
| """ | |
| # =========================== | |
| # HEALTH ENDPOINT | |
| # =========================== | |
| def health(): | |
| report = { | |
| "service": "Financial Health Score Service", | |
| "status": "healthy", | |
| "checks": {} | |
| } | |
| try: | |
| mongo_client.admin.command("ping") | |
| report["checks"]["mongodb"] = "ok" | |
| except PyMongoError as e: | |
| report["checks"]["mongodb"] = f"fail: {e}" | |
| report["status"] = "degraded" | |
| try: | |
| openai.models.list() | |
| report["checks"]["openai"] = "ok" | |
| except Exception as e: | |
| report["checks"]["openai"] = f"fail: {e}" | |
| report["status"] = "degraded" | |
| report["timestamp"] = datetime.utcnow().isoformat() | |
| return report | |
| # =========================== | |
| # FINANCIAL SCORE | |
| # =========================== | |
| def financial_score(payload: ScoreRequest): | |
| start_time = time.time() | |
| try: | |
| user_oid = ObjectId(payload.userId) | |
| except: | |
| log_api_event( | |
| status="fail", | |
| response_time=time.time() - start_time, | |
| user_id=payload.userId, | |
| error_message="Invalid userId" | |
| ) | |
| raise HTTPException(status_code=400, detail="Invalid userId") | |
| try: | |
| budgets = normalize_budgets( | |
| list(budget_collection.find({"createdBy": user_oid})) | |
| ) | |
| txns = normalize_transactions( | |
| list( | |
| transaction_collection.find( | |
| {"user": user_oid, "date": {"$gte": datetime.utcnow() - timedelta(days=30)}} | |
| ).sort("date", -1).limit(200) | |
| ) | |
| ) | |
| cards = normalize_creditcards( | |
| list(creditcards_collection.find({"user_id": user_oid})) | |
| ) | |
| if not budgets and not txns and not cards: | |
| log_api_event( | |
| status="success", | |
| response_time=time.time() - start_time, | |
| user_id=payload.userId | |
| ) | |
| return { | |
| "status": "success", | |
| "message": "No financial data found", | |
| "userId": payload.userId, | |
| "score": 0, | |
| "explanation": "No financial data available." | |
| } | |
| response = openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| temperature=0.6, | |
| response_format={ | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "financial_score", | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "score": {"type": "number"}, | |
| "explanation": {"type": "string"} | |
| }, | |
| "required": ["score", "explanation"] | |
| } | |
| } | |
| }, | |
| messages=[ | |
| {"role": "system", "content": "You calculate financial health."}, | |
| {"role": "user", "content": build_prompt(budgets, txns, cards)} | |
| ] | |
| ) | |
| parsed = json.loads(response.choices[0].message.content) | |
| score = max(0, min(100, int(float(parsed.get("score", 0))))) | |
| log_api_event( | |
| status="success", | |
| response_time=time.time() - start_time, | |
| user_id=payload.userId | |
| ) | |
| return { | |
| "status": "success", | |
| "message": "Financial health score calculated successfully", | |
| "userId": payload.userId, | |
| "score": score, | |
| "explanation": parsed.get("explanation") | |
| } | |
| except Exception as exc: | |
| log_api_event( | |
| status="fail", | |
| response_time=time.time() - start_time, | |
| user_id=payload.userId, | |
| error_message=str(exc) | |
| ) | |
| raise HTTPException(status_code=502, detail=str(exc)) | |
| # # main.py | |
| # """ | |
| # Financial Health Score Service (FastAPI) | |
| # FINAL + CREDIT CARD VERSION: | |
| # - Budget score | |
| # - Transaction score | |
| # - Credit card utilization & repayment score | |
| # - GPT-4o-mini with JSON schema | |
| # - Universal JSON parsing | |
| # """ | |
| # import json | |
| # import os | |
| # from datetime import datetime, timedelta | |
| # from bson import ObjectId | |
| # from dotenv import load_dotenv | |
| # from fastapi import FastAPI, HTTPException | |
| # from openai import OpenAI | |
| # from pydantic import BaseModel | |
| # from pymongo import MongoClient | |
| # # =========================== | |
| # # ENV SETUP | |
| # # =========================== | |
| # load_dotenv() | |
| # MONGO_URI = os.getenv("MONGO_URI") | |
| # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # if not MONGO_URI: | |
| # raise RuntimeError("❌ MONGO_URI missing") | |
| # if not OPENAI_API_KEY: | |
| # raise RuntimeError("❌ OPENAI_API_KEY missing") | |
| # mongo_client = MongoClient(MONGO_URI) | |
| # default_db = mongo_client.get_default_database() | |
| # budget_collection = default_db["budgets"] | |
| # transaction_collection = default_db["transactions"] | |
| # currencies_collection = default_db["currencies"] | |
| # creditcards_collection = default_db["creditcards"] | |
| # openai = OpenAI(api_key=OPENAI_API_KEY) | |
| # app = FastAPI(title="Financial Health Score Service") | |
| # # =========================== | |
| # # MODELS | |
| # # =========================== | |
| # class ScoreRequest(BaseModel): | |
| # userId: str | |
| # # =========================== | |
| # # HELPERS | |
| # # =========================== | |
| # def safe_number(v): | |
| # try: | |
| # return float(v) | |
| # except: | |
| # return None | |
| # def normalize_budgets(budgets): | |
| # out = [] | |
| # for b in budgets: | |
| # heads = [] | |
| # for h in b.get("headCategories", []) or []: | |
| # heads.append({ | |
| # "spendLimitType": h.get("spendLimitType"), | |
| # "spendAmount": safe_number(h.get("spendAmount")), | |
| # "maxAmount": safe_number(h.get("maxAmount")), | |
| # "remainingAmount": safe_number(h.get("remainingAmount")), | |
| # "notifications": h.get("notifications") or [] | |
| # }) | |
| # out.append({ | |
| # "name": b.get("name"), | |
| # "status": b.get("status"), | |
| # "period": b.get("period"), | |
| # "maxAmount": safe_number(b.get("maxAmount")), | |
| # "spendAmount": safe_number(b.get("spendAmount")), | |
| # "remainingAmount": safe_number(b.get("remainingAmount")), | |
| # "rollover": b.get("rollover"), | |
| # "notifications": b.get("notifications") or [], | |
| # "headCategories": heads, | |
| # }) | |
| # return out | |
| # def normalize_transactions(txns): | |
| # out = [] | |
| # for txn in txns: | |
| # date_val = txn.get("date") | |
| # date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None | |
| # currency_code = None | |
| # currency_id = txn.get("currency") | |
| # try: | |
| # if isinstance(currency_id, ObjectId): | |
| # doc = currencies_collection.find_one({"_id": currency_id}) | |
| # else: | |
| # try: | |
| # doc = currencies_collection.find_one({"_id": ObjectId(currency_id)}) | |
| # except: | |
| # doc = None | |
| # if doc: | |
| # currency_code = doc.get("code") or doc.get("currency") | |
| # except: | |
| # currency_code = None | |
| # out.append({ | |
| # "type": txn.get("type"), | |
| # "amount": safe_number(txn.get("amount")), | |
| # "currency": currency_code, | |
| # "date": date_str, | |
| # }) | |
| # return out | |
| # def normalize_creditcards(cards): | |
| # out = [] | |
| # for c in cards: | |
| # try: | |
| # start_date = c.get("billing_cycle", {}).get("start_date") | |
| # end_date = c.get("billing_cycle", {}).get("end_date") | |
| # start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None | |
| # end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None | |
| # due_date = c.get("due_date") | |
| # due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None | |
| # except: | |
| # start_date = end_date = due_date = None | |
| # out.append({ | |
| # "card_name": c.get("card_name"), | |
| # "credit_limit": safe_number(c.get("credit_limit")), | |
| # "current_balance": safe_number(c.get("current_balance")), | |
| # "total_due_amount": safe_number(c.get("total_due_amount")), | |
| # "minimum_due": safe_number(c.get("minimum_due")), | |
| # "billing_cycle_start": start_date, | |
| # "billing_cycle_end": end_date, | |
| # "due_date": due_date, | |
| # }) | |
| # return out | |
| # def build_prompt(budgets, transactions, creditcards): | |
| # return f""" | |
| # You are a financial health scoring engine. | |
| # Compute a score (0–100) considering: | |
| # 1. Budgets usage | |
| # 2. Spending patterns from recent transactions (last 30 days) | |
| # 3. Credit card health: | |
| # - utilization ratio (balance / credit_limit) | |
| # - due amount status | |
| # - repayment behavior | |
| # - risk from minimum-due payments | |
| # - future risk if due_date is near | |
| # Scoring rules: | |
| # - A lower credit utilization (<30%) increases score. | |
| # - High utilization (>70%) lowers score sharply. | |
| # - Overdue or large due amounts reduce score. | |
| # - Good budget management increases score. | |
| # - Overspending reduces score. | |
| # - Make explanation short (2 sentences max). | |
| # - MUST follow JSON schema. | |
| # Budgets: | |
| # {json.dumps(budgets, indent=2)} | |
| # Transactions: | |
| # {json.dumps(transactions, indent=2)} | |
| # CreditCards: | |
| # {json.dumps(creditcards, indent=2)} | |
| # """ | |
| # # =========================== | |
| # # ROUTES | |
| # # =========================== | |
| # @app.post("/financial-score") | |
| # def financial_score(payload: ScoreRequest): | |
| # # Validate ID | |
| # try: | |
| # user_id = ObjectId(payload.userId) | |
| # except: | |
| # raise HTTPException(status_code=400, detail="Invalid userId") | |
| # # Fetch budgets | |
| # budgets_raw = list(budget_collection.find({"createdBy": user_id})) | |
| # budgets = normalize_budgets(budgets_raw) | |
| # # Fetch last 30 days transactions | |
| # thirty_days_ago = datetime.utcnow() - timedelta(days=30) | |
| # txns_raw = list( | |
| # transaction_collection.find( | |
| # {"user": user_id, "date": {"$gte": thirty_days_ago}} | |
| # ).sort("date", -1).limit(200) | |
| # ) | |
| # transactions = normalize_transactions(txns_raw) | |
| # # Fetch credit cards | |
| # cards_raw = list(creditcards_collection.find({"user_id": user_id})) | |
| # creditcards = normalize_creditcards(cards_raw) | |
| # # No data | |
| # if not budgets and not transactions and not creditcards: | |
| # return { | |
| # "userId": payload.userId, | |
| # "score": 0, | |
| # "explanation": "No financial data found." | |
| # } | |
| # prompt = build_prompt(budgets, transactions, creditcards) | |
| # try: | |
| # response = openai.chat.completions.create( | |
| # model="gpt-4o-mini", | |
| # temperature=0.6, | |
| # response_format={ | |
| # "type": "json_schema", | |
| # "json_schema": { | |
| # "name": "financial_score", | |
| # "schema": { | |
| # "type": "object", | |
| # "properties": { | |
| # "score": {"type": "number"}, | |
| # "explanation": {"type": "string"} | |
| # }, | |
| # "required": ["score", "explanation"], | |
| # "additionalProperties": False | |
| # } | |
| # } | |
| # }, | |
| # messages=[ | |
| # {"role": "system", "content": "You calculate financial health."}, | |
| # {"role": "user", "content": prompt} | |
| # ] | |
| # ) | |
| # except Exception as exc: | |
| # raise HTTPException(status_code=502, detail=f"OpenAI request failed: {exc}") | |
| # # UNIVERSAL JSON PARSING | |
| # try: | |
| # content = response.choices[0].message.content | |
| # if isinstance(content, list): | |
| # raw_json = content[0].get("text", content[0]) | |
| # else: | |
| # raw_json = content | |
| # if not isinstance(raw_json, str): | |
| # raw_json = str(raw_json) | |
| # parsed = json.loads(raw_json) | |
| # except Exception as e: | |
| # raise HTTPException( | |
| # status_code=502, | |
| # detail=f"Could not parse JSON output: {e}" | |
| # ) | |
| # score_val = parsed.get("score", 0) | |
| # try: | |
| # score_val = int(float(score_val)) | |
| # except: | |
| # score_val = 0 | |
| # score_val = max(0, min(100, score_val)) | |
| # return { | |
| # "userId": payload.userId, | |
| # "score": score_val, | |
| # "explanation": parsed.get("explanation") | |
| # } | |