Spaces:
Running
Running
Daksh C Jain
fix: resolve Pytest ModuleNotFoundError by natively saving tests to target repo root instead of /tmp
45ccfad | import os | |
| import json | |
| import argparse | |
| from typing import TypedDict, List, Dict, Any | |
| from dotenv import load_dotenv | |
| from src.providers import GitLabProvider, GCPProvider, GeminiBrain | |
| # LangGraph Imports | |
| from langgraph.graph import StateGraph, END | |
| load_dotenv() | |
| # ========================================== | |
| # 1. State Definition | |
| # ========================================== | |
| class AgentState(TypedDict): | |
| project_id: str | |
| mr_iid: str | |
| gl_provider: Any | |
| gcp_provider: Any | |
| brain_provider: Any | |
| stage: str | |
| # Context Data | |
| mr_data: Dict[str, Any] | |
| log_data: str | |
| gcp_data: str | |
| # Generated Outputs | |
| raw_output: str | |
| report_markdown: str | |
| code_quality: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| mermaid_diagram: str | |
| error: str | |
| critical_failure: bool | |
| early_exit: bool | |
| cost_estimation: str | |
| test_report: str | |
| devops_report: str | |
| # ========================================== | |
| # 2. Agent Nodes (The Graph) | |
| # ========================================== | |
| def librarian_node(state: AgentState): | |
| print("[*] Agent: Librarian 📚 - Fetching GitLab Context & Job Logs...") | |
| mr_data = state["gl_provider"].fetch_mr_bundle(state["mr_iid"]) | |
| if not mr_data: | |
| return {"error": "Failed to fetch MR Data. Check project ID and MR IID."} | |
| log_data = state["gl_provider"].fetch_job_logs() | |
| if not mr_data.get("diff", "").strip(): | |
| print(" -> No actionable files found in MR diff. Triggering early exit.") | |
| return {"mr_data": mr_data, "log_data": log_data, "early_exit": True} | |
| return {"mr_data": mr_data, "log_data": log_data} | |
| def sentinel_node(state: AgentState): | |
| if state.get("error") or state.get("early_exit"): return state | |
| print("[*] Agent: Sentinel 🛡️ - Fetching GCP Production Signals...") | |
| gcp_data = state["gcp_provider"].get_context() | |
| return {"gcp_data": gcp_data} | |
| def reasoner_node(state: AgentState): | |
| if state.get("error") or state.get("early_exit"): return state | |
| print("[*] Agent: Reasoner 🧠 - Synthesizing Intelligence Report & Automations...") | |
| try: | |
| raw = state["brain_provider"].synthesize(state["mr_data"], state["gcp_data"], state["log_data"]) | |
| report, cq, meta = state["brain_provider"].parse_response(raw) | |
| return { | |
| "raw_output": raw, | |
| "report_markdown": report, | |
| "code_quality": cq, | |
| "metadata": meta | |
| } | |
| except Exception as e: | |
| return {"error": f"Brain failed to synthesize: {e}"} | |
| def cost_estimator_node(state: AgentState): | |
| if state.get("error") or state.get("early_exit"): return state | |
| print("[*] Agent: FinOps 💸 - Estimating Production Cost Impact...") | |
| try: return {"cost_estimation": state["brain_provider"].estimate_cost(state["mr_data"])} | |
| except Exception as e: return {"cost_estimation": "*Cost estimation failed.*"} | |
| def devops_node(state: AgentState): | |
| if state.get("error") or state.get("early_exit"): return state | |
| print("[*] Agent: DevOps ⚙️ - Auditing Infrastructure and Reliability...") | |
| try: return {"devops_report": state["brain_provider"].review_devops(state["mr_data"])} | |
| except Exception as e: return {"devops_report": "*DevOps audit failed.*"} | |
| def test_engineer_node(state: AgentState): | |
| if state.get("error") or state.get("early_exit"): return state | |
| print("[*] Agent: Test Engineer 🧪 - Generating & Validating CI Tests...") | |
| try: | |
| import subprocess | |
| import tempfile | |
| import os | |
| test_code = state["brain_provider"].generate_tests(state["mr_data"]) | |
| if "NO_TESTS_NEEDED" in test_code: | |
| return {"test_report": "### 🧪 Automated Testing Report\n\nNo explicitly testable business logic detected in this MR."} | |
| # Safely create a temporary Python file | |
| try: | |
| # Add a safety check for forbidden keywords (hallucinated mocks) | |
| forbidden = ["patch(", "@patch", "unittest.mock", "MagicMock", "autospec="] | |
| if any(f in test_code for f in forbidden): | |
| print(" -> ⚠️ AI hallucinated mocks/patch! Rejecting test suite for safety.") | |
| return {"test_report": "### 🧪 Automated Testing Report\n\nAI generated a test suite with forbidden mocks/patch. I've rejected it to maintain test integrity. Please review your business logic's testability."} | |
| # Write directly to the target project root (..) so Python resolves local imports natively | |
| test_filename = "../test_ai_generated.py" | |
| with open(test_filename, "w", encoding="utf-8") as f: | |
| f.write(test_code) | |
| env = os.environ.copy() | |
| psep = ";" if os.name == "nt" else ":" | |
| env["PYTHONPATH"] = f"{env.get('PYTHONPATH', '')}{psep}{os.path.abspath('..')}" | |
| print(f" -> Executing generated pytest suite locally ({os.path.basename(test_filename)})...") | |
| result = subprocess.run(["pytest", test_filename, "-v", "--junitxml=junit-report.xml"], capture_output=True, text=True, timeout=30, env=env) | |
| print(f" [PYTEST STDOUT]\n{result.stdout}\n [PYTEST STDERR]\n{result.stderr}") | |
| report = "### 🧪 Automated Testing Report\n\nI generated Unit Tests for the new feature and ran them automatically in the CI environment.\n\n" | |
| if result.returncode == 0: | |
| report += "✅ **Tests Passed!** The new logic appears solid based on my scenarios. I have committed these tests to your branch!\n\n" | |
| print(" -> 🚀 Tests passed natively! Injecting them back into the MR repository...") | |
| state["gl_provider"].commit_test_file( | |
| state["mr_iid"], | |
| f"tests/test_ai_generated.py", | |
| test_code, | |
| commit_message="test: add Context Brain auto-generated test suite" | |
| ) | |
| else: | |
| report += "❌ **Tests Failed natively!** The new logic failed validation against my generated scenarios.\n\n" | |
| state["critical_failure"] = True | |
| report += "<details><summary>View Pytest Execution Logs</summary>\n\n```text\nSTDOUT:\n" + result.stdout[-1500:] + "\nSTDERR:\n" + result.stderr[-1000:] + "\n```\n</details>" | |
| return {"test_report": report} | |
| finally: | |
| # Clean up the temporary file immediately after execution | |
| try: | |
| os.remove(test_filename) | |
| except OSError: | |
| pass | |
| except subprocess.TimeoutExpired: | |
| return {"test_report": "### 🧪 Automated Testing Report\n\n*Execute failed: Tests timed out.*"} | |
| except Exception as e: | |
| return {"test_report": f"### 🧪 Automated Testing Report\n\n*Execute failed: {e}*"} | |
| def reporter_node(state: AgentState): | |
| if state.get("error"): | |
| print(f"❌ PIPELINE HALTED: {state['error']}") | |
| return state | |
| if state.get("early_exit"): | |
| print(" -> Early Exit. No actionable codebase diffs.") | |
| return {} | |
| print("[*] Agent: Reporter 📢 - Dispatching Data to Artifacts...") | |
| gl = state["gl_provider"] | |
| mr_iid = state["mr_iid"] | |
| stage = state.get("stage", "all") | |
| # Action A: Labels ONLY (Description is deferred to Assembler) | |
| meta = state.get("metadata", {}) | |
| if meta and stage in ["analyze", "all"]: | |
| print(f" -> Applying Intelligent Labels: {meta.get('labels')}") | |
| gl.update_mr_metadata(mr_iid, labels=meta.get('labels')) | |
| # Action B: Code Quality SAST and Inline Suggestions | |
| cq = state.get("code_quality", []) | |
| if cq and stage in ["analyze", "all"]: | |
| gl_cq = [] | |
| critical_issues = [] | |
| for issue in cq: | |
| gl_cq.append({ | |
| "description": issue.get("description", "Issue found"), | |
| "check_name": "ContextBrainAudit", | |
| "fingerprint": f"{issue.get('file')}-{issue.get('line')}", | |
| "severity": issue.get("severity", "major"), | |
| "location": {"path": issue.get("file"), "lines": {"begin": int(issue.get("line", 1))}} | |
| }) | |
| if str(issue.get("severity", "")).lower() in ["critical", "blocker", "high"]: | |
| critical_issues.append(issue) | |
| if issue.get("suggestion"): | |
| gl.create_inline_suggestion(mr_iid, issue.get("file"), issue.get("line"), issue.get("suggestion"), discussion_body=issue.get("description")) | |
| with open("gl-code-quality-report.json", "w") as f: | |
| json.dump(gl_cq, f, indent=2) | |
| if critical_issues: | |
| import uuid | |
| sast = {"version": "15.0.0", "vulnerabilities": []} | |
| for issue in critical_issues: | |
| sast["vulnerabilities"].append({ | |
| "id": str(uuid.uuid4()), | |
| "category": "sast", | |
| "name": "Context Brain Security Review", | |
| "message": issue.get("description", "Vulnerability"), | |
| "severity": "High", | |
| "scanner": {"id": "context_brain", "name": "Context Brain"}, | |
| "location": {"file": issue.get("file"), "start_line": int(issue.get("line", 1))} | |
| }) | |
| with open("gl-sast-report.json", "w") as f: json.dump(sast, f) | |
| print(" -> ⛔ Critical issues found. Applying Security SAST artifact.") | |
| # Action C: Compile Local Summaries instead of Spamming Comments | |
| if stage in ["analyze", "all"]: | |
| body = state.get("report_markdown", "") or state.get("raw_output", "") | |
| with open("analyze_results.json", "w") as f: json.dump({"summary": meta.get("summary", ""), "body": body, "issues": len(cq)}, f) | |
| elif stage == "finops": | |
| with open("finops_results.json", "w") as f: json.dump({"cost": state.get("cost_estimation", "")}, f) | |
| elif stage == "devops": | |
| with open("devops_results.json", "w") as f: json.dump({"report": state.get("devops_report", "")}, f) | |
| elif stage == "test": | |
| with open("test_results.json", "w") as f: json.dump({"passed": "✅ Tests Passed" in state.get("test_report", ""), "report": state.get("test_report", "")}, f) | |
| return {} | |
| def results_assembler_node(state: AgentState): | |
| print("[*] Agent: Assembler 📊 - Building Native GitLab UX...") | |
| an_data, fin_data, test_data, dev_data = {}, {}, {}, {} | |
| if os.path.exists("analyze_results.json"): | |
| with open("analyze_results.json") as f: an_data = json.load(f) | |
| print(" -> ✅ Loaded analyze_results.json") | |
| else: | |
| print(" -> ⚠️ analyze_results.json NOT FOUND - check artifact paths") | |
| if os.path.exists("finops_results.json"): | |
| with open("finops_results.json") as f: fin_data = json.load(f) | |
| print(" -> ✅ Loaded finops_results.json") | |
| else: | |
| print(" -> ⚠️ finops_results.json NOT FOUND - check artifact paths") | |
| if os.path.exists("devops_results.json"): | |
| with open("devops_results.json") as f: dev_data = json.load(f) | |
| print(" -> ✅ Loaded devops_results.json") | |
| else: | |
| print(" -> ⚠️ devops_results.json NOT FOUND - check artifact paths") | |
| if os.path.exists("test_results.json"): | |
| with open("test_results.json") as f: test_data = json.load(f) | |
| print(" -> ✅ Loaded test_results.json") | |
| else: | |
| print(" -> ⚠️ test_results.json NOT FOUND - check artifact paths") | |
| # 1. Update MR Description | |
| summary = an_data.get("summary", "Complete Platform Insights delivered.") | |
| cost = fin_data.get("cost", "No significant cost impact.") | |
| devops_txt = dev_data.get("report", "Infrastructure verified.") | |
| test_status = "✅ **Passed** (Secure)" if test_data.get("passed") else "❌ **Failed** (Security/Logic Risks)" | |
| # Premium MR Header Injection | |
| mr_injection = ( | |
| "### 🧠 Context Brain: Intelligence Report\n" | |
| f"**Analysis**: {summary}\n" | |
| f"- **FinOps**: {cost[:200]}\n" | |
| f"- **DevOps**: {devops_txt[:200]}\n" | |
| f"- **Tests**: {test_status}\n\n" | |
| "---" | |
| ) | |
| state["gl_provider"].update_mr_metadata(state["mr_iid"], description_prefix=mr_injection) | |
| # 2. Detailed Comment Thread | |
| full_report = ( | |
| "## 🧩 Context Brain: Full Platform Audit\n\n" | |
| f"#### 🕵️ Code Analyst Audit\n{an_data.get('body', 'No insights.')}\n\n" | |
| f"#### 💸 Cloud FinOps & Cost Logic\n{fin_data.get('cost', 'Optimized.')}\n\n" | |
| f"#### ⚙️ DevOps & SRE Review\n{dev_data.get('report', 'Ready for Prod.')}\n\n" | |
| f"#### 🧪 Quality SDET Validation\n{test_data.get('report', 'No logic tested.')}\n\n" | |
| "---\n*Powered by Context Brain LangGraph Engine*" | |
| ) | |
| state["gl_provider"].post_comment(state["mr_iid"], full_report) | |
| print(" -> 📝 Full Markdown Report posted as thread comment!") | |
| # 2. Generate metrics.txt for MR Widget | |
| with open("metrics.txt", "w") as f: | |
| f.write(f"context_brain_issues {an_data.get('issues', 0)}\n") | |
| f.write(f"context_brain_test_pass_rate {100 if test_data.get('passed') else 0}\n") | |
| # 3. PREMIUM HTML Dashboard | |
| html_template = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Context Brain Intelligence Dashboard</title> | |
| <link rel="preconnect" href="https://fonts.googleapis.com"> | |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> | |
| <link href="https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;600&family=JetBrains+Mono:wght@400&display=swap" rel="stylesheet"> | |
| <style> | |
| :root {{ | |
| --bg: #0f172a; | |
| --card-bg: rgba(30, 41, 59, 0.7); | |
| --accent: #38bdf8; | |
| --text: #f1f5f9; | |
| --text-muted: #94a3b8; | |
| --border: rgba(255, 255, 255, 0.1); | |
| }} | |
| body {{ | |
| font-family: 'Outfit', sans-serif; | |
| background-color: var(--bg); | |
| color: var(--text); | |
| margin: 0; | |
| padding: 40px; | |
| line-height: 1.6; | |
| background-image: radial-gradient(circle at 50% 50%, #1e293b 0%, #0f172a 100%); | |
| }} | |
| .container {{ | |
| max-width: 1000px; | |
| margin: 0 auto; | |
| }} | |
| .header {{ | |
| text-align: center; | |
| margin-bottom: 60px; | |
| }} | |
| .header h1 {{ | |
| font-size: 3rem; | |
| margin: 0; | |
| background: linear-gradient(90deg, #38bdf8, #818cf8); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| font-weight: 600; | |
| }} | |
| .header p {{ | |
| color: var(--text-muted); | |
| font-size: 1.2rem; | |
| }} | |
| .grid {{ | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(450px, 1fr)); | |
| gap: 25px; | |
| }} | |
| .card {{ | |
| background: var(--card-bg); | |
| backdrop-filter: blur(12px); | |
| border: 1px solid var(--border); | |
| border-radius: 20px; | |
| padding: 30px; | |
| box-shadow: 0 10px 30px rgba(0,0,0,0.3); | |
| transition: transform 0.3s ease; | |
| }} | |
| .card:hover {{ | |
| transform: translateY(-5px); | |
| border-color: var(--accent); | |
| }} | |
| .card h2 {{ | |
| margin-top: 0; | |
| font-size: 1.5rem; | |
| display: flex; | |
| align-items: center; | |
| gap: 12px; | |
| color: var(--accent); | |
| }} | |
| .content {{ | |
| background: rgba(0,0,0,0.2); | |
| border-radius: 12px; | |
| padding: 20px; | |
| font-size: 0.95rem; | |
| white-space: pre-wrap; | |
| font-family: 'Outfit', sans-serif; | |
| }} | |
| .footer {{ | |
| text-align: center; | |
| margin-top: 60px; | |
| color: var(--text-muted); | |
| font-size: 0.9rem; | |
| }} | |
| code {{ | |
| font-family: 'JetBrains Mono', monospace; | |
| background: rgba(255,255,255,0.1); | |
| padding: 2px 5px; | |
| border-radius: 4px; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>🧠 Context Brain</h1> | |
| <p>Intelligence Platform Audit Dashboard</p> | |
| </div> | |
| <div class="grid"> | |
| <div class="card"> | |
| <h2>🕵️ Code Quality Analysis</h2> | |
| <div class="content">{an_data.get('body', 'No critical issues found.').replace('```', '')}</div> | |
| </div> | |
| <div class="card"> | |
| <h2>💸 FinOps & Cost Impact</h2> | |
| <div class="content">{fin_data.get('cost', 'Cost efficient.')}</div> | |
| </div> | |
| <div class="card"> | |
| <h2>⚙️ DevOps & SRE Signals</h2> | |
| <div class="content">{dev_data.get('report', 'Infrastructure verified.')}</div> | |
| </div> | |
| <div class="card"> | |
| <h2>🧪 Automated SDET Tests</h2> | |
| <div class="content">{test_data.get('report', 'No functional tests executed.')}</div> | |
| </div> | |
| </div> | |
| <div class="footer"> | |
| Context Brain © 2026 | Generated for MR !{state['mr_iid']} | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| with open("dashboard.html", "w", encoding="utf-8") as f: f.write(html_template) | |
| print(" -> 🎨 Native UI Assets generated (Metrics Widget, Premium HTML Dashboard)!") | |
| return {} | |
| # ========================================== | |
| # 3. Main Orchestrator | |
| # ========================================== | |
| def build_and_run_graph(project_id, mr_iid, stage="all"): | |
| print(f"--- 🧠 Context Brain LangGraph Workflow Initializing (Project {project_id} / MR {mr_iid} / Stage {stage}) ---") | |
| gl_token = os.getenv("GITLAB_TOKEN") | |
| gcp_project = os.getenv("GCP_PROJECT_ID") | |
| ai_key = os.getenv("GOOGLE_API_KEY") | |
| if not gl_token or not ai_key: | |
| print("❌ CRITICAL: Missing required API keys. Check .env file.") | |
| return | |
| # Build Providers | |
| gl = GitLabProvider(project_id, gl_token) | |
| gcp = GCPProvider(gcp_project) | |
| brain = GeminiBrain(ai_key) | |
| workflow = StateGraph(AgentState) | |
| # Check for cached context | |
| cached_context = {} | |
| if os.path.exists("context.json"): | |
| with open("context.json", "r") as f: | |
| cached_context = json.load(f) | |
| # Define an Injection Node that seeds the graph with cached state | |
| def context_injector_node(state: AgentState): | |
| print("[*] ⚡ Loading Shared Fast-Context from context.json Artifact...") | |
| return { | |
| "mr_data": cached_context.get("mr_data", {}), | |
| "log_data": cached_context.get("log_data", ""), | |
| "gcp_data": cached_context.get("gcp_data", ""), | |
| "early_exit": cached_context.get("early_exit", False) | |
| } | |
| def prep_reporter(state: AgentState): | |
| print(f"[*] 💾 Caching Context state to disk for parallel parallel nodes...") | |
| with open("context.json", "w") as f: | |
| json.dump({ | |
| "mr_data": state.get("mr_data"), | |
| "log_data": state.get("log_data"), | |
| "gcp_data": state.get("gcp_data"), | |
| "early_exit": state.get("early_exit") | |
| }, f) | |
| return {} | |
| if stage == "prep": | |
| workflow.add_node("Librarian", librarian_node) | |
| workflow.add_node("Sentinel", sentinel_node) | |
| workflow.add_node("PrepReporter", prep_reporter) | |
| workflow.set_entry_point("Librarian") | |
| workflow.add_edge("Librarian", "Sentinel") | |
| workflow.add_edge("Sentinel", "PrepReporter") | |
| workflow.add_edge("PrepReporter", END) | |
| else: | |
| # For execution stages, skip Librarian and Sentinel if cached | |
| if cached_context: | |
| workflow.add_node("Injector", context_injector_node) | |
| entry = "Injector" | |
| else: | |
| workflow.add_node("Librarian", librarian_node) | |
| workflow.add_node("Sentinel", sentinel_node) | |
| workflow.add_edge("Librarian", "Sentinel") | |
| entry = "Librarian" | |
| workflow.set_entry_point(entry) | |
| if stage == "analyze" or stage == "all": | |
| workflow.add_node("Reasoner", reasoner_node) | |
| workflow.add_node("Reporter", reporter_node) | |
| if cached_context: workflow.add_edge("Injector", "Reasoner") | |
| else: workflow.add_edge("Sentinel", "Reasoner") | |
| workflow.add_edge("Reasoner", "Reporter") | |
| workflow.add_edge("Reporter", END) | |
| elif stage == "finops": | |
| workflow.add_node("CostEstimator", cost_estimator_node) | |
| workflow.add_node("Reporter", reporter_node) | |
| if cached_context: workflow.add_edge("Injector", "CostEstimator") | |
| else: workflow.add_edge("Sentinel", "CostEstimator") | |
| workflow.add_edge("CostEstimator", "Reporter") | |
| workflow.add_edge("Reporter", END) | |
| elif stage == "devops": | |
| workflow.add_node("DevOps", devops_node) | |
| workflow.add_node("Reporter", reporter_node) | |
| if cached_context: workflow.add_edge("Injector", "DevOps") | |
| else: workflow.add_edge("Sentinel", "DevOps") | |
| workflow.add_edge("DevOps", "Reporter") | |
| workflow.add_edge("Reporter", END) | |
| elif stage == "test": | |
| workflow.add_node("TestEngineer", test_engineer_node) | |
| workflow.add_node("Reporter", reporter_node) | |
| if cached_context: workflow.add_edge("Injector", "TestEngineer") | |
| else: workflow.add_edge("Sentinel", "TestEngineer") | |
| workflow.add_edge("TestEngineer", "Reporter") | |
| workflow.add_edge("Reporter", END) | |
| elif stage == "results": | |
| workflow.add_node("Assembler", results_assembler_node) | |
| if cached_context: workflow.add_edge("Injector", "Assembler") | |
| else: workflow.add_edge("Sentinel", "Assembler") | |
| workflow.add_edge("Assembler", END) | |
| app = workflow.compile() | |
| # Run the Pipeline | |
| initial_state = { | |
| "project_id": project_id, | |
| "mr_iid": mr_iid, | |
| "gl_provider": gl, | |
| "gcp_provider": gcp, | |
| "brain_provider": brain, | |
| "stage": stage, | |
| "mermaid_diagram": "", | |
| "error": "", | |
| "early_exit": False, | |
| "cost_estimation": "", | |
| "test_report": "", | |
| "devops_report": "" | |
| } | |
| print("\n🚀 EXECUTING LANGGRAPH PIPELINE...\n") | |
| final_state = app.invoke(initial_state) | |
| print("\n✅ PIPELINE COMPLETE.") | |
| if final_state.get("critical_failure"): | |
| print("\n⛔ EXITED WITH CRITICAL SECURITY ISSUES. FAILING PIPELINE.") | |
| import sys | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="GitLab Context Brain LangGraph Agent") | |
| parser.add_argument("--project_id", default=os.getenv("CI_PROJECT_ID"), help="GitLab Project ID") | |
| parser.add_argument("--mr_iid", default=os.getenv("CI_MERGE_REQUEST_IID"), help="Merge Request IID") | |
| parser.add_argument("--stage", default="all", help="Pipeline Stage (analyze, finops, test)") | |
| args = parser.parse_args() | |
| if os.getenv("AI_FLOW_CONTEXT"): | |
| try: | |
| raw_context = json.loads(os.getenv("AI_FLOW_CONTEXT")) | |
| args.mr_iid = raw_context.get("iid") | |
| except: pass | |
| if args.project_id and args.mr_iid: | |
| build_and_run_graph(args.project_id, args.mr_iid, args.stage) | |
| else: | |
| print("❌ ERROR: Missing project_id or mr_iid.") | |