import numpy as np import pandas as pd from datetime import datetime from expense_tracker.utils import MongoDBClient from bson import ObjectId class AnomalyDetector: def __init__(self, user_id): self.user_id = user_id from sklearn.ensemble import IsolationForest from sklearn.preprocessing import LabelEncoder self.model = IsolationForest(contamination=0.05, random_state=42) # 5% anomaly rate assumption self.le = LabelEncoder() def fetch_data(self): db = MongoDBClient.get_client() uid = self.user_id if not isinstance(uid, ObjectId): try: uid = ObjectId(uid) except: return [] user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1}) if not user or 'financial_data' not in user: return [] return user['financial_data'].get('expenses', []) def detect(self): expenses = self.fetch_data() if len(expenses) < 10: # Need minimum data points return [] # Prepare DataFrame data = [] for e in expenses: if 'amount' in e and 'category' in e: data.append({ 'id': str(e.get('_id')), 'amount': float(e['amount']), 'category': e['category'], 'date': e.get('date'), 'title': e.get('title', 'Unknown') }) if not data: return [] df = pd.DataFrame(data) # Feature Engineering # 1. Amount (Numeric) # 2. Category (Encoded) # 3. Day of week (Numeric) - Spending patterns vary by day # Encode Category df['category_encoded'] = self.le.fit_transform(df['category'].astype(str)) # Date Features (handle missing dates) df['date'] = pd.to_datetime(df['date']) df['day_of_week'] = df['date'].dt.dayofweek # Training Data X = df[['amount', 'category_encoded', 'day_of_week']].fillna(0) # Train Model self.model.fit(X) # Predict (-1 is anomaly, 1 is normal) df['anomaly'] = self.model.predict(X) # Get anomaly scores (lower is more anomalous, usually < 0 for anomalies) # We negate it so higher is more anomalous for easier UI consumption scores = self.model.decision_function(X) df['score_raw'] = scores # Extract Anomalies anomalies = df[df['anomaly'] == -1].copy() # Format Result results = [] for _, row in anomalies.iterrows(): # Convert raw score to a 0-1 probability-like metric for UI # Decision function usually ranges -0.5 to 0.5 roughly # We want: more negative -> higher score # Simple heuristic: 0.5 - score (since score < 0 for anomalies) # Clip to 0-1 range ui_score = min(max(0.5 - float(row['score_raw']), 0.0), 1.0) results.append({ 'id': row['id'], 'title': row['title'], 'amount': row['amount'], 'category': row['category'], 'date': row['date'].strftime('%Y-%m-%d') if pd.notnull(row['date']) else None, 'score': ui_score, 'reason': "Unusual spending pattern" }) return results def _explain_anomaly(self, row, df): # Deprecated logic replaced by static string, keeping method if needed later or removing return "Unusual spending pattern" def get_anomalies(user_id): detector = AnomalyDetector(user_id) return detector.detect()