FinMK / backend /analytics /anomaly.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
import numpy as np
import pandas as pd
from datetime import datetime
from expense_tracker.utils import MongoDBClient
from bson import ObjectId
class AnomalyDetector:
def __init__(self, user_id):
self.user_id = user_id
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import LabelEncoder
self.model = IsolationForest(contamination=0.05, random_state=42) # 5% anomaly rate assumption
self.le = LabelEncoder()
def fetch_data(self):
db = MongoDBClient.get_client()
uid = self.user_id
if not isinstance(uid, ObjectId):
try:
uid = ObjectId(uid)
except:
return []
user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1})
if not user or 'financial_data' not in user:
return []
return user['financial_data'].get('expenses', [])
def detect(self):
expenses = self.fetch_data()
if len(expenses) < 10: # Need minimum data points
return []
# Prepare DataFrame
data = []
for e in expenses:
if 'amount' in e and 'category' in e:
data.append({
'id': str(e.get('_id')),
'amount': float(e['amount']),
'category': e['category'],
'date': e.get('date'),
'title': e.get('title', 'Unknown')
})
if not data:
return []
df = pd.DataFrame(data)
# Feature Engineering
# 1. Amount (Numeric)
# 2. Category (Encoded)
# 3. Day of week (Numeric) - Spending patterns vary by day
# Encode Category
df['category_encoded'] = self.le.fit_transform(df['category'].astype(str))
# Date Features (handle missing dates)
df['date'] = pd.to_datetime(df['date'])
df['day_of_week'] = df['date'].dt.dayofweek
# Training Data
X = df[['amount', 'category_encoded', 'day_of_week']].fillna(0)
# Train Model
self.model.fit(X)
# Predict (-1 is anomaly, 1 is normal)
df['anomaly'] = self.model.predict(X)
# Get anomaly scores (lower is more anomalous, usually < 0 for anomalies)
# We negate it so higher is more anomalous for easier UI consumption
scores = self.model.decision_function(X)
df['score_raw'] = scores
# Extract Anomalies
anomalies = df[df['anomaly'] == -1].copy()
# Format Result
results = []
for _, row in anomalies.iterrows():
# Convert raw score to a 0-1 probability-like metric for UI
# Decision function usually ranges -0.5 to 0.5 roughly
# We want: more negative -> higher score
# Simple heuristic: 0.5 - score (since score < 0 for anomalies)
# Clip to 0-1 range
ui_score = min(max(0.5 - float(row['score_raw']), 0.0), 1.0)
results.append({
'id': row['id'],
'title': row['title'],
'amount': row['amount'],
'category': row['category'],
'date': row['date'].strftime('%Y-%m-%d') if pd.notnull(row['date']) else None,
'score': ui_score,
'reason': "Unusual spending pattern"
})
return results
def _explain_anomaly(self, row, df):
# Deprecated logic replaced by static string, keeping method if needed later or removing
return "Unusual spending pattern"
def get_anomalies(user_id):
detector = AnomalyDetector(user_id)
return detector.detect()