File size: 5,257 Bytes
3a4ffcf 8d6014f 3a4ffcf 8d6014f 3a4ffcf 8d6014f 3a4ffcf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import logging
import os
import time
from typing import Dict, List
from fastapi import FastAPI, HTTPException
from .db import fetch_recent_transactions, log_api_hit
from .schemas import InsightRequest, InsightResponse, Transaction
from .services import generate_insights
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
logger = logging.getLogger(__name__)
app = FastAPI(
title="AI Financial Insights",
description="AI-powered personalized spending insight generator for WalletSync.",
version="1.0.0",
)
@app.get("/")
def root() -> Dict[str, str]:
"""Simple description endpoint."""
return {
"message": "AI Financial Insights is ready. POST /insights with transactions to receive insights.",
"docs": "/docs",
}
@app.get("/health")
def health() -> Dict[str, str]:
"""Health probe for monitoring."""
return {"status": "ok"}
def _resolve_transactions(payload: InsightRequest) -> List[Transaction]:
if payload.transactions:
return payload.transactions
if not payload.user_id:
raise HTTPException(
status_code=400,
detail="Provide `transactions` or `user_id` to fetch them from MongoDB.",
)
try:
return fetch_recent_transactions(payload.user_id)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except Exception as exc: # noqa: BLE001
logger.exception("Failed to fetch MongoDB data for user_id=%s", payload.user_id)
raise HTTPException(status_code=502, detail="Unable to query transaction store.") from exc
@app.post("/insights", response_model=InsightResponse)
def create_insights(payload: InsightRequest) -> InsightResponse:
"""Generate insights from a transaction payload."""
start_time = time.time()
status = "success"
# Capture user_id directly from request body - store exactly what is passed
user_id = payload.user_id
try:
logger.info("Received request for user_id=%s", user_id)
transactions = _resolve_transactions(payload)
insights, months = generate_insights(transactions)
response = InsightResponse(user_id=user_id, insights=insights, evaluated_months=months)
return response
except HTTPException:
status = "error"
raise
except Exception as exc: # noqa: BLE001
status = "error"
logger.exception("Unexpected error processing insights request for user_id=%s", user_id)
raise HTTPException(status_code=500, detail="Internal server error") from exc
finally:
response_time = time.time() - start_time
try:
# Log with the exact user_id from request body (can be None if not provided)
log_api_hit(user_id, status, response_time)
except Exception as log_exc: # noqa: BLE001
# Don't fail the request if logging fails
logger.warning("Failed to log API hit: %s", log_exc)
# import logging
# import os
# from typing import Dict, List
# from fastapi import FastAPI, HTTPException
# from .db import fetch_recent_transactions
# from .schemas import InsightRequest, InsightResponse, Transaction
# from .services import generate_insights
# logging.basicConfig(
# level=os.environ.get("LOG_LEVEL", "INFO"),
# format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
# )
# logger = logging.getLogger(__name__)
# app = FastAPI(
# title="AI Financial Insights",
# description="AI-powered personalized spending insight generator for WalletSync.",
# version="1.0.0",
# )
# @app.get("/")
# def root() -> Dict[str, str]:
# """Simple description endpoint."""
# return {
# "message": "AI Financial Insights is ready. POST /insights with transactions to receive insights.",
# "docs": "/docs",
# }
# @app.get("/health")
# def health() -> Dict[str, str]:
# """Health probe for monitoring."""
# return {"status": "ok"}
# def _resolve_transactions(payload: InsightRequest) -> List[Transaction]:
# if payload.transactions:
# return payload.transactions
# if not payload.user_id:
# raise HTTPException(
# status_code=400,
# detail="Provide `transactions` or `user_id` to fetch them from MongoDB.",
# )
# try:
# return fetch_recent_transactions(payload.user_id)
# except ValueError as exc:
# raise HTTPException(status_code=400, detail=str(exc)) from exc
# except Exception as exc: # noqa: BLE001
# logger.exception("Failed to fetch MongoDB data for user_id=%s", payload.user_id)
# raise HTTPException(status_code=502, detail="Unable to query transaction store.") from exc
# @app.post("/insights", response_model=InsightResponse)
# def create_insights(payload: InsightRequest) -> InsightResponse:
# """Generate insights from a transaction payload."""
# logger.info("Received request for user_id=%s", payload.user_id)
# transactions = _resolve_transactions(payload)
# insights, months = generate_insights(transactions)
# return InsightResponse(user_id=payload.user_id, insights=insights, evaluated_months=months)
|