RTIX / src /application /services /intelligence.rs
github-actions
deploy: clean backend production release
d8ffec9
use crate::domain::error::AppResult;
use crate::infrastructure::db::DbPool;
pub struct IntelligenceService {
pool: DbPool,
}
impl IntelligenceService {
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
pub async fn get_pincode_volatility(&self, pincode: &str) -> AppResult<f64> {
let stats = sqlx::query("SELECT volatility_score FROM pincode_stats WHERE pincode = $1")
.bind(pincode)
.fetch_optional(&self.pool)
.await?;
use sqlx::Row;
Ok(stats
.map(|s| s.get::<Option<f64>, _>("volatility_score").unwrap_or(0.0))
.unwrap_or(0.0))
}
pub async fn refresh_network_intelligence(&self) -> AppResult<()> {
// 1. Refresh Pincode Stats
sqlx::query(
r#"
INSERT INTO pincode_stats (pincode, total_orders, total_disputes, avg_delivery_time_hours, volatility_score)
SELECT
shipping_pincode,
COUNT(*),
SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END),
AVG(EXTRACT(EPOCH FROM (delivered_at - created_at))/3600),
(SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END)::DOUBLE PRECISION / NULLIF(COUNT(*), 0))
FROM orders
WHERE shipping_pincode IS NOT NULL
GROUP BY shipping_pincode
ON CONFLICT (pincode) DO UPDATE SET
total_orders = EXCLUDED.total_orders,
total_disputes = EXCLUDED.total_disputes,
avg_delivery_time_hours = EXCLUDED.avg_delivery_time_hours,
volatility_score = EXCLUDED.volatility_score,
last_updated = CURRENT_TIMESTAMP
"#
)
.execute(&self.pool)
.await?;
// 2. Refresh Merchant Reliability Scores
sqlx::query(
r#"
UPDATE merchants m
SET reliability_score = sub.score
FROM (
SELECT
merchant_id,
(1.0 - (SUM(CASE WHEN status = 'DISPUTED_HELD' THEN 1 ELSE 0 END)::DOUBLE PRECISION / NULLIF(COUNT(*), 0))) as score
FROM orders
GROUP BY merchant_id
) sub
WHERE m.merchant_id = sub.merchant_id
"#
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn verify_geofence_with_precision(
&self,
pincode: &str,
lat: f64,
lng: f64,
) -> AppResult<bool> {
// Institutional Precision Geofencing
// 1. Get base pincode coordinates (cached or DB)
// 2. Calculate dynamic radius based on pincode order density
// 3. Perform Haversine check
let stats = sqlx::query("SELECT total_orders FROM pincode_stats WHERE pincode = $1")
.bind(pincode)
.fetch_optional(&self.pool)
.await?;
use sqlx::Row;
let order_count = stats.map(|s| s.get::<i64, _>("total_orders")).unwrap_or(0);
// Dynamic radius logic: higher density -> tighter tolerance
// Base radius 5km, reduces to 1km in extremely high-volume areas
let radius_km = if order_count > 1000 {
1.5
} else if order_count > 100 {
3.0
} else {
5.0
};
tracing::info!(
"Performing precision geofence for pincode {} with radius {}km",
pincode,
radius_km
);
// Mocking coordinates for the pincode (in a real system, these would come from a geospatial DB)
// For E2E demonstration, we assume valid coordinates for common test pincodes
let (p_lat, p_lng) = match pincode {
"560001" => (12.9716, 77.5946),
"110001" => (28.6139, 77.2090),
_ => (lat, lng), // Fallback to current if unknown (graceful degradation)
};
let distance = self.calculate_distance(lat, lng, p_lat, p_lng);
Ok(distance <= radius_km)
}
pub async fn get_risk_forensics(
&self,
transaction_id: &str,
) -> AppResult<crate::domain::models::analytics::OrderForensics> {
// Retrieve forensic telemetry for a specific transaction
let order = sqlx::query("SELECT risk_score, risk_flags, created_at, status, device_fingerprint FROM orders WHERE transaction_id = $1")
.bind(transaction_id)
.fetch_one(&self.pool)
.await?;
use sqlx::Row;
let score: f64 = order.get("risk_score");
let flags: Option<serde_json::Value> = order.get("risk_flags");
let created_at: chrono::NaiveDateTime = order.get("created_at");
let status: String = order.get("status");
let fingerprint: Option<String> = order.get("device_fingerprint");
let mut factors = Vec::new();
// 1. Static Flags
if let Some(f_val) = flags {
if let Some(f_str) = f_val.as_str() {
for flag in f_str.split(',') {
if flag.is_empty() {
continue;
}
factors.push(crate::domain::models::analytics::RiskForensic {
factor: flag.to_string(),
score_contribution: score / 2.0,
description: format!("Automated flag raised: {}", flag),
severity: if score > 75.0 {
"CRITICAL".into()
} else {
"MEDIUM".into()
},
});
}
}
}
// 2. Dynamic Velocity Context
if let Some(ref f) = fingerprint {
let activity =
sqlx::query("SELECT activity_count FROM velocity_metrics WHERE fingerprint = $1")
.bind(f)
.fetch_optional(&self.pool)
.await?;
if let Some(row) = activity {
let count: i32 = row.get("activity_count");
if count > 1 {
factors.push(crate::domain::models::analytics::RiskForensic {
factor: "VELOCITY_CLUSTER".into(),
score_contribution: (count as f64 * 10.0).min(50.0),
description: format!("Device has initiated {} transactions in the current window (Singleton Pattern).", count),
severity: if count >= 3 { "CRITICAL".into() } else { "HIGH".into() },
});
}
}
}
Ok(crate::domain::models::analytics::OrderForensics {
transaction_id: transaction_id.to_string(),
overall_risk_score: score,
factors,
timestamp: created_at.to_string(),
status,
device_fingerprint: fingerprint,
})
}
pub async fn evaluate_velocity_risk(
&self,
fingerprint: Option<&str>,
ip: Option<&str>,
merchant_id: &str,
) -> AppResult<f64> {
// Allow opt-out via explicit env var only — never gate on infrastructure provider name
if std::env::var("FRAUD_DETECTION_ENABLED").map(|v| v == "false").unwrap_or(false) {
return Ok(0.0);
}
use sqlx::Row;
let mut max_risk: f64 = 0.0;
// 1. Blacklist Check (Hard Block)
if let Some(f) = fingerprint {
let is_blacklisted =
sqlx::query("SELECT 1 FROM device_blacklist WHERE fingerprint = $1")
.bind(f)
.fetch_optional(&self.pool)
.await?
.is_some();
if is_blacklisted {
tracing::warn!("VELOCITY GUARD: Blacklisted fingerprint {} detected.", f);
return Ok(100.0);
}
}
// 2. Singleton Attack Detection (Device Velocity)
if let Some(f) = fingerprint {
let mut tx = self.pool.begin().await?;
// Serialize concurrent requests for the same device fingerprint using transactional advisory lock
sqlx::query("SELECT pg_advisory_xact_lock(hashtext($1))")
.bind(f)
.execute(&mut *tx)
.await?;
let activity = sqlx::query(
r#"
SELECT activity_count, last_activity_at
FROM velocity_metrics
WHERE fingerprint = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
"#,
)
.bind(f)
.fetch_optional(&mut *tx)
.await?;
if let Some(row) = activity {
let count: i32 = row.get("activity_count");
if count >= 3 {
max_risk = max_risk.max(85.0_f64);
}
if count >= 5 {
max_risk = max_risk.max(100.0_f64);
tracing::error!(
"VELOCITY GUARD: Singleton Devastation attempt by fingerprint {}.",
f
);
}
// Update activity
sqlx::query("UPDATE velocity_metrics SET activity_count = activity_count + 1, last_activity_at = CURRENT_TIMESTAMP WHERE fingerprint = $1")
.bind(f)
.execute(&mut *tx)
.await?;
} else {
sqlx::query(
"INSERT INTO velocity_metrics (fingerprint, merchant_id) VALUES ($1, $2)",
)
.bind(f)
.bind(merchant_id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
}
// 3. Mass Deviation Detection (IP Velocity)
if let Some(addr) = ip {
let mut tx = self.pool.begin().await?;
// Serialize concurrent requests for the same IP address using transactional advisory lock
sqlx::query("SELECT pg_advisory_xact_lock(hashtext($1))")
.bind(addr)
.execute(&mut *tx)
.await?;
let ip_activity = sqlx::query(
"SELECT activity_count FROM velocity_metrics WHERE ip_address = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'"
)
.bind(addr)
.fetch_optional(&mut *tx)
.await?;
if let Some(row) = ip_activity {
let count: i32 = row.get("activity_count");
if count >= 10 {
max_risk = max_risk.max(70.0_f64);
}
sqlx::query("UPDATE velocity_metrics SET activity_count = activity_count + 1, last_activity_at = CURRENT_TIMESTAMP WHERE ip_address = $1")
.bind(addr)
.execute(&mut *tx)
.await?;
} else {
sqlx::query(
"INSERT INTO velocity_metrics (ip_address, merchant_id) VALUES ($1, $2)",
)
.bind(addr)
.bind(merchant_id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
}
Ok(max_risk)
}
pub async fn blacklist_fingerprint(&self, fingerprint: &str, reason: &str) -> AppResult<()> {
sqlx::query("INSERT INTO device_blacklist (fingerprint, reason) VALUES ($1, $2) ON CONFLICT (fingerprint) DO UPDATE SET reason = EXCLUDED.reason")
.bind(fingerprint)
.bind(reason)
.execute(&self.pool)
.await?;
Ok(())
}
fn calculate_distance(&self, lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
let r = 6371.0; // Earth radius in km
let d_lat = (lat2 - lat1).to_radians();
let d_lon = (lon2 - lon1).to_radians();
let a = (d_lat / 2.0).sin().powi(2)
+ lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt());
r * c
}
pub async fn process_unanalyzed_telemetry(&self) -> AppResult<()> {
let unanalyzed_errors = sqlx::query(
"SELECT id, source, error_level, message, stack_trace, user_context FROM error_telemetry WHERE analyzed = false LIMIT 10"
)
.fetch_all(&self.pool)
.await?;
if unanalyzed_errors.is_empty() {
return Ok(());
}
// Resolve AI provider — explicit config takes priority, then auto-detect GROQ_API_KEY
let (api_url, api_key, model_name) = {
let explicit_url = std::env::var("AI_OPENSOURCE_MODEL_URL").unwrap_or_default();
let explicit_key = std::env::var("AI_OPENSOURCE_API_KEY").unwrap_or_default();
let explicit_model = std::env::var("AI_OPENSOURCE_MODEL_NAME").unwrap_or_default();
if !explicit_url.is_empty() && !explicit_key.is_empty() {
// Fully explicit config
(
explicit_url,
explicit_key,
if explicit_model.is_empty() { "meta-llama/Llama-3-70b-chat-hf".to_string() } else { explicit_model },
)
} else if let Ok(groq_key) = std::env::var("GROQ_API_KEY") {
// Auto-detect Groq — use their OpenAI-compatible endpoint
static GROQ_DETECTED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
if !GROQ_DETECTED.swap(true, std::sync::atomic::Ordering::Relaxed) {
tracing::info!("AI integration: Groq API key detected. Using Groq (llama-3.3-70b-versatile) for telemetry analysis.");
}
(
"https://api.groq.com/openai/v1/chat/completions".to_string(),
groq_key,
std::env::var("AI_OPENSOURCE_MODEL_NAME").unwrap_or_else(|_| "llama-3.3-70b-versatile".to_string()),
)
} else {
static WARNED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
if !WARNED.swap(true, std::sync::atomic::Ordering::Relaxed) {
tracing::info!("AI integration not configured. Set GROQ_API_KEY or AI_OPENSOURCE_MODEL_URL + AI_OPENSOURCE_API_KEY to enable telemetry analysis.");
}
return Ok(());
}
};
let client = reqwest::Client::new();
for error in unanalyzed_errors {
use sqlx::Row;
let id: uuid::Uuid = error.get("id");
let source: String = error.get("source");
let error_level: String = error.get("error_level");
let message: String = error.get("message");
let stack_trace: Option<String> = error.try_get("stack_trace").unwrap_or(None);
let prompt = format!(
"You are an expert AI software engineer. Analyze the following error and provide a fix in strict JSON format.\n\nSource: {}\nLevel: {}\nMessage: {}\nStack Trace: {}\n\nRespond ONLY with a JSON object containing the exact keys: 'issue_summary', 'root_cause_analysis', 'proposed_solution', and 'suggested_code_diff'. The 'suggested_code_diff' must be a valid git diff that can be applied to the codebase. Do not include markdown blocks around the JSON.",
source, error_level, message, stack_trace.unwrap_or_default()
);
let body = serde_json::json!({
"model": model_name,
"messages": [
{
"role": "system",
"content": "You are a senior AI site reliability engineer. Output only valid JSON."
},
{
"role": "user",
"content": prompt
}
],
"response_format": { "type": "json_object" }
});
match client.post(&api_url)
.bearer_auth(&api_key)
.json(&body)
.send()
.await {
Ok(resp) => {
if let Ok(json_resp) = resp.json::<serde_json::Value>().await {
if let Some(content) = json_resp["choices"][0]["message"]["content"].as_str() {
if let Ok(ai_data) = serde_json::from_str::<serde_json::Value>(content) {
let issue_summary = ai_data["issue_summary"].as_str().unwrap_or("Unknown Issue").to_string();
let root_cause = ai_data["root_cause_analysis"].as_str().unwrap_or("Could not determine root cause").to_string();
let solution = ai_data["proposed_solution"].as_str().unwrap_or("No solution provided").to_string();
let code_diff = ai_data["suggested_code_diff"].as_str().unwrap_or("").to_string();
let _ = sqlx::query(
"INSERT INTO ai_engineer_insights (issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff) VALUES ($1, $2, $3, $4)"
)
.bind(&issue_summary)
.bind(&root_cause)
.bind(&solution)
.bind(&code_diff)
.execute(&self.pool)
.await;
}
}
}
}
Err(e) => tracing::error!("Failed to reach AI model: {:?}", e),
}
let _ = sqlx::query("UPDATE error_telemetry SET analyzed = true WHERE id = $1")
.bind(id)
.execute(&self.pool)
.await;
}
Ok(())
}
}