"""Test integration: GritQL evidence → CrewAI agent analysis.""" import os import subprocess from pathlib import Path from dotenv import load_dotenv from crewai import Agent, Task, Crew, LLM # Load .env from project root load_dotenv(Path(__file__).resolve().parent.parent / ".env") # --- Configuration --- LOCALE_DIR = os.path.join(os.path.dirname(__file__), "fixtures", "locale") # Patterns verified against test fixtures. # JS patterns use // comments, Python patterns use # comments. # Some patterns target Python specifically via --language flag. GRITQL_PATTERNS = [ # --- Cross-language: hardcoded secrets --- { "category": "hardcoded_secrets_js", "pattern": '`$VAR = "$VAL"` where { $VAR <: r"(?i).*(password|key|secret|token).*" }', "language": None, # auto-detect (JS works natively) }, { "category": "hardcoded_secrets_py", "pattern": '`$VAR = $VAL` where { $VAR <: r"(?i).*(PASSWORD|KEY|SECRET|TOKEN).*" }', "language": "python", }, # --- Connection strings --- { "category": "connection_strings", "pattern": '`"$CONN"` where { $CONN <: r"mysql://.+" }', "language": None, }, # --- TODO / FIXME / HACK comments --- { "category": "todo_py", "pattern": "`# TODO: $_`", "language": "python", }, { "category": "todo_js", "pattern": "`// TODO: $_`", "language": None, }, { "category": "fixme_py", "pattern": "`# FIXME: $_`", "language": "python", }, { "category": "fixme_js", "pattern": "`// FIXME: $_`", "language": None, }, { "category": "hack_py", "pattern": "`# HACK: $_`", "language": "python", }, { "category": "hack_js", "pattern": "`// HACK: $_`", "language": None, }, # --- Dangerous function calls --- { "category": "eval_usage", "pattern": "`eval($_)`", "language": "python", }, { "category": "pickle_load", "pattern": "`pickle.load($_)`", "language": "python", }, { "category": "os_system", "pattern": "`os.system($_)`", "language": "python", }, { "category": "subprocess_shell", "pattern": "`subprocess.call($_, shell=True)`", "language": "python", }, { "category": "md5_hash", "pattern": "`hashlib.md5($_)`", "language": "python", }, # --- SQL injection --- { "category": "sql_injection_fstring", "pattern": r'`$S` where { $S <: r"f\"SELECT.*\{.*\}\"" }', "language": "python", }, { "category": "sql_injection_js", "pattern": r'`$STR` where { $STR <: r"`SELECT.*\$\{.*\}`" }', "language": None, }, ] def run_gritql(pattern: str, target_dir: str, language: str | None = None) -> dict: """Run a single GritQL pattern and return structured results.""" cmd = ["grit", "apply", pattern, target_dir] if language: cmd += ["--language", language] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, ) output = result.stdout.strip() errors = result.stderr.strip() # Grit prints "Processed X files and found Y matches" to stderr match_line = [l for l in errors.splitlines() if "found" in l] return { "pattern": pattern, "findings": output or None, "summary": match_line[0] if match_line else None, "returncode": result.returncode, } except FileNotFoundError: return {"pattern": pattern, "findings": None, "error": "'grit' CLI not found. Run: npm install -g @getgrit/cli"} except Exception as e: return {"pattern": pattern, "findings": None, "error": str(e)} def gather_evidence(target_dir: str) -> list[dict]: """Run all GritQL patterns against the target directory.""" evidence = [] for p in GRITQL_PATTERNS: print(f" Scanning: {p['category']}...") result = run_gritql(p["pattern"], target_dir, p.get("language")) result["category"] = p["category"] evidence.append(result) return evidence def format_evidence_for_agent(evidence: list[dict]) -> str: """Format evidence into a readable report for the LLM agent.""" lines = ["=== FORENSIC EVIDENCE REPORT ===\n"] hits = 0 for item in evidence: if item.get("findings"): hits += 1 lines.append(f"--- {item['category'].upper()} ---") lines.append(f"Pattern: {item['pattern']}") lines.append(f"Findings:\n{item['findings']}") lines.append("") lines.insert(1, f"Total categories with findings: {hits} / {len(evidence)}\n") return "\n".join(lines) def run_crewai_analysis(evidence_report: str) -> str: """Pass evidence to a CrewAI agent for analysis.""" llm = LLM( model=os.environ.get("MODEL_NAME", "zai/glm-5.1"), api_key=os.environ.get("ZAI_API_KEY"), ) investigator = Agent( role="Senior Code Forensic Investigator", goal="Analyze code evidence and identify critical security vulnerabilities and code quality issues", backstory=( "You are a veteran code auditor with 15 years of experience. " "You've seen every trick in the book — from hardcoded credentials to SQL injection. " "You analyze deterministic scan results and provide clear, severity-ranked findings." ), llm=llm, verbose=True, ) analysis_task = Task( description=( "Analyze the following forensic evidence report from a codebase scan. " "For each finding, assess severity (CRITICAL / HIGH / MEDIUM / LOW), " "explain the risk, and suggest a fix.\n\n" f"{evidence_report}" ), agent=investigator, expected_output="A structured forensic analysis report with severity-ranked findings.", ) crew = Crew( agents=[investigator], tasks=[analysis_task], verbose=True, ) result = crew.kickoff() return result.raw if hasattr(result, "raw") else str(result) def main(): print("=" * 60) print("CodeTribunal Integration Test") print("=" * 60) # Phase 1: GritQL evidence gathering print("\n[Phase 1] Gathering evidence with GritQL...") evidence = gather_evidence(LOCALE_DIR) hits = sum(1 for e in evidence if e.get("findings")) print(f"\n Patterns scanned: {len(evidence)}") print(f" Hits: {hits}") evidence_report = format_evidence_for_agent(evidence) print("\n" + evidence_report) # Phase 2: CrewAI analysis api_key = os.environ.get("ZAI_API_KEY") if not api_key: print("\n[Phase 2] SKIPPED — set ZAI_API_KEY to test CrewAI integration") return print("\n[Phase 2] Running CrewAI analysis with GLM 5.1...") report = run_crewai_analysis(evidence_report) print("\n" + "=" * 60) print("AGENT REPORT") print("=" * 60) print(report) if __name__ == "__main__": main()