LogicGoInfotechSpaces's picture
Update app.py
9f46740 verified
import json
import os
import time
from datetime import datetime, timedelta
from bson import ObjectId
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from openai import OpenAI
from pydantic import BaseModel
from pymongo import MongoClient
from pymongo.errors import PyMongoError
# ===========================
# ENV SETUP
# ===========================
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not MONGO_URI:
raise RuntimeError("❌ MONGO_URI missing")
if not OPENAI_API_KEY:
raise RuntimeError("❌ OPENAI_API_KEY missing")
mongo_client = MongoClient(MONGO_URI)
default_db = mongo_client.get_default_database()
budget_collection = default_db["budgets"]
transaction_collection = default_db["transactions"]
currencies_collection = default_db["currencies"]
creditcards_collection = default_db["creditcards"]
api_logs_collection = default_db["api_logs"]
openai = OpenAI(api_key=OPENAI_API_KEY)
app = FastAPI(title="Financial Health Score Service")
# ===========================
# MODELS
# ===========================
class ScoreRequest(BaseModel):
userId: str
# ===========================
# HELPERS
# ===========================
def ist_now():
return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST")
def log_api_event(
*,
status: str,
response_time: float,
user_id: str | None = None,
error_message: str | None = None
):
payload = {
"name": "Financial Health Score",
"status": status,
"date": ist_now(),
"response_time": round(response_time, 3),
}
if user_id:
payload["user_id"] = user_id
if error_message:
payload["error_message"] = error_message
try:
api_logs_collection.insert_one(payload)
except Exception:
pass # logging must never break API
def safe_number(v):
try:
return float(v)
except:
return None
def normalize_budgets(budgets):
out = []
for b in budgets:
heads = []
for h in b.get("headCategories", []) or []:
heads.append({
"spendLimitType": h.get("spendLimitType"),
"spendAmount": safe_number(h.get("spendAmount")),
"maxAmount": safe_number(h.get("maxAmount")),
"remainingAmount": safe_number(h.get("remainingAmount")),
"notifications": h.get("notifications") or []
})
out.append({
"name": b.get("name"),
"status": b.get("status"),
"period": b.get("period"),
"maxAmount": safe_number(b.get("maxAmount")),
"spendAmount": safe_number(b.get("spendAmount")),
"remainingAmount": safe_number(b.get("remainingAmount")),
"rollover": b.get("rollover"),
"notifications": b.get("notifications") or [],
"headCategories": heads,
})
return out
def normalize_transactions(txns):
out = []
for txn in txns:
date_val = txn.get("date")
date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None
currency_code = None
currency_id = txn.get("currency")
try:
doc = currencies_collection.find_one({"_id": ObjectId(currency_id)})
if doc:
currency_code = doc.get("code") or doc.get("currency")
except:
pass
out.append({
"type": txn.get("type"),
"amount": safe_number(txn.get("amount")),
"currency": currency_code,
"date": date_str,
})
return out
def normalize_creditcards(cards):
out = []
for c in cards:
try:
start_date = c.get("billing_cycle", {}).get("start_date")
end_date = c.get("billing_cycle", {}).get("end_date")
due_date = c.get("due_date")
start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None
end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None
due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None
except:
start_date = end_date = due_date = None
out.append({
"card_name": c.get("card_name"),
"credit_limit": safe_number(c.get("credit_limit")),
"current_balance": safe_number(c.get("current_balance")),
"total_due_amount": safe_number(c.get("total_due_amount")),
"minimum_due": safe_number(c.get("minimum_due")),
"billing_cycle_start": start_date,
"billing_cycle_end": end_date,
"due_date": due_date,
})
return out
def build_prompt(budgets, transactions, creditcards):
return f"""
You are a financial health scoring engine.
Compute a score (0–100) considering:
- Budget usage
- Spending behavior
- Credit card utilization and dues
Return JSON only.
Budgets:
{json.dumps(budgets, indent=2)}
Transactions:
{json.dumps(transactions, indent=2)}
Credit Cards:
{json.dumps(creditcards, indent=2)}
"""
# ===========================
# HEALTH ENDPOINT
# ===========================
@app.get("/health")
def health():
report = {
"service": "Financial Health Score Service",
"status": "healthy",
"checks": {}
}
try:
mongo_client.admin.command("ping")
report["checks"]["mongodb"] = "ok"
except PyMongoError as e:
report["checks"]["mongodb"] = f"fail: {e}"
report["status"] = "degraded"
try:
openai.models.list()
report["checks"]["openai"] = "ok"
except Exception as e:
report["checks"]["openai"] = f"fail: {e}"
report["status"] = "degraded"
report["timestamp"] = datetime.utcnow().isoformat()
return report
# ===========================
# FINANCIAL SCORE
# ===========================
@app.post("/financial-score")
def financial_score(payload: ScoreRequest):
start_time = time.time()
try:
user_oid = ObjectId(payload.userId)
except:
log_api_event(
status="fail",
response_time=time.time() - start_time,
user_id=payload.userId,
error_message="Invalid userId"
)
raise HTTPException(status_code=400, detail="Invalid userId")
try:
budgets = normalize_budgets(
list(budget_collection.find({"createdBy": user_oid}))
)
txns = normalize_transactions(
list(
transaction_collection.find(
{"user": user_oid, "date": {"$gte": datetime.utcnow() - timedelta(days=30)}}
).sort("date", -1).limit(200)
)
)
cards = normalize_creditcards(
list(creditcards_collection.find({"user_id": user_oid}))
)
if not budgets and not txns and not cards:
log_api_event(
status="success",
response_time=time.time() - start_time,
user_id=payload.userId
)
return {
"status": "success",
"message": "No financial data found",
"userId": payload.userId,
"score": 0,
"explanation": "No financial data available."
}
response = openai.chat.completions.create(
model="gpt-4o-mini",
temperature=0.6,
response_format={
"type": "json_schema",
"json_schema": {
"name": "financial_score",
"schema": {
"type": "object",
"properties": {
"score": {"type": "number"},
"explanation": {"type": "string"}
},
"required": ["score", "explanation"]
}
}
},
messages=[
{"role": "system", "content": "You calculate financial health."},
{"role": "user", "content": build_prompt(budgets, txns, cards)}
]
)
parsed = json.loads(response.choices[0].message.content)
score = max(0, min(100, int(float(parsed.get("score", 0)))))
log_api_event(
status="success",
response_time=time.time() - start_time,
user_id=payload.userId
)
return {
"status": "success",
"message": "Financial health score calculated successfully",
"userId": payload.userId,
"score": score,
"explanation": parsed.get("explanation")
}
except Exception as exc:
log_api_event(
status="fail",
response_time=time.time() - start_time,
user_id=payload.userId,
error_message=str(exc)
)
raise HTTPException(status_code=502, detail=str(exc))
# # main.py
# """
# Financial Health Score Service (FastAPI)
# FINAL + CREDIT CARD VERSION:
# - Budget score
# - Transaction score
# - Credit card utilization & repayment score
# - GPT-4o-mini with JSON schema
# - Universal JSON parsing
# """
# import json
# import os
# from datetime import datetime, timedelta
# from bson import ObjectId
# from dotenv import load_dotenv
# from fastapi import FastAPI, HTTPException
# from openai import OpenAI
# from pydantic import BaseModel
# from pymongo import MongoClient
# # ===========================
# # ENV SETUP
# # ===========================
# load_dotenv()
# MONGO_URI = os.getenv("MONGO_URI")
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# if not MONGO_URI:
# raise RuntimeError("❌ MONGO_URI missing")
# if not OPENAI_API_KEY:
# raise RuntimeError("❌ OPENAI_API_KEY missing")
# mongo_client = MongoClient(MONGO_URI)
# default_db = mongo_client.get_default_database()
# budget_collection = default_db["budgets"]
# transaction_collection = default_db["transactions"]
# currencies_collection = default_db["currencies"]
# creditcards_collection = default_db["creditcards"]
# openai = OpenAI(api_key=OPENAI_API_KEY)
# app = FastAPI(title="Financial Health Score Service")
# # ===========================
# # MODELS
# # ===========================
# class ScoreRequest(BaseModel):
# userId: str
# # ===========================
# # HELPERS
# # ===========================
# def safe_number(v):
# try:
# return float(v)
# except:
# return None
# def normalize_budgets(budgets):
# out = []
# for b in budgets:
# heads = []
# for h in b.get("headCategories", []) or []:
# heads.append({
# "spendLimitType": h.get("spendLimitType"),
# "spendAmount": safe_number(h.get("spendAmount")),
# "maxAmount": safe_number(h.get("maxAmount")),
# "remainingAmount": safe_number(h.get("remainingAmount")),
# "notifications": h.get("notifications") or []
# })
# out.append({
# "name": b.get("name"),
# "status": b.get("status"),
# "period": b.get("period"),
# "maxAmount": safe_number(b.get("maxAmount")),
# "spendAmount": safe_number(b.get("spendAmount")),
# "remainingAmount": safe_number(b.get("remainingAmount")),
# "rollover": b.get("rollover"),
# "notifications": b.get("notifications") or [],
# "headCategories": heads,
# })
# return out
# def normalize_transactions(txns):
# out = []
# for txn in txns:
# date_val = txn.get("date")
# date_str = date_val.date().isoformat() if isinstance(date_val, datetime) else None
# currency_code = None
# currency_id = txn.get("currency")
# try:
# if isinstance(currency_id, ObjectId):
# doc = currencies_collection.find_one({"_id": currency_id})
# else:
# try:
# doc = currencies_collection.find_one({"_id": ObjectId(currency_id)})
# except:
# doc = None
# if doc:
# currency_code = doc.get("code") or doc.get("currency")
# except:
# currency_code = None
# out.append({
# "type": txn.get("type"),
# "amount": safe_number(txn.get("amount")),
# "currency": currency_code,
# "date": date_str,
# })
# return out
# def normalize_creditcards(cards):
# out = []
# for c in cards:
# try:
# start_date = c.get("billing_cycle", {}).get("start_date")
# end_date = c.get("billing_cycle", {}).get("end_date")
# start_date = start_date.date().isoformat() if isinstance(start_date, datetime) else None
# end_date = end_date.date().isoformat() if isinstance(end_date, datetime) else None
# due_date = c.get("due_date")
# due_date = due_date.date().isoformat() if isinstance(due_date, datetime) else None
# except:
# start_date = end_date = due_date = None
# out.append({
# "card_name": c.get("card_name"),
# "credit_limit": safe_number(c.get("credit_limit")),
# "current_balance": safe_number(c.get("current_balance")),
# "total_due_amount": safe_number(c.get("total_due_amount")),
# "minimum_due": safe_number(c.get("minimum_due")),
# "billing_cycle_start": start_date,
# "billing_cycle_end": end_date,
# "due_date": due_date,
# })
# return out
# def build_prompt(budgets, transactions, creditcards):
# return f"""
# You are a financial health scoring engine.
# Compute a score (0–100) considering:
# 1. Budgets usage
# 2. Spending patterns from recent transactions (last 30 days)
# 3. Credit card health:
# - utilization ratio (balance / credit_limit)
# - due amount status
# - repayment behavior
# - risk from minimum-due payments
# - future risk if due_date is near
# Scoring rules:
# - A lower credit utilization (<30%) increases score.
# - High utilization (>70%) lowers score sharply.
# - Overdue or large due amounts reduce score.
# - Good budget management increases score.
# - Overspending reduces score.
# - Make explanation short (2 sentences max).
# - MUST follow JSON schema.
# Budgets:
# {json.dumps(budgets, indent=2)}
# Transactions:
# {json.dumps(transactions, indent=2)}
# CreditCards:
# {json.dumps(creditcards, indent=2)}
# """
# # ===========================
# # ROUTES
# # ===========================
# @app.post("/financial-score")
# def financial_score(payload: ScoreRequest):
# # Validate ID
# try:
# user_id = ObjectId(payload.userId)
# except:
# raise HTTPException(status_code=400, detail="Invalid userId")
# # Fetch budgets
# budgets_raw = list(budget_collection.find({"createdBy": user_id}))
# budgets = normalize_budgets(budgets_raw)
# # Fetch last 30 days transactions
# thirty_days_ago = datetime.utcnow() - timedelta(days=30)
# txns_raw = list(
# transaction_collection.find(
# {"user": user_id, "date": {"$gte": thirty_days_ago}}
# ).sort("date", -1).limit(200)
# )
# transactions = normalize_transactions(txns_raw)
# # Fetch credit cards
# cards_raw = list(creditcards_collection.find({"user_id": user_id}))
# creditcards = normalize_creditcards(cards_raw)
# # No data
# if not budgets and not transactions and not creditcards:
# return {
# "userId": payload.userId,
# "score": 0,
# "explanation": "No financial data found."
# }
# prompt = build_prompt(budgets, transactions, creditcards)
# try:
# response = openai.chat.completions.create(
# model="gpt-4o-mini",
# temperature=0.6,
# response_format={
# "type": "json_schema",
# "json_schema": {
# "name": "financial_score",
# "schema": {
# "type": "object",
# "properties": {
# "score": {"type": "number"},
# "explanation": {"type": "string"}
# },
# "required": ["score", "explanation"],
# "additionalProperties": False
# }
# }
# },
# messages=[
# {"role": "system", "content": "You calculate financial health."},
# {"role": "user", "content": prompt}
# ]
# )
# except Exception as exc:
# raise HTTPException(status_code=502, detail=f"OpenAI request failed: {exc}")
# # UNIVERSAL JSON PARSING
# try:
# content = response.choices[0].message.content
# if isinstance(content, list):
# raw_json = content[0].get("text", content[0])
# else:
# raw_json = content
# if not isinstance(raw_json, str):
# raw_json = str(raw_json)
# parsed = json.loads(raw_json)
# except Exception as e:
# raise HTTPException(
# status_code=502,
# detail=f"Could not parse JSON output: {e}"
# )
# score_val = parsed.get("score", 0)
# try:
# score_val = int(float(score_val))
# except:
# score_val = 0
# score_val = max(0, min(100, score_val))
# return {
# "userId": payload.userId,
# "score": score_val,
# "explanation": parsed.get("explanation")
# }