File size: 3,814 Bytes
24e6f5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
import pandas as pd
from datetime import datetime
from expense_tracker.utils import MongoDBClient
from bson import ObjectId

class AnomalyDetector:
    def __init__(self, user_id):
        self.user_id = user_id
        from sklearn.ensemble import IsolationForest
        from sklearn.preprocessing import LabelEncoder
        self.model = IsolationForest(contamination=0.05, random_state=42) # 5% anomaly rate assumption
        self.le = LabelEncoder()

    def fetch_data(self):
        db = MongoDBClient.get_client()
        
        uid = self.user_id
        if not isinstance(uid, ObjectId):
            try:
                uid = ObjectId(uid)
            except:
                return []
                
        user = db.users.find_one({'_id': uid}, {'financial_data.expenses': 1})
        if not user or 'financial_data' not in user:
            return []
        return user['financial_data'].get('expenses', [])

    def detect(self):
        expenses = self.fetch_data()
        if len(expenses) < 10: # Need minimum data points
            return []

        # Prepare DataFrame
        data = []
        for e in expenses:
            if 'amount' in e and 'category' in e:
                data.append({
                    'id': str(e.get('_id')),
                    'amount': float(e['amount']),
                    'category': e['category'],
                    'date': e.get('date'),
                    'title': e.get('title', 'Unknown')
                })
        
        if not data:
            return []

        df = pd.DataFrame(data)
        
        # Feature Engineering
        # 1. Amount (Numeric)
        # 2. Category (Encoded)
        # 3. Day of week (Numeric) - Spending patterns vary by day
        
        # Encode Category
        df['category_encoded'] = self.le.fit_transform(df['category'].astype(str))
        
        # Date Features (handle missing dates)
        df['date'] = pd.to_datetime(df['date'])
        df['day_of_week'] = df['date'].dt.dayofweek
        
        # Training Data
        X = df[['amount', 'category_encoded', 'day_of_week']].fillna(0)
        
        # Train Model
        self.model.fit(X)
        
        # Predict (-1 is anomaly, 1 is normal)
        df['anomaly'] = self.model.predict(X)
        
        # Get anomaly scores (lower is more anomalous, usually < 0 for anomalies)
        # We negate it so higher is more anomalous for easier UI consumption
        scores = self.model.decision_function(X)
        df['score_raw'] = scores
        
        # Extract Anomalies
        anomalies = df[df['anomaly'] == -1].copy()
        
        # Format Result
        results = []
        for _, row in anomalies.iterrows():
            # Convert raw score to a 0-1 probability-like metric for UI
            # Decision function usually ranges -0.5 to 0.5 roughly
            # We want: more negative -> higher score
            # Simple heuristic: 0.5 - score (since score < 0 for anomalies)
            # Clip to 0-1 range
            ui_score = min(max(0.5 - float(row['score_raw']), 0.0), 1.0)
            
            results.append({
                'id': row['id'],
                'title': row['title'],
                'amount': row['amount'],
                'category': row['category'],
                'date': row['date'].strftime('%Y-%m-%d') if pd.notnull(row['date']) else None,
                'score': ui_score,
                'reason': "Unusual spending pattern"
            })
            
        return results

    def _explain_anomaly(self, row, df):
        # Deprecated logic replaced by static string, keeping method if needed later or removing
        return "Unusual spending pattern"

def get_anomalies(user_id):
    detector = AnomalyDetector(user_id)
    return detector.detect()