CodeTribunal / tests /test_integration.py
amine-yagoub's picture
feat: Add initial CodeTribunal implementation
d5341cc
"""Test integration: GritQL evidence → CrewAI agent analysis."""
import os
import subprocess
from pathlib import Path
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, LLM
# Load .env from project root
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
# --- Configuration ---
LOCALE_DIR = os.path.join(os.path.dirname(__file__), "fixtures", "locale")
# Patterns verified against test fixtures.
# JS patterns use // comments, Python patterns use # comments.
# Some patterns target Python specifically via --language flag.
GRITQL_PATTERNS = [
# --- Cross-language: hardcoded secrets ---
{
"category": "hardcoded_secrets_js",
"pattern": '`$VAR = "$VAL"` where { $VAR <: r"(?i).*(password|key|secret|token).*" }',
"language": None, # auto-detect (JS works natively)
},
{
"category": "hardcoded_secrets_py",
"pattern": '`$VAR = $VAL` where { $VAR <: r"(?i).*(PASSWORD|KEY|SECRET|TOKEN).*" }',
"language": "python",
},
# --- Connection strings ---
{
"category": "connection_strings",
"pattern": '`"$CONN"` where { $CONN <: r"mysql://.+" }',
"language": None,
},
# --- TODO / FIXME / HACK comments ---
{
"category": "todo_py",
"pattern": "`# TODO: $_`",
"language": "python",
},
{
"category": "todo_js",
"pattern": "`// TODO: $_`",
"language": None,
},
{
"category": "fixme_py",
"pattern": "`# FIXME: $_`",
"language": "python",
},
{
"category": "fixme_js",
"pattern": "`// FIXME: $_`",
"language": None,
},
{
"category": "hack_py",
"pattern": "`# HACK: $_`",
"language": "python",
},
{
"category": "hack_js",
"pattern": "`// HACK: $_`",
"language": None,
},
# --- Dangerous function calls ---
{
"category": "eval_usage",
"pattern": "`eval($_)`",
"language": "python",
},
{
"category": "pickle_load",
"pattern": "`pickle.load($_)`",
"language": "python",
},
{
"category": "os_system",
"pattern": "`os.system($_)`",
"language": "python",
},
{
"category": "subprocess_shell",
"pattern": "`subprocess.call($_, shell=True)`",
"language": "python",
},
{
"category": "md5_hash",
"pattern": "`hashlib.md5($_)`",
"language": "python",
},
# --- SQL injection ---
{
"category": "sql_injection_fstring",
"pattern": r'`$S` where { $S <: r"f\"SELECT.*\{.*\}\"" }',
"language": "python",
},
{
"category": "sql_injection_js",
"pattern": r'`$STR` where { $STR <: r"`SELECT.*\$\{.*\}`" }',
"language": None,
},
]
def run_gritql(pattern: str, target_dir: str, language: str | None = None) -> dict:
"""Run a single GritQL pattern and return structured results."""
cmd = ["grit", "apply", pattern, target_dir]
if language:
cmd += ["--language", language]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout.strip()
errors = result.stderr.strip()
# Grit prints "Processed X files and found Y matches" to stderr
match_line = [l for l in errors.splitlines() if "found" in l]
return {
"pattern": pattern,
"findings": output or None,
"summary": match_line[0] if match_line else None,
"returncode": result.returncode,
}
except FileNotFoundError:
return {"pattern": pattern, "findings": None, "error": "'grit' CLI not found. Run: npm install -g @getgrit/cli"}
except Exception as e:
return {"pattern": pattern, "findings": None, "error": str(e)}
def gather_evidence(target_dir: str) -> list[dict]:
"""Run all GritQL patterns against the target directory."""
evidence = []
for p in GRITQL_PATTERNS:
print(f" Scanning: {p['category']}...")
result = run_gritql(p["pattern"], target_dir, p.get("language"))
result["category"] = p["category"]
evidence.append(result)
return evidence
def format_evidence_for_agent(evidence: list[dict]) -> str:
"""Format evidence into a readable report for the LLM agent."""
lines = ["=== FORENSIC EVIDENCE REPORT ===\n"]
hits = 0
for item in evidence:
if item.get("findings"):
hits += 1
lines.append(f"--- {item['category'].upper()} ---")
lines.append(f"Pattern: {item['pattern']}")
lines.append(f"Findings:\n{item['findings']}")
lines.append("")
lines.insert(1, f"Total categories with findings: {hits} / {len(evidence)}\n")
return "\n".join(lines)
def run_crewai_analysis(evidence_report: str) -> str:
"""Pass evidence to a CrewAI agent for analysis."""
llm = LLM(
model=os.environ.get("MODEL_NAME", "zai/glm-5.1"),
api_key=os.environ.get("ZAI_API_KEY"),
)
investigator = Agent(
role="Senior Code Forensic Investigator",
goal="Analyze code evidence and identify critical security vulnerabilities and code quality issues",
backstory=(
"You are a veteran code auditor with 15 years of experience. "
"You've seen every trick in the book — from hardcoded credentials to SQL injection. "
"You analyze deterministic scan results and provide clear, severity-ranked findings."
),
llm=llm,
verbose=True,
)
analysis_task = Task(
description=(
"Analyze the following forensic evidence report from a codebase scan. "
"For each finding, assess severity (CRITICAL / HIGH / MEDIUM / LOW), "
"explain the risk, and suggest a fix.\n\n"
f"{evidence_report}"
),
agent=investigator,
expected_output="A structured forensic analysis report with severity-ranked findings.",
)
crew = Crew(
agents=[investigator],
tasks=[analysis_task],
verbose=True,
)
result = crew.kickoff()
return result.raw if hasattr(result, "raw") else str(result)
def main():
print("=" * 60)
print("CodeTribunal Integration Test")
print("=" * 60)
# Phase 1: GritQL evidence gathering
print("\n[Phase 1] Gathering evidence with GritQL...")
evidence = gather_evidence(LOCALE_DIR)
hits = sum(1 for e in evidence if e.get("findings"))
print(f"\n Patterns scanned: {len(evidence)}")
print(f" Hits: {hits}")
evidence_report = format_evidence_for_agent(evidence)
print("\n" + evidence_report)
# Phase 2: CrewAI analysis
api_key = os.environ.get("ZAI_API_KEY")
if not api_key:
print("\n[Phase 2] SKIPPED — set ZAI_API_KEY to test CrewAI integration")
return
print("\n[Phase 2] Running CrewAI analysis with GLM 5.1...")
report = run_crewai_analysis(evidence_report)
print("\n" + "=" * 60)
print("AGENT REPORT")
print("=" * 60)
print(report)
if __name__ == "__main__":
main()