import os
import json
import argparse
from typing import TypedDict, List, Dict, Any
from dotenv import load_dotenv
from src.providers import GitLabProvider, GCPProvider, GeminiBrain
# LangGraph Imports
from langgraph.graph import StateGraph, END
load_dotenv()
# ==========================================
# 1. State Definition
# ==========================================
class AgentState(TypedDict):
project_id: str
mr_iid: str
gl_provider: Any
gcp_provider: Any
brain_provider: Any
stage: str
# Context Data
mr_data: Dict[str, Any]
log_data: str
gcp_data: str
# Generated Outputs
raw_output: str
report_markdown: str
code_quality: List[Dict[str, Any]]
metadata: Dict[str, Any]
mermaid_diagram: str
error: str
critical_failure: bool
early_exit: bool
cost_estimation: str
test_report: str
devops_report: str
# ==========================================
# 2. Agent Nodes (The Graph)
# ==========================================
def librarian_node(state: AgentState):
print("[*] Agent: Librarian ๐ - Fetching GitLab Context & Job Logs...")
mr_data = state["gl_provider"].fetch_mr_bundle(state["mr_iid"])
if not mr_data:
return {"error": "Failed to fetch MR Data. Check project ID and MR IID."}
log_data = state["gl_provider"].fetch_job_logs()
if not mr_data.get("diff", "").strip():
print(" -> No actionable files found in MR diff. Triggering early exit.")
return {"mr_data": mr_data, "log_data": log_data, "early_exit": True}
return {"mr_data": mr_data, "log_data": log_data}
def sentinel_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Sentinel ๐ก๏ธ - Fetching GCP Production Signals...")
gcp_data = state["gcp_provider"].get_context()
return {"gcp_data": gcp_data}
def reasoner_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Reasoner ๐ง - Synthesizing Intelligence Report & Automations...")
try:
raw = state["brain_provider"].synthesize(state["mr_data"], state["gcp_data"], state["log_data"])
report, cq, meta = state["brain_provider"].parse_response(raw)
return {
"raw_output": raw,
"report_markdown": report,
"code_quality": cq,
"metadata": meta
}
except Exception as e:
return {"error": f"Brain failed to synthesize: {e}"}
def cost_estimator_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: FinOps ๐ธ - Estimating Production Cost Impact...")
try: return {"cost_estimation": state["brain_provider"].estimate_cost(state["mr_data"])}
except Exception as e: return {"cost_estimation": "*Cost estimation failed.*"}
def devops_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: DevOps โ๏ธ - Auditing Infrastructure and Reliability...")
try: return {"devops_report": state["brain_provider"].review_devops(state["mr_data"])}
except Exception as e: return {"devops_report": "*DevOps audit failed.*"}
def test_engineer_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Test Engineer ๐งช - Generating & Validating CI Tests...")
try:
import subprocess
import tempfile
import os
test_code = state["brain_provider"].generate_tests(state["mr_data"])
if "NO_TESTS_NEEDED" in test_code:
return {"test_report": "### ๐งช Automated Testing Report\n\nNo explicitly testable business logic detected in this MR."}
# Safely create a temporary Python file
try:
# Add a safety check for forbidden keywords (hallucinated mocks)
forbidden = ["patch(", "@patch", "unittest.mock", "MagicMock", "autospec="]
if any(f in test_code for f in forbidden):
print(" -> โ ๏ธ AI hallucinated mocks/patch! Rejecting test suite for safety.")
return {"test_report": "### ๐งช Automated Testing Report\n\nAI generated a test suite with forbidden mocks/patch. I've rejected it to maintain test integrity. Please review your business logic's testability."}
# Write directly to the target project root (..) so Python resolves local imports natively
test_filename = "../test_ai_generated.py"
with open(test_filename, "w", encoding="utf-8") as f:
f.write(test_code)
env = os.environ.copy()
psep = ";" if os.name == "nt" else ":"
env["PYTHONPATH"] = f"{env.get('PYTHONPATH', '')}{psep}{os.path.abspath('..')}"
print(f" -> Executing generated pytest suite locally ({os.path.basename(test_filename)})...")
result = subprocess.run(["pytest", test_filename, "-v", "--junitxml=junit-report.xml"], capture_output=True, text=True, timeout=30, env=env)
print(f" [PYTEST STDOUT]\n{result.stdout}\n [PYTEST STDERR]\n{result.stderr}")
report = "### ๐งช Automated Testing Report\n\nI generated Unit Tests for the new feature and ran them automatically in the CI environment.\n\n"
if result.returncode == 0:
report += "โ
**Tests Passed!** The new logic appears solid based on my scenarios. I have committed these tests to your branch!\n\n"
print(" -> ๐ Tests passed natively! Injecting them back into the MR repository...")
state["gl_provider"].commit_test_file(
state["mr_iid"],
f"tests/test_ai_generated.py",
test_code,
commit_message="test: add Context Brain auto-generated test suite"
)
else:
report += "โ **Tests Failed natively!** The new logic failed validation against my generated scenarios.\n\n"
state["critical_failure"] = True
report += "View Pytest Execution Logs
\n\n```text\nSTDOUT:\n" + result.stdout[-1500:] + "\nSTDERR:\n" + result.stderr[-1000:] + "\n```\n "
return {"test_report": report}
finally:
# Clean up the temporary file immediately after execution
try:
os.remove(test_filename)
except OSError:
pass
except subprocess.TimeoutExpired:
return {"test_report": "### ๐งช Automated Testing Report\n\n*Execute failed: Tests timed out.*"}
except Exception as e:
return {"test_report": f"### ๐งช Automated Testing Report\n\n*Execute failed: {e}*"}
def reporter_node(state: AgentState):
if state.get("error"):
print(f"โ PIPELINE HALTED: {state['error']}")
return state
if state.get("early_exit"):
print(" -> Early Exit. No actionable codebase diffs.")
return {}
print("[*] Agent: Reporter ๐ข - Dispatching Data to Artifacts...")
gl = state["gl_provider"]
mr_iid = state["mr_iid"]
stage = state.get("stage", "all")
# Action A: Labels ONLY (Description is deferred to Assembler)
meta = state.get("metadata", {})
if meta and stage in ["analyze", "all"]:
print(f" -> Applying Intelligent Labels: {meta.get('labels')}")
gl.update_mr_metadata(mr_iid, labels=meta.get('labels'))
# Action B: Code Quality SAST and Inline Suggestions
cq = state.get("code_quality", [])
if cq and stage in ["analyze", "all"]:
gl_cq = []
critical_issues = []
for issue in cq:
gl_cq.append({
"description": issue.get("description", "Issue found"),
"check_name": "ContextBrainAudit",
"fingerprint": f"{issue.get('file')}-{issue.get('line')}",
"severity": issue.get("severity", "major"),
"location": {"path": issue.get("file"), "lines": {"begin": int(issue.get("line", 1))}}
})
if str(issue.get("severity", "")).lower() in ["critical", "blocker", "high"]:
critical_issues.append(issue)
if issue.get("suggestion"):
gl.create_inline_suggestion(mr_iid, issue.get("file"), issue.get("line"), issue.get("suggestion"), discussion_body=issue.get("description"))
with open("gl-code-quality-report.json", "w") as f:
json.dump(gl_cq, f, indent=2)
if critical_issues:
import uuid
sast = {"version": "15.0.0", "vulnerabilities": []}
for issue in critical_issues:
sast["vulnerabilities"].append({
"id": str(uuid.uuid4()),
"category": "sast",
"name": "Context Brain Security Review",
"message": issue.get("description", "Vulnerability"),
"severity": "High",
"scanner": {"id": "context_brain", "name": "Context Brain"},
"location": {"file": issue.get("file"), "start_line": int(issue.get("line", 1))}
})
with open("gl-sast-report.json", "w") as f: json.dump(sast, f)
print(" -> โ Critical issues found. Applying Security SAST artifact.")
# Action C: Compile Local Summaries instead of Spamming Comments
if stage in ["analyze", "all"]:
body = state.get("report_markdown", "") or state.get("raw_output", "")
with open("analyze_results.json", "w") as f: json.dump({"summary": meta.get("summary", ""), "body": body, "issues": len(cq)}, f)
elif stage == "finops":
with open("finops_results.json", "w") as f: json.dump({"cost": state.get("cost_estimation", "")}, f)
elif stage == "devops":
with open("devops_results.json", "w") as f: json.dump({"report": state.get("devops_report", "")}, f)
elif stage == "test":
with open("test_results.json", "w") as f: json.dump({"passed": "โ
Tests Passed" in state.get("test_report", ""), "report": state.get("test_report", "")}, f)
return {}
def results_assembler_node(state: AgentState):
print("[*] Agent: Assembler ๐ - Building Native GitLab UX...")
an_data, fin_data, test_data, dev_data = {}, {}, {}, {}
if os.path.exists("analyze_results.json"):
with open("analyze_results.json") as f: an_data = json.load(f)
print(" -> โ
Loaded analyze_results.json")
else:
print(" -> โ ๏ธ analyze_results.json NOT FOUND - check artifact paths")
if os.path.exists("finops_results.json"):
with open("finops_results.json") as f: fin_data = json.load(f)
print(" -> โ
Loaded finops_results.json")
else:
print(" -> โ ๏ธ finops_results.json NOT FOUND - check artifact paths")
if os.path.exists("devops_results.json"):
with open("devops_results.json") as f: dev_data = json.load(f)
print(" -> โ
Loaded devops_results.json")
else:
print(" -> โ ๏ธ devops_results.json NOT FOUND - check artifact paths")
if os.path.exists("test_results.json"):
with open("test_results.json") as f: test_data = json.load(f)
print(" -> โ
Loaded test_results.json")
else:
print(" -> โ ๏ธ test_results.json NOT FOUND - check artifact paths")
# 1. Update MR Description
summary = an_data.get("summary", "Complete Platform Insights delivered.")
cost = fin_data.get("cost", "No significant cost impact.")
devops_txt = dev_data.get("report", "Infrastructure verified.")
test_status = "โ
**Passed** (Secure)" if test_data.get("passed") else "โ **Failed** (Security/Logic Risks)"
# Premium MR Header Injection
mr_injection = (
"### ๐ง Context Brain: Intelligence Report\n"
f"**Analysis**: {summary}\n"
f"- **FinOps**: {cost[:200]}\n"
f"- **DevOps**: {devops_txt[:200]}\n"
f"- **Tests**: {test_status}\n\n"
"---"
)
state["gl_provider"].update_mr_metadata(state["mr_iid"], description_prefix=mr_injection)
# 2. Detailed Comment Thread
full_report = (
"## ๐งฉ Context Brain: Full Platform Audit\n\n"
f"#### ๐ต๏ธ Code Analyst Audit\n{an_data.get('body', 'No insights.')}\n\n"
f"#### ๐ธ Cloud FinOps & Cost Logic\n{fin_data.get('cost', 'Optimized.')}\n\n"
f"#### โ๏ธ DevOps & SRE Review\n{dev_data.get('report', 'Ready for Prod.')}\n\n"
f"#### ๐งช Quality SDET Validation\n{test_data.get('report', 'No logic tested.')}\n\n"
"---\n*Powered by Context Brain LangGraph Engine*"
)
state["gl_provider"].post_comment(state["mr_iid"], full_report)
print(" -> ๐ Full Markdown Report posted as thread comment!")
# 2. Generate metrics.txt for MR Widget
with open("metrics.txt", "w") as f:
f.write(f"context_brain_issues {an_data.get('issues', 0)}\n")
f.write(f"context_brain_test_pass_rate {100 if test_data.get('passed') else 0}\n")
# 3. PREMIUM HTML Dashboard
html_template = f"""
Context Brain Intelligence Dashboard
๐ต๏ธ Code Quality Analysis
{an_data.get('body', 'No critical issues found.').replace('```', '')}
๐ธ FinOps & Cost Impact
{fin_data.get('cost', 'Cost efficient.')}
โ๏ธ DevOps & SRE Signals
{dev_data.get('report', 'Infrastructure verified.')}
๐งช Automated SDET Tests
{test_data.get('report', 'No functional tests executed.')}
"""
with open("dashboard.html", "w", encoding="utf-8") as f: f.write(html_template)
print(" -> ๐จ Native UI Assets generated (Metrics Widget, Premium HTML Dashboard)!")
return {}
# ==========================================
# 3. Main Orchestrator
# ==========================================
def build_and_run_graph(project_id, mr_iid, stage="all"):
print(f"--- ๐ง Context Brain LangGraph Workflow Initializing (Project {project_id} / MR {mr_iid} / Stage {stage}) ---")
gl_token = os.getenv("GITLAB_TOKEN")
gcp_project = os.getenv("GCP_PROJECT_ID")
ai_key = os.getenv("GOOGLE_API_KEY")
if not gl_token or not ai_key:
print("โ CRITICAL: Missing required API keys. Check .env file.")
return
# Build Providers
gl = GitLabProvider(project_id, gl_token)
gcp = GCPProvider(gcp_project)
brain = GeminiBrain(ai_key)
workflow = StateGraph(AgentState)
# Check for cached context
cached_context = {}
if os.path.exists("context.json"):
with open("context.json", "r") as f:
cached_context = json.load(f)
# Define an Injection Node that seeds the graph with cached state
def context_injector_node(state: AgentState):
print("[*] โก Loading Shared Fast-Context from context.json Artifact...")
return {
"mr_data": cached_context.get("mr_data", {}),
"log_data": cached_context.get("log_data", ""),
"gcp_data": cached_context.get("gcp_data", ""),
"early_exit": cached_context.get("early_exit", False)
}
def prep_reporter(state: AgentState):
print(f"[*] ๐พ Caching Context state to disk for parallel parallel nodes...")
with open("context.json", "w") as f:
json.dump({
"mr_data": state.get("mr_data"),
"log_data": state.get("log_data"),
"gcp_data": state.get("gcp_data"),
"early_exit": state.get("early_exit")
}, f)
return {}
if stage == "prep":
workflow.add_node("Librarian", librarian_node)
workflow.add_node("Sentinel", sentinel_node)
workflow.add_node("PrepReporter", prep_reporter)
workflow.set_entry_point("Librarian")
workflow.add_edge("Librarian", "Sentinel")
workflow.add_edge("Sentinel", "PrepReporter")
workflow.add_edge("PrepReporter", END)
else:
# For execution stages, skip Librarian and Sentinel if cached
if cached_context:
workflow.add_node("Injector", context_injector_node)
entry = "Injector"
else:
workflow.add_node("Librarian", librarian_node)
workflow.add_node("Sentinel", sentinel_node)
workflow.add_edge("Librarian", "Sentinel")
entry = "Librarian"
workflow.set_entry_point(entry)
if stage == "analyze" or stage == "all":
workflow.add_node("Reasoner", reasoner_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "Reasoner")
else: workflow.add_edge("Sentinel", "Reasoner")
workflow.add_edge("Reasoner", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "finops":
workflow.add_node("CostEstimator", cost_estimator_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "CostEstimator")
else: workflow.add_edge("Sentinel", "CostEstimator")
workflow.add_edge("CostEstimator", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "devops":
workflow.add_node("DevOps", devops_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "DevOps")
else: workflow.add_edge("Sentinel", "DevOps")
workflow.add_edge("DevOps", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "test":
workflow.add_node("TestEngineer", test_engineer_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "TestEngineer")
else: workflow.add_edge("Sentinel", "TestEngineer")
workflow.add_edge("TestEngineer", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "results":
workflow.add_node("Assembler", results_assembler_node)
if cached_context: workflow.add_edge("Injector", "Assembler")
else: workflow.add_edge("Sentinel", "Assembler")
workflow.add_edge("Assembler", END)
app = workflow.compile()
# Run the Pipeline
initial_state = {
"project_id": project_id,
"mr_iid": mr_iid,
"gl_provider": gl,
"gcp_provider": gcp,
"brain_provider": brain,
"stage": stage,
"mermaid_diagram": "",
"error": "",
"early_exit": False,
"cost_estimation": "",
"test_report": "",
"devops_report": ""
}
print("\n๐ EXECUTING LANGGRAPH PIPELINE...\n")
final_state = app.invoke(initial_state)
print("\nโ
PIPELINE COMPLETE.")
if final_state.get("critical_failure"):
print("\nโ EXITED WITH CRITICAL SECURITY ISSUES. FAILING PIPELINE.")
import sys
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GitLab Context Brain LangGraph Agent")
parser.add_argument("--project_id", default=os.getenv("CI_PROJECT_ID"), help="GitLab Project ID")
parser.add_argument("--mr_iid", default=os.getenv("CI_MERGE_REQUEST_IID"), help="Merge Request IID")
parser.add_argument("--stage", default="all", help="Pipeline Stage (analyze, finops, test)")
args = parser.parse_args()
if os.getenv("AI_FLOW_CONTEXT"):
try:
raw_context = json.loads(os.getenv("AI_FLOW_CONTEXT"))
args.mr_iid = raw_context.get("iid")
except: pass
if args.project_id and args.mr_iid:
build_and_run_graph(args.project_id, args.mr_iid, args.stage)
else:
print("โ ERROR: Missing project_id or mr_iid.")