Spaces:
Running
Running
| """Test integration: GritQL evidence → CrewAI agent analysis.""" | |
| import os | |
| import subprocess | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from crewai import Agent, Task, Crew, LLM | |
| # Load .env from project root | |
| load_dotenv(Path(__file__).resolve().parent.parent / ".env") | |
| # --- Configuration --- | |
| LOCALE_DIR = os.path.join(os.path.dirname(__file__), "fixtures", "locale") | |
| # Patterns verified against test fixtures. | |
| # JS patterns use // comments, Python patterns use # comments. | |
| # Some patterns target Python specifically via --language flag. | |
| GRITQL_PATTERNS = [ | |
| # --- Cross-language: hardcoded secrets --- | |
| { | |
| "category": "hardcoded_secrets_js", | |
| "pattern": '`$VAR = "$VAL"` where { $VAR <: r"(?i).*(password|key|secret|token).*" }', | |
| "language": None, # auto-detect (JS works natively) | |
| }, | |
| { | |
| "category": "hardcoded_secrets_py", | |
| "pattern": '`$VAR = $VAL` where { $VAR <: r"(?i).*(PASSWORD|KEY|SECRET|TOKEN).*" }', | |
| "language": "python", | |
| }, | |
| # --- Connection strings --- | |
| { | |
| "category": "connection_strings", | |
| "pattern": '`"$CONN"` where { $CONN <: r"mysql://.+" }', | |
| "language": None, | |
| }, | |
| # --- TODO / FIXME / HACK comments --- | |
| { | |
| "category": "todo_py", | |
| "pattern": "`# TODO: $_`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "todo_js", | |
| "pattern": "`// TODO: $_`", | |
| "language": None, | |
| }, | |
| { | |
| "category": "fixme_py", | |
| "pattern": "`# FIXME: $_`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "fixme_js", | |
| "pattern": "`// FIXME: $_`", | |
| "language": None, | |
| }, | |
| { | |
| "category": "hack_py", | |
| "pattern": "`# HACK: $_`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "hack_js", | |
| "pattern": "`// HACK: $_`", | |
| "language": None, | |
| }, | |
| # --- Dangerous function calls --- | |
| { | |
| "category": "eval_usage", | |
| "pattern": "`eval($_)`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "pickle_load", | |
| "pattern": "`pickle.load($_)`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "os_system", | |
| "pattern": "`os.system($_)`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "subprocess_shell", | |
| "pattern": "`subprocess.call($_, shell=True)`", | |
| "language": "python", | |
| }, | |
| { | |
| "category": "md5_hash", | |
| "pattern": "`hashlib.md5($_)`", | |
| "language": "python", | |
| }, | |
| # --- SQL injection --- | |
| { | |
| "category": "sql_injection_fstring", | |
| "pattern": r'`$S` where { $S <: r"f\"SELECT.*\{.*\}\"" }', | |
| "language": "python", | |
| }, | |
| { | |
| "category": "sql_injection_js", | |
| "pattern": r'`$STR` where { $STR <: r"`SELECT.*\$\{.*\}`" }', | |
| "language": None, | |
| }, | |
| ] | |
| def run_gritql(pattern: str, target_dir: str, language: str | None = None) -> dict: | |
| """Run a single GritQL pattern and return structured results.""" | |
| cmd = ["grit", "apply", pattern, target_dir] | |
| if language: | |
| cmd += ["--language", language] | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| ) | |
| output = result.stdout.strip() | |
| errors = result.stderr.strip() | |
| # Grit prints "Processed X files and found Y matches" to stderr | |
| match_line = [l for l in errors.splitlines() if "found" in l] | |
| return { | |
| "pattern": pattern, | |
| "findings": output or None, | |
| "summary": match_line[0] if match_line else None, | |
| "returncode": result.returncode, | |
| } | |
| except FileNotFoundError: | |
| return {"pattern": pattern, "findings": None, "error": "'grit' CLI not found. Run: npm install -g @getgrit/cli"} | |
| except Exception as e: | |
| return {"pattern": pattern, "findings": None, "error": str(e)} | |
| def gather_evidence(target_dir: str) -> list[dict]: | |
| """Run all GritQL patterns against the target directory.""" | |
| evidence = [] | |
| for p in GRITQL_PATTERNS: | |
| print(f" Scanning: {p['category']}...") | |
| result = run_gritql(p["pattern"], target_dir, p.get("language")) | |
| result["category"] = p["category"] | |
| evidence.append(result) | |
| return evidence | |
| def format_evidence_for_agent(evidence: list[dict]) -> str: | |
| """Format evidence into a readable report for the LLM agent.""" | |
| lines = ["=== FORENSIC EVIDENCE REPORT ===\n"] | |
| hits = 0 | |
| for item in evidence: | |
| if item.get("findings"): | |
| hits += 1 | |
| lines.append(f"--- {item['category'].upper()} ---") | |
| lines.append(f"Pattern: {item['pattern']}") | |
| lines.append(f"Findings:\n{item['findings']}") | |
| lines.append("") | |
| lines.insert(1, f"Total categories with findings: {hits} / {len(evidence)}\n") | |
| return "\n".join(lines) | |
| def run_crewai_analysis(evidence_report: str) -> str: | |
| """Pass evidence to a CrewAI agent for analysis.""" | |
| llm = LLM( | |
| model=os.environ.get("MODEL_NAME", "zai/glm-5.1"), | |
| api_key=os.environ.get("ZAI_API_KEY"), | |
| ) | |
| investigator = Agent( | |
| role="Senior Code Forensic Investigator", | |
| goal="Analyze code evidence and identify critical security vulnerabilities and code quality issues", | |
| backstory=( | |
| "You are a veteran code auditor with 15 years of experience. " | |
| "You've seen every trick in the book — from hardcoded credentials to SQL injection. " | |
| "You analyze deterministic scan results and provide clear, severity-ranked findings." | |
| ), | |
| llm=llm, | |
| verbose=True, | |
| ) | |
| analysis_task = Task( | |
| description=( | |
| "Analyze the following forensic evidence report from a codebase scan. " | |
| "For each finding, assess severity (CRITICAL / HIGH / MEDIUM / LOW), " | |
| "explain the risk, and suggest a fix.\n\n" | |
| f"{evidence_report}" | |
| ), | |
| agent=investigator, | |
| expected_output="A structured forensic analysis report with severity-ranked findings.", | |
| ) | |
| crew = Crew( | |
| agents=[investigator], | |
| tasks=[analysis_task], | |
| verbose=True, | |
| ) | |
| result = crew.kickoff() | |
| return result.raw if hasattr(result, "raw") else str(result) | |
| def main(): | |
| print("=" * 60) | |
| print("CodeTribunal Integration Test") | |
| print("=" * 60) | |
| # Phase 1: GritQL evidence gathering | |
| print("\n[Phase 1] Gathering evidence with GritQL...") | |
| evidence = gather_evidence(LOCALE_DIR) | |
| hits = sum(1 for e in evidence if e.get("findings")) | |
| print(f"\n Patterns scanned: {len(evidence)}") | |
| print(f" Hits: {hits}") | |
| evidence_report = format_evidence_for_agent(evidence) | |
| print("\n" + evidence_report) | |
| # Phase 2: CrewAI analysis | |
| api_key = os.environ.get("ZAI_API_KEY") | |
| if not api_key: | |
| print("\n[Phase 2] SKIPPED — set ZAI_API_KEY to test CrewAI integration") | |
| return | |
| print("\n[Phase 2] Running CrewAI analysis with GLM 5.1...") | |
| report = run_crewai_analysis(evidence_report) | |
| print("\n" + "=" * 60) | |
| print("AGENT REPORT") | |
| print("=" * 60) | |
| print(report) | |
| if __name__ == "__main__": | |
| main() | |