Spaces:
Running
Running
| use crate::domain::error::AppResult; | |
| use crate::infrastructure::db::DbPool; | |
| pub struct IntelligenceService { | |
| pool: DbPool, | |
| } | |
| impl IntelligenceService { | |
| pub fn new(pool: DbPool) -> Self { | |
| Self { pool } | |
| } | |
| pub async fn get_pincode_volatility(&self, pincode: &str) -> AppResult<f64> { | |
| let stats = sqlx::query("SELECT volatility_score FROM pincode_stats WHERE pincode = $1") | |
| .bind(pincode) | |
| .fetch_optional(&self.pool) | |
| .await?; | |
| use sqlx::Row; | |
| Ok(stats | |
| .map(|s| s.get::<Option<f64>, _>("volatility_score").unwrap_or(0.0)) | |
| .unwrap_or(0.0)) | |
| } | |
| pub async fn refresh_network_intelligence(&self) -> AppResult<()> { | |
| // 1. Refresh Pincode Stats | |
| sqlx::query( | |
| r#" | |
| INSERT INTO pincode_stats (pincode, total_orders, total_disputes, avg_delivery_time_hours, volatility_score) | |
| SELECT | |
| shipping_pincode, | |
| COUNT(*), | |
| SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END), | |
| AVG(EXTRACT(EPOCH FROM (delivered_at - created_at))/3600), | |
| (SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END)::DOUBLE PRECISION / NULLIF(COUNT(*), 0)) | |
| FROM orders | |
| WHERE shipping_pincode IS NOT NULL | |
| GROUP BY shipping_pincode | |
| ON CONFLICT (pincode) DO UPDATE SET | |
| total_orders = EXCLUDED.total_orders, | |
| total_disputes = EXCLUDED.total_disputes, | |
| avg_delivery_time_hours = EXCLUDED.avg_delivery_time_hours, | |
| volatility_score = EXCLUDED.volatility_score, | |
| last_updated = CURRENT_TIMESTAMP | |
| "# | |
| ) | |
| .execute(&self.pool) | |
| .await?; | |
| // 2. Refresh Merchant Reliability Scores | |
| sqlx::query( | |
| r#" | |
| UPDATE merchants m | |
| SET reliability_score = sub.score | |
| FROM ( | |
| SELECT | |
| merchant_id, | |
| (1.0 - (SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END)::DOUBLE PRECISION / NULLIF(COUNT(*), 0))) as score | |
| FROM orders | |
| GROUP BY merchant_id | |
| ) sub | |
| WHERE m.merchant_id = sub.merchant_id | |
| "# | |
| ) | |
| .execute(&self.pool) | |
| .await?; | |
| Ok(()) | |
| } | |
| pub async fn verify_geofence_with_precision( | |
| &self, | |
| pincode: &str, | |
| lat: f64, | |
| lng: f64, | |
| ) -> AppResult<bool> { | |
| // Institutional Precision Geofencing | |
| // 1. Get base pincode coordinates (cached or DB) | |
| // 2. Calculate dynamic radius based on pincode order density | |
| // 3. Perform Haversine check | |
| let stats = sqlx::query("SELECT total_orders FROM pincode_stats WHERE pincode = $1") | |
| .bind(pincode) | |
| .fetch_optional(&self.pool) | |
| .await?; | |
| use sqlx::Row; | |
| let order_count = stats.map(|s| s.get::<i64, _>("total_orders")).unwrap_or(0); | |
| // Dynamic radius logic: higher density -> tighter tolerance | |
| // Base radius 5km, reduces to 1km in extremely high-volume areas | |
| let radius_km = if order_count > 1000 { | |
| 1.5 | |
| } else if order_count > 100 { | |
| 3.0 | |
| } else { | |
| 5.0 | |
| }; | |
| tracing::info!( | |
| "Performing precision geofence for pincode {} with radius {}km", | |
| pincode, | |
| radius_km | |
| ); | |
| // Mocking coordinates for the pincode (in a real system, these would come from a geospatial DB) | |
| // For E2E demonstration, we assume valid coordinates for common test pincodes | |
| let (p_lat, p_lng) = match pincode { | |
| "560001" => (12.9716, 77.5946), | |
| "110001" => (28.6139, 77.2090), | |
| _ => (lat, lng), // Fallback to current if unknown (graceful degradation) | |
| }; | |
| let distance = self.calculate_distance(lat, lng, p_lat, p_lng); | |
| Ok(distance <= radius_km) | |
| } | |
| pub async fn get_risk_forensics( | |
| &self, | |
| transaction_id: &str, | |
| ) -> AppResult<crate::domain::models::analytics::OrderForensics> { | |
| // Retrieve forensic telemetry for a specific transaction | |
| let order = sqlx::query("SELECT risk_score, risk_flags, created_at, status, device_fingerprint FROM orders WHERE transaction_id = $1") | |
| .bind(transaction_id) | |
| .fetch_one(&self.pool) | |
| .await?; | |
| use sqlx::Row; | |
| let score: f64 = order.get("risk_score"); | |
| let flags: Option<serde_json::Value> = order.get("risk_flags"); | |
| let created_at: chrono::NaiveDateTime = order.get("created_at"); | |
| let status: String = order.get("status"); | |
| let fingerprint: Option<String> = order.get("device_fingerprint"); | |
| let mut factors = Vec::new(); | |
| // 1. Static Flags | |
| if let Some(f_val) = flags { | |
| if let Some(f_str) = f_val.as_str() { | |
| for flag in f_str.split(',') { | |
| if flag.is_empty() { | |
| continue; | |
| } | |
| factors.push(crate::domain::models::analytics::RiskForensic { | |
| factor: flag.to_string(), | |
| score_contribution: score / 2.0, | |
| description: format!("Automated flag raised: {}", flag), | |
| severity: if score > 75.0 { | |
| "CRITICAL".into() | |
| } else { | |
| "MEDIUM".into() | |
| }, | |
| }); | |
| } | |
| } | |
| } | |
| // 2. Dynamic Velocity Context | |
| if let Some(ref f) = fingerprint { | |
| let activity = | |
| sqlx::query("SELECT activity_count FROM velocity_metrics WHERE fingerprint = $1") | |
| .bind(f) | |
| .fetch_optional(&self.pool) | |
| .await?; | |
| if let Some(row) = activity { | |
| let count: i32 = row.get("activity_count"); | |
| if count > 1 { | |
| factors.push(crate::domain::models::analytics::RiskForensic { | |
| factor: "VELOCITY_CLUSTER".into(), | |
| score_contribution: (count as f64 * 10.0).min(50.0), | |
| description: format!("Device has initiated {} transactions in the current window (Singleton Pattern).", count), | |
| severity: if count >= 3 { "CRITICAL".into() } else { "HIGH".into() }, | |
| }); | |
| } | |
| } | |
| } | |
| Ok(crate::domain::models::analytics::OrderForensics { | |
| transaction_id: transaction_id.to_string(), | |
| overall_risk_score: score, | |
| factors, | |
| timestamp: created_at.to_string(), | |
| status, | |
| device_fingerprint: fingerprint, | |
| }) | |
| } | |
| pub async fn evaluate_velocity_risk( | |
| &self, | |
| fingerprint: Option<&str>, | |
| ip: Option<&str>, | |
| merchant_id: &str, | |
| ) -> AppResult<f64> { | |
| // Allow opt-out via explicit env var only — never gate on infrastructure provider name | |
| if std::env::var("FRAUD_DETECTION_ENABLED").map(|v| v == "false").unwrap_or(false) { | |
| return Ok(0.0); | |
| } | |
| use sqlx::Row; | |
| let mut max_risk: f64 = 0.0; | |
| // 1. Blacklist Check (Hard Block) | |
| if let Some(f) = fingerprint { | |
| let is_blacklisted = | |
| sqlx::query("SELECT 1 FROM device_blacklist WHERE fingerprint = $1") | |
| .bind(f) | |
| .fetch_optional(&self.pool) | |
| .await? | |
| .is_some(); | |
| if is_blacklisted { | |
| tracing::warn!("VELOCITY GUARD: Blacklisted fingerprint {} detected.", f); | |
| return Ok(100.0); | |
| } | |
| } | |
| // 2. Singleton Attack Detection (Device Velocity) | |
| if let Some(f) = fingerprint { | |
| let mut tx = self.pool.begin().await?; | |
| // Serialize concurrent requests for the same device fingerprint using transactional advisory lock | |
| sqlx::query("SELECT pg_advisory_xact_lock(hashtext($1))") | |
| .bind(f) | |
| .execute(&mut *tx) | |
| .await?; | |
| let activity = sqlx::query( | |
| r#" | |
| SELECT activity_count, last_activity_at | |
| FROM velocity_metrics | |
| WHERE fingerprint = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour' | |
| "#, | |
| ) | |
| .bind(f) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = activity { | |
| let count: i32 = row.get("activity_count"); | |
| if count >= 3 { | |
| max_risk = max_risk.max(85.0_f64); | |
| } | |
| if count >= 5 { | |
| max_risk = max_risk.max(100.0_f64); | |
| tracing::error!( | |
| "VELOCITY GUARD: Singleton Devastation attempt by fingerprint {}.", | |
| f | |
| ); | |
| } | |
| // Update activity | |
| sqlx::query("UPDATE velocity_metrics SET activity_count = activity_count + 1, last_activity_at = CURRENT_TIMESTAMP WHERE fingerprint = $1") | |
| .bind(f) | |
| .execute(&mut *tx) | |
| .await?; | |
| } else { | |
| sqlx::query( | |
| "INSERT INTO velocity_metrics (fingerprint, merchant_id) VALUES ($1, $2)", | |
| ) | |
| .bind(f) | |
| .bind(merchant_id) | |
| .execute(&mut *tx) | |
| .await?; | |
| } | |
| tx.commit().await?; | |
| } | |
| // 3. Mass Deviation Detection (IP Velocity) | |
| if let Some(addr) = ip { | |
| let mut tx = self.pool.begin().await?; | |
| // Serialize concurrent requests for the same IP address using transactional advisory lock | |
| sqlx::query("SELECT pg_advisory_xact_lock(hashtext($1))") | |
| .bind(addr) | |
| .execute(&mut *tx) | |
| .await?; | |
| let ip_activity = sqlx::query( | |
| "SELECT activity_count FROM velocity_metrics WHERE ip_address = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'" | |
| ) | |
| .bind(addr) | |
| .fetch_optional(&mut *tx) | |
| .await?; | |
| if let Some(row) = ip_activity { | |
| let count: i32 = row.get("activity_count"); | |
| if count >= 10 { | |
| max_risk = max_risk.max(70.0_f64); | |
| } | |
| sqlx::query("UPDATE velocity_metrics SET activity_count = activity_count + 1, last_activity_at = CURRENT_TIMESTAMP WHERE ip_address = $1") | |
| .bind(addr) | |
| .execute(&mut *tx) | |
| .await?; | |
| } else { | |
| sqlx::query( | |
| "INSERT INTO velocity_metrics (ip_address, merchant_id) VALUES ($1, $2)", | |
| ) | |
| .bind(addr) | |
| .bind(merchant_id) | |
| .execute(&mut *tx) | |
| .await?; | |
| } | |
| tx.commit().await?; | |
| } | |
| Ok(max_risk) | |
| } | |
| pub async fn blacklist_fingerprint(&self, fingerprint: &str, reason: &str) -> AppResult<()> { | |
| sqlx::query("INSERT INTO device_blacklist (fingerprint, reason) VALUES ($1, $2) ON CONFLICT (fingerprint) DO UPDATE SET reason = EXCLUDED.reason") | |
| .bind(fingerprint) | |
| .bind(reason) | |
| .execute(&self.pool) | |
| .await?; | |
| Ok(()) | |
| } | |
| fn calculate_distance(&self, lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 { | |
| let r = 6371.0; // Earth radius in km | |
| let d_lat = (lat2 - lat1).to_radians(); | |
| let d_lon = (lon2 - lon1).to_radians(); | |
| let a = (d_lat / 2.0).sin().powi(2) | |
| + lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2); | |
| let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt()); | |
| r * c | |
| } | |
| pub async fn process_unanalyzed_telemetry(&self) -> AppResult<()> { | |
| let unanalyzed_errors = sqlx::query( | |
| "SELECT id, source, error_level, message, stack_trace, user_context FROM error_telemetry WHERE analyzed = false LIMIT 10" | |
| ) | |
| .fetch_all(&self.pool) | |
| .await?; | |
| if unanalyzed_errors.is_empty() { | |
| return Ok(()); | |
| } | |
| // Resolve AI provider — explicit config takes priority, then auto-detect GROQ_API_KEY | |
| let (api_url, api_key, model_name) = { | |
| let explicit_url = std::env::var("AI_OPENSOURCE_MODEL_URL").unwrap_or_default(); | |
| let explicit_key = std::env::var("AI_OPENSOURCE_API_KEY").unwrap_or_default(); | |
| let explicit_model = std::env::var("AI_OPENSOURCE_MODEL_NAME").unwrap_or_default(); | |
| if !explicit_url.is_empty() && !explicit_key.is_empty() { | |
| // Fully explicit config | |
| ( | |
| explicit_url, | |
| explicit_key, | |
| if explicit_model.is_empty() { "meta-llama/Llama-3-70b-chat-hf".to_string() } else { explicit_model }, | |
| ) | |
| } else if let Ok(groq_key) = std::env::var("GROQ_API_KEY") { | |
| // Auto-detect Groq — use their OpenAI-compatible endpoint | |
| static GROQ_DETECTED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); | |
| if !GROQ_DETECTED.swap(true, std::sync::atomic::Ordering::Relaxed) { | |
| tracing::info!("AI integration: Groq API key detected. Using Groq (llama-3.3-70b-versatile) for telemetry analysis."); | |
| } | |
| ( | |
| "https://api.groq.com/openai/v1/chat/completions".to_string(), | |
| groq_key, | |
| std::env::var("AI_OPENSOURCE_MODEL_NAME").unwrap_or_else(|_| "llama-3.3-70b-versatile".to_string()), | |
| ) | |
| } else { | |
| static WARNED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); | |
| if !WARNED.swap(true, std::sync::atomic::Ordering::Relaxed) { | |
| tracing::info!("AI integration not configured. Set GROQ_API_KEY or AI_OPENSOURCE_MODEL_URL + AI_OPENSOURCE_API_KEY to enable telemetry analysis."); | |
| } | |
| return Ok(()); | |
| } | |
| }; | |
| let client = reqwest::Client::new(); | |
| for error in unanalyzed_errors { | |
| use sqlx::Row; | |
| let id: uuid::Uuid = error.get("id"); | |
| let source: String = error.get("source"); | |
| let error_level: String = error.get("error_level"); | |
| let message: String = error.get("message"); | |
| let stack_trace: Option<String> = error.try_get("stack_trace").unwrap_or(None); | |
| let prompt = format!( | |
| "You are an expert AI software engineer. Analyze the following error and provide a fix in strict JSON format.\n\nSource: {}\nLevel: {}\nMessage: {}\nStack Trace: {}\n\nRespond ONLY with a JSON object containing the exact keys: 'issue_summary', 'root_cause_analysis', 'proposed_solution', and 'suggested_code_diff'. The 'suggested_code_diff' must be a valid git diff that can be applied to the codebase. Do not include markdown blocks around the JSON.", | |
| source, error_level, message, stack_trace.unwrap_or_default() | |
| ); | |
| let body = serde_json::json!({ | |
| "model": model_name, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are a senior AI site reliability engineer. Output only valid JSON." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "response_format": { "type": "json_object" } | |
| }); | |
| match client.post(&api_url) | |
| .bearer_auth(&api_key) | |
| .json(&body) | |
| .send() | |
| .await { | |
| Ok(resp) => { | |
| if let Ok(json_resp) = resp.json::<serde_json::Value>().await { | |
| if let Some(content) = json_resp["choices"][0]["message"]["content"].as_str() { | |
| if let Ok(ai_data) = serde_json::from_str::<serde_json::Value>(content) { | |
| let issue_summary = ai_data["issue_summary"].as_str().unwrap_or("Unknown Issue").to_string(); | |
| let root_cause = ai_data["root_cause_analysis"].as_str().unwrap_or("Could not determine root cause").to_string(); | |
| let solution = ai_data["proposed_solution"].as_str().unwrap_or("No solution provided").to_string(); | |
| let code_diff = ai_data["suggested_code_diff"].as_str().unwrap_or("").to_string(); | |
| let _ = sqlx::query( | |
| "INSERT INTO ai_engineer_insights (issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff) VALUES ($1, $2, $3, $4)" | |
| ) | |
| .bind(&issue_summary) | |
| .bind(&root_cause) | |
| .bind(&solution) | |
| .bind(&code_diff) | |
| .execute(&self.pool) | |
| .await; | |
| } | |
| } | |
| } | |
| } | |
| Err(e) => tracing::error!("Failed to reach AI model: {:?}", e), | |
| } | |
| let _ = sqlx::query("UPDATE error_telemetry SET analyzed = true WHERE id = $1") | |
| .bind(id) | |
| .execute(&self.pool) | |
| .await; | |
| } | |
| Ok(()) | |
| } | |
| } | |