import os import json import argparse from typing import TypedDict, List, Dict, Any from dotenv import load_dotenv from src.providers import GitLabProvider, GCPProvider, GeminiBrain # LangGraph Imports from langgraph.graph import StateGraph, END load_dotenv() # ========================================== # 1. State Definition # ========================================== class AgentState(TypedDict): project_id: str mr_iid: str gl_provider: Any gcp_provider: Any brain_provider: Any stage: str # Context Data mr_data: Dict[str, Any] log_data: str gcp_data: str # Generated Outputs raw_output: str report_markdown: str code_quality: List[Dict[str, Any]] metadata: Dict[str, Any] mermaid_diagram: str error: str critical_failure: bool early_exit: bool cost_estimation: str test_report: str devops_report: str # ========================================== # 2. Agent Nodes (The Graph) # ========================================== def librarian_node(state: AgentState): print("[*] Agent: Librarian ๐Ÿ“š - Fetching GitLab Context & Job Logs...") mr_data = state["gl_provider"].fetch_mr_bundle(state["mr_iid"]) if not mr_data: return {"error": "Failed to fetch MR Data. Check project ID and MR IID."} log_data = state["gl_provider"].fetch_job_logs() if not mr_data.get("diff", "").strip(): print(" -> No actionable files found in MR diff. Triggering early exit.") return {"mr_data": mr_data, "log_data": log_data, "early_exit": True} return {"mr_data": mr_data, "log_data": log_data} def sentinel_node(state: AgentState): if state.get("error") or state.get("early_exit"): return state print("[*] Agent: Sentinel ๐Ÿ›ก๏ธ - Fetching GCP Production Signals...") gcp_data = state["gcp_provider"].get_context() return {"gcp_data": gcp_data} def reasoner_node(state: AgentState): if state.get("error") or state.get("early_exit"): return state print("[*] Agent: Reasoner ๐Ÿง  - Synthesizing Intelligence Report & Automations...") try: raw = state["brain_provider"].synthesize(state["mr_data"], state["gcp_data"], state["log_data"]) report, cq, meta = state["brain_provider"].parse_response(raw) return { "raw_output": raw, "report_markdown": report, "code_quality": cq, "metadata": meta } except Exception as e: return {"error": f"Brain failed to synthesize: {e}"} def cost_estimator_node(state: AgentState): if state.get("error") or state.get("early_exit"): return state print("[*] Agent: FinOps ๐Ÿ’ธ - Estimating Production Cost Impact...") try: return {"cost_estimation": state["brain_provider"].estimate_cost(state["mr_data"])} except Exception as e: return {"cost_estimation": "*Cost estimation failed.*"} def devops_node(state: AgentState): if state.get("error") or state.get("early_exit"): return state print("[*] Agent: DevOps โš™๏ธ - Auditing Infrastructure and Reliability...") try: return {"devops_report": state["brain_provider"].review_devops(state["mr_data"])} except Exception as e: return {"devops_report": "*DevOps audit failed.*"} def test_engineer_node(state: AgentState): if state.get("error") or state.get("early_exit"): return state print("[*] Agent: Test Engineer ๐Ÿงช - Generating & Validating CI Tests...") try: import subprocess import tempfile import os test_code = state["brain_provider"].generate_tests(state["mr_data"]) if "NO_TESTS_NEEDED" in test_code: return {"test_report": "### ๐Ÿงช Automated Testing Report\n\nNo explicitly testable business logic detected in this MR."} # Safely create a temporary Python file try: # Add a safety check for forbidden keywords (hallucinated mocks) forbidden = ["patch(", "@patch", "unittest.mock", "MagicMock", "autospec="] if any(f in test_code for f in forbidden): print(" -> โš ๏ธ AI hallucinated mocks/patch! Rejecting test suite for safety.") return {"test_report": "### ๐Ÿงช Automated Testing Report\n\nAI generated a test suite with forbidden mocks/patch. I've rejected it to maintain test integrity. Please review your business logic's testability."} # Write directly to the target project root (..) so Python resolves local imports natively test_filename = "../test_ai_generated.py" with open(test_filename, "w", encoding="utf-8") as f: f.write(test_code) env = os.environ.copy() psep = ";" if os.name == "nt" else ":" env["PYTHONPATH"] = f"{env.get('PYTHONPATH', '')}{psep}{os.path.abspath('..')}" print(f" -> Executing generated pytest suite locally ({os.path.basename(test_filename)})...") result = subprocess.run(["pytest", test_filename, "-v", "--junitxml=junit-report.xml"], capture_output=True, text=True, timeout=30, env=env) print(f" [PYTEST STDOUT]\n{result.stdout}\n [PYTEST STDERR]\n{result.stderr}") report = "### ๐Ÿงช Automated Testing Report\n\nI generated Unit Tests for the new feature and ran them automatically in the CI environment.\n\n" if result.returncode == 0: report += "โœ… **Tests Passed!** The new logic appears solid based on my scenarios. I have committed these tests to your branch!\n\n" print(" -> ๐Ÿš€ Tests passed natively! Injecting them back into the MR repository...") state["gl_provider"].commit_test_file( state["mr_iid"], f"tests/test_ai_generated.py", test_code, commit_message="test: add Context Brain auto-generated test suite" ) else: report += "โŒ **Tests Failed natively!** The new logic failed validation against my generated scenarios.\n\n" state["critical_failure"] = True report += "
View Pytest Execution Logs\n\n```text\nSTDOUT:\n" + result.stdout[-1500:] + "\nSTDERR:\n" + result.stderr[-1000:] + "\n```\n
" return {"test_report": report} finally: # Clean up the temporary file immediately after execution try: os.remove(test_filename) except OSError: pass except subprocess.TimeoutExpired: return {"test_report": "### ๐Ÿงช Automated Testing Report\n\n*Execute failed: Tests timed out.*"} except Exception as e: return {"test_report": f"### ๐Ÿงช Automated Testing Report\n\n*Execute failed: {e}*"} def reporter_node(state: AgentState): if state.get("error"): print(f"โŒ PIPELINE HALTED: {state['error']}") return state if state.get("early_exit"): print(" -> Early Exit. No actionable codebase diffs.") return {} print("[*] Agent: Reporter ๐Ÿ“ข - Dispatching Data to Artifacts...") gl = state["gl_provider"] mr_iid = state["mr_iid"] stage = state.get("stage", "all") # Action A: Labels ONLY (Description is deferred to Assembler) meta = state.get("metadata", {}) if meta and stage in ["analyze", "all"]: print(f" -> Applying Intelligent Labels: {meta.get('labels')}") gl.update_mr_metadata(mr_iid, labels=meta.get('labels')) # Action B: Code Quality SAST and Inline Suggestions cq = state.get("code_quality", []) if cq and stage in ["analyze", "all"]: gl_cq = [] critical_issues = [] for issue in cq: gl_cq.append({ "description": issue.get("description", "Issue found"), "check_name": "ContextBrainAudit", "fingerprint": f"{issue.get('file')}-{issue.get('line')}", "severity": issue.get("severity", "major"), "location": {"path": issue.get("file"), "lines": {"begin": int(issue.get("line", 1))}} }) if str(issue.get("severity", "")).lower() in ["critical", "blocker", "high"]: critical_issues.append(issue) if issue.get("suggestion"): gl.create_inline_suggestion(mr_iid, issue.get("file"), issue.get("line"), issue.get("suggestion"), discussion_body=issue.get("description")) with open("gl-code-quality-report.json", "w") as f: json.dump(gl_cq, f, indent=2) if critical_issues: import uuid sast = {"version": "15.0.0", "vulnerabilities": []} for issue in critical_issues: sast["vulnerabilities"].append({ "id": str(uuid.uuid4()), "category": "sast", "name": "Context Brain Security Review", "message": issue.get("description", "Vulnerability"), "severity": "High", "scanner": {"id": "context_brain", "name": "Context Brain"}, "location": {"file": issue.get("file"), "start_line": int(issue.get("line", 1))} }) with open("gl-sast-report.json", "w") as f: json.dump(sast, f) print(" -> โ›” Critical issues found. Applying Security SAST artifact.") # Action C: Compile Local Summaries instead of Spamming Comments if stage in ["analyze", "all"]: body = state.get("report_markdown", "") or state.get("raw_output", "") with open("analyze_results.json", "w") as f: json.dump({"summary": meta.get("summary", ""), "body": body, "issues": len(cq)}, f) elif stage == "finops": with open("finops_results.json", "w") as f: json.dump({"cost": state.get("cost_estimation", "")}, f) elif stage == "devops": with open("devops_results.json", "w") as f: json.dump({"report": state.get("devops_report", "")}, f) elif stage == "test": with open("test_results.json", "w") as f: json.dump({"passed": "โœ… Tests Passed" in state.get("test_report", ""), "report": state.get("test_report", "")}, f) return {} def results_assembler_node(state: AgentState): print("[*] Agent: Assembler ๐Ÿ“Š - Building Native GitLab UX...") an_data, fin_data, test_data, dev_data = {}, {}, {}, {} if os.path.exists("analyze_results.json"): with open("analyze_results.json") as f: an_data = json.load(f) print(" -> โœ… Loaded analyze_results.json") else: print(" -> โš ๏ธ analyze_results.json NOT FOUND - check artifact paths") if os.path.exists("finops_results.json"): with open("finops_results.json") as f: fin_data = json.load(f) print(" -> โœ… Loaded finops_results.json") else: print(" -> โš ๏ธ finops_results.json NOT FOUND - check artifact paths") if os.path.exists("devops_results.json"): with open("devops_results.json") as f: dev_data = json.load(f) print(" -> โœ… Loaded devops_results.json") else: print(" -> โš ๏ธ devops_results.json NOT FOUND - check artifact paths") if os.path.exists("test_results.json"): with open("test_results.json") as f: test_data = json.load(f) print(" -> โœ… Loaded test_results.json") else: print(" -> โš ๏ธ test_results.json NOT FOUND - check artifact paths") # 1. Update MR Description summary = an_data.get("summary", "Complete Platform Insights delivered.") cost = fin_data.get("cost", "No significant cost impact.") devops_txt = dev_data.get("report", "Infrastructure verified.") test_status = "โœ… **Passed** (Secure)" if test_data.get("passed") else "โŒ **Failed** (Security/Logic Risks)" # Premium MR Header Injection mr_injection = ( "### ๐Ÿง  Context Brain: Intelligence Report\n" f"**Analysis**: {summary}\n" f"- **FinOps**: {cost[:200]}\n" f"- **DevOps**: {devops_txt[:200]}\n" f"- **Tests**: {test_status}\n\n" "---" ) state["gl_provider"].update_mr_metadata(state["mr_iid"], description_prefix=mr_injection) # 2. Detailed Comment Thread full_report = ( "## ๐Ÿงฉ Context Brain: Full Platform Audit\n\n" f"#### ๐Ÿ•ต๏ธ Code Analyst Audit\n{an_data.get('body', 'No insights.')}\n\n" f"#### ๐Ÿ’ธ Cloud FinOps & Cost Logic\n{fin_data.get('cost', 'Optimized.')}\n\n" f"#### โš™๏ธ DevOps & SRE Review\n{dev_data.get('report', 'Ready for Prod.')}\n\n" f"#### ๐Ÿงช Quality SDET Validation\n{test_data.get('report', 'No logic tested.')}\n\n" "---\n*Powered by Context Brain LangGraph Engine*" ) state["gl_provider"].post_comment(state["mr_iid"], full_report) print(" -> ๐Ÿ“ Full Markdown Report posted as thread comment!") # 2. Generate metrics.txt for MR Widget with open("metrics.txt", "w") as f: f.write(f"context_brain_issues {an_data.get('issues', 0)}\n") f.write(f"context_brain_test_pass_rate {100 if test_data.get('passed') else 0}\n") # 3. PREMIUM HTML Dashboard html_template = f""" Context Brain Intelligence Dashboard

๐Ÿง  Context Brain

Intelligence Platform Audit Dashboard

๐Ÿ•ต๏ธ Code Quality Analysis

{an_data.get('body', 'No critical issues found.').replace('```', '')}

๐Ÿ’ธ FinOps & Cost Impact

{fin_data.get('cost', 'Cost efficient.')}

โš™๏ธ DevOps & SRE Signals

{dev_data.get('report', 'Infrastructure verified.')}

๐Ÿงช Automated SDET Tests

{test_data.get('report', 'No functional tests executed.')}
""" with open("dashboard.html", "w", encoding="utf-8") as f: f.write(html_template) print(" -> ๐ŸŽจ Native UI Assets generated (Metrics Widget, Premium HTML Dashboard)!") return {} # ========================================== # 3. Main Orchestrator # ========================================== def build_and_run_graph(project_id, mr_iid, stage="all"): print(f"--- ๐Ÿง  Context Brain LangGraph Workflow Initializing (Project {project_id} / MR {mr_iid} / Stage {stage}) ---") gl_token = os.getenv("GITLAB_TOKEN") gcp_project = os.getenv("GCP_PROJECT_ID") ai_key = os.getenv("GOOGLE_API_KEY") if not gl_token or not ai_key: print("โŒ CRITICAL: Missing required API keys. Check .env file.") return # Build Providers gl = GitLabProvider(project_id, gl_token) gcp = GCPProvider(gcp_project) brain = GeminiBrain(ai_key) workflow = StateGraph(AgentState) # Check for cached context cached_context = {} if os.path.exists("context.json"): with open("context.json", "r") as f: cached_context = json.load(f) # Define an Injection Node that seeds the graph with cached state def context_injector_node(state: AgentState): print("[*] โšก Loading Shared Fast-Context from context.json Artifact...") return { "mr_data": cached_context.get("mr_data", {}), "log_data": cached_context.get("log_data", ""), "gcp_data": cached_context.get("gcp_data", ""), "early_exit": cached_context.get("early_exit", False) } def prep_reporter(state: AgentState): print(f"[*] ๐Ÿ’พ Caching Context state to disk for parallel parallel nodes...") with open("context.json", "w") as f: json.dump({ "mr_data": state.get("mr_data"), "log_data": state.get("log_data"), "gcp_data": state.get("gcp_data"), "early_exit": state.get("early_exit") }, f) return {} if stage == "prep": workflow.add_node("Librarian", librarian_node) workflow.add_node("Sentinel", sentinel_node) workflow.add_node("PrepReporter", prep_reporter) workflow.set_entry_point("Librarian") workflow.add_edge("Librarian", "Sentinel") workflow.add_edge("Sentinel", "PrepReporter") workflow.add_edge("PrepReporter", END) else: # For execution stages, skip Librarian and Sentinel if cached if cached_context: workflow.add_node("Injector", context_injector_node) entry = "Injector" else: workflow.add_node("Librarian", librarian_node) workflow.add_node("Sentinel", sentinel_node) workflow.add_edge("Librarian", "Sentinel") entry = "Librarian" workflow.set_entry_point(entry) if stage == "analyze" or stage == "all": workflow.add_node("Reasoner", reasoner_node) workflow.add_node("Reporter", reporter_node) if cached_context: workflow.add_edge("Injector", "Reasoner") else: workflow.add_edge("Sentinel", "Reasoner") workflow.add_edge("Reasoner", "Reporter") workflow.add_edge("Reporter", END) elif stage == "finops": workflow.add_node("CostEstimator", cost_estimator_node) workflow.add_node("Reporter", reporter_node) if cached_context: workflow.add_edge("Injector", "CostEstimator") else: workflow.add_edge("Sentinel", "CostEstimator") workflow.add_edge("CostEstimator", "Reporter") workflow.add_edge("Reporter", END) elif stage == "devops": workflow.add_node("DevOps", devops_node) workflow.add_node("Reporter", reporter_node) if cached_context: workflow.add_edge("Injector", "DevOps") else: workflow.add_edge("Sentinel", "DevOps") workflow.add_edge("DevOps", "Reporter") workflow.add_edge("Reporter", END) elif stage == "test": workflow.add_node("TestEngineer", test_engineer_node) workflow.add_node("Reporter", reporter_node) if cached_context: workflow.add_edge("Injector", "TestEngineer") else: workflow.add_edge("Sentinel", "TestEngineer") workflow.add_edge("TestEngineer", "Reporter") workflow.add_edge("Reporter", END) elif stage == "results": workflow.add_node("Assembler", results_assembler_node) if cached_context: workflow.add_edge("Injector", "Assembler") else: workflow.add_edge("Sentinel", "Assembler") workflow.add_edge("Assembler", END) app = workflow.compile() # Run the Pipeline initial_state = { "project_id": project_id, "mr_iid": mr_iid, "gl_provider": gl, "gcp_provider": gcp, "brain_provider": brain, "stage": stage, "mermaid_diagram": "", "error": "", "early_exit": False, "cost_estimation": "", "test_report": "", "devops_report": "" } print("\n๐Ÿš€ EXECUTING LANGGRAPH PIPELINE...\n") final_state = app.invoke(initial_state) print("\nโœ… PIPELINE COMPLETE.") if final_state.get("critical_failure"): print("\nโ›” EXITED WITH CRITICAL SECURITY ISSUES. FAILING PIPELINE.") import sys sys.exit(1) if __name__ == "__main__": parser = argparse.ArgumentParser(description="GitLab Context Brain LangGraph Agent") parser.add_argument("--project_id", default=os.getenv("CI_PROJECT_ID"), help="GitLab Project ID") parser.add_argument("--mr_iid", default=os.getenv("CI_MERGE_REQUEST_IID"), help="Merge Request IID") parser.add_argument("--stage", default="all", help="Pipeline Stage (analyze, finops, test)") args = parser.parse_args() if os.getenv("AI_FLOW_CONTEXT"): try: raw_context = json.loads(os.getenv("AI_FLOW_CONTEXT")) args.mr_iid = raw_context.get("iid") except: pass if args.project_id and args.mr_iid: build_and_run_graph(args.project_id, args.mr_iid, args.stage) else: print("โŒ ERROR: Missing project_id or mr_iid.")