Spaces:
Running
Running
| use crate::interfaces::http::api::AppState; | |
| use crate::interfaces::http::routes::RequireDev; | |
| use axum::{ | |
| extract::State, | |
| http::StatusCode, | |
| routing::{get, post}, | |
| Json, Router, | |
| }; | |
| use serde::{Deserialize, Serialize}; | |
| pub struct MerchantSummary { | |
| pub merchant_id: String, | |
| pub email: String, | |
| pub brand_name: String, | |
| pub slug: String, | |
| pub plan: String, | |
| pub role: String, | |
| pub trust_score: f64, | |
| pub verification_level: String, | |
| } | |
| pub struct AuditQuery { | |
| pub risk_level: Option<String>, | |
| pub event_type: Option<String>, | |
| pub page: Option<usize>, | |
| pub limit: Option<usize>, | |
| } | |
| pub fn router() -> Router<AppState> { | |
| Router::new() | |
| .route("/merchants", get(list_merchants)) | |
| .route("/audit", get(get_global_audit_logs)) | |
| .route("/sandbox/simulate-rate-limit", post(simulate_rate_limit)) | |
| .route("/sandbox/simulate-velocity", post(simulate_velocity)) | |
| .route("/sandbox/reset", post(reset_sandbox)) | |
| .route("/telemetry", axum::routing::post(ingest_telemetry)) | |
| .route("/ai-insights", axum::routing::get(get_ai_insights)) | |
| .route("/ai-insights/:id", axum::routing::patch(update_ai_insight)) | |
| } | |
| pub async fn list_merchants( | |
| State(state): State<AppState>, | |
| RequireDev(_dev_id): RequireDev, | |
| ) -> Result<Json<Vec<MerchantSummary>>, StatusCode> { | |
| let rows = sqlx::query( | |
| "SELECT merchant_id, email, brand_name, slug, plan, role, trust_score, verification_level FROM merchants ORDER BY brand_name ASC" | |
| ) | |
| .fetch_all(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to fetch merchants for developer: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| let merchants = rows | |
| .into_iter() | |
| .map(|r| { | |
| use sqlx::Row; | |
| MerchantSummary { | |
| merchant_id: r.get("merchant_id"), | |
| email: r.get("email"), | |
| brand_name: r.get("brand_name"), | |
| slug: r.get("slug"), | |
| plan: r.get("plan"), | |
| role: r.get("role"), | |
| trust_score: r.get("trust_score"), | |
| verification_level: r.get("verification_level"), | |
| } | |
| }) | |
| .collect(); | |
| Ok(Json(merchants)) | |
| } | |
| pub async fn get_global_audit_logs( | |
| State(state): State<AppState>, | |
| query: axum::extract::Query<AuditQuery>, | |
| RequireDev(_dev_id): RequireDev, | |
| ) -> Result<Json<Vec<crate::domain::models::RiskAuditLog>>, StatusCode> { | |
| let limit = query.limit.unwrap_or(50).min(200) as i64; | |
| let offset = (query.page.unwrap_or(1).saturating_sub(1) as i64) * limit; | |
| let mut sql = "SELECT * FROM risk_audit_logs".to_string(); | |
| let mut conditions = Vec::new(); | |
| let mut param_idx = 1; | |
| if query.risk_level.is_some() { | |
| conditions.push(format!("risk_level = ${}", param_idx)); | |
| param_idx += 1; | |
| } | |
| if query.event_type.is_some() { | |
| conditions.push(format!("event_type = ${}", param_idx)); | |
| param_idx += 1; | |
| } | |
| if !conditions.is_empty() { | |
| sql.push_str(" WHERE "); | |
| sql.push_str(&conditions.join(" AND ")); | |
| } | |
| sql.push_str(&format!( | |
| " ORDER BY created_at DESC LIMIT ${} OFFSET ${}", | |
| param_idx, | |
| param_idx + 1 | |
| )); | |
| let mut db_query = sqlx::query(&sql); | |
| if let Some(rl) = &query.risk_level { | |
| db_query = db_query.bind(rl); | |
| } | |
| if let Some(et) = &query.event_type { | |
| db_query = db_query.bind(et); | |
| } | |
| db_query = db_query.bind(limit).bind(offset); | |
| let rows = db_query.fetch_all(&state.pool).await.map_err(|e| { | |
| tracing::error!("Failed to fetch global audit logs: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| let logs = rows | |
| .into_iter() | |
| .map(|r| { | |
| use sqlx::Row; | |
| crate::domain::models::RiskAuditLog { | |
| id: r.get("id"), | |
| transaction_id: r.get("transaction_id"), | |
| merchant_id: r.get("merchant_id"), | |
| event_type: r.get("event_type"), | |
| risk_level: r.get("risk_level"), | |
| details: r.get("details"), | |
| device_fingerprint: r.get("device_fingerprint"), | |
| request_id: r.try_get("request_id").ok(), | |
| entry_hash: r.try_get("entry_hash").unwrap_or_default(), | |
| previous_hash: r.try_get("previous_hash").unwrap_or_default(), | |
| created_at: r.get("created_at"), | |
| } | |
| }) | |
| .collect(); | |
| Ok(Json(logs)) | |
| } | |
| pub struct SimulateSandboxRequest { | |
| pub merchant_id: String, | |
| } | |
| pub async fn simulate_rate_limit( | |
| State(state): State<AppState>, | |
| RequireDev(_dev_id): RequireDev, | |
| Json(payload): Json<SimulateSandboxRequest>, | |
| ) -> Result<Json<serde_json::Value>, StatusCode> { | |
| let simulated_ip = "198.51.100.42"; | |
| // Increment rate limiter 105 times programmatically to trigger the auth ban | |
| let rate_key = format!("auth:{}", simulated_ip); | |
| for _ in 0..105 { | |
| crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key); | |
| } | |
| // Persistently block the IP | |
| crate::interfaces::http::middleware::block_ip_persistently( | |
| &state.pool, | |
| simulated_ip, | |
| "Simulated Attack: Repeated Auth Rate Limit Violations", | |
| Some(&state.tx), | |
| ) | |
| .await; | |
| // Log a risk audit log | |
| let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); | |
| let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); | |
| let _ = sqlx::query( | |
| "INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" | |
| ) | |
| .bind("SYSTEM_SANDBOX") | |
| .bind(&payload.merchant_id) | |
| .bind("SINGLETON_BLOCK") | |
| .bind("CRITICAL") | |
| .bind(format!("Rate limiter dynamically triggered persistently blocked state for malicious IP: {}", simulated_ip)) | |
| .bind("sandbox_fingerprint") | |
| .bind(uuid::Uuid::new_v4().to_string()) | |
| .bind(&entry_hash) | |
| .bind(&prev_hash) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to insert sandbox audit log: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| Ok(Json(serde_json::json!({ | |
| "success": true, | |
| "blocked_ip": simulated_ip, | |
| "message": "Automated IP block and security logs successfully triggered. Check your live dashboard!" | |
| }))) | |
| } | |
| pub async fn simulate_velocity( | |
| State(state): State<AppState>, | |
| RequireDev(_dev_id): RequireDev, | |
| Json(payload): Json<SimulateSandboxRequest>, | |
| ) -> Result<Json<serde_json::Value>, StatusCode> { | |
| let fingerprint = "dev_fingerprint_sandbox_99"; | |
| // Fetch current count to increment it | |
| use sqlx::Row; | |
| let activity = sqlx::query( | |
| "SELECT activity_count FROM velocity_metrics WHERE fingerprint = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'" | |
| ) | |
| .bind(fingerprint) | |
| .fetch_optional(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to fetch velocity count: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| let new_count = match activity { | |
| Some(row) => { | |
| let count: i32 = row.get("activity_count"); | |
| let next = count + 1; | |
| sqlx::query("UPDATE velocity_metrics SET activity_count = $1, last_activity_at = CURRENT_TIMESTAMP WHERE fingerprint = $2") | |
| .bind(next) | |
| .bind(fingerprint) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to update velocity metrics: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| next | |
| } | |
| None => { | |
| sqlx::query("INSERT INTO velocity_metrics (fingerprint, merchant_id, activity_count) VALUES ($1, $2, 1)") | |
| .bind(fingerprint) | |
| .bind(&payload.merchant_id) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to insert velocity metrics: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| 1 | |
| } | |
| }; | |
| // Calculate score | |
| let score = state | |
| .intelligence_service | |
| .evaluate_velocity_risk(Some(fingerprint), None, &payload.merchant_id) | |
| .await | |
| .unwrap_or(0.0); | |
| // If score >= 85, trigger a RiskAlert realtime event so it lights up on the dashboard | |
| if score >= 85.0 { | |
| let _ = state.tx.send(crate::interfaces::http::api::RealtimeEvent::RiskAlert { | |
| transaction_id: format!("tx_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..16]), | |
| merchant_id: payload.merchant_id.clone(), | |
| risk_score: score, | |
| message: format!("VELOCITY THREAT: Sandbox fingerprint {} triggered singleton abuse alert (Count: {}, Score: {:.1})", fingerprint, new_count, score), | |
| }); | |
| // Insert risk audit log entry | |
| let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); | |
| let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); | |
| let _ = sqlx::query( | |
| "INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" | |
| ) | |
| .bind("SYSTEM_SANDBOX") | |
| .bind(&payload.merchant_id) | |
| .bind("VOLUMETRIC_DEVIATION") | |
| .bind("HIGH") | |
| .bind(format!("Singleton abuse attempt blocked. Device fingerprint: {}. Activity count: {}, Computed risk: {:.1}", fingerprint, new_count, score)) | |
| .bind(fingerprint) | |
| .bind(uuid::Uuid::new_v4().to_string()) | |
| .bind(&entry_hash) | |
| .bind(&prev_hash) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to insert sandbox velocity audit log: {:?}", e); | |
| StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| } | |
| Ok(Json(serde_json::json!({ | |
| "success": true, | |
| "fingerprint": fingerprint, | |
| "activity_count": new_count, | |
| "risk_score": score, | |
| "message": format!("Simulated rapid device checkout. Count: {}, Computed Risk Score: {:.1}", new_count, score) | |
| }))) | |
| } | |
| pub async fn reset_sandbox( | |
| State(state): State<AppState>, | |
| RequireDev(_dev_id): RequireDev, | |
| Json(_payload): Json<SimulateSandboxRequest>, | |
| ) -> Result<Json<serde_json::Value>, StatusCode> { | |
| // Delete test blocks and velocity metrics from database | |
| let _ = sqlx::query("DELETE FROM security_blocks WHERE ip = '198.51.100.42'") | |
| .execute(&state.pool) | |
| .await; | |
| let _ = sqlx::query( | |
| "DELETE FROM velocity_metrics WHERE fingerprint = 'dev_fingerprint_sandbox_99'", | |
| ) | |
| .execute(&state.pool) | |
| .await; | |
| let _ = sqlx::query("DELETE FROM risk_audit_logs WHERE transaction_id = 'SYSTEM_SANDBOX'") | |
| .execute(&state.pool) | |
| .await; | |
| // Clear live caches | |
| crate::interfaces::http::middleware::BLOCKED_IP_CACHE.remove("198.51.100.42"); | |
| crate::interfaces::http::middleware::BLOOM_FILTER.clear(); | |
| // Clear rate limits | |
| let rate_key = "auth:198.51.100.42".to_string(); | |
| crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key); // will reset/clear if empty | |
| Ok(Json(serde_json::json!({ | |
| "success": true, | |
| "message": "Sandbox environments flushed. All security limits reset successfully." | |
| }))) | |
| } | |
| pub struct IngestTelemetryRequest { | |
| pub source: String, | |
| pub error_level: String, | |
| pub message: String, | |
| pub stack_trace: Option<String>, | |
| pub user_context: Option<serde_json::Value>, | |
| } | |
| pub async fn ingest_telemetry( | |
| axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>, | |
| axum::extract::Json(payload): axum::extract::Json<IngestTelemetryRequest>, | |
| ) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> { | |
| let ctx = payload | |
| .user_context | |
| .unwrap_or_else(|| serde_json::json!({})); | |
| let _ = sqlx::query( | |
| "INSERT INTO error_telemetry (source, error_level, message, stack_trace, user_context) VALUES ($1, $2, $3, $4, $5)" | |
| ) | |
| .bind(&payload.source) | |
| .bind(&payload.error_level) | |
| .bind(&payload.message) | |
| .bind(&payload.stack_trace) | |
| .bind(&ctx) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to ingest telemetry: {:?}", e); | |
| axum::http::StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| // Spawn SRE telemetry analysis asynchronously in a background local Ollama task | |
| let pool = state.pool.clone(); | |
| let source = payload.source.clone(); | |
| let error_level = payload.error_level.clone(); | |
| let message = payload.message.clone(); | |
| let stack_trace = payload.stack_trace.clone(); | |
| tokio::spawn(async move { | |
| if let Err(e) = analyze_error_with_ai(pool, source, error_level, message, stack_trace).await | |
| { | |
| tracing::error!("Failed AI SRE analysis: {:?}", e); | |
| } | |
| }); | |
| Ok(axum::extract::Json(serde_json::json!({ | |
| "success": true | |
| }))) | |
| } | |
| async fn analyze_error_with_ai( | |
| pool: sqlx::PgPool, | |
| source: String, | |
| error_level: String, | |
| message: String, | |
| stack_trace: Option<String>, | |
| ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| // 1. Build HTTP client with a strict 45-second timeout to prevent hanging threads | |
| let client = reqwest::Client::builder() | |
| .timeout(std::time::Duration::from_secs(45)) | |
| .build()?; | |
| let prompt = format!( | |
| "You are an SRE and AI Coding Engineer. Analyze this telemetry: | |
| Source: {} | |
| Error Level: {} | |
| Message: {} | |
| Stack Trace: {:?} | |
| Analyze the root cause and provide a structured code fix. | |
| Your output MUST be a valid JSON object ONLY. DO NOT wrap it in markdown code blocks or add explanations outside the JSON. | |
| JSON Keys required: | |
| {{ | |
| \"issue_summary\": \"one-line summary of bug\", | |
| \"root_cause_analysis\": \"detailed root cause\", | |
| \"proposed_solution\": \"detailed solution to fix\", | |
| \"suggested_code_diff\": \"git-diff format patch demonstrating how to fix the code, or null\", | |
| \"suggested_test_code_diff\": \"git-diff format patch demonstrating how to add a unit test in the test suite to verify the solution/fix, or null\", | |
| \"metrics_affected\": {{ \"predicted_latency_reduction_ms\": 50 }} | |
| }}", | |
| source, error_level, message, stack_trace | |
| ); | |
| let clean_json = if let Ok(groq_key) = std::env::var("GROQ_API_KEY") { | |
| tracing::info!("AI SRE: Using Groq Cloud API (free tier open-source Llama 3) for analysis"); | |
| let req_body = serde_json::json!({ | |
| "model": "llama-3.3-70b-versatile", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "response_format": {"type": "json_object"}, | |
| "temperature": 0.1 | |
| }); | |
| let res = client | |
| .post("https://api.groq.com/openai/v1/chat/completions") | |
| .header("Authorization", format!("Bearer {}", groq_key)) | |
| .json(&req_body) | |
| .send() | |
| .await?; | |
| if !res.status().is_success() { | |
| let status = res.status(); | |
| let err_text = res.text().await?; | |
| return Err(format!("Groq API failed (status {}): {}", status, err_text).into()); | |
| } | |
| let resp_json: serde_json::Value = res.json().await?; | |
| let text = resp_json["choices"][0]["message"]["content"] | |
| .as_str() | |
| .ok_or("Failed to extract content from Groq response")? | |
| .trim() | |
| .to_string(); | |
| text | |
| } else if let Ok(openrouter_key) = std::env::var("OPENROUTER_API_KEY") { | |
| tracing::info!("AI SRE: Using OpenRouter API (free tier open-source Llama 3) for analysis"); | |
| let req_body = serde_json::json!({ | |
| "model": "meta-llama/llama-3-8b-instruct:free", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "response_format": {"type": "json_object"}, | |
| "temperature": 0.1 | |
| }); | |
| let res = client | |
| .post("https://openrouter.ai/api/v1/chat/completions") | |
| .header("Authorization", format!("Bearer {}", openrouter_key)) | |
| .json(&req_body) | |
| .send() | |
| .await?; | |
| if !res.status().is_success() { | |
| let status = res.status(); | |
| let err_text = res.text().await?; | |
| return Err(format!("OpenRouter API failed (status {}): {}", status, err_text).into()); | |
| } | |
| let resp_json: serde_json::Value = res.json().await?; | |
| let text = resp_json["choices"][0]["message"]["content"] | |
| .as_str() | |
| .ok_or("Failed to extract content from OpenRouter response")? | |
| .trim() | |
| .to_string(); | |
| text | |
| } else if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") { | |
| tracing::info!("AI SRE: Using Google Gemini API cloud provider for analysis"); | |
| let url = format!( | |
| "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={}", | |
| gemini_key | |
| ); | |
| let req_body = serde_json::json!({ | |
| "contents": [{ | |
| "parts": [{ | |
| "text": prompt | |
| }] | |
| }], | |
| "generationConfig": { | |
| "responseMimeType": "application/json" | |
| } | |
| }); | |
| let res = client.post(&url).json(&req_body).send().await?; | |
| if !res.status().is_success() { | |
| let status = res.status(); | |
| let err_text = res.text().await?; | |
| return Err(format!("Gemini API failed (status {}): {}", status, err_text).into()); | |
| } | |
| let resp_json: serde_json::Value = res.json().await?; | |
| let text = resp_json["candidates"][0]["content"]["parts"][0]["text"] | |
| .as_str() | |
| .ok_or("Failed to extract text from Gemini response")? | |
| .trim() | |
| .to_string(); | |
| text | |
| } else { | |
| tracing::info!("AI SRE: No cloud API keys found. Falling back to local Ollama..."); | |
| // 1. Fetch available models from local Ollama | |
| let tags_res = client.get("http://localhost:11434/api/tags").send().await?; | |
| struct OllamaModel { | |
| name: String, | |
| } | |
| struct OllamaTags { | |
| models: Vec<OllamaModel>, | |
| } | |
| let tags: OllamaTags = tags_res.json().await?; | |
| if tags.models.is_empty() { | |
| return Err("No models installed in local Ollama".into()); | |
| } | |
| // Prefer llama2 first because it's 7B and faster, then qwen (9.7B), then first available | |
| let mut model_name = tags.models[0].name.clone(); | |
| for m in &tags.models { | |
| if m.name.contains("llama") { | |
| model_name = m.name.clone(); | |
| break; | |
| } | |
| } | |
| if !model_name.contains("llama") { | |
| for m in &tags.models { | |
| if m.name.contains("qwen") { | |
| model_name = m.name.clone(); | |
| break; | |
| } | |
| } | |
| } | |
| tracing::info!("Selected local SRE analysis model: {}", model_name); | |
| // 2. Request Ollama API with options to limit generation and speed up inference | |
| let req_body = serde_json::json!({ | |
| "model": model_name, | |
| "prompt": prompt, | |
| "stream": false, | |
| "options": { | |
| "num_predict": 400, | |
| "temperature": 0.1 | |
| } | |
| }); | |
| let res = client | |
| .post("http://localhost:11434/api/generate") | |
| .json(&req_body) | |
| .send() | |
| .await?; | |
| if !res.status().is_success() { | |
| let status = res.status(); | |
| let err_text = res.text().await?; | |
| return Err(format!("Ollama API failed (status {}): {}", status, err_text).into()); | |
| } | |
| struct OllamaResponse { | |
| response: String, | |
| } | |
| let resp_data: OllamaResponse = res.json().await?; | |
| resp_data.response.trim().to_string() | |
| }; | |
| // Clean JSON markdown block wrapping | |
| let mut clean_json_str = clean_json.as_str(); | |
| if clean_json_str.starts_with("```json") { | |
| clean_json_str = &clean_json_str[7..]; | |
| } else if clean_json_str.starts_with("```") { | |
| clean_json_str = &clean_json_str[3..]; | |
| } | |
| if clean_json_str.ends_with("```") { | |
| clean_json_str = &clean_json_str[..clean_json_str.len() - 3]; | |
| } | |
| let clean_json_str = clean_json_str.trim(); | |
| // 4. Parse JSON response | |
| struct SreInsight { | |
| issue_summary: String, | |
| root_cause_analysis: String, | |
| proposed_solution: String, | |
| suggested_code_diff: Option<String>, | |
| suggested_test_code_diff: Option<String>, | |
| metrics_affected: serde_json::Value, | |
| } | |
| let insight: SreInsight = serde_json::from_str(clean_json_str)?; | |
| // 5. Insert into Database | |
| sqlx::query( | |
| "INSERT INTO rtix_app.ai_engineer_insights (status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, metrics_affected) VALUES ('PENDING_REVIEW', $1, $2, $3, $4, $5, $6)" | |
| ) | |
| .bind(&insight.issue_summary) | |
| .bind(&insight.root_cause_analysis) | |
| .bind(&insight.proposed_solution) | |
| .bind(&insight.suggested_code_diff) | |
| .bind(&insight.suggested_test_code_diff) | |
| .bind(&insight.metrics_affected) | |
| .execute(&pool) | |
| .await?; | |
| tracing::info!( | |
| "AI SRE: Successfully analyzed error telemetry and generated insight: {}", | |
| insight.issue_summary | |
| ); | |
| Ok(()) | |
| } | |
| pub struct AiInsightQuery { | |
| pub status: Option<String>, | |
| } | |
| pub async fn get_ai_insights( | |
| axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>, | |
| query: axum::extract::Query<AiInsightQuery>, | |
| _dev_id: crate::interfaces::http::routes::RequireDev, | |
| ) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> { | |
| let mut sql = "SELECT id, created_at, status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, pr_url, error_logs, metrics_affected FROM ai_engineer_insights".to_string(); | |
| if query.status.is_some() { | |
| sql.push_str(" WHERE status = $1"); | |
| } | |
| sql.push_str(" ORDER BY created_at DESC"); | |
| let mut db_query = sqlx::query(&sql); | |
| if let Some(st) = &query.status { | |
| db_query = db_query.bind(st); | |
| } | |
| let rows = db_query.fetch_all(&state.pool).await.map_err(|e| { | |
| tracing::error!("Failed to fetch AI insights: {:?}", e); | |
| axum::http::StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| let insights: Vec<serde_json::Value> = rows | |
| .into_iter() | |
| .map(|r| { | |
| use sqlx::Row; | |
| serde_json::json!({ | |
| "id": r.get::<uuid::Uuid, _>("id").to_string(), | |
| "created_at": r.get::<chrono::DateTime<chrono::Utc>, _>("created_at").to_rfc3339(), | |
| "status": r.get::<String, _>("status"), | |
| "issue_summary": r.get::<String, _>("issue_summary"), | |
| "root_cause_analysis": r.get::<String, _>("root_cause_analysis"), | |
| "proposed_solution": r.get::<String, _>("proposed_solution"), | |
| "suggested_code_diff": r.try_get::<String, _>("suggested_code_diff").ok(), | |
| "suggested_test_code_diff": r.try_get::<String, _>("suggested_test_code_diff").ok(), | |
| "pr_url": r.try_get::<String, _>("pr_url").ok(), | |
| "error_logs": r.try_get::<String, _>("error_logs").ok(), | |
| "metrics_affected": r.get::<serde_json::Value, _>("metrics_affected") | |
| }) | |
| }) | |
| .collect(); | |
| Ok(axum::extract::Json(serde_json::json!(insights))) | |
| } | |
| pub struct UpdateAiInsightRequest { | |
| pub status: String, | |
| } | |
| pub async fn update_ai_insight( | |
| axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>, | |
| axum::extract::Path(id): axum::extract::Path<String>, | |
| _dev_id: crate::interfaces::http::routes::RequireDev, | |
| axum::extract::Json(payload): axum::extract::Json<UpdateAiInsightRequest>, | |
| ) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> { | |
| let uuid_val = uuid::Uuid::parse_str(&id).map_err(|_| axum::http::StatusCode::BAD_REQUEST)?; | |
| let mut patch_applied = true; | |
| let mut patch_error = None; | |
| let mut pr_url = None; | |
| // If status is changed to APPROVED, fetch the patch and execute it on the codebase! | |
| if payload.status == "APPROVED" { | |
| let row = sqlx::query("SELECT issue_summary, suggested_code_diff, suggested_test_code_diff FROM rtix_app.ai_engineer_insights WHERE id = $1") | |
| .bind(uuid_val) | |
| .fetch_one(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to fetch SRE insight for execution: {:?}", e); | |
| axum::http::StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| use sqlx::Row; | |
| let issue_summary = row.get::<String, _>("issue_summary"); | |
| let diff = row | |
| .try_get::<Option<String>, _>("suggested_code_diff") | |
| .ok() | |
| .flatten() | |
| .unwrap_or_default(); | |
| let test_diff = row | |
| .try_get::<Option<String>, _>("suggested_test_code_diff") | |
| .ok() | |
| .flatten() | |
| .unwrap_or_default(); | |
| if !diff.trim().is_empty() { | |
| tracing::info!( | |
| "AI SRE: Starting comprehensive self-healing workflow for SRE insight {}...", | |
| id | |
| ); | |
| match execute_self_healing_workflow(&id, &issue_summary, &diff, &test_diff, &state.tx) | |
| .await | |
| { | |
| Ok(url) => { | |
| tracing::info!( | |
| "AI SRE: Workflow successfully completed! PR created at: {}", | |
| url | |
| ); | |
| pr_url = Some(url.clone()); | |
| // Send Slack/Discord notification for success! | |
| let _ = send_sre_notification(&id, &issue_summary, true, &url).await; | |
| } | |
| Err(e) => { | |
| tracing::warn!("AI SRE: Self-healing workflow failed: {:?}", e); | |
| patch_applied = false; | |
| patch_error = Some(e.to_string()); | |
| // Send Slack/Discord notification for failure! | |
| let _ = send_sre_notification(&id, &issue_summary, false, &e.to_string()).await; | |
| } | |
| } | |
| } | |
| } | |
| let final_status = if payload.status == "APPROVED" { | |
| if patch_applied { | |
| "IMPLEMENTED".to_string() | |
| } else { | |
| "PENDING_REVIEW".to_string() | |
| } | |
| } else { | |
| payload.status.clone() | |
| }; | |
| let result = sqlx::query( | |
| "UPDATE ai_engineer_insights SET status = $1, pr_url = $2, error_logs = $3 WHERE id = $4", | |
| ) | |
| .bind(&final_status) | |
| .bind(pr_url.clone()) | |
| .bind(patch_error.clone()) | |
| .bind(uuid_val) | |
| .execute(&state.pool) | |
| .await | |
| .map_err(|e| { | |
| tracing::error!("Failed to update AI insight: {:?}", e); | |
| axum::http::StatusCode::INTERNAL_SERVER_ERROR | |
| })?; | |
| if result.rows_affected() == 0 { | |
| return Err(axum::http::StatusCode::NOT_FOUND); | |
| } | |
| Ok(axum::extract::Json(serde_json::json!({ | |
| "success": true, | |
| "patch_applied": patch_applied, | |
| "warning": patch_error, | |
| "pr_url": pr_url | |
| }))) | |
| } | |
| async fn execute_self_healing_workflow( | |
| id: &str, | |
| issue_summary: &str, | |
| code_diff: &str, | |
| test_diff: &str, | |
| tx: &tokio::sync::broadcast::Sender<crate::interfaces::http::api::RealtimeEvent>, | |
| ) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { | |
| let branch_name = format!("sre-fix-{}", &id[..8]); | |
| let broadcast = |step: &str, status: &str, msg: &str| { | |
| let _ = tx.send( | |
| crate::interfaces::http::api::RealtimeEvent::AIEngineerProgress { | |
| insight_id: id.to_string(), | |
| step: step.to_string(), | |
| status: status.to_string(), | |
| message: msg.to_string(), | |
| }, | |
| ); | |
| }; | |
| // 1. Stash any uncommitted local work so it doesn't get lost | |
| broadcast( | |
| "STASHING", | |
| "ACTIVE", | |
| "Safely stashing developer workspace changes...", | |
| ); | |
| tracing::info!("AI SRE: Stashing local uncommitted changes..."); | |
| let stash_output = std::process::Command::new("git").args(["stash"]).output()?; | |
| if !stash_output.status.success() { | |
| broadcast("STASHING", "FAILED", "Failed to stash workspace changes."); | |
| return Err("Git stash failed".into()); | |
| } | |
| broadcast( | |
| "STASHING", | |
| "COMPLETED", | |
| "Stashed local changes successfully.", | |
| ); | |
| // 2. Ensure we are starting from clean main branch | |
| broadcast("BRANCHING", "ACTIVE", "Switching to main branch..."); | |
| tracing::info!("AI SRE: Checking out clean main branch..."); | |
| let checkout_main = std::process::Command::new("git") | |
| .args(["checkout", "main"]) | |
| .output()?; | |
| if !checkout_main.status.success() { | |
| broadcast("BRANCHING", "FAILED", "Failed to checkout main branch."); | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err("Checkout main failed".into()); | |
| } | |
| // 3. Create and checkout new branch | |
| broadcast( | |
| "BRANCHING", | |
| "ACTIVE", | |
| &format!("Provisioning new branch {}...", branch_name), | |
| ); | |
| tracing::info!("AI SRE: Creating new git branch {}...", branch_name); | |
| let checkout_output = std::process::Command::new("git") | |
| .args(["checkout", "-b", &branch_name]) | |
| .output()?; | |
| if !checkout_output.status.success() { | |
| let _ = std::process::Command::new("git") | |
| .args(["branch", "-D", &branch_name]) | |
| .output()?; | |
| let second_checkout = std::process::Command::new("git") | |
| .args(["checkout", "-b", &branch_name]) | |
| .output()?; | |
| if !second_checkout.status.success() { | |
| broadcast("BRANCHING", "FAILED", "Failed to checkout new branch."); | |
| let _ = std::process::Command::new("git") | |
| .args(["checkout", "main"]) | |
| .output()?; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err("Git branch creation failed".into()); | |
| } | |
| } | |
| broadcast( | |
| "BRANCHING", | |
| "COMPLETED", | |
| &format!("SRE branch {} provisioned successfully.", branch_name), | |
| ); | |
| // 4. Apply the code patch | |
| broadcast("PATCHING", "ACTIVE", "Applying AI suggested code patch..."); | |
| tracing::info!("AI SRE: Applying code fix diff..."); | |
| if let Err(e) = apply_patch_string(code_diff).await { | |
| broadcast( | |
| "PATCHING", | |
| "FAILED", | |
| &format!("Failed to apply code patch: {}", e), | |
| ); | |
| cleanup_failed_branch(&branch_name).await; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err(format!("Failed to apply code diff patch: {}", e).into()); | |
| } | |
| // 5. Apply the test case patch if present | |
| if !test_diff.trim().is_empty() { | |
| broadcast( | |
| "PATCHING", | |
| "ACTIVE", | |
| "Applying AI suggested unit test patch...", | |
| ); | |
| tracing::info!("AI SRE: Applying AI-generated test case..."); | |
| if let Err(e) = apply_patch_string(test_diff).await { | |
| broadcast( | |
| "PATCHING", | |
| "FAILED", | |
| &format!("Failed to apply test patch: {}", e), | |
| ); | |
| cleanup_failed_branch(&branch_name).await; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err(format!("Failed to apply test case patch: {}", e).into()); | |
| } | |
| } | |
| broadcast( | |
| "PATCHING", | |
| "COMPLETED", | |
| "All suggested code and test patches successfully applied.", | |
| ); | |
| // 6. Run automated test cases using cargo test | |
| broadcast( | |
| "TESTING", | |
| "ACTIVE", | |
| "Executing automated test suite ('cargo test')...", | |
| ); | |
| tracing::info!("AI SRE: Running cargo test to verify fix and new test case..."); | |
| let test_output = std::process::Command::new("cargo") | |
| .args(["test"]) | |
| .output()?; | |
| if !test_output.status.success() { | |
| broadcast("TESTING", "FAILED", "Cargo test suite failed verification."); | |
| let stderr = String::from_utf8_lossy(&test_output.stderr); | |
| let stdout = String::from_utf8_lossy(&test_output.stdout); | |
| cleanup_failed_branch(&branch_name).await; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err(format!( | |
| "Automated tests failed.\nStderr: {}\nStdout: {}", | |
| stderr, stdout | |
| ) | |
| .into()); | |
| } | |
| broadcast( | |
| "TESTING", | |
| "COMPLETED", | |
| "All automated verification tests passed flawlessly (100% pass rate).", | |
| ); | |
| // 7. Commit the changes | |
| broadcast( | |
| "COMMITTING", | |
| "ACTIVE", | |
| "Committing changes and preparing to push...", | |
| ); | |
| tracing::info!("AI SRE: Committing code and test fix to git..."); | |
| let _ = std::process::Command::new("git") | |
| .args(["add", "."]) | |
| .output()?; | |
| let commit_msg = format!("sre-fix: {}", issue_summary); | |
| let commit_output = std::process::Command::new("git") | |
| .args(["commit", "-m", &commit_msg]) | |
| .output()?; | |
| if !commit_output.status.success() { | |
| broadcast("COMMITTING", "FAILED", "Failed to commit changes locally."); | |
| cleanup_failed_branch(&branch_name).await; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err("Git commit failed".into()); | |
| } | |
| // 8. Push to origin/GitHub remote repo | |
| broadcast( | |
| "COMMITTING", | |
| "ACTIVE", | |
| "Pushing branch to GitHub remote repository...", | |
| ); | |
| tracing::info!("AI SRE: Pushing branch {} to remote origin...", branch_name); | |
| let push_output = std::process::Command::new("git") | |
| .args(["push", "-u", "origin", &branch_name, "-f"]) | |
| .output()?; | |
| if !push_output.status.success() { | |
| broadcast( | |
| "COMMITTING", | |
| "FAILED", | |
| "Failed to push SRE branch to remote repository.", | |
| ); | |
| let err = String::from_utf8_lossy(&push_output.stderr); | |
| cleanup_failed_branch(&branch_name).await; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| return Err(format!("Failed to push to GitHub remote: {}", err).into()); | |
| } | |
| // 9. Create GitHub Pull Request using 'gh' CLI! | |
| broadcast( | |
| "COMMITTING", | |
| "ACTIVE", | |
| "Opening GitHub Pull Request using GitHub CLI...", | |
| ); | |
| tracing::info!("AI SRE: Creating GitHub Pull Request..."); | |
| let pr_body = format!( | |
| "## AI SRE Self-Healing Fix\n\n**Issue**: {}\n\n**Resolution**: Verified successfully via automated tests (`cargo test` passed with zero errors, including autonomous test cases).\n\n*Created autonomously by AI SRE Log & Telemetry Watchdog.*", | |
| issue_summary | |
| ); | |
| let pr_output = std::process::Command::new("gh") | |
| .args([ | |
| "pr", | |
| "create", | |
| "--title", | |
| &format!("sre-fix: {}", issue_summary), | |
| "--body", | |
| &pr_body, | |
| "--head", | |
| &branch_name, | |
| "--base", | |
| "main", | |
| ]) | |
| .output()?; | |
| // Checkout back to main and restore stash | |
| tracing::info!("AI SRE: Restoring original development branch and changes..."); | |
| let _ = std::process::Command::new("git") | |
| .args(["checkout", "main"]) | |
| .output()?; | |
| let _ = std::process::Command::new("git") | |
| .args(["stash", "pop"]) | |
| .output()?; | |
| if !pr_output.status.success() { | |
| broadcast("COMMITTING", "FAILED", "Failed to open Pull Request."); | |
| let err = String::from_utf8_lossy(&pr_output.stderr); | |
| return Err(format!("GitHub PR creation failed: {}", err).into()); | |
| } | |
| let pr_url = String::from_utf8_lossy(&pr_output.stdout) | |
| .trim() | |
| .to_string(); | |
| broadcast( | |
| "COMMITTING", | |
| "COMPLETED", | |
| &format!("PR successfully opened at: {}", pr_url), | |
| ); | |
| Ok(pr_url) | |
| } | |
| async fn send_sre_notification( | |
| id: &str, | |
| issue_summary: &str, | |
| is_success: bool, | |
| details_or_url: &str, | |
| ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| let webhook_url = match std::env::var("SRE_WEBHOOK_URL") | |
| .or_else(|_| std::env::var("SLACK_WEBHOOK_URL")) | |
| { | |
| Ok(url) => url, | |
| Err(_) => { | |
| tracing::info!("AI SRE: No SRE_WEBHOOK_URL or SLACK_WEBHOOK_URL set in environment. Skipping webhook notification."); | |
| return Ok(()); | |
| } | |
| }; | |
| if webhook_url.trim().is_empty() { | |
| return Ok(()); | |
| } | |
| let client = reqwest::Client::new(); | |
| let payload = if webhook_url.contains("discord.com") { | |
| let color = if is_success { 0x00E59B } else { 0xEF4444 }; | |
| let title = if is_success { | |
| format!("🟢 SRE Auto-Healing Success: {}", issue_summary) | |
| } else { | |
| format!("🔴 SRE Auto-Healing Failure: {}", issue_summary) | |
| }; | |
| let description = if is_success { | |
| format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n**GitHub Pull Request**:\n<{}>", details_or_url) | |
| } else { | |
| let mut err_text = details_or_url.to_string(); | |
| if err_text.len() > 1000 { | |
| err_text.truncate(950); | |
| err_text.push_str("\n... [truncated]"); | |
| } | |
| format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n**Diagnostic Logs**:\n```\n{}\n```", err_text) | |
| }; | |
| serde_json::json!({ | |
| "username": "Antigravity AI SRE", | |
| "avatar_url": "https://img.icons8.com/color/96/000000/robot-vacuum.png", | |
| "embeds": [{ | |
| "title": title, | |
| "description": description, | |
| "color": color, | |
| "footer": { | |
| "text": format!("SRE Insight ID: {}", id) | |
| }, | |
| "timestamp": chrono::Utc::now().to_rfc3339() | |
| }] | |
| }) | |
| } else { | |
| let color = if is_success { "#00E59B" } else { "#EF4444" }; | |
| let text = if is_success { | |
| format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n*GitHub Pull Request*:\n<{}>", details_or_url) | |
| } else { | |
| let mut err_text = details_or_url.to_string(); | |
| if err_text.len() > 1000 { | |
| err_text.truncate(950); | |
| err_text.push_str("\n... [truncated]"); | |
| } | |
| format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n*Diagnostic Logs*:\n```\n{}\n```", err_text) | |
| }; | |
| serde_json::json!({ | |
| "text": format!("*Antigravity AI SRE Watchdog Alert*"), | |
| "attachments": [{ | |
| "color": color, | |
| "title": if is_success { format!("🟢 SRE Auto-Healing Success: {}", issue_summary) } else { format!("🔴 SRE Auto-Healing Failure: {}", issue_summary) }, | |
| "text": text, | |
| "footer": format!("SRE Insight ID: {}", id) | |
| }] | |
| }) | |
| }; | |
| let response = client.post(&webhook_url).json(&payload).send().await?; | |
| if !response.status().is_success() { | |
| tracing::error!( | |
| "AI SRE: Failed to send Slack/Discord notification (status {}): {:?}", | |
| response.status(), | |
| response.text().await | |
| ); | |
| } else { | |
| tracing::info!("AI SRE: Sent Slack/Discord SRE notification successfully."); | |
| } | |
| Ok(()) | |
| } | |
| async fn apply_patch_string(diff: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| let patch_path = "./temp_sre_patch.patch"; | |
| std::fs::write(patch_path, diff)?; | |
| let output = std::process::Command::new("patch") | |
| .arg("-p1") | |
| .arg("-i") | |
| .arg(patch_path) | |
| .output()?; | |
| let _ = std::fs::remove_file(patch_path); | |
| if !output.status.success() { | |
| let err_msg = String::from_utf8_lossy(&output.stderr).to_string(); | |
| return Err(format!("Patch apply failed: {}", err_msg).into()); | |
| } | |
| Ok(()) | |
| } | |
| async fn cleanup_failed_branch(branch_name: &str) { | |
| let _ = std::process::Command::new("git") | |
| .args(["checkout", "main"]) | |
| .output(); | |
| let _ = std::process::Command::new("git") | |
| .args(["reset", "--hard", "origin/main"]) | |
| .output(); | |
| let _ = std::process::Command::new("git") | |
| .args(["branch", "-D", branch_name]) | |
| .output(); | |
| } | |
| pub fn start_ai_log_monitor(pool: sqlx::PgPool) { | |
| tokio::spawn(async move { | |
| let mut last_check_time = chrono::Utc::now(); | |
| tracing::info!("AI SRE: Continuous Log & Database Activity Monitor Active."); | |
| loop { | |
| tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; | |
| // Query any new critical risk audit logs since last check | |
| let result = sqlx::query( | |
| "SELECT id, event_type, risk_level, details, device_fingerprint, created_at FROM risk_audit_logs WHERE risk_level IN ('HIGH', 'CRITICAL') AND created_at > $1 ORDER BY created_at ASC" | |
| ) | |
| .bind(last_check_time) | |
| .fetch_all(&pool) | |
| .await; | |
| match result { | |
| Ok(rows) => { | |
| for row in rows { | |
| use sqlx::Row; | |
| let id: i64 = row.get("id"); | |
| let event_type: String = row.get("event_type"); | |
| let risk_level: String = row.get("risk_level"); | |
| let details: String = row.get("details"); | |
| let fingerprint: String = row.get("device_fingerprint"); | |
| let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at"); | |
| tracing::info!("AI SRE: Detected critical security event in audit logs (ID: {}). Autonomously analyzing...", id); | |
| let source = format!("SYSTEM_SECURITY_{}", event_type); | |
| let message = format!("Security Threat Detected: {}", details); | |
| let stack_trace = Some(format!( | |
| "Device Fingerprint: {}\nLog ID: {}\nRisk Level: {}", | |
| fingerprint, id, risk_level | |
| )); | |
| let pool_clone = pool.clone(); | |
| tokio::spawn(async move { | |
| if let Err(e) = analyze_error_with_ai( | |
| pool_clone, | |
| source, | |
| risk_level, | |
| message, | |
| stack_trace, | |
| ) | |
| .await | |
| { | |
| tracing::error!( | |
| "AI SRE: Autonomous analysis failed for audit log {}: {:?}", | |
| id, | |
| e | |
| ); | |
| } | |
| }); | |
| if created_at > last_check_time { | |
| last_check_time = created_at; | |
| } | |
| } | |
| } | |
| Err(e) => { | |
| tracing::error!("AI SRE: Error querying critical audit logs: {:?}", e); | |
| } | |
| } | |
| } | |
| }); | |
| } | |