import os import json import uuid import httpx import asyncio from datetime import datetime, timezone from flask import Flask, request, jsonify from flask_jwt_extended import ( JWTManager, create_access_token, jwt_required, get_jwt_identity, ) import secrets from flask_cors import CORS from dotenv import load_dotenv # Import the Microsoft Agent Framework A2A classes from a2a.client import A2ACardResolver from agent_framework.a2a import A2AAgent import subprocess, sys, os, logging from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import atexit load_dotenv() # ── SCHEDULER SETUP (add this block after your Flask app is created) ────────── # Resolve the absolute path to seed_logs.py so the job works regardless of # the working directory Flask was launched from. BACKEND_DIR = os.path.dirname(os.path.abspath(__file__)) # .../backend/ SEED_SCRIPT = os.path.join(BACKEND_DIR, "seed_logs.py") def run_seed_logs(): """ Spawns seed_logs.py as a subprocess using the same Python interpreter that is running Flask. stdout/stderr are forwarded to the Flask logger so you can see seeder output in your server logs. """ logging.info("[Scheduler] Starting daily seed_logs.py run...") try: result = subprocess.run( [sys.executable, SEED_SCRIPT], # same venv Python → same packages capture_output=True, text=True, cwd=BACKEND_DIR, # run from backend/ so .env is found ) if result.returncode == 0: logging.info( "[Scheduler] seed_logs.py completed successfully.\n%s", result.stdout ) else: logging.error( "[Scheduler] seed_logs.py failed (exit %d).\nSTDOUT: %s\nSTDERR: %s", result.returncode, result.stdout, result.stderr, ) except Exception as exc: logging.exception("[Scheduler] Unexpected error running seed_logs.py: %s", exc) app = Flask(__name__) app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY") or secrets.token_hex(32) jwt = JWTManager(app) CORS(app) # Create the scheduler (BackgroundScheduler runs in a daemon thread — # it doesn't block Flask and stops automatically when the process exits). scheduler = BackgroundScheduler(timezone="UTC") scheduler.add_job( func=run_seed_logs, trigger=CronTrigger( hour=0, # midnight UTC minute=0, second=0, timezone="UTC", ), id="daily_seed_logs", name="Daily seed_logs.py at 00:00 UTC", replace_existing=True, # safe to call even if job already registered misfire_grace_time=60 * 5, # if the server was down at midnight, run # the job within the next 5 minutes instead # of skipping it entirely ) scheduler.start() logging.info( "[Scheduler] APScheduler started. Next seed run: %s", scheduler.get_job("daily_seed_logs").next_run_time, ) # Cleanly shut the scheduler down when Flask/Gunicorn exits atexit.register(lambda: scheduler.shutdown(wait=False)) # --------------------------------------------------------- # Configuration # --------------------------------------------------------- # NOTE: Make sure this is your Kibana URL (e.g. https://xxxx.kb.asia-south1.gcp.elastic-cloud.com) KIBANA_URL = os.getenv("KIBANA_URL", "http://localhost:5601").rstrip("/") ES_API_KEY = os.getenv("ES_API_KEY") AGENT_ID = os.getenv("AGENT_ID") # --------------------------------------------------------- # Mock Database (In-Memory for MVP) # --------------------------------------------------------- db = {"cases": {}, "actions": {}, "audit_logs": []} def now_iso(): return datetime.now(timezone.utc).isoformat() def add_audit_log(case_id, actor, action_type, details): log = { "id": uuid.uuid4().hex[:8], "case_id": case_id, "actor": actor, "action_type": action_type, "details": details, "timestamp": now_iso(), } db["audit_logs"].append(log) print(f"[AUDIT] {action_type}: {details}") @app.route("/api/admin/seed", methods=["POST"]) def trigger_seed(): """Manually kick off seed_logs.py — protected by a simple header check.""" scheduler.add_job( func=run_seed_logs, id="manual_seed", replace_existing=True, ) return jsonify({"status": "seeder job queued"}), 202 # AUTH ENDPOINTS @app.route("/api/auth/login", methods=["POST"]) def login(): data = request.json or {} username = data.get("username", "") password = data.get("password", "") if username != os.getenv("UI_AUTH_USERNAME") or password != os.getenv( "UI_AUTH_PASSWORD" ): return jsonify({"msg": "Bad username or password"}), 401 token = create_access_token(identity=username) return jsonify({"access_token": token}) @app.route("/api/auth/me", methods=["GET"]) @jwt_required() def me(): return jsonify({"user": get_jwt_identity()}) @app.route("/api/auth/logout", methods=["POST"]) @jwt_required() def logout(): # MVP: client deletes token; real logout would use token revocation/denylist return jsonify({"ok": True}) # --------------------------------------------------------- # 1. Alert Webhook (Simulates an alert triggering a Case) # --------------------------------------------------------- @app.route("/api/alerts", methods=["POST"]) @jwt_required() def receive_alert(): data = request.json case_id = f"CASE-{uuid.uuid4().hex[:6].upper()}" db["cases"][case_id] = { "id": case_id, "status": "OPEN", "severity": data.get("severity", "P1"), "title": data.get("title", "High 500 Error Rate detected"), "service": data.get("service", "checkout-api"), "start_time": data.get("start_time"), "end_time": data.get("end_time"), "created_at": now_iso(), "analysis": None, } add_audit_log( case_id, "System", "CASE_CREATED", f"Alert triggered for {data.get('service')}" ) return jsonify({"message": "Case created", "case_id": case_id}), 201 # --------------------------------------------------------- # 2. Analyze (Calls Elastic Agent via Microsoft Agent Framework) # --------------------------------------------------------- @app.route("/api/cases//analyze", methods=["POST"]) @jwt_required() async def analyze_case(case_id): case = db["cases"].get(case_id) if not case: return jsonify({"error": "Case not found"}), 404 prompt = ( f"An alert fired for {case['service']} between {case['start_time']} and {case['end_time']}. " "Please investigate this using your tools. Follow your system instructions to find the root cause " "and return ONLY the raw JSON format with causal_chain, hypotheses, and mitigations. " "Do not include markdown blocks like ```json." ) add_audit_log( case_id, "System", "ANALYSIS_STARTED", "Sent context to Elastic AI Agent" ) # The Kibana A2A base URL a2a_agent_host = f"{KIBANA_URL}/api/agent_builder/a2a" custom_headers = {"Authorization": f"ApiKey {ES_API_KEY}", "kbn-xsrf": "true"} try: # Use httpx AsyncClient as required by the Agent Framework async with httpx.AsyncClient( timeout=120.0, headers=custom_headers ) as http_client: # 1. Resolve the A2A Agent Card using the Agent ID resolver = A2ACardResolver( httpx_client=http_client, base_url=a2a_agent_host ) # The Agent Card path uses your Agent ID agent_card = await resolver.get_agent_card( relative_card_path=f"/{AGENT_ID}.json" ) print(f"Found Agent: {agent_card.name} - {agent_card.description}") # 2. Use the Agent Framework to connect to the Elastic Agent agent = A2AAgent( name=agent_card.name, description=agent_card.description, agent_card=agent_card, url=a2a_agent_host, http_client=http_client, ) print("Sending prompt to Elastic A2A agent...") # 3. Execute the Run command (this handles the JSON-RPC complexity automatically) response = await agent.run(prompt) # Extract the text from the response agent_reply = "" for message in response.messages: agent_reply += message.text # Clean up Markdown formatting if the LLM accidentally added it if agent_reply.startswith("```json"): agent_reply = agent_reply.strip("```json").strip("```").strip() analysis_data = json.loads(agent_reply) case["analysis"] = analysis_data case["status"] = "INVESTIGATED" # Auto-create PENDING actions from the agent's mitigations for mitigation in analysis_data.get("mitigations", []): action_id = f"ACT-{uuid.uuid4().hex[:6].upper()}" db["actions"][action_id] = { "id": action_id, "case_id": case_id, "type": mitigation.get("type", "UNKNOWN"), "action": mitigation.get("action", ""), "status": "PENDING", "created_at": now_iso(), } add_audit_log( case_id, "System", "ANALYSIS_COMPLETE", "Agent identified root cause and proposed mitigations", ) return jsonify( { "message": "Analysis complete", "analysis": analysis_data, "actions": db["actions"], } ) except Exception as e: print("Error calling agent via Agent Framework:", e) return jsonify({"error": str(e)}), 500 # --------------------------------------------------------- # 3. Human-in-the-Loop (HITL) Approval # --------------------------------------------------------- @app.route("/api/actions//approve", methods=["POST"]) @jwt_required() def approve_action(action_id): action = db["actions"].get(action_id) if not action: # return jsonify({"error": "Action not found"}), 404 db["actions"][action_id] = { "id": action_id, "case_id": "CASE-34225", "type": "patch-fix", "action": "Apply patch of error logs to checkout-api to fix the vulnerability", "status": "PENDING", "created_at": now_iso(), } action = db["actions"][action_id] data = request.json or {} user = data.get("user", "OnCall-Engineer-1") action["status"] = "APPROVED" action["approved_by"] = user action["approved_at"] = now_iso() add_audit_log( action["case_id"], user, "ACTION_APPROVED", f"Approved execution of {action['type']}", ) return jsonify({"message": "Action approved", "action": action}) # --------------------------------------------------------- # 4. Execute Action (Calls the Mock Executor) # --------------------------------------------------------- @app.route("/api/actions//execute", methods=["POST"]) @jwt_required() def execute_action(action_id): action = db["actions"].get(action_id) if not action: db["actions"][action_id] = { "id": action_id, "case_id": "CASE-34225", "type": "patch-fix", "action": "Apply patch of error logs to checkout-api to fix the vulnerability", "status": "PENDING", "created_at": now_iso(), } action = db["actions"][action_id] # return jsonify({"error": "Action not found"}), 404 if action["status"] != "APPROVED": return ( jsonify({"error": f"Cannot execute action in status: {action['status']}"}), 400, ) action["status"] = "RUNNING" add_audit_log( action["case_id"], "System", "EXECUTION_STARTED", f"Running runbook for {action['type']}", ) # perform the mock execution directly rather than via HTTP result, status_code = mock_executor_logic( { "action": action["action"], "type": action["type"], } ) if status_code == 200 and result.get("status") == "OK": action["status"] = "SUCCESS" add_audit_log( action["case_id"], "MockExecutor", "EXECUTION_SUCCESS", "Runbook completed successfully", ) else: action["status"] = "FAILED" add_audit_log( action["case_id"], "MockExecutor", "EXECUTION_FAILED", "Runbook failed" ) return jsonify({"message": "Execution finished", "action": action}) # --------------------------------------------------------- # helper for mock execution logic (used by both the API handler and direct calls) # --------------------------------------------------------- def mock_executor_logic(data: dict): """Simulate performing an action. Returns tuple (response_dict, status_code).""" import time time.sleep(2) print( f"[MOCK EXECUTOR] Successfully applied {data.get('type')}: {data.get('action')}" ) return {"status": "OK", "run_id": f"RUN-{uuid.uuid4().hex[:6]}"}, 200 # --------------------------------------------------------- # 5. Mock HTTP Executor # --------------------------------------------------------- @app.route("/api/mock_executor", methods=["POST"]) @jwt_required() def mock_executor(): # this route is retained for compatibility but now simply invokes the # shared logic rather than duplicating it or being called by other # endpoints. result, status_code = mock_executor_logic(request.json or {}) return jsonify(result), status_code # --------------------------------------------------------- # 6. Read Endpoints # --------------------------------------------------------- @app.route("/api/cases", methods=["GET"]) @jwt_required() def get_cases(): return jsonify(list(db["cases"].values())) @app.route("/api/cases/", methods=["GET"]) @jwt_required() def get_case(case_id): case = db["cases"].get(case_id) if not case: return jsonify({"error": "Not found"}), 404 case_actions = [a for a in db["actions"].values() if a["case_id"] == case_id] case_audits = [l for l in db["audit_logs"] if l["case_id"] == case_id] return jsonify({"case": case, "actions": case_actions, "audit_logs": case_audits}) @app.route("/api/cases", methods=["DELETE"]) @jwt_required() def delete_all_cases(): count = len(db["cases"]) db["cases"].clear() db["actions"].clear() db["audit_logs"].clear() return ( jsonify( {"status": "ok", "message": f"Deleted {count} case(s) and all related data"} ), 200, ) @app.route("/api/health") def health(): return jsonify({"status": "ok"}), 200 if __name__ == "__main__": app.run(port=5000, debug=True)