use crate::interfaces::http::api::AppState; use crate::interfaces::http::routes::RequireDev; use axum::{ extract::State, http::StatusCode, routing::{get, post}, Json, Router, }; use serde::{Deserialize, Serialize}; #[derive(Serialize)] pub struct MerchantSummary { pub merchant_id: String, pub email: String, pub brand_name: String, pub slug: String, pub plan: String, pub role: String, pub trust_score: f64, pub verification_level: String, } #[derive(Deserialize)] pub struct AuditQuery { pub risk_level: Option, pub event_type: Option, pub page: Option, pub limit: Option, } pub fn router() -> Router { Router::new() .route("/merchants", get(list_merchants)) .route("/audit", get(get_global_audit_logs)) .route("/sandbox/simulate-rate-limit", post(simulate_rate_limit)) .route("/sandbox/simulate-velocity", post(simulate_velocity)) .route("/sandbox/reset", post(reset_sandbox)) .route("/telemetry", axum::routing::post(ingest_telemetry)) .route("/ai-insights", axum::routing::get(get_ai_insights)) .route("/ai-insights/:id", axum::routing::patch(update_ai_insight)) } pub async fn list_merchants( State(state): State, RequireDev(_dev_id): RequireDev, ) -> Result>, StatusCode> { let rows = sqlx::query( "SELECT merchant_id, email, brand_name, slug, plan, role, trust_score, verification_level FROM merchants ORDER BY brand_name ASC" ) .fetch_all(&state.pool) .await .map_err(|e| { tracing::error!("Failed to fetch merchants for developer: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let merchants = rows .into_iter() .map(|r| { use sqlx::Row; MerchantSummary { merchant_id: r.get("merchant_id"), email: r.get("email"), brand_name: r.get("brand_name"), slug: r.get("slug"), plan: r.get("plan"), role: r.get("role"), trust_score: r.get("trust_score"), verification_level: r.get("verification_level"), } }) .collect(); Ok(Json(merchants)) } pub async fn get_global_audit_logs( State(state): State, query: axum::extract::Query, RequireDev(_dev_id): RequireDev, ) -> Result>, StatusCode> { let limit = query.limit.unwrap_or(50).min(200) as i64; let offset = (query.page.unwrap_or(1).saturating_sub(1) as i64) * limit; let mut sql = "SELECT * FROM risk_audit_logs".to_string(); let mut conditions = Vec::new(); let mut param_idx = 1; if query.risk_level.is_some() { conditions.push(format!("risk_level = ${}", param_idx)); param_idx += 1; } if query.event_type.is_some() { conditions.push(format!("event_type = ${}", param_idx)); param_idx += 1; } if !conditions.is_empty() { sql.push_str(" WHERE "); sql.push_str(&conditions.join(" AND ")); } sql.push_str(&format!( " ORDER BY created_at DESC LIMIT ${} OFFSET ${}", param_idx, param_idx + 1 )); let mut db_query = sqlx::query(&sql); if let Some(rl) = &query.risk_level { db_query = db_query.bind(rl); } if let Some(et) = &query.event_type { db_query = db_query.bind(et); } db_query = db_query.bind(limit).bind(offset); let rows = db_query.fetch_all(&state.pool).await.map_err(|e| { tracing::error!("Failed to fetch global audit logs: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let logs = rows .into_iter() .map(|r| { use sqlx::Row; crate::domain::models::RiskAuditLog { id: r.get("id"), transaction_id: r.get("transaction_id"), merchant_id: r.get("merchant_id"), event_type: r.get("event_type"), risk_level: r.get("risk_level"), details: r.get("details"), device_fingerprint: r.get("device_fingerprint"), request_id: r.try_get("request_id").ok(), entry_hash: r.try_get("entry_hash").unwrap_or_default(), previous_hash: r.try_get("previous_hash").unwrap_or_default(), created_at: r.get("created_at"), } }) .collect(); Ok(Json(logs)) } #[derive(Deserialize)] pub struct SimulateSandboxRequest { pub merchant_id: String, } pub async fn simulate_rate_limit( State(state): State, RequireDev(_dev_id): RequireDev, Json(payload): Json, ) -> Result, StatusCode> { let simulated_ip = "198.51.100.42"; // Increment rate limiter 105 times programmatically to trigger the auth ban let rate_key = format!("auth:{}", simulated_ip); for _ in 0..105 { crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key); } // Persistently block the IP crate::interfaces::http::middleware::block_ip_persistently( &state.pool, simulated_ip, "Simulated Attack: Repeated Auth Rate Limit Violations", Some(&state.tx), ) .await; // Log a risk audit log let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); let _ = sqlx::query( "INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" ) .bind("SYSTEM_SANDBOX") .bind(&payload.merchant_id) .bind("SINGLETON_BLOCK") .bind("CRITICAL") .bind(format!("Rate limiter dynamically triggered persistently blocked state for malicious IP: {}", simulated_ip)) .bind("sandbox_fingerprint") .bind(uuid::Uuid::new_v4().to_string()) .bind(&entry_hash) .bind(&prev_hash) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to insert sandbox audit log: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(serde_json::json!({ "success": true, "blocked_ip": simulated_ip, "message": "Automated IP block and security logs successfully triggered. Check your live dashboard!" }))) } pub async fn simulate_velocity( State(state): State, RequireDev(_dev_id): RequireDev, Json(payload): Json, ) -> Result, StatusCode> { let fingerprint = "dev_fingerprint_sandbox_99"; // Fetch current count to increment it use sqlx::Row; let activity = sqlx::query( "SELECT activity_count FROM velocity_metrics WHERE fingerprint = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'" ) .bind(fingerprint) .fetch_optional(&state.pool) .await .map_err(|e| { tracing::error!("Failed to fetch velocity count: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; let new_count = match activity { Some(row) => { let count: i32 = row.get("activity_count"); let next = count + 1; sqlx::query("UPDATE velocity_metrics SET activity_count = $1, last_activity_at = CURRENT_TIMESTAMP WHERE fingerprint = $2") .bind(next) .bind(fingerprint) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to update velocity metrics: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; next } None => { sqlx::query("INSERT INTO velocity_metrics (fingerprint, merchant_id, activity_count) VALUES ($1, $2, 1)") .bind(fingerprint) .bind(&payload.merchant_id) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to insert velocity metrics: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; 1 } }; // Calculate score let score = state .intelligence_service .evaluate_velocity_risk(Some(fingerprint), None, &payload.merchant_id) .await .unwrap_or(0.0); // If score >= 85, trigger a RiskAlert realtime event so it lights up on the dashboard if score >= 85.0 { let _ = state.tx.send(crate::interfaces::http::api::RealtimeEvent::RiskAlert { transaction_id: format!("tx_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..16]), merchant_id: payload.merchant_id.clone(), risk_score: score, message: format!("VELOCITY THREAT: Sandbox fingerprint {} triggered singleton abuse alert (Count: {}, Score: {:.1})", fingerprint, new_count, score), }); // Insert risk audit log entry let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", "")); let _ = sqlx::query( "INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" ) .bind("SYSTEM_SANDBOX") .bind(&payload.merchant_id) .bind("VOLUMETRIC_DEVIATION") .bind("HIGH") .bind(format!("Singleton abuse attempt blocked. Device fingerprint: {}. Activity count: {}, Computed risk: {:.1}", fingerprint, new_count, score)) .bind(fingerprint) .bind(uuid::Uuid::new_v4().to_string()) .bind(&entry_hash) .bind(&prev_hash) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to insert sandbox velocity audit log: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; } Ok(Json(serde_json::json!({ "success": true, "fingerprint": fingerprint, "activity_count": new_count, "risk_score": score, "message": format!("Simulated rapid device checkout. Count: {}, Computed Risk Score: {:.1}", new_count, score) }))) } pub async fn reset_sandbox( State(state): State, RequireDev(_dev_id): RequireDev, Json(_payload): Json, ) -> Result, StatusCode> { // Delete test blocks and velocity metrics from database let _ = sqlx::query("DELETE FROM security_blocks WHERE ip = '198.51.100.42'") .execute(&state.pool) .await; let _ = sqlx::query( "DELETE FROM velocity_metrics WHERE fingerprint = 'dev_fingerprint_sandbox_99'", ) .execute(&state.pool) .await; let _ = sqlx::query("DELETE FROM risk_audit_logs WHERE transaction_id = 'SYSTEM_SANDBOX'") .execute(&state.pool) .await; // Clear live caches crate::interfaces::http::middleware::BLOCKED_IP_CACHE.remove("198.51.100.42"); crate::interfaces::http::middleware::BLOOM_FILTER.clear(); // Clear rate limits let rate_key = "auth:198.51.100.42".to_string(); crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key); // will reset/clear if empty Ok(Json(serde_json::json!({ "success": true, "message": "Sandbox environments flushed. All security limits reset successfully." }))) } #[derive(serde::Deserialize)] pub struct IngestTelemetryRequest { pub source: String, pub error_level: String, pub message: String, pub stack_trace: Option, pub user_context: Option, } pub async fn ingest_telemetry( axum::extract::State(state): axum::extract::State, axum::extract::Json(payload): axum::extract::Json, ) -> Result, axum::http::StatusCode> { let ctx = payload .user_context .unwrap_or_else(|| serde_json::json!({})); let _ = sqlx::query( "INSERT INTO error_telemetry (source, error_level, message, stack_trace, user_context) VALUES ($1, $2, $3, $4, $5)" ) .bind(&payload.source) .bind(&payload.error_level) .bind(&payload.message) .bind(&payload.stack_trace) .bind(&ctx) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to ingest telemetry: {:?}", e); axum::http::StatusCode::INTERNAL_SERVER_ERROR })?; // Spawn SRE telemetry analysis asynchronously in a background local Ollama task let pool = state.pool.clone(); let source = payload.source.clone(); let error_level = payload.error_level.clone(); let message = payload.message.clone(); let stack_trace = payload.stack_trace.clone(); tokio::spawn(async move { if let Err(e) = analyze_error_with_ai(pool, source, error_level, message, stack_trace).await { tracing::error!("Failed AI SRE analysis: {:?}", e); } }); Ok(axum::extract::Json(serde_json::json!({ "success": true }))) } async fn analyze_error_with_ai( pool: sqlx::PgPool, source: String, error_level: String, message: String, stack_trace: Option, ) -> Result<(), Box> { // 1. Build HTTP client with a strict 45-second timeout to prevent hanging threads let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(45)) .build()?; let prompt = format!( "You are an SRE and AI Coding Engineer. Analyze this telemetry: Source: {} Error Level: {} Message: {} Stack Trace: {:?} Analyze the root cause and provide a structured code fix. Your output MUST be a valid JSON object ONLY. DO NOT wrap it in markdown code blocks or add explanations outside the JSON. JSON Keys required: {{ \"issue_summary\": \"one-line summary of bug\", \"root_cause_analysis\": \"detailed root cause\", \"proposed_solution\": \"detailed solution to fix\", \"suggested_code_diff\": \"git-diff format patch demonstrating how to fix the code, or null\", \"suggested_test_code_diff\": \"git-diff format patch demonstrating how to add a unit test in the test suite to verify the solution/fix, or null\", \"metrics_affected\": {{ \"predicted_latency_reduction_ms\": 50 }} }}", source, error_level, message, stack_trace ); let clean_json = if let Ok(groq_key) = std::env::var("GROQ_API_KEY") { tracing::info!("AI SRE: Using Groq Cloud API (free tier open-source Llama 3) for analysis"); let req_body = serde_json::json!({ "model": "llama-3.3-70b-versatile", "messages": [{"role": "user", "content": prompt}], "response_format": {"type": "json_object"}, "temperature": 0.1 }); let res = client .post("https://api.groq.com/openai/v1/chat/completions") .header("Authorization", format!("Bearer {}", groq_key)) .json(&req_body) .send() .await?; if !res.status().is_success() { let status = res.status(); let err_text = res.text().await?; return Err(format!("Groq API failed (status {}): {}", status, err_text).into()); } let resp_json: serde_json::Value = res.json().await?; let text = resp_json["choices"][0]["message"]["content"] .as_str() .ok_or("Failed to extract content from Groq response")? .trim() .to_string(); text } else if let Ok(openrouter_key) = std::env::var("OPENROUTER_API_KEY") { tracing::info!("AI SRE: Using OpenRouter API (free tier open-source Llama 3) for analysis"); let req_body = serde_json::json!({ "model": "meta-llama/llama-3-8b-instruct:free", "messages": [{"role": "user", "content": prompt}], "response_format": {"type": "json_object"}, "temperature": 0.1 }); let res = client .post("https://openrouter.ai/api/v1/chat/completions") .header("Authorization", format!("Bearer {}", openrouter_key)) .json(&req_body) .send() .await?; if !res.status().is_success() { let status = res.status(); let err_text = res.text().await?; return Err(format!("OpenRouter API failed (status {}): {}", status, err_text).into()); } let resp_json: serde_json::Value = res.json().await?; let text = resp_json["choices"][0]["message"]["content"] .as_str() .ok_or("Failed to extract content from OpenRouter response")? .trim() .to_string(); text } else if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") { tracing::info!("AI SRE: Using Google Gemini API cloud provider for analysis"); let url = format!( "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={}", gemini_key ); let req_body = serde_json::json!({ "contents": [{ "parts": [{ "text": prompt }] }], "generationConfig": { "responseMimeType": "application/json" } }); let res = client.post(&url).json(&req_body).send().await?; if !res.status().is_success() { let status = res.status(); let err_text = res.text().await?; return Err(format!("Gemini API failed (status {}): {}", status, err_text).into()); } let resp_json: serde_json::Value = res.json().await?; let text = resp_json["candidates"][0]["content"]["parts"][0]["text"] .as_str() .ok_or("Failed to extract text from Gemini response")? .trim() .to_string(); text } else { tracing::info!("AI SRE: No cloud API keys found. Falling back to local Ollama..."); // 1. Fetch available models from local Ollama let tags_res = client.get("http://localhost:11434/api/tags").send().await?; #[derive(serde::Deserialize)] struct OllamaModel { name: String, } #[derive(serde::Deserialize)] struct OllamaTags { models: Vec, } let tags: OllamaTags = tags_res.json().await?; if tags.models.is_empty() { return Err("No models installed in local Ollama".into()); } // Prefer llama2 first because it's 7B and faster, then qwen (9.7B), then first available let mut model_name = tags.models[0].name.clone(); for m in &tags.models { if m.name.contains("llama") { model_name = m.name.clone(); break; } } if !model_name.contains("llama") { for m in &tags.models { if m.name.contains("qwen") { model_name = m.name.clone(); break; } } } tracing::info!("Selected local SRE analysis model: {}", model_name); // 2. Request Ollama API with options to limit generation and speed up inference let req_body = serde_json::json!({ "model": model_name, "prompt": prompt, "stream": false, "options": { "num_predict": 400, "temperature": 0.1 } }); let res = client .post("http://localhost:11434/api/generate") .json(&req_body) .send() .await?; if !res.status().is_success() { let status = res.status(); let err_text = res.text().await?; return Err(format!("Ollama API failed (status {}): {}", status, err_text).into()); } #[derive(serde::Deserialize)] struct OllamaResponse { response: String, } let resp_data: OllamaResponse = res.json().await?; resp_data.response.trim().to_string() }; // Clean JSON markdown block wrapping let mut clean_json_str = clean_json.as_str(); if clean_json_str.starts_with("```json") { clean_json_str = &clean_json_str[7..]; } else if clean_json_str.starts_with("```") { clean_json_str = &clean_json_str[3..]; } if clean_json_str.ends_with("```") { clean_json_str = &clean_json_str[..clean_json_str.len() - 3]; } let clean_json_str = clean_json_str.trim(); // 4. Parse JSON response #[derive(serde::Deserialize)] struct SreInsight { issue_summary: String, root_cause_analysis: String, proposed_solution: String, suggested_code_diff: Option, suggested_test_code_diff: Option, metrics_affected: serde_json::Value, } let insight: SreInsight = serde_json::from_str(clean_json_str)?; // 5. Insert into Database sqlx::query( "INSERT INTO rtix_app.ai_engineer_insights (status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, metrics_affected) VALUES ('PENDING_REVIEW', $1, $2, $3, $4, $5, $6)" ) .bind(&insight.issue_summary) .bind(&insight.root_cause_analysis) .bind(&insight.proposed_solution) .bind(&insight.suggested_code_diff) .bind(&insight.suggested_test_code_diff) .bind(&insight.metrics_affected) .execute(&pool) .await?; tracing::info!( "AI SRE: Successfully analyzed error telemetry and generated insight: {}", insight.issue_summary ); Ok(()) } #[derive(serde::Deserialize)] pub struct AiInsightQuery { pub status: Option, } pub async fn get_ai_insights( axum::extract::State(state): axum::extract::State, query: axum::extract::Query, _dev_id: crate::interfaces::http::routes::RequireDev, ) -> Result, axum::http::StatusCode> { let mut sql = "SELECT id, created_at, status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, pr_url, error_logs, metrics_affected FROM ai_engineer_insights".to_string(); if query.status.is_some() { sql.push_str(" WHERE status = $1"); } sql.push_str(" ORDER BY created_at DESC"); let mut db_query = sqlx::query(&sql); if let Some(st) = &query.status { db_query = db_query.bind(st); } let rows = db_query.fetch_all(&state.pool).await.map_err(|e| { tracing::error!("Failed to fetch AI insights: {:?}", e); axum::http::StatusCode::INTERNAL_SERVER_ERROR })?; let insights: Vec = rows .into_iter() .map(|r| { use sqlx::Row; serde_json::json!({ "id": r.get::("id").to_string(), "created_at": r.get::, _>("created_at").to_rfc3339(), "status": r.get::("status"), "issue_summary": r.get::("issue_summary"), "root_cause_analysis": r.get::("root_cause_analysis"), "proposed_solution": r.get::("proposed_solution"), "suggested_code_diff": r.try_get::("suggested_code_diff").ok(), "suggested_test_code_diff": r.try_get::("suggested_test_code_diff").ok(), "pr_url": r.try_get::("pr_url").ok(), "error_logs": r.try_get::("error_logs").ok(), "metrics_affected": r.get::("metrics_affected") }) }) .collect(); Ok(axum::extract::Json(serde_json::json!(insights))) } #[derive(serde::Deserialize)] pub struct UpdateAiInsightRequest { pub status: String, } pub async fn update_ai_insight( axum::extract::State(state): axum::extract::State, axum::extract::Path(id): axum::extract::Path, _dev_id: crate::interfaces::http::routes::RequireDev, axum::extract::Json(payload): axum::extract::Json, ) -> Result, axum::http::StatusCode> { let uuid_val = uuid::Uuid::parse_str(&id).map_err(|_| axum::http::StatusCode::BAD_REQUEST)?; let mut patch_applied = true; let mut patch_error = None; let mut pr_url = None; // If status is changed to APPROVED, fetch the patch and execute it on the codebase! if payload.status == "APPROVED" { let row = sqlx::query("SELECT issue_summary, suggested_code_diff, suggested_test_code_diff FROM rtix_app.ai_engineer_insights WHERE id = $1") .bind(uuid_val) .fetch_one(&state.pool) .await .map_err(|e| { tracing::error!("Failed to fetch SRE insight for execution: {:?}", e); axum::http::StatusCode::INTERNAL_SERVER_ERROR })?; use sqlx::Row; let issue_summary = row.get::("issue_summary"); let diff = row .try_get::, _>("suggested_code_diff") .ok() .flatten() .unwrap_or_default(); let test_diff = row .try_get::, _>("suggested_test_code_diff") .ok() .flatten() .unwrap_or_default(); if !diff.trim().is_empty() { tracing::info!( "AI SRE: Starting comprehensive self-healing workflow for SRE insight {}...", id ); match execute_self_healing_workflow(&id, &issue_summary, &diff, &test_diff, &state.tx) .await { Ok(url) => { tracing::info!( "AI SRE: Workflow successfully completed! PR created at: {}", url ); pr_url = Some(url.clone()); // Send Slack/Discord notification for success! let _ = send_sre_notification(&id, &issue_summary, true, &url).await; } Err(e) => { tracing::warn!("AI SRE: Self-healing workflow failed: {:?}", e); patch_applied = false; patch_error = Some(e.to_string()); // Send Slack/Discord notification for failure! let _ = send_sre_notification(&id, &issue_summary, false, &e.to_string()).await; } } } } let final_status = if payload.status == "APPROVED" { if patch_applied { "IMPLEMENTED".to_string() } else { "PENDING_REVIEW".to_string() } } else { payload.status.clone() }; let result = sqlx::query( "UPDATE ai_engineer_insights SET status = $1, pr_url = $2, error_logs = $3 WHERE id = $4", ) .bind(&final_status) .bind(pr_url.clone()) .bind(patch_error.clone()) .bind(uuid_val) .execute(&state.pool) .await .map_err(|e| { tracing::error!("Failed to update AI insight: {:?}", e); axum::http::StatusCode::INTERNAL_SERVER_ERROR })?; if result.rows_affected() == 0 { return Err(axum::http::StatusCode::NOT_FOUND); } Ok(axum::extract::Json(serde_json::json!({ "success": true, "patch_applied": patch_applied, "warning": patch_error, "pr_url": pr_url }))) } async fn execute_self_healing_workflow( id: &str, issue_summary: &str, code_diff: &str, test_diff: &str, tx: &tokio::sync::broadcast::Sender, ) -> Result> { let branch_name = format!("sre-fix-{}", &id[..8]); let broadcast = |step: &str, status: &str, msg: &str| { let _ = tx.send( crate::interfaces::http::api::RealtimeEvent::AIEngineerProgress { insight_id: id.to_string(), step: step.to_string(), status: status.to_string(), message: msg.to_string(), }, ); }; // 1. Stash any uncommitted local work so it doesn't get lost broadcast( "STASHING", "ACTIVE", "Safely stashing developer workspace changes...", ); tracing::info!("AI SRE: Stashing local uncommitted changes..."); let stash_output = std::process::Command::new("git").args(["stash"]).output()?; if !stash_output.status.success() { broadcast("STASHING", "FAILED", "Failed to stash workspace changes."); return Err("Git stash failed".into()); } broadcast( "STASHING", "COMPLETED", "Stashed local changes successfully.", ); // 2. Ensure we are starting from clean main branch broadcast("BRANCHING", "ACTIVE", "Switching to main branch..."); tracing::info!("AI SRE: Checking out clean main branch..."); let checkout_main = std::process::Command::new("git") .args(["checkout", "main"]) .output()?; if !checkout_main.status.success() { broadcast("BRANCHING", "FAILED", "Failed to checkout main branch."); let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err("Checkout main failed".into()); } // 3. Create and checkout new branch broadcast( "BRANCHING", "ACTIVE", &format!("Provisioning new branch {}...", branch_name), ); tracing::info!("AI SRE: Creating new git branch {}...", branch_name); let checkout_output = std::process::Command::new("git") .args(["checkout", "-b", &branch_name]) .output()?; if !checkout_output.status.success() { let _ = std::process::Command::new("git") .args(["branch", "-D", &branch_name]) .output()?; let second_checkout = std::process::Command::new("git") .args(["checkout", "-b", &branch_name]) .output()?; if !second_checkout.status.success() { broadcast("BRANCHING", "FAILED", "Failed to checkout new branch."); let _ = std::process::Command::new("git") .args(["checkout", "main"]) .output()?; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err("Git branch creation failed".into()); } } broadcast( "BRANCHING", "COMPLETED", &format!("SRE branch {} provisioned successfully.", branch_name), ); // 4. Apply the code patch broadcast("PATCHING", "ACTIVE", "Applying AI suggested code patch..."); tracing::info!("AI SRE: Applying code fix diff..."); if let Err(e) = apply_patch_string(code_diff).await { broadcast( "PATCHING", "FAILED", &format!("Failed to apply code patch: {}", e), ); cleanup_failed_branch(&branch_name).await; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err(format!("Failed to apply code diff patch: {}", e).into()); } // 5. Apply the test case patch if present if !test_diff.trim().is_empty() { broadcast( "PATCHING", "ACTIVE", "Applying AI suggested unit test patch...", ); tracing::info!("AI SRE: Applying AI-generated test case..."); if let Err(e) = apply_patch_string(test_diff).await { broadcast( "PATCHING", "FAILED", &format!("Failed to apply test patch: {}", e), ); cleanup_failed_branch(&branch_name).await; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err(format!("Failed to apply test case patch: {}", e).into()); } } broadcast( "PATCHING", "COMPLETED", "All suggested code and test patches successfully applied.", ); // 6. Run automated test cases using cargo test broadcast( "TESTING", "ACTIVE", "Executing automated test suite ('cargo test')...", ); tracing::info!("AI SRE: Running cargo test to verify fix and new test case..."); let test_output = std::process::Command::new("cargo") .args(["test"]) .output()?; if !test_output.status.success() { broadcast("TESTING", "FAILED", "Cargo test suite failed verification."); let stderr = String::from_utf8_lossy(&test_output.stderr); let stdout = String::from_utf8_lossy(&test_output.stdout); cleanup_failed_branch(&branch_name).await; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err(format!( "Automated tests failed.\nStderr: {}\nStdout: {}", stderr, stdout ) .into()); } broadcast( "TESTING", "COMPLETED", "All automated verification tests passed flawlessly (100% pass rate).", ); // 7. Commit the changes broadcast( "COMMITTING", "ACTIVE", "Committing changes and preparing to push...", ); tracing::info!("AI SRE: Committing code and test fix to git..."); let _ = std::process::Command::new("git") .args(["add", "."]) .output()?; let commit_msg = format!("sre-fix: {}", issue_summary); let commit_output = std::process::Command::new("git") .args(["commit", "-m", &commit_msg]) .output()?; if !commit_output.status.success() { broadcast("COMMITTING", "FAILED", "Failed to commit changes locally."); cleanup_failed_branch(&branch_name).await; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err("Git commit failed".into()); } // 8. Push to origin/GitHub remote repo broadcast( "COMMITTING", "ACTIVE", "Pushing branch to GitHub remote repository...", ); tracing::info!("AI SRE: Pushing branch {} to remote origin...", branch_name); let push_output = std::process::Command::new("git") .args(["push", "-u", "origin", &branch_name, "-f"]) .output()?; if !push_output.status.success() { broadcast( "COMMITTING", "FAILED", "Failed to push SRE branch to remote repository.", ); let err = String::from_utf8_lossy(&push_output.stderr); cleanup_failed_branch(&branch_name).await; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; return Err(format!("Failed to push to GitHub remote: {}", err).into()); } // 9. Create GitHub Pull Request using 'gh' CLI! broadcast( "COMMITTING", "ACTIVE", "Opening GitHub Pull Request using GitHub CLI...", ); tracing::info!("AI SRE: Creating GitHub Pull Request..."); let pr_body = format!( "## AI SRE Self-Healing Fix\n\n**Issue**: {}\n\n**Resolution**: Verified successfully via automated tests (`cargo test` passed with zero errors, including autonomous test cases).\n\n*Created autonomously by AI SRE Log & Telemetry Watchdog.*", issue_summary ); let pr_output = std::process::Command::new("gh") .args([ "pr", "create", "--title", &format!("sre-fix: {}", issue_summary), "--body", &pr_body, "--head", &branch_name, "--base", "main", ]) .output()?; // Checkout back to main and restore stash tracing::info!("AI SRE: Restoring original development branch and changes..."); let _ = std::process::Command::new("git") .args(["checkout", "main"]) .output()?; let _ = std::process::Command::new("git") .args(["stash", "pop"]) .output()?; if !pr_output.status.success() { broadcast("COMMITTING", "FAILED", "Failed to open Pull Request."); let err = String::from_utf8_lossy(&pr_output.stderr); return Err(format!("GitHub PR creation failed: {}", err).into()); } let pr_url = String::from_utf8_lossy(&pr_output.stdout) .trim() .to_string(); broadcast( "COMMITTING", "COMPLETED", &format!("PR successfully opened at: {}", pr_url), ); Ok(pr_url) } async fn send_sre_notification( id: &str, issue_summary: &str, is_success: bool, details_or_url: &str, ) -> Result<(), Box> { let webhook_url = match std::env::var("SRE_WEBHOOK_URL") .or_else(|_| std::env::var("SLACK_WEBHOOK_URL")) { Ok(url) => url, Err(_) => { tracing::info!("AI SRE: No SRE_WEBHOOK_URL or SLACK_WEBHOOK_URL set in environment. Skipping webhook notification."); return Ok(()); } }; if webhook_url.trim().is_empty() { return Ok(()); } let client = reqwest::Client::new(); let payload = if webhook_url.contains("discord.com") { let color = if is_success { 0x00E59B } else { 0xEF4444 }; let title = if is_success { format!("🟢 SRE Auto-Healing Success: {}", issue_summary) } else { format!("🔴 SRE Auto-Healing Failure: {}", issue_summary) }; let description = if is_success { format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n**GitHub Pull Request**:\n<{}>", details_or_url) } else { let mut err_text = details_or_url.to_string(); if err_text.len() > 1000 { err_text.truncate(950); err_text.push_str("\n... [truncated]"); } format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n**Diagnostic Logs**:\n```\n{}\n```", err_text) }; serde_json::json!({ "username": "Antigravity AI SRE", "avatar_url": "https://img.icons8.com/color/96/000000/robot-vacuum.png", "embeds": [{ "title": title, "description": description, "color": color, "footer": { "text": format!("SRE Insight ID: {}", id) }, "timestamp": chrono::Utc::now().to_rfc3339() }] }) } else { let color = if is_success { "#00E59B" } else { "#EF4444" }; let text = if is_success { format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n*GitHub Pull Request*:\n<{}>", details_or_url) } else { let mut err_text = details_or_url.to_string(); if err_text.len() > 1000 { err_text.truncate(950); err_text.push_str("\n... [truncated]"); } format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n*Diagnostic Logs*:\n```\n{}\n```", err_text) }; serde_json::json!({ "text": format!("*Antigravity AI SRE Watchdog Alert*"), "attachments": [{ "color": color, "title": if is_success { format!("🟢 SRE Auto-Healing Success: {}", issue_summary) } else { format!("🔴 SRE Auto-Healing Failure: {}", issue_summary) }, "text": text, "footer": format!("SRE Insight ID: {}", id) }] }) }; let response = client.post(&webhook_url).json(&payload).send().await?; if !response.status().is_success() { tracing::error!( "AI SRE: Failed to send Slack/Discord notification (status {}): {:?}", response.status(), response.text().await ); } else { tracing::info!("AI SRE: Sent Slack/Discord SRE notification successfully."); } Ok(()) } async fn apply_patch_string(diff: &str) -> Result<(), Box> { let patch_path = "./temp_sre_patch.patch"; std::fs::write(patch_path, diff)?; let output = std::process::Command::new("patch") .arg("-p1") .arg("-i") .arg(patch_path) .output()?; let _ = std::fs::remove_file(patch_path); if !output.status.success() { let err_msg = String::from_utf8_lossy(&output.stderr).to_string(); return Err(format!("Patch apply failed: {}", err_msg).into()); } Ok(()) } async fn cleanup_failed_branch(branch_name: &str) { let _ = std::process::Command::new("git") .args(["checkout", "main"]) .output(); let _ = std::process::Command::new("git") .args(["reset", "--hard", "origin/main"]) .output(); let _ = std::process::Command::new("git") .args(["branch", "-D", branch_name]) .output(); } pub fn start_ai_log_monitor(pool: sqlx::PgPool) { tokio::spawn(async move { let mut last_check_time = chrono::Utc::now(); tracing::info!("AI SRE: Continuous Log & Database Activity Monitor Active."); loop { tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; // Query any new critical risk audit logs since last check let result = sqlx::query( "SELECT id, event_type, risk_level, details, device_fingerprint, created_at FROM risk_audit_logs WHERE risk_level IN ('HIGH', 'CRITICAL') AND created_at > $1 ORDER BY created_at ASC" ) .bind(last_check_time) .fetch_all(&pool) .await; match result { Ok(rows) => { for row in rows { use sqlx::Row; let id: i64 = row.get("id"); let event_type: String = row.get("event_type"); let risk_level: String = row.get("risk_level"); let details: String = row.get("details"); let fingerprint: String = row.get("device_fingerprint"); let created_at: chrono::DateTime = row.get("created_at"); tracing::info!("AI SRE: Detected critical security event in audit logs (ID: {}). Autonomously analyzing...", id); let source = format!("SYSTEM_SECURITY_{}", event_type); let message = format!("Security Threat Detected: {}", details); let stack_trace = Some(format!( "Device Fingerprint: {}\nLog ID: {}\nRisk Level: {}", fingerprint, id, risk_level )); let pool_clone = pool.clone(); tokio::spawn(async move { if let Err(e) = analyze_error_with_ai( pool_clone, source, risk_level, message, stack_trace, ) .await { tracing::error!( "AI SRE: Autonomous analysis failed for audit log {}: {:?}", id, e ); } }); if created_at > last_check_time { last_check_time = created_at; } } } Err(e) => { tracing::error!("AI SRE: Error querying critical audit logs: {:?}", e); } } } }); }