Duo-Guardian / src /main.py
Daksh C Jain
fix: resolve Pytest ModuleNotFoundError by natively saving tests to target repo root instead of /tmp
45ccfad
import os
import json
import argparse
from typing import TypedDict, List, Dict, Any
from dotenv import load_dotenv
from src.providers import GitLabProvider, GCPProvider, GeminiBrain
# LangGraph Imports
from langgraph.graph import StateGraph, END
load_dotenv()
# ==========================================
# 1. State Definition
# ==========================================
class AgentState(TypedDict):
project_id: str
mr_iid: str
gl_provider: Any
gcp_provider: Any
brain_provider: Any
stage: str
# Context Data
mr_data: Dict[str, Any]
log_data: str
gcp_data: str
# Generated Outputs
raw_output: str
report_markdown: str
code_quality: List[Dict[str, Any]]
metadata: Dict[str, Any]
mermaid_diagram: str
error: str
critical_failure: bool
early_exit: bool
cost_estimation: str
test_report: str
devops_report: str
# ==========================================
# 2. Agent Nodes (The Graph)
# ==========================================
def librarian_node(state: AgentState):
print("[*] Agent: Librarian 📚 - Fetching GitLab Context & Job Logs...")
mr_data = state["gl_provider"].fetch_mr_bundle(state["mr_iid"])
if not mr_data:
return {"error": "Failed to fetch MR Data. Check project ID and MR IID."}
log_data = state["gl_provider"].fetch_job_logs()
if not mr_data.get("diff", "").strip():
print(" -> No actionable files found in MR diff. Triggering early exit.")
return {"mr_data": mr_data, "log_data": log_data, "early_exit": True}
return {"mr_data": mr_data, "log_data": log_data}
def sentinel_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Sentinel 🛡️ - Fetching GCP Production Signals...")
gcp_data = state["gcp_provider"].get_context()
return {"gcp_data": gcp_data}
def reasoner_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Reasoner 🧠 - Synthesizing Intelligence Report & Automations...")
try:
raw = state["brain_provider"].synthesize(state["mr_data"], state["gcp_data"], state["log_data"])
report, cq, meta = state["brain_provider"].parse_response(raw)
return {
"raw_output": raw,
"report_markdown": report,
"code_quality": cq,
"metadata": meta
}
except Exception as e:
return {"error": f"Brain failed to synthesize: {e}"}
def cost_estimator_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: FinOps 💸 - Estimating Production Cost Impact...")
try: return {"cost_estimation": state["brain_provider"].estimate_cost(state["mr_data"])}
except Exception as e: return {"cost_estimation": "*Cost estimation failed.*"}
def devops_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: DevOps ⚙️ - Auditing Infrastructure and Reliability...")
try: return {"devops_report": state["brain_provider"].review_devops(state["mr_data"])}
except Exception as e: return {"devops_report": "*DevOps audit failed.*"}
def test_engineer_node(state: AgentState):
if state.get("error") or state.get("early_exit"): return state
print("[*] Agent: Test Engineer 🧪 - Generating & Validating CI Tests...")
try:
import subprocess
import tempfile
import os
test_code = state["brain_provider"].generate_tests(state["mr_data"])
if "NO_TESTS_NEEDED" in test_code:
return {"test_report": "### 🧪 Automated Testing Report\n\nNo explicitly testable business logic detected in this MR."}
# Safely create a temporary Python file
try:
# Add a safety check for forbidden keywords (hallucinated mocks)
forbidden = ["patch(", "@patch", "unittest.mock", "MagicMock", "autospec="]
if any(f in test_code for f in forbidden):
print(" -> ⚠️ AI hallucinated mocks/patch! Rejecting test suite for safety.")
return {"test_report": "### 🧪 Automated Testing Report\n\nAI generated a test suite with forbidden mocks/patch. I've rejected it to maintain test integrity. Please review your business logic's testability."}
# Write directly to the target project root (..) so Python resolves local imports natively
test_filename = "../test_ai_generated.py"
with open(test_filename, "w", encoding="utf-8") as f:
f.write(test_code)
env = os.environ.copy()
psep = ";" if os.name == "nt" else ":"
env["PYTHONPATH"] = f"{env.get('PYTHONPATH', '')}{psep}{os.path.abspath('..')}"
print(f" -> Executing generated pytest suite locally ({os.path.basename(test_filename)})...")
result = subprocess.run(["pytest", test_filename, "-v", "--junitxml=junit-report.xml"], capture_output=True, text=True, timeout=30, env=env)
print(f" [PYTEST STDOUT]\n{result.stdout}\n [PYTEST STDERR]\n{result.stderr}")
report = "### 🧪 Automated Testing Report\n\nI generated Unit Tests for the new feature and ran them automatically in the CI environment.\n\n"
if result.returncode == 0:
report += "✅ **Tests Passed!** The new logic appears solid based on my scenarios. I have committed these tests to your branch!\n\n"
print(" -> 🚀 Tests passed natively! Injecting them back into the MR repository...")
state["gl_provider"].commit_test_file(
state["mr_iid"],
f"tests/test_ai_generated.py",
test_code,
commit_message="test: add Context Brain auto-generated test suite"
)
else:
report += "❌ **Tests Failed natively!** The new logic failed validation against my generated scenarios.\n\n"
state["critical_failure"] = True
report += "<details><summary>View Pytest Execution Logs</summary>\n\n```text\nSTDOUT:\n" + result.stdout[-1500:] + "\nSTDERR:\n" + result.stderr[-1000:] + "\n```\n</details>"
return {"test_report": report}
finally:
# Clean up the temporary file immediately after execution
try:
os.remove(test_filename)
except OSError:
pass
except subprocess.TimeoutExpired:
return {"test_report": "### 🧪 Automated Testing Report\n\n*Execute failed: Tests timed out.*"}
except Exception as e:
return {"test_report": f"### 🧪 Automated Testing Report\n\n*Execute failed: {e}*"}
def reporter_node(state: AgentState):
if state.get("error"):
print(f"❌ PIPELINE HALTED: {state['error']}")
return state
if state.get("early_exit"):
print(" -> Early Exit. No actionable codebase diffs.")
return {}
print("[*] Agent: Reporter 📢 - Dispatching Data to Artifacts...")
gl = state["gl_provider"]
mr_iid = state["mr_iid"]
stage = state.get("stage", "all")
# Action A: Labels ONLY (Description is deferred to Assembler)
meta = state.get("metadata", {})
if meta and stage in ["analyze", "all"]:
print(f" -> Applying Intelligent Labels: {meta.get('labels')}")
gl.update_mr_metadata(mr_iid, labels=meta.get('labels'))
# Action B: Code Quality SAST and Inline Suggestions
cq = state.get("code_quality", [])
if cq and stage in ["analyze", "all"]:
gl_cq = []
critical_issues = []
for issue in cq:
gl_cq.append({
"description": issue.get("description", "Issue found"),
"check_name": "ContextBrainAudit",
"fingerprint": f"{issue.get('file')}-{issue.get('line')}",
"severity": issue.get("severity", "major"),
"location": {"path": issue.get("file"), "lines": {"begin": int(issue.get("line", 1))}}
})
if str(issue.get("severity", "")).lower() in ["critical", "blocker", "high"]:
critical_issues.append(issue)
if issue.get("suggestion"):
gl.create_inline_suggestion(mr_iid, issue.get("file"), issue.get("line"), issue.get("suggestion"), discussion_body=issue.get("description"))
with open("gl-code-quality-report.json", "w") as f:
json.dump(gl_cq, f, indent=2)
if critical_issues:
import uuid
sast = {"version": "15.0.0", "vulnerabilities": []}
for issue in critical_issues:
sast["vulnerabilities"].append({
"id": str(uuid.uuid4()),
"category": "sast",
"name": "Context Brain Security Review",
"message": issue.get("description", "Vulnerability"),
"severity": "High",
"scanner": {"id": "context_brain", "name": "Context Brain"},
"location": {"file": issue.get("file"), "start_line": int(issue.get("line", 1))}
})
with open("gl-sast-report.json", "w") as f: json.dump(sast, f)
print(" -> ⛔ Critical issues found. Applying Security SAST artifact.")
# Action C: Compile Local Summaries instead of Spamming Comments
if stage in ["analyze", "all"]:
body = state.get("report_markdown", "") or state.get("raw_output", "")
with open("analyze_results.json", "w") as f: json.dump({"summary": meta.get("summary", ""), "body": body, "issues": len(cq)}, f)
elif stage == "finops":
with open("finops_results.json", "w") as f: json.dump({"cost": state.get("cost_estimation", "")}, f)
elif stage == "devops":
with open("devops_results.json", "w") as f: json.dump({"report": state.get("devops_report", "")}, f)
elif stage == "test":
with open("test_results.json", "w") as f: json.dump({"passed": "✅ Tests Passed" in state.get("test_report", ""), "report": state.get("test_report", "")}, f)
return {}
def results_assembler_node(state: AgentState):
print("[*] Agent: Assembler 📊 - Building Native GitLab UX...")
an_data, fin_data, test_data, dev_data = {}, {}, {}, {}
if os.path.exists("analyze_results.json"):
with open("analyze_results.json") as f: an_data = json.load(f)
print(" -> ✅ Loaded analyze_results.json")
else:
print(" -> ⚠️ analyze_results.json NOT FOUND - check artifact paths")
if os.path.exists("finops_results.json"):
with open("finops_results.json") as f: fin_data = json.load(f)
print(" -> ✅ Loaded finops_results.json")
else:
print(" -> ⚠️ finops_results.json NOT FOUND - check artifact paths")
if os.path.exists("devops_results.json"):
with open("devops_results.json") as f: dev_data = json.load(f)
print(" -> ✅ Loaded devops_results.json")
else:
print(" -> ⚠️ devops_results.json NOT FOUND - check artifact paths")
if os.path.exists("test_results.json"):
with open("test_results.json") as f: test_data = json.load(f)
print(" -> ✅ Loaded test_results.json")
else:
print(" -> ⚠️ test_results.json NOT FOUND - check artifact paths")
# 1. Update MR Description
summary = an_data.get("summary", "Complete Platform Insights delivered.")
cost = fin_data.get("cost", "No significant cost impact.")
devops_txt = dev_data.get("report", "Infrastructure verified.")
test_status = "✅ **Passed** (Secure)" if test_data.get("passed") else "❌ **Failed** (Security/Logic Risks)"
# Premium MR Header Injection
mr_injection = (
"### 🧠 Context Brain: Intelligence Report\n"
f"**Analysis**: {summary}\n"
f"- **FinOps**: {cost[:200]}\n"
f"- **DevOps**: {devops_txt[:200]}\n"
f"- **Tests**: {test_status}\n\n"
"---"
)
state["gl_provider"].update_mr_metadata(state["mr_iid"], description_prefix=mr_injection)
# 2. Detailed Comment Thread
full_report = (
"## 🧩 Context Brain: Full Platform Audit\n\n"
f"#### 🕵️ Code Analyst Audit\n{an_data.get('body', 'No insights.')}\n\n"
f"#### 💸 Cloud FinOps & Cost Logic\n{fin_data.get('cost', 'Optimized.')}\n\n"
f"#### ⚙️ DevOps & SRE Review\n{dev_data.get('report', 'Ready for Prod.')}\n\n"
f"#### 🧪 Quality SDET Validation\n{test_data.get('report', 'No logic tested.')}\n\n"
"---\n*Powered by Context Brain LangGraph Engine*"
)
state["gl_provider"].post_comment(state["mr_iid"], full_report)
print(" -> 📝 Full Markdown Report posted as thread comment!")
# 2. Generate metrics.txt for MR Widget
with open("metrics.txt", "w") as f:
f.write(f"context_brain_issues {an_data.get('issues', 0)}\n")
f.write(f"context_brain_test_pass_rate {100 if test_data.get('passed') else 0}\n")
# 3. PREMIUM HTML Dashboard
html_template = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Context Brain Intelligence Dashboard</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;600&family=JetBrains+Mono:wght@400&display=swap" rel="stylesheet">
<style>
:root {{
--bg: #0f172a;
--card-bg: rgba(30, 41, 59, 0.7);
--accent: #38bdf8;
--text: #f1f5f9;
--text-muted: #94a3b8;
--border: rgba(255, 255, 255, 0.1);
}}
body {{
font-family: 'Outfit', sans-serif;
background-color: var(--bg);
color: var(--text);
margin: 0;
padding: 40px;
line-height: 1.6;
background-image: radial-gradient(circle at 50% 50%, #1e293b 0%, #0f172a 100%);
}}
.container {{
max-width: 1000px;
margin: 0 auto;
}}
.header {{
text-align: center;
margin-bottom: 60px;
}}
.header h1 {{
font-size: 3rem;
margin: 0;
background: linear-gradient(90deg, #38bdf8, #818cf8);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-weight: 600;
}}
.header p {{
color: var(--text-muted);
font-size: 1.2rem;
}}
.grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(450px, 1fr));
gap: 25px;
}}
.card {{
background: var(--card-bg);
backdrop-filter: blur(12px);
border: 1px solid var(--border);
border-radius: 20px;
padding: 30px;
box-shadow: 0 10px 30px rgba(0,0,0,0.3);
transition: transform 0.3s ease;
}}
.card:hover {{
transform: translateY(-5px);
border-color: var(--accent);
}}
.card h2 {{
margin-top: 0;
font-size: 1.5rem;
display: flex;
align-items: center;
gap: 12px;
color: var(--accent);
}}
.content {{
background: rgba(0,0,0,0.2);
border-radius: 12px;
padding: 20px;
font-size: 0.95rem;
white-space: pre-wrap;
font-family: 'Outfit', sans-serif;
}}
.footer {{
text-align: center;
margin-top: 60px;
color: var(--text-muted);
font-size: 0.9rem;
}}
code {{
font-family: 'JetBrains Mono', monospace;
background: rgba(255,255,255,0.1);
padding: 2px 5px;
border-radius: 4px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🧠 Context Brain</h1>
<p>Intelligence Platform Audit Dashboard</p>
</div>
<div class="grid">
<div class="card">
<h2>🕵️ Code Quality Analysis</h2>
<div class="content">{an_data.get('body', 'No critical issues found.').replace('```', '')}</div>
</div>
<div class="card">
<h2>💸 FinOps & Cost Impact</h2>
<div class="content">{fin_data.get('cost', 'Cost efficient.')}</div>
</div>
<div class="card">
<h2>⚙️ DevOps & SRE Signals</h2>
<div class="content">{dev_data.get('report', 'Infrastructure verified.')}</div>
</div>
<div class="card">
<h2>🧪 Automated SDET Tests</h2>
<div class="content">{test_data.get('report', 'No functional tests executed.')}</div>
</div>
</div>
<div class="footer">
Context Brain &copy; 2026 | Generated for MR !{state['mr_iid']}
</div>
</div>
</body>
</html>
"""
with open("dashboard.html", "w", encoding="utf-8") as f: f.write(html_template)
print(" -> 🎨 Native UI Assets generated (Metrics Widget, Premium HTML Dashboard)!")
return {}
# ==========================================
# 3. Main Orchestrator
# ==========================================
def build_and_run_graph(project_id, mr_iid, stage="all"):
print(f"--- 🧠 Context Brain LangGraph Workflow Initializing (Project {project_id} / MR {mr_iid} / Stage {stage}) ---")
gl_token = os.getenv("GITLAB_TOKEN")
gcp_project = os.getenv("GCP_PROJECT_ID")
ai_key = os.getenv("GOOGLE_API_KEY")
if not gl_token or not ai_key:
print("❌ CRITICAL: Missing required API keys. Check .env file.")
return
# Build Providers
gl = GitLabProvider(project_id, gl_token)
gcp = GCPProvider(gcp_project)
brain = GeminiBrain(ai_key)
workflow = StateGraph(AgentState)
# Check for cached context
cached_context = {}
if os.path.exists("context.json"):
with open("context.json", "r") as f:
cached_context = json.load(f)
# Define an Injection Node that seeds the graph with cached state
def context_injector_node(state: AgentState):
print("[*] ⚡ Loading Shared Fast-Context from context.json Artifact...")
return {
"mr_data": cached_context.get("mr_data", {}),
"log_data": cached_context.get("log_data", ""),
"gcp_data": cached_context.get("gcp_data", ""),
"early_exit": cached_context.get("early_exit", False)
}
def prep_reporter(state: AgentState):
print(f"[*] 💾 Caching Context state to disk for parallel parallel nodes...")
with open("context.json", "w") as f:
json.dump({
"mr_data": state.get("mr_data"),
"log_data": state.get("log_data"),
"gcp_data": state.get("gcp_data"),
"early_exit": state.get("early_exit")
}, f)
return {}
if stage == "prep":
workflow.add_node("Librarian", librarian_node)
workflow.add_node("Sentinel", sentinel_node)
workflow.add_node("PrepReporter", prep_reporter)
workflow.set_entry_point("Librarian")
workflow.add_edge("Librarian", "Sentinel")
workflow.add_edge("Sentinel", "PrepReporter")
workflow.add_edge("PrepReporter", END)
else:
# For execution stages, skip Librarian and Sentinel if cached
if cached_context:
workflow.add_node("Injector", context_injector_node)
entry = "Injector"
else:
workflow.add_node("Librarian", librarian_node)
workflow.add_node("Sentinel", sentinel_node)
workflow.add_edge("Librarian", "Sentinel")
entry = "Librarian"
workflow.set_entry_point(entry)
if stage == "analyze" or stage == "all":
workflow.add_node("Reasoner", reasoner_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "Reasoner")
else: workflow.add_edge("Sentinel", "Reasoner")
workflow.add_edge("Reasoner", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "finops":
workflow.add_node("CostEstimator", cost_estimator_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "CostEstimator")
else: workflow.add_edge("Sentinel", "CostEstimator")
workflow.add_edge("CostEstimator", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "devops":
workflow.add_node("DevOps", devops_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "DevOps")
else: workflow.add_edge("Sentinel", "DevOps")
workflow.add_edge("DevOps", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "test":
workflow.add_node("TestEngineer", test_engineer_node)
workflow.add_node("Reporter", reporter_node)
if cached_context: workflow.add_edge("Injector", "TestEngineer")
else: workflow.add_edge("Sentinel", "TestEngineer")
workflow.add_edge("TestEngineer", "Reporter")
workflow.add_edge("Reporter", END)
elif stage == "results":
workflow.add_node("Assembler", results_assembler_node)
if cached_context: workflow.add_edge("Injector", "Assembler")
else: workflow.add_edge("Sentinel", "Assembler")
workflow.add_edge("Assembler", END)
app = workflow.compile()
# Run the Pipeline
initial_state = {
"project_id": project_id,
"mr_iid": mr_iid,
"gl_provider": gl,
"gcp_provider": gcp,
"brain_provider": brain,
"stage": stage,
"mermaid_diagram": "",
"error": "",
"early_exit": False,
"cost_estimation": "",
"test_report": "",
"devops_report": ""
}
print("\n🚀 EXECUTING LANGGRAPH PIPELINE...\n")
final_state = app.invoke(initial_state)
print("\n✅ PIPELINE COMPLETE.")
if final_state.get("critical_failure"):
print("\n⛔ EXITED WITH CRITICAL SECURITY ISSUES. FAILING PIPELINE.")
import sys
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GitLab Context Brain LangGraph Agent")
parser.add_argument("--project_id", default=os.getenv("CI_PROJECT_ID"), help="GitLab Project ID")
parser.add_argument("--mr_iid", default=os.getenv("CI_MERGE_REQUEST_IID"), help="Merge Request IID")
parser.add_argument("--stage", default="all", help="Pipeline Stage (analyze, finops, test)")
args = parser.parse_args()
if os.getenv("AI_FLOW_CONTEXT"):
try:
raw_context = json.loads(os.getenv("AI_FLOW_CONTEXT"))
args.mr_iid = raw_context.get("iid")
except: pass
if args.project_id and args.mr_iid:
build_and_run_graph(args.project_id, args.mr_iid, args.stage)
else:
print("❌ ERROR: Missing project_id or mr_iid.")