RTIX / src /interfaces /http /routes /developer.rs
github-actions
deploy: clean backend production release
c33971d
use crate::interfaces::http::api::AppState;
use crate::interfaces::http::routes::RequireDev;
use axum::{
extract::State,
http::StatusCode,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
#[derive(Serialize)]
pub struct MerchantSummary {
pub merchant_id: String,
pub email: String,
pub brand_name: String,
pub slug: String,
pub plan: String,
pub role: String,
pub trust_score: f64,
pub verification_level: String,
}
#[derive(Deserialize)]
pub struct AuditQuery {
pub risk_level: Option<String>,
pub event_type: Option<String>,
pub page: Option<usize>,
pub limit: Option<usize>,
}
pub fn router() -> Router<AppState> {
Router::new()
.route("/merchants", get(list_merchants))
.route("/audit", get(get_global_audit_logs))
.route("/sandbox/simulate-rate-limit", post(simulate_rate_limit))
.route("/sandbox/simulate-velocity", post(simulate_velocity))
.route("/sandbox/reset", post(reset_sandbox))
.route("/telemetry", axum::routing::post(ingest_telemetry))
.route("/ai-insights", axum::routing::get(get_ai_insights))
.route("/ai-insights/:id", axum::routing::patch(update_ai_insight))
}
pub async fn list_merchants(
State(state): State<AppState>,
RequireDev(_dev_id): RequireDev,
) -> Result<Json<Vec<MerchantSummary>>, StatusCode> {
let rows = sqlx::query(
"SELECT merchant_id, email, brand_name, slug, plan, role, trust_score, verification_level FROM merchants ORDER BY brand_name ASC"
)
.fetch_all(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to fetch merchants for developer: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let merchants = rows
.into_iter()
.map(|r| {
use sqlx::Row;
MerchantSummary {
merchant_id: r.get("merchant_id"),
email: r.get("email"),
brand_name: r.get("brand_name"),
slug: r.get("slug"),
plan: r.get("plan"),
role: r.get("role"),
trust_score: r.get("trust_score"),
verification_level: r.get("verification_level"),
}
})
.collect();
Ok(Json(merchants))
}
pub async fn get_global_audit_logs(
State(state): State<AppState>,
query: axum::extract::Query<AuditQuery>,
RequireDev(_dev_id): RequireDev,
) -> Result<Json<Vec<crate::domain::models::RiskAuditLog>>, StatusCode> {
let limit = query.limit.unwrap_or(50).min(200) as i64;
let offset = (query.page.unwrap_or(1).saturating_sub(1) as i64) * limit;
let mut sql = "SELECT * FROM risk_audit_logs".to_string();
let mut conditions = Vec::new();
let mut param_idx = 1;
if query.risk_level.is_some() {
conditions.push(format!("risk_level = ${}", param_idx));
param_idx += 1;
}
if query.event_type.is_some() {
conditions.push(format!("event_type = ${}", param_idx));
param_idx += 1;
}
if !conditions.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&conditions.join(" AND "));
}
sql.push_str(&format!(
" ORDER BY created_at DESC LIMIT ${} OFFSET ${}",
param_idx,
param_idx + 1
));
let mut db_query = sqlx::query(&sql);
if let Some(rl) = &query.risk_level {
db_query = db_query.bind(rl);
}
if let Some(et) = &query.event_type {
db_query = db_query.bind(et);
}
db_query = db_query.bind(limit).bind(offset);
let rows = db_query.fetch_all(&state.pool).await.map_err(|e| {
tracing::error!("Failed to fetch global audit logs: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let logs = rows
.into_iter()
.map(|r| {
use sqlx::Row;
crate::domain::models::RiskAuditLog {
id: r.get("id"),
transaction_id: r.get("transaction_id"),
merchant_id: r.get("merchant_id"),
event_type: r.get("event_type"),
risk_level: r.get("risk_level"),
details: r.get("details"),
device_fingerprint: r.get("device_fingerprint"),
request_id: r.try_get("request_id").ok(),
entry_hash: r.try_get("entry_hash").unwrap_or_default(),
previous_hash: r.try_get("previous_hash").unwrap_or_default(),
created_at: r.get("created_at"),
}
})
.collect();
Ok(Json(logs))
}
#[derive(Deserialize)]
pub struct SimulateSandboxRequest {
pub merchant_id: String,
}
pub async fn simulate_rate_limit(
State(state): State<AppState>,
RequireDev(_dev_id): RequireDev,
Json(payload): Json<SimulateSandboxRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let simulated_ip = "198.51.100.42";
// Increment rate limiter 105 times programmatically to trigger the auth ban
let rate_key = format!("auth:{}", simulated_ip);
for _ in 0..105 {
crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key);
}
// Persistently block the IP
crate::interfaces::http::middleware::block_ip_persistently(
&state.pool,
simulated_ip,
"Simulated Attack: Repeated Auth Rate Limit Violations",
Some(&state.tx),
)
.await;
// Log a risk audit log
let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", ""));
let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", ""));
let _ = sqlx::query(
"INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
)
.bind("SYSTEM_SANDBOX")
.bind(&payload.merchant_id)
.bind("SINGLETON_BLOCK")
.bind("CRITICAL")
.bind(format!("Rate limiter dynamically triggered persistently blocked state for malicious IP: {}", simulated_ip))
.bind("sandbox_fingerprint")
.bind(uuid::Uuid::new_v4().to_string())
.bind(&entry_hash)
.bind(&prev_hash)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to insert sandbox audit log: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"success": true,
"blocked_ip": simulated_ip,
"message": "Automated IP block and security logs successfully triggered. Check your live dashboard!"
})))
}
pub async fn simulate_velocity(
State(state): State<AppState>,
RequireDev(_dev_id): RequireDev,
Json(payload): Json<SimulateSandboxRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let fingerprint = "dev_fingerprint_sandbox_99";
// Fetch current count to increment it
use sqlx::Row;
let activity = sqlx::query(
"SELECT activity_count FROM velocity_metrics WHERE fingerprint = $1 AND window_start_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'"
)
.bind(fingerprint)
.fetch_optional(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to fetch velocity count: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let new_count = match activity {
Some(row) => {
let count: i32 = row.get("activity_count");
let next = count + 1;
sqlx::query("UPDATE velocity_metrics SET activity_count = $1, last_activity_at = CURRENT_TIMESTAMP WHERE fingerprint = $2")
.bind(next)
.bind(fingerprint)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to update velocity metrics: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
next
}
None => {
sqlx::query("INSERT INTO velocity_metrics (fingerprint, merchant_id, activity_count) VALUES ($1, $2, 1)")
.bind(fingerprint)
.bind(&payload.merchant_id)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to insert velocity metrics: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
1
}
};
// Calculate score
let score = state
.intelligence_service
.evaluate_velocity_risk(Some(fingerprint), None, &payload.merchant_id)
.await
.unwrap_or(0.0);
// If score >= 85, trigger a RiskAlert realtime event so it lights up on the dashboard
if score >= 85.0 {
let _ = state.tx.send(crate::interfaces::http::api::RealtimeEvent::RiskAlert {
transaction_id: format!("tx_{}", &uuid::Uuid::new_v4().to_string().replace("-", "")[..16]),
merchant_id: payload.merchant_id.clone(),
risk_score: score,
message: format!("VELOCITY THREAT: Sandbox fingerprint {} triggered singleton abuse alert (Count: {}, Score: {:.1})", fingerprint, new_count, score),
});
// Insert risk audit log entry
let entry_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", ""));
let prev_hash = format!("hash_{}", uuid::Uuid::new_v4().to_string().replace("-", ""));
let _ = sqlx::query(
"INSERT INTO risk_audit_logs (transaction_id, merchant_id, event_type, risk_level, details, device_fingerprint, request_id, entry_hash, previous_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
)
.bind("SYSTEM_SANDBOX")
.bind(&payload.merchant_id)
.bind("VOLUMETRIC_DEVIATION")
.bind("HIGH")
.bind(format!("Singleton abuse attempt blocked. Device fingerprint: {}. Activity count: {}, Computed risk: {:.1}", fingerprint, new_count, score))
.bind(fingerprint)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&entry_hash)
.bind(&prev_hash)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to insert sandbox velocity audit log: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
}
Ok(Json(serde_json::json!({
"success": true,
"fingerprint": fingerprint,
"activity_count": new_count,
"risk_score": score,
"message": format!("Simulated rapid device checkout. Count: {}, Computed Risk Score: {:.1}", new_count, score)
})))
}
pub async fn reset_sandbox(
State(state): State<AppState>,
RequireDev(_dev_id): RequireDev,
Json(_payload): Json<SimulateSandboxRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Delete test blocks and velocity metrics from database
let _ = sqlx::query("DELETE FROM security_blocks WHERE ip = '198.51.100.42'")
.execute(&state.pool)
.await;
let _ = sqlx::query(
"DELETE FROM velocity_metrics WHERE fingerprint = 'dev_fingerprint_sandbox_99'",
)
.execute(&state.pool)
.await;
let _ = sqlx::query("DELETE FROM risk_audit_logs WHERE transaction_id = 'SYSTEM_SANDBOX'")
.execute(&state.pool)
.await;
// Clear live caches
crate::interfaces::http::middleware::BLOCKED_IP_CACHE.remove("198.51.100.42");
crate::interfaces::http::middleware::BLOOM_FILTER.clear();
// Clear rate limits
let rate_key = "auth:198.51.100.42".to_string();
crate::interfaces::http::middleware::RATE_LIMIT_STORE.is_allowed(&rate_key); // will reset/clear if empty
Ok(Json(serde_json::json!({
"success": true,
"message": "Sandbox environments flushed. All security limits reset successfully."
})))
}
#[derive(serde::Deserialize)]
pub struct IngestTelemetryRequest {
pub source: String,
pub error_level: String,
pub message: String,
pub stack_trace: Option<String>,
pub user_context: Option<serde_json::Value>,
}
pub async fn ingest_telemetry(
axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>,
axum::extract::Json(payload): axum::extract::Json<IngestTelemetryRequest>,
) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> {
let ctx = payload
.user_context
.unwrap_or_else(|| serde_json::json!({}));
let _ = sqlx::query(
"INSERT INTO error_telemetry (source, error_level, message, stack_trace, user_context) VALUES ($1, $2, $3, $4, $5)"
)
.bind(&payload.source)
.bind(&payload.error_level)
.bind(&payload.message)
.bind(&payload.stack_trace)
.bind(&ctx)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to ingest telemetry: {:?}", e);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})?;
// Spawn SRE telemetry analysis asynchronously in a background local Ollama task
let pool = state.pool.clone();
let source = payload.source.clone();
let error_level = payload.error_level.clone();
let message = payload.message.clone();
let stack_trace = payload.stack_trace.clone();
tokio::spawn(async move {
if let Err(e) = analyze_error_with_ai(pool, source, error_level, message, stack_trace).await
{
tracing::error!("Failed AI SRE analysis: {:?}", e);
}
});
Ok(axum::extract::Json(serde_json::json!({
"success": true
})))
}
async fn analyze_error_with_ai(
pool: sqlx::PgPool,
source: String,
error_level: String,
message: String,
stack_trace: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. Build HTTP client with a strict 45-second timeout to prevent hanging threads
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(45))
.build()?;
let prompt = format!(
"You are an SRE and AI Coding Engineer. Analyze this telemetry:
Source: {}
Error Level: {}
Message: {}
Stack Trace: {:?}
Analyze the root cause and provide a structured code fix.
Your output MUST be a valid JSON object ONLY. DO NOT wrap it in markdown code blocks or add explanations outside the JSON.
JSON Keys required:
{{
\"issue_summary\": \"one-line summary of bug\",
\"root_cause_analysis\": \"detailed root cause\",
\"proposed_solution\": \"detailed solution to fix\",
\"suggested_code_diff\": \"git-diff format patch demonstrating how to fix the code, or null\",
\"suggested_test_code_diff\": \"git-diff format patch demonstrating how to add a unit test in the test suite to verify the solution/fix, or null\",
\"metrics_affected\": {{ \"predicted_latency_reduction_ms\": 50 }}
}}",
source, error_level, message, stack_trace
);
let clean_json = if let Ok(groq_key) = std::env::var("GROQ_API_KEY") {
tracing::info!("AI SRE: Using Groq Cloud API (free tier open-source Llama 3) for analysis");
let req_body = serde_json::json!({
"model": "llama-3.3-70b-versatile",
"messages": [{"role": "user", "content": prompt}],
"response_format": {"type": "json_object"},
"temperature": 0.1
});
let res = client
.post("https://api.groq.com/openai/v1/chat/completions")
.header("Authorization", format!("Bearer {}", groq_key))
.json(&req_body)
.send()
.await?;
if !res.status().is_success() {
let status = res.status();
let err_text = res.text().await?;
return Err(format!("Groq API failed (status {}): {}", status, err_text).into());
}
let resp_json: serde_json::Value = res.json().await?;
let text = resp_json["choices"][0]["message"]["content"]
.as_str()
.ok_or("Failed to extract content from Groq response")?
.trim()
.to_string();
text
} else if let Ok(openrouter_key) = std::env::var("OPENROUTER_API_KEY") {
tracing::info!("AI SRE: Using OpenRouter API (free tier open-source Llama 3) for analysis");
let req_body = serde_json::json!({
"model": "meta-llama/llama-3-8b-instruct:free",
"messages": [{"role": "user", "content": prompt}],
"response_format": {"type": "json_object"},
"temperature": 0.1
});
let res = client
.post("https://openrouter.ai/api/v1/chat/completions")
.header("Authorization", format!("Bearer {}", openrouter_key))
.json(&req_body)
.send()
.await?;
if !res.status().is_success() {
let status = res.status();
let err_text = res.text().await?;
return Err(format!("OpenRouter API failed (status {}): {}", status, err_text).into());
}
let resp_json: serde_json::Value = res.json().await?;
let text = resp_json["choices"][0]["message"]["content"]
.as_str()
.ok_or("Failed to extract content from OpenRouter response")?
.trim()
.to_string();
text
} else if let Ok(gemini_key) = std::env::var("GEMINI_API_KEY") {
tracing::info!("AI SRE: Using Google Gemini API cloud provider for analysis");
let url = format!(
"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={}",
gemini_key
);
let req_body = serde_json::json!({
"contents": [{
"parts": [{
"text": prompt
}]
}],
"generationConfig": {
"responseMimeType": "application/json"
}
});
let res = client.post(&url).json(&req_body).send().await?;
if !res.status().is_success() {
let status = res.status();
let err_text = res.text().await?;
return Err(format!("Gemini API failed (status {}): {}", status, err_text).into());
}
let resp_json: serde_json::Value = res.json().await?;
let text = resp_json["candidates"][0]["content"]["parts"][0]["text"]
.as_str()
.ok_or("Failed to extract text from Gemini response")?
.trim()
.to_string();
text
} else {
tracing::info!("AI SRE: No cloud API keys found. Falling back to local Ollama...");
// 1. Fetch available models from local Ollama
let tags_res = client.get("http://localhost:11434/api/tags").send().await?;
#[derive(serde::Deserialize)]
struct OllamaModel {
name: String,
}
#[derive(serde::Deserialize)]
struct OllamaTags {
models: Vec<OllamaModel>,
}
let tags: OllamaTags = tags_res.json().await?;
if tags.models.is_empty() {
return Err("No models installed in local Ollama".into());
}
// Prefer llama2 first because it's 7B and faster, then qwen (9.7B), then first available
let mut model_name = tags.models[0].name.clone();
for m in &tags.models {
if m.name.contains("llama") {
model_name = m.name.clone();
break;
}
}
if !model_name.contains("llama") {
for m in &tags.models {
if m.name.contains("qwen") {
model_name = m.name.clone();
break;
}
}
}
tracing::info!("Selected local SRE analysis model: {}", model_name);
// 2. Request Ollama API with options to limit generation and speed up inference
let req_body = serde_json::json!({
"model": model_name,
"prompt": prompt,
"stream": false,
"options": {
"num_predict": 400,
"temperature": 0.1
}
});
let res = client
.post("http://localhost:11434/api/generate")
.json(&req_body)
.send()
.await?;
if !res.status().is_success() {
let status = res.status();
let err_text = res.text().await?;
return Err(format!("Ollama API failed (status {}): {}", status, err_text).into());
}
#[derive(serde::Deserialize)]
struct OllamaResponse {
response: String,
}
let resp_data: OllamaResponse = res.json().await?;
resp_data.response.trim().to_string()
};
// Clean JSON markdown block wrapping
let mut clean_json_str = clean_json.as_str();
if clean_json_str.starts_with("```json") {
clean_json_str = &clean_json_str[7..];
} else if clean_json_str.starts_with("```") {
clean_json_str = &clean_json_str[3..];
}
if clean_json_str.ends_with("```") {
clean_json_str = &clean_json_str[..clean_json_str.len() - 3];
}
let clean_json_str = clean_json_str.trim();
// 4. Parse JSON response
#[derive(serde::Deserialize)]
struct SreInsight {
issue_summary: String,
root_cause_analysis: String,
proposed_solution: String,
suggested_code_diff: Option<String>,
suggested_test_code_diff: Option<String>,
metrics_affected: serde_json::Value,
}
let insight: SreInsight = serde_json::from_str(clean_json_str)?;
// 5. Insert into Database
sqlx::query(
"INSERT INTO rtix_app.ai_engineer_insights (status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, metrics_affected) VALUES ('PENDING_REVIEW', $1, $2, $3, $4, $5, $6)"
)
.bind(&insight.issue_summary)
.bind(&insight.root_cause_analysis)
.bind(&insight.proposed_solution)
.bind(&insight.suggested_code_diff)
.bind(&insight.suggested_test_code_diff)
.bind(&insight.metrics_affected)
.execute(&pool)
.await?;
tracing::info!(
"AI SRE: Successfully analyzed error telemetry and generated insight: {}",
insight.issue_summary
);
Ok(())
}
#[derive(serde::Deserialize)]
pub struct AiInsightQuery {
pub status: Option<String>,
}
pub async fn get_ai_insights(
axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>,
query: axum::extract::Query<AiInsightQuery>,
_dev_id: crate::interfaces::http::routes::RequireDev,
) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> {
let mut sql = "SELECT id, created_at, status, issue_summary, root_cause_analysis, proposed_solution, suggested_code_diff, suggested_test_code_diff, pr_url, error_logs, metrics_affected FROM ai_engineer_insights".to_string();
if query.status.is_some() {
sql.push_str(" WHERE status = $1");
}
sql.push_str(" ORDER BY created_at DESC");
let mut db_query = sqlx::query(&sql);
if let Some(st) = &query.status {
db_query = db_query.bind(st);
}
let rows = db_query.fetch_all(&state.pool).await.map_err(|e| {
tracing::error!("Failed to fetch AI insights: {:?}", e);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})?;
let insights: Vec<serde_json::Value> = rows
.into_iter()
.map(|r| {
use sqlx::Row;
serde_json::json!({
"id": r.get::<uuid::Uuid, _>("id").to_string(),
"created_at": r.get::<chrono::DateTime<chrono::Utc>, _>("created_at").to_rfc3339(),
"status": r.get::<String, _>("status"),
"issue_summary": r.get::<String, _>("issue_summary"),
"root_cause_analysis": r.get::<String, _>("root_cause_analysis"),
"proposed_solution": r.get::<String, _>("proposed_solution"),
"suggested_code_diff": r.try_get::<String, _>("suggested_code_diff").ok(),
"suggested_test_code_diff": r.try_get::<String, _>("suggested_test_code_diff").ok(),
"pr_url": r.try_get::<String, _>("pr_url").ok(),
"error_logs": r.try_get::<String, _>("error_logs").ok(),
"metrics_affected": r.get::<serde_json::Value, _>("metrics_affected")
})
})
.collect();
Ok(axum::extract::Json(serde_json::json!(insights)))
}
#[derive(serde::Deserialize)]
pub struct UpdateAiInsightRequest {
pub status: String,
}
pub async fn update_ai_insight(
axum::extract::State(state): axum::extract::State<crate::interfaces::http::api::AppState>,
axum::extract::Path(id): axum::extract::Path<String>,
_dev_id: crate::interfaces::http::routes::RequireDev,
axum::extract::Json(payload): axum::extract::Json<UpdateAiInsightRequest>,
) -> Result<axum::extract::Json<serde_json::Value>, axum::http::StatusCode> {
let uuid_val = uuid::Uuid::parse_str(&id).map_err(|_| axum::http::StatusCode::BAD_REQUEST)?;
let mut patch_applied = true;
let mut patch_error = None;
let mut pr_url = None;
// If status is changed to APPROVED, fetch the patch and execute it on the codebase!
if payload.status == "APPROVED" {
let row = sqlx::query("SELECT issue_summary, suggested_code_diff, suggested_test_code_diff FROM rtix_app.ai_engineer_insights WHERE id = $1")
.bind(uuid_val)
.fetch_one(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to fetch SRE insight for execution: {:?}", e);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})?;
use sqlx::Row;
let issue_summary = row.get::<String, _>("issue_summary");
let diff = row
.try_get::<Option<String>, _>("suggested_code_diff")
.ok()
.flatten()
.unwrap_or_default();
let test_diff = row
.try_get::<Option<String>, _>("suggested_test_code_diff")
.ok()
.flatten()
.unwrap_or_default();
if !diff.trim().is_empty() {
tracing::info!(
"AI SRE: Starting comprehensive self-healing workflow for SRE insight {}...",
id
);
match execute_self_healing_workflow(&id, &issue_summary, &diff, &test_diff, &state.tx)
.await
{
Ok(url) => {
tracing::info!(
"AI SRE: Workflow successfully completed! PR created at: {}",
url
);
pr_url = Some(url.clone());
// Send Slack/Discord notification for success!
let _ = send_sre_notification(&id, &issue_summary, true, &url).await;
}
Err(e) => {
tracing::warn!("AI SRE: Self-healing workflow failed: {:?}", e);
patch_applied = false;
patch_error = Some(e.to_string());
// Send Slack/Discord notification for failure!
let _ = send_sre_notification(&id, &issue_summary, false, &e.to_string()).await;
}
}
}
}
let final_status = if payload.status == "APPROVED" {
if patch_applied {
"IMPLEMENTED".to_string()
} else {
"PENDING_REVIEW".to_string()
}
} else {
payload.status.clone()
};
let result = sqlx::query(
"UPDATE ai_engineer_insights SET status = $1, pr_url = $2, error_logs = $3 WHERE id = $4",
)
.bind(&final_status)
.bind(pr_url.clone())
.bind(patch_error.clone())
.bind(uuid_val)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to update AI insight: {:?}", e);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})?;
if result.rows_affected() == 0 {
return Err(axum::http::StatusCode::NOT_FOUND);
}
Ok(axum::extract::Json(serde_json::json!({
"success": true,
"patch_applied": patch_applied,
"warning": patch_error,
"pr_url": pr_url
})))
}
async fn execute_self_healing_workflow(
id: &str,
issue_summary: &str,
code_diff: &str,
test_diff: &str,
tx: &tokio::sync::broadcast::Sender<crate::interfaces::http::api::RealtimeEvent>,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let branch_name = format!("sre-fix-{}", &id[..8]);
let broadcast = |step: &str, status: &str, msg: &str| {
let _ = tx.send(
crate::interfaces::http::api::RealtimeEvent::AIEngineerProgress {
insight_id: id.to_string(),
step: step.to_string(),
status: status.to_string(),
message: msg.to_string(),
},
);
};
// 1. Stash any uncommitted local work so it doesn't get lost
broadcast(
"STASHING",
"ACTIVE",
"Safely stashing developer workspace changes...",
);
tracing::info!("AI SRE: Stashing local uncommitted changes...");
let stash_output = std::process::Command::new("git").args(["stash"]).output()?;
if !stash_output.status.success() {
broadcast("STASHING", "FAILED", "Failed to stash workspace changes.");
return Err("Git stash failed".into());
}
broadcast(
"STASHING",
"COMPLETED",
"Stashed local changes successfully.",
);
// 2. Ensure we are starting from clean main branch
broadcast("BRANCHING", "ACTIVE", "Switching to main branch...");
tracing::info!("AI SRE: Checking out clean main branch...");
let checkout_main = std::process::Command::new("git")
.args(["checkout", "main"])
.output()?;
if !checkout_main.status.success() {
broadcast("BRANCHING", "FAILED", "Failed to checkout main branch.");
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err("Checkout main failed".into());
}
// 3. Create and checkout new branch
broadcast(
"BRANCHING",
"ACTIVE",
&format!("Provisioning new branch {}...", branch_name),
);
tracing::info!("AI SRE: Creating new git branch {}...", branch_name);
let checkout_output = std::process::Command::new("git")
.args(["checkout", "-b", &branch_name])
.output()?;
if !checkout_output.status.success() {
let _ = std::process::Command::new("git")
.args(["branch", "-D", &branch_name])
.output()?;
let second_checkout = std::process::Command::new("git")
.args(["checkout", "-b", &branch_name])
.output()?;
if !second_checkout.status.success() {
broadcast("BRANCHING", "FAILED", "Failed to checkout new branch.");
let _ = std::process::Command::new("git")
.args(["checkout", "main"])
.output()?;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err("Git branch creation failed".into());
}
}
broadcast(
"BRANCHING",
"COMPLETED",
&format!("SRE branch {} provisioned successfully.", branch_name),
);
// 4. Apply the code patch
broadcast("PATCHING", "ACTIVE", "Applying AI suggested code patch...");
tracing::info!("AI SRE: Applying code fix diff...");
if let Err(e) = apply_patch_string(code_diff).await {
broadcast(
"PATCHING",
"FAILED",
&format!("Failed to apply code patch: {}", e),
);
cleanup_failed_branch(&branch_name).await;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err(format!("Failed to apply code diff patch: {}", e).into());
}
// 5. Apply the test case patch if present
if !test_diff.trim().is_empty() {
broadcast(
"PATCHING",
"ACTIVE",
"Applying AI suggested unit test patch...",
);
tracing::info!("AI SRE: Applying AI-generated test case...");
if let Err(e) = apply_patch_string(test_diff).await {
broadcast(
"PATCHING",
"FAILED",
&format!("Failed to apply test patch: {}", e),
);
cleanup_failed_branch(&branch_name).await;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err(format!("Failed to apply test case patch: {}", e).into());
}
}
broadcast(
"PATCHING",
"COMPLETED",
"All suggested code and test patches successfully applied.",
);
// 6. Run automated test cases using cargo test
broadcast(
"TESTING",
"ACTIVE",
"Executing automated test suite ('cargo test')...",
);
tracing::info!("AI SRE: Running cargo test to verify fix and new test case...");
let test_output = std::process::Command::new("cargo")
.args(["test"])
.output()?;
if !test_output.status.success() {
broadcast("TESTING", "FAILED", "Cargo test suite failed verification.");
let stderr = String::from_utf8_lossy(&test_output.stderr);
let stdout = String::from_utf8_lossy(&test_output.stdout);
cleanup_failed_branch(&branch_name).await;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err(format!(
"Automated tests failed.\nStderr: {}\nStdout: {}",
stderr, stdout
)
.into());
}
broadcast(
"TESTING",
"COMPLETED",
"All automated verification tests passed flawlessly (100% pass rate).",
);
// 7. Commit the changes
broadcast(
"COMMITTING",
"ACTIVE",
"Committing changes and preparing to push...",
);
tracing::info!("AI SRE: Committing code and test fix to git...");
let _ = std::process::Command::new("git")
.args(["add", "."])
.output()?;
let commit_msg = format!("sre-fix: {}", issue_summary);
let commit_output = std::process::Command::new("git")
.args(["commit", "-m", &commit_msg])
.output()?;
if !commit_output.status.success() {
broadcast("COMMITTING", "FAILED", "Failed to commit changes locally.");
cleanup_failed_branch(&branch_name).await;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err("Git commit failed".into());
}
// 8. Push to origin/GitHub remote repo
broadcast(
"COMMITTING",
"ACTIVE",
"Pushing branch to GitHub remote repository...",
);
tracing::info!("AI SRE: Pushing branch {} to remote origin...", branch_name);
let push_output = std::process::Command::new("git")
.args(["push", "-u", "origin", &branch_name, "-f"])
.output()?;
if !push_output.status.success() {
broadcast(
"COMMITTING",
"FAILED",
"Failed to push SRE branch to remote repository.",
);
let err = String::from_utf8_lossy(&push_output.stderr);
cleanup_failed_branch(&branch_name).await;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
return Err(format!("Failed to push to GitHub remote: {}", err).into());
}
// 9. Create GitHub Pull Request using 'gh' CLI!
broadcast(
"COMMITTING",
"ACTIVE",
"Opening GitHub Pull Request using GitHub CLI...",
);
tracing::info!("AI SRE: Creating GitHub Pull Request...");
let pr_body = format!(
"## AI SRE Self-Healing Fix\n\n**Issue**: {}\n\n**Resolution**: Verified successfully via automated tests (`cargo test` passed with zero errors, including autonomous test cases).\n\n*Created autonomously by AI SRE Log & Telemetry Watchdog.*",
issue_summary
);
let pr_output = std::process::Command::new("gh")
.args([
"pr",
"create",
"--title",
&format!("sre-fix: {}", issue_summary),
"--body",
&pr_body,
"--head",
&branch_name,
"--base",
"main",
])
.output()?;
// Checkout back to main and restore stash
tracing::info!("AI SRE: Restoring original development branch and changes...");
let _ = std::process::Command::new("git")
.args(["checkout", "main"])
.output()?;
let _ = std::process::Command::new("git")
.args(["stash", "pop"])
.output()?;
if !pr_output.status.success() {
broadcast("COMMITTING", "FAILED", "Failed to open Pull Request.");
let err = String::from_utf8_lossy(&pr_output.stderr);
return Err(format!("GitHub PR creation failed: {}", err).into());
}
let pr_url = String::from_utf8_lossy(&pr_output.stdout)
.trim()
.to_string();
broadcast(
"COMMITTING",
"COMPLETED",
&format!("PR successfully opened at: {}", pr_url),
);
Ok(pr_url)
}
async fn send_sre_notification(
id: &str,
issue_summary: &str,
is_success: bool,
details_or_url: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let webhook_url = match std::env::var("SRE_WEBHOOK_URL")
.or_else(|_| std::env::var("SLACK_WEBHOOK_URL"))
{
Ok(url) => url,
Err(_) => {
tracing::info!("AI SRE: No SRE_WEBHOOK_URL or SLACK_WEBHOOK_URL set in environment. Skipping webhook notification.");
return Ok(());
}
};
if webhook_url.trim().is_empty() {
return Ok(());
}
let client = reqwest::Client::new();
let payload = if webhook_url.contains("discord.com") {
let color = if is_success { 0x00E59B } else { 0xEF4444 };
let title = if is_success {
format!("🟢 SRE Auto-Healing Success: {}", issue_summary)
} else {
format!("🔴 SRE Auto-Healing Failure: {}", issue_summary)
};
let description = if is_success {
format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n**GitHub Pull Request**:\n<{}>", details_or_url)
} else {
let mut err_text = details_or_url.to_string();
if err_text.len() > 1000 {
err_text.truncate(950);
err_text.push_str("\n... [truncated]");
}
format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n**Diagnostic Logs**:\n```\n{}\n```", err_text)
};
serde_json::json!({
"username": "Antigravity AI SRE",
"avatar_url": "https://img.icons8.com/color/96/000000/robot-vacuum.png",
"embeds": [{
"title": title,
"description": description,
"color": color,
"footer": {
"text": format!("SRE Insight ID: {}", id)
},
"timestamp": chrono::Utc::now().to_rfc3339()
}]
})
} else {
let color = if is_success { "#00E59B" } else { "#EF4444" };
let text = if is_success {
format!("The AI SRE self-healing engine has successfully patched the issue, passed all automated verification checks (`cargo test`), and created a pull request on GitHub!\n\n*GitHub Pull Request*:\n<{}>", details_or_url)
} else {
let mut err_text = details_or_url.to_string();
if err_text.len() > 1000 {
err_text.truncate(950);
err_text.push_str("\n... [truncated]");
}
format!("The AI SRE self-healing workflow failed during automated verification. System has rolled back cleanly to main.\n\n*Diagnostic Logs*:\n```\n{}\n```", err_text)
};
serde_json::json!({
"text": format!("*Antigravity AI SRE Watchdog Alert*"),
"attachments": [{
"color": color,
"title": if is_success { format!("🟢 SRE Auto-Healing Success: {}", issue_summary) } else { format!("🔴 SRE Auto-Healing Failure: {}", issue_summary) },
"text": text,
"footer": format!("SRE Insight ID: {}", id)
}]
})
};
let response = client.post(&webhook_url).json(&payload).send().await?;
if !response.status().is_success() {
tracing::error!(
"AI SRE: Failed to send Slack/Discord notification (status {}): {:?}",
response.status(),
response.text().await
);
} else {
tracing::info!("AI SRE: Sent Slack/Discord SRE notification successfully.");
}
Ok(())
}
async fn apply_patch_string(diff: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let patch_path = "./temp_sre_patch.patch";
std::fs::write(patch_path, diff)?;
let output = std::process::Command::new("patch")
.arg("-p1")
.arg("-i")
.arg(patch_path)
.output()?;
let _ = std::fs::remove_file(patch_path);
if !output.status.success() {
let err_msg = String::from_utf8_lossy(&output.stderr).to_string();
return Err(format!("Patch apply failed: {}", err_msg).into());
}
Ok(())
}
async fn cleanup_failed_branch(branch_name: &str) {
let _ = std::process::Command::new("git")
.args(["checkout", "main"])
.output();
let _ = std::process::Command::new("git")
.args(["reset", "--hard", "origin/main"])
.output();
let _ = std::process::Command::new("git")
.args(["branch", "-D", branch_name])
.output();
}
pub fn start_ai_log_monitor(pool: sqlx::PgPool) {
tokio::spawn(async move {
let mut last_check_time = chrono::Utc::now();
tracing::info!("AI SRE: Continuous Log & Database Activity Monitor Active.");
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
// Query any new critical risk audit logs since last check
let result = sqlx::query(
"SELECT id, event_type, risk_level, details, device_fingerprint, created_at FROM risk_audit_logs WHERE risk_level IN ('HIGH', 'CRITICAL') AND created_at > $1 ORDER BY created_at ASC"
)
.bind(last_check_time)
.fetch_all(&pool)
.await;
match result {
Ok(rows) => {
for row in rows {
use sqlx::Row;
let id: i64 = row.get("id");
let event_type: String = row.get("event_type");
let risk_level: String = row.get("risk_level");
let details: String = row.get("details");
let fingerprint: String = row.get("device_fingerprint");
let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
tracing::info!("AI SRE: Detected critical security event in audit logs (ID: {}). Autonomously analyzing...", id);
let source = format!("SYSTEM_SECURITY_{}", event_type);
let message = format!("Security Threat Detected: {}", details);
let stack_trace = Some(format!(
"Device Fingerprint: {}\nLog ID: {}\nRisk Level: {}",
fingerprint, id, risk_level
));
let pool_clone = pool.clone();
tokio::spawn(async move {
if let Err(e) = analyze_error_with_ai(
pool_clone,
source,
risk_level,
message,
stack_trace,
)
.await
{
tracing::error!(
"AI SRE: Autonomous analysis failed for audit log {}: {:?}",
id,
e
);
}
});
if created_at > last_check_time {
last_check_time = created_at;
}
}
}
Err(e) => {
tracing::error!("AI SRE: Error querying critical audit logs: {:?}", e);
}
}
}
});
}