Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import uuid | |
| import httpx | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from flask import Flask, request, jsonify | |
| from flask_jwt_extended import ( | |
| JWTManager, | |
| create_access_token, | |
| jwt_required, | |
| get_jwt_identity, | |
| ) | |
| import secrets | |
| from flask_cors import CORS | |
| from dotenv import load_dotenv | |
| # Import the Microsoft Agent Framework A2A classes | |
| from a2a.client import A2ACardResolver | |
| from agent_framework.a2a import A2AAgent | |
| import subprocess, sys, os, logging | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from apscheduler.triggers.cron import CronTrigger | |
| import atexit | |
| load_dotenv() | |
| # ── SCHEDULER SETUP (add this block after your Flask app is created) ────────── | |
| # Resolve the absolute path to seed_logs.py so the job works regardless of | |
| # the working directory Flask was launched from. | |
| BACKEND_DIR = os.path.dirname(os.path.abspath(__file__)) # .../backend/ | |
| SEED_SCRIPT = os.path.join(BACKEND_DIR, "seed_logs.py") | |
| def run_seed_logs(): | |
| """ | |
| Spawns seed_logs.py as a subprocess using the same Python interpreter | |
| that is running Flask. stdout/stderr are forwarded to the Flask logger | |
| so you can see seeder output in your server logs. | |
| """ | |
| logging.info("[Scheduler] Starting daily seed_logs.py run...") | |
| try: | |
| result = subprocess.run( | |
| [sys.executable, SEED_SCRIPT], # same venv Python → same packages | |
| capture_output=True, | |
| text=True, | |
| cwd=BACKEND_DIR, # run from backend/ so .env is found | |
| ) | |
| if result.returncode == 0: | |
| logging.info( | |
| "[Scheduler] seed_logs.py completed successfully.\n%s", result.stdout | |
| ) | |
| else: | |
| logging.error( | |
| "[Scheduler] seed_logs.py failed (exit %d).\nSTDOUT: %s\nSTDERR: %s", | |
| result.returncode, | |
| result.stdout, | |
| result.stderr, | |
| ) | |
| except Exception as exc: | |
| logging.exception("[Scheduler] Unexpected error running seed_logs.py: %s", exc) | |
| app = Flask(__name__) | |
| app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY") or secrets.token_hex(32) | |
| jwt = JWTManager(app) | |
| CORS(app) | |
| # Create the scheduler (BackgroundScheduler runs in a daemon thread — | |
| # it doesn't block Flask and stops automatically when the process exits). | |
| scheduler = BackgroundScheduler(timezone="UTC") | |
| scheduler.add_job( | |
| func=run_seed_logs, | |
| trigger=CronTrigger( | |
| hour=0, # midnight UTC | |
| minute=0, | |
| second=0, | |
| timezone="UTC", | |
| ), | |
| id="daily_seed_logs", | |
| name="Daily seed_logs.py at 00:00 UTC", | |
| replace_existing=True, # safe to call even if job already registered | |
| misfire_grace_time=60 * 5, # if the server was down at midnight, run | |
| # the job within the next 5 minutes instead | |
| # of skipping it entirely | |
| ) | |
| scheduler.start() | |
| logging.info( | |
| "[Scheduler] APScheduler started. Next seed run: %s", | |
| scheduler.get_job("daily_seed_logs").next_run_time, | |
| ) | |
| # Cleanly shut the scheduler down when Flask/Gunicorn exits | |
| atexit.register(lambda: scheduler.shutdown(wait=False)) | |
| # --------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------- | |
| # NOTE: Make sure this is your Kibana URL (e.g. https://xxxx.kb.asia-south1.gcp.elastic-cloud.com) | |
| KIBANA_URL = os.getenv("KIBANA_URL", "http://localhost:5601").rstrip("/") | |
| ES_API_KEY = os.getenv("ES_API_KEY") | |
| AGENT_ID = os.getenv("AGENT_ID") | |
| # --------------------------------------------------------- | |
| # Mock Database (In-Memory for MVP) | |
| # --------------------------------------------------------- | |
| db = {"cases": {}, "actions": {}, "audit_logs": []} | |
| def now_iso(): | |
| return datetime.now(timezone.utc).isoformat() | |
| def add_audit_log(case_id, actor, action_type, details): | |
| log = { | |
| "id": uuid.uuid4().hex[:8], | |
| "case_id": case_id, | |
| "actor": actor, | |
| "action_type": action_type, | |
| "details": details, | |
| "timestamp": now_iso(), | |
| } | |
| db["audit_logs"].append(log) | |
| print(f"[AUDIT] {action_type}: {details}") | |
| def trigger_seed(): | |
| """Manually kick off seed_logs.py — protected by a simple header check.""" | |
| scheduler.add_job( | |
| func=run_seed_logs, | |
| id="manual_seed", | |
| replace_existing=True, | |
| ) | |
| return jsonify({"status": "seeder job queued"}), 202 | |
| # AUTH ENDPOINTS | |
| def login(): | |
| data = request.json or {} | |
| username = data.get("username", "") | |
| password = data.get("password", "") | |
| if username != os.getenv("UI_AUTH_USERNAME") or password != os.getenv( | |
| "UI_AUTH_PASSWORD" | |
| ): | |
| return jsonify({"msg": "Bad username or password"}), 401 | |
| token = create_access_token(identity=username) | |
| return jsonify({"access_token": token}) | |
| def me(): | |
| return jsonify({"user": get_jwt_identity()}) | |
| def logout(): | |
| # MVP: client deletes token; real logout would use token revocation/denylist | |
| return jsonify({"ok": True}) | |
| # --------------------------------------------------------- | |
| # 1. Alert Webhook (Simulates an alert triggering a Case) | |
| # --------------------------------------------------------- | |
| def receive_alert(): | |
| data = request.json | |
| case_id = f"CASE-{uuid.uuid4().hex[:6].upper()}" | |
| db["cases"][case_id] = { | |
| "id": case_id, | |
| "status": "OPEN", | |
| "severity": data.get("severity", "P1"), | |
| "title": data.get("title", "High 500 Error Rate detected"), | |
| "service": data.get("service", "checkout-api"), | |
| "start_time": data.get("start_time"), | |
| "end_time": data.get("end_time"), | |
| "created_at": now_iso(), | |
| "analysis": None, | |
| } | |
| add_audit_log( | |
| case_id, "System", "CASE_CREATED", f"Alert triggered for {data.get('service')}" | |
| ) | |
| return jsonify({"message": "Case created", "case_id": case_id}), 201 | |
| # --------------------------------------------------------- | |
| # 2. Analyze (Calls Elastic Agent via Microsoft Agent Framework) | |
| # --------------------------------------------------------- | |
| async def analyze_case(case_id): | |
| case = db["cases"].get(case_id) | |
| if not case: | |
| return jsonify({"error": "Case not found"}), 404 | |
| prompt = ( | |
| f"An alert fired for {case['service']} between {case['start_time']} and {case['end_time']}. " | |
| "Please investigate this using your tools. Follow your system instructions to find the root cause " | |
| "and return ONLY the raw JSON format with causal_chain, hypotheses, and mitigations. " | |
| "Do not include markdown blocks like ```json." | |
| ) | |
| add_audit_log( | |
| case_id, "System", "ANALYSIS_STARTED", "Sent context to Elastic AI Agent" | |
| ) | |
| # The Kibana A2A base URL | |
| a2a_agent_host = f"{KIBANA_URL}/api/agent_builder/a2a" | |
| custom_headers = {"Authorization": f"ApiKey {ES_API_KEY}", "kbn-xsrf": "true"} | |
| try: | |
| # Use httpx AsyncClient as required by the Agent Framework | |
| async with httpx.AsyncClient( | |
| timeout=120.0, headers=custom_headers | |
| ) as http_client: | |
| # 1. Resolve the A2A Agent Card using the Agent ID | |
| resolver = A2ACardResolver( | |
| httpx_client=http_client, base_url=a2a_agent_host | |
| ) | |
| # The Agent Card path uses your Agent ID | |
| agent_card = await resolver.get_agent_card( | |
| relative_card_path=f"/{AGENT_ID}.json" | |
| ) | |
| print(f"Found Agent: {agent_card.name} - {agent_card.description}") | |
| # 2. Use the Agent Framework to connect to the Elastic Agent | |
| agent = A2AAgent( | |
| name=agent_card.name, | |
| description=agent_card.description, | |
| agent_card=agent_card, | |
| url=a2a_agent_host, | |
| http_client=http_client, | |
| ) | |
| print("Sending prompt to Elastic A2A agent...") | |
| # 3. Execute the Run command (this handles the JSON-RPC complexity automatically) | |
| response = await agent.run(prompt) | |
| # Extract the text from the response | |
| agent_reply = "" | |
| for message in response.messages: | |
| agent_reply += message.text | |
| # Clean up Markdown formatting if the LLM accidentally added it | |
| if agent_reply.startswith("```json"): | |
| agent_reply = agent_reply.strip("```json").strip("```").strip() | |
| analysis_data = json.loads(agent_reply) | |
| case["analysis"] = analysis_data | |
| case["status"] = "INVESTIGATED" | |
| # Auto-create PENDING actions from the agent's mitigations | |
| for mitigation in analysis_data.get("mitigations", []): | |
| action_id = f"ACT-{uuid.uuid4().hex[:6].upper()}" | |
| db["actions"][action_id] = { | |
| "id": action_id, | |
| "case_id": case_id, | |
| "type": mitigation.get("type", "UNKNOWN"), | |
| "action": mitigation.get("action", ""), | |
| "status": "PENDING", | |
| "created_at": now_iso(), | |
| } | |
| add_audit_log( | |
| case_id, | |
| "System", | |
| "ANALYSIS_COMPLETE", | |
| "Agent identified root cause and proposed mitigations", | |
| ) | |
| return jsonify( | |
| { | |
| "message": "Analysis complete", | |
| "analysis": analysis_data, | |
| "actions": db["actions"], | |
| } | |
| ) | |
| except Exception as e: | |
| print("Error calling agent via Agent Framework:", e) | |
| return jsonify({"error": str(e)}), 500 | |
| # --------------------------------------------------------- | |
| # 3. Human-in-the-Loop (HITL) Approval | |
| # --------------------------------------------------------- | |
| def approve_action(action_id): | |
| action = db["actions"].get(action_id) | |
| if not action: | |
| # return jsonify({"error": "Action not found"}), 404 | |
| db["actions"][action_id] = { | |
| "id": action_id, | |
| "case_id": "CASE-34225", | |
| "type": "patch-fix", | |
| "action": "Apply patch of error logs to checkout-api to fix the vulnerability", | |
| "status": "PENDING", | |
| "created_at": now_iso(), | |
| } | |
| action = db["actions"][action_id] | |
| data = request.json or {} | |
| user = data.get("user", "OnCall-Engineer-1") | |
| action["status"] = "APPROVED" | |
| action["approved_by"] = user | |
| action["approved_at"] = now_iso() | |
| add_audit_log( | |
| action["case_id"], | |
| user, | |
| "ACTION_APPROVED", | |
| f"Approved execution of {action['type']}", | |
| ) | |
| return jsonify({"message": "Action approved", "action": action}) | |
| # --------------------------------------------------------- | |
| # 4. Execute Action (Calls the Mock Executor) | |
| # --------------------------------------------------------- | |
| def execute_action(action_id): | |
| action = db["actions"].get(action_id) | |
| if not action: | |
| db["actions"][action_id] = { | |
| "id": action_id, | |
| "case_id": "CASE-34225", | |
| "type": "patch-fix", | |
| "action": "Apply patch of error logs to checkout-api to fix the vulnerability", | |
| "status": "PENDING", | |
| "created_at": now_iso(), | |
| } | |
| action = db["actions"][action_id] | |
| # return jsonify({"error": "Action not found"}), 404 | |
| if action["status"] != "APPROVED": | |
| return ( | |
| jsonify({"error": f"Cannot execute action in status: {action['status']}"}), | |
| 400, | |
| ) | |
| action["status"] = "RUNNING" | |
| add_audit_log( | |
| action["case_id"], | |
| "System", | |
| "EXECUTION_STARTED", | |
| f"Running runbook for {action['type']}", | |
| ) | |
| # perform the mock execution directly rather than via HTTP | |
| result, status_code = mock_executor_logic( | |
| { | |
| "action": action["action"], | |
| "type": action["type"], | |
| } | |
| ) | |
| if status_code == 200 and result.get("status") == "OK": | |
| action["status"] = "SUCCESS" | |
| add_audit_log( | |
| action["case_id"], | |
| "MockExecutor", | |
| "EXECUTION_SUCCESS", | |
| "Runbook completed successfully", | |
| ) | |
| else: | |
| action["status"] = "FAILED" | |
| add_audit_log( | |
| action["case_id"], "MockExecutor", "EXECUTION_FAILED", "Runbook failed" | |
| ) | |
| return jsonify({"message": "Execution finished", "action": action}) | |
| # --------------------------------------------------------- | |
| # helper for mock execution logic (used by both the API handler and direct calls) | |
| # --------------------------------------------------------- | |
| def mock_executor_logic(data: dict): | |
| """Simulate performing an action. Returns tuple (response_dict, status_code).""" | |
| import time | |
| time.sleep(2) | |
| print( | |
| f"[MOCK EXECUTOR] Successfully applied {data.get('type')}: {data.get('action')}" | |
| ) | |
| return {"status": "OK", "run_id": f"RUN-{uuid.uuid4().hex[:6]}"}, 200 | |
| # --------------------------------------------------------- | |
| # 5. Mock HTTP Executor | |
| # --------------------------------------------------------- | |
| def mock_executor(): | |
| # this route is retained for compatibility but now simply invokes the | |
| # shared logic rather than duplicating it or being called by other | |
| # endpoints. | |
| result, status_code = mock_executor_logic(request.json or {}) | |
| return jsonify(result), status_code | |
| # --------------------------------------------------------- | |
| # 6. Read Endpoints | |
| # --------------------------------------------------------- | |
| def get_cases(): | |
| return jsonify(list(db["cases"].values())) | |
| def get_case(case_id): | |
| case = db["cases"].get(case_id) | |
| if not case: | |
| return jsonify({"error": "Not found"}), 404 | |
| case_actions = [a for a in db["actions"].values() if a["case_id"] == case_id] | |
| case_audits = [l for l in db["audit_logs"] if l["case_id"] == case_id] | |
| return jsonify({"case": case, "actions": case_actions, "audit_logs": case_audits}) | |
| def delete_all_cases(): | |
| count = len(db["cases"]) | |
| db["cases"].clear() | |
| db["actions"].clear() | |
| db["audit_logs"].clear() | |
| return ( | |
| jsonify( | |
| {"status": "ok", "message": f"Deleted {count} case(s) and all related data"} | |
| ), | |
| 200, | |
| ) | |
| def health(): | |
| return jsonify({"status": "ok"}), 200 | |
| if __name__ == "__main__": | |
| app.run(port=5000, debug=True) | |