File size: 4,041 Bytes
1295969
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Streaming fraud detection — velocity checks + play ratio analysis.
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
use tracing::warn;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub enum RiskLevel {
    Clean,
    Suspicious,
    HighRisk,
    Confirmed,
}

#[derive(Debug, Clone, Deserialize)]
pub struct PlayEvent {
    pub track_isrc: String,
    pub user_id: String,
    pub ip_hash: String,
    pub device_id: String,
    pub country_code: String,
    pub play_duration_secs: f64,
    pub track_duration_secs: f64,
}

#[derive(Debug, Clone, Serialize)]
pub struct FraudAnalysis {
    pub risk_level: RiskLevel,
    pub signals: Vec<String>,
    pub action: FraudAction,
}

#[derive(Debug, Clone, Serialize, PartialEq)]
pub enum FraudAction {
    Allow,
    Flag,
    Throttle,
    Block,
    Suspend,
}

struct Window {
    count: u64,
    start: std::time::Instant,
}

pub struct FraudDetector {
    ip_vel: Mutex<HashMap<String, Window>>,
    usr_vel: Mutex<HashMap<String, Window>>,
    /// SECURITY FIX: Changed from Vec<String> (O(n) scan) to HashSet<String> (O(1) lookup).
    /// Prevents DoS via blocked-list inflation attack.
    blocked: Mutex<HashSet<String>>,
}

impl Default for FraudDetector {
    fn default() -> Self {
        Self::new()
    }
}

impl FraudDetector {
    pub fn new() -> Self {
        Self {
            ip_vel: Mutex::new(HashMap::new()),
            usr_vel: Mutex::new(HashMap::new()),
            blocked: Mutex::new(HashSet::new()),
        }
    }
    pub fn analyse(&self, e: &PlayEvent) -> FraudAnalysis {
        let mut signals = Vec::new();
        let mut risk = RiskLevel::Clean;
        let ratio = e.play_duration_secs / e.track_duration_secs.max(1.0);
        if ratio < 0.05 {
            signals.push(format!("play ratio {ratio:.2} — bot skip"));
            risk = RiskLevel::Suspicious;
        }
        let ip_c = self.inc(&self.ip_vel, &e.ip_hash);
        if ip_c > 200 {
            signals.push(format!("IP velocity {ip_c} — click farm"));
            risk = RiskLevel::HighRisk;
        } else if ip_c > 50 {
            signals.push(format!("IP velocity {ip_c} — suspicious"));
            if risk < RiskLevel::Suspicious {
                risk = RiskLevel::Suspicious;
            }
        }
        let usr_c = self.inc(&self.usr_vel, &e.user_id);
        if usr_c > 100 {
            signals.push(format!("user velocity {usr_c} — bot"));
            risk = RiskLevel::HighRisk;
        }
        if self.is_blocked(&e.track_isrc) {
            signals.push("ISRC blocklisted".into());
            risk = RiskLevel::Confirmed;
        }
        if risk >= RiskLevel::Suspicious {
            warn!(isrc=%e.track_isrc, risk=?risk, "Fraud signal");
        }
        let action = match risk {
            RiskLevel::Clean => FraudAction::Allow,
            RiskLevel::Suspicious => FraudAction::Flag,
            RiskLevel::HighRisk => FraudAction::Block,
            RiskLevel::Confirmed => FraudAction::Suspend,
        };
        FraudAnalysis {
            risk_level: risk,
            signals,
            action,
        }
    }
    fn inc(&self, m: &Mutex<HashMap<String, Window>>, k: &str) -> u64 {
        if let Ok(mut map) = m.lock() {
            let now = std::time::Instant::now();
            let e = map.entry(k.to_string()).or_insert(Window {
                count: 0,
                start: now,
            });
            if now.duration_since(e.start).as_secs() > 3600 {
                e.count = 0;
                e.start = now;
            }
            e.count += 1;
            e.count
        } else {
            0
        }
    }
    pub fn block_isrc(&self, isrc: &str) {
        if let Ok(mut s) = self.blocked.lock() {
            s.insert(isrc.to_string());
        }
    }
    pub fn is_blocked(&self, isrc: &str) -> bool {
        self.blocked
            .lock()
            .map(|s| s.contains(isrc))
            .unwrap_or(false)
    }
}