CodeLens / scripts /baseline.py
ArshVerma's picture
feat: finalize CodeLens. rebranding and production environment polish
adea8c3
Raw
History Blame Contribute Delete
13.7 kB
import argparse
import sys
import json
import csv
import time
import requests
from typing import List, Optional
# Each rule: (search_term, category, severity, description_template)
RULES = [
# Bug rules
("range(len(", "bug", "medium", "Off-by-one risk: use enumerate() instead of range(len())"),
("except Exception", "bug", "low", "Broad exception catch hides errors; catch specific exception types"),
("except:", "bug", "low", "Bare except catches all exceptions including SystemExit and KeyboardInterrupt"),
(".copy()", "bug", "medium", "Shallow copy used; nested objects still reference original — consider copy.deepcopy()"),
("== 0.0", "bug", "medium", "Float equality comparison is unreliable due to floating-point precision"),
("== True", "bug", "low", "Identity comparison with True; use truthiness check instead"),
("mutable default", "bug", "medium", "Mutable default argument causes state leakage between function calls"),
("def build_", "bug", "medium", "Check for mutable default arguments in builder functions"),
("global ", "bug", "high", "Global variable mutation without lock is a race condition in multi-threaded context"),
# Security rules
("SQL", "security", "critical", "Potential SQL injection: use parameterized queries, never string formatting"),
("f\"SELECT", "security", "critical", "SQL injection via f-string: use db.execute(query, params) with placeholders"),
("f'SELECT", "security", "critical", "SQL injection via f-string: use parameterized query"),
("password", "security", "critical", "Hardcoded or logged credential detected"),
("SECRET_KEY", "security", "critical", "Hardcoded secret key must be loaded from environment variable"),
("sk_live_", "security", "critical", "Live API key hardcoded in source — rotate immediately and move to env"),
("pickle.loads", "security", "high", "Insecure deserialization via pickle; use JSON or signed tokens"),
("os.system(", "security", "critical", "Command injection risk: use subprocess.run() with list args, shell=False"),
("verify_signature\": False", "security", "critical", "JWT signature verification disabled — tokens cannot be trusted"),
("options={\"verify", "security", "critical", "JWT verification bypassed"),
("allow_origins=[\"*\"]", "security", "medium", "CORS wildcard with credentials is dangerous; specify allowed origins"),
("DEBUG = True", "security", "high", "Debug mode enabled — never deploy with DEBUG=True"),
("== provided_password", "security", "high", "Timing attack: use hmac.compare_digest() or secrets.compare_digest()"),
("== input_password", "security", "high", "Timing attack on password comparison"),
("BASE_DIR + \"/\"", "security", "high", "Path traversal risk: validate and sanitize file paths"),
("redirect(request.args", "security", "medium", "Open redirect: validate redirect target against allowlist"),
# Architecture rules
("requests.get(", "architecture", "medium", "Blocking HTTP call: use httpx.AsyncClient in async context"),
("requests.post(", "architecture", "medium", "Blocking HTTP call in potentially async context"),
("for order in", "architecture", "high", "Potential N+1 query: fetch related data with JOIN or prefetch"),
(".all()", "architecture", "high", "Unbounded query: add pagination with .limit() and .offset()"),
("logger.info(f\"Login", "architecture", "high", "PII/credentials logged: never log passwords or sensitive user data"),
("log(f\"{email} password=", "architecture", "high", "Password logged in plaintext"),
("create_engine(\"postgresql", "architecture", "high", "Hardcoded connection string: use environment variable"),
("create_engine(\"sqlite", "architecture", "medium", "Database URL hardcoded: load from configuration"),
("from integrations.", "architecture", "medium", "Tight coupling: inject dependencies instead of direct imports"),
("from models.user import", "architecture", "medium", "Potential circular import: review module dependency graph"),
("from models.order import", "architecture", "medium", "Potential circular import: review module dependency graph"),
("# Use API key:", "architecture", "medium", "Secret documented in code comment: remove and use secret manager"),
]
class KeywordAgent:
"""
Heuristic agent that scans diffs for known issue patterns.
Covers all 30 scenarios with targeted keywords.
"""
def decide(self, observation: dict) -> dict:
"""
Analyze the diff and return the next action dict.
Yields FLAG_ISSUE for first unacted matching rule, then APPROVE.
"""
diff = observation.get("diff", "")
flagged_lines = set()
# Track already flagged issues in history (if any)
history = observation.get("history", [])
for entry in history:
if isinstance(entry, dict) and entry.get("line_number"):
flagged_lines.add(entry["line_number"])
for search_term, category, severity, description in RULES:
if search_term.lower() in diff.lower():
# Find line number
line_no = 1
for i, line in enumerate(diff.split("\n"), 1):
if search_term.lower() in line.lower() and i not in flagged_lines:
line_no = i
flagged_lines.add(i)
files = observation.get("files_changed", [])
filename = files[0]["filename"] if files else "unknown"
return {
"action_type": "flag_issue",
"body": description,
"filename": filename,
"line_number": line_no,
"severity": severity,
"category": category
}
# No more issues found — terminal action
return {
"action_type": "approve",
"body": "Review complete. No further issues identified.",
"verdict": "lgtm"
}
class LLMAgent:
"""
Agent powered by Claude claude-sonnet-4-20250514 via Anthropic API.
Requires ANTHROPIC_API_KEY or --api-key argument.
"""
SYSTEM_PROMPT = """You are a senior software engineer performing a code review.
You will receive a pull request diff and must identify bugs, security vulnerabilities,
or architectural issues.
For each issue you find, respond with a JSON object (one per response):
{
"action_type": "flag_issue",
"body": "<detailed description of the issue and how to fix it>",
"filename": "<filename from the diff>",
"line_number": <line number where issue occurs>,
"severity": "<critical|high|medium|low|info>",
"category": "<bug|security|architecture|style|performance>"
}
When you have flagged all issues, respond with:
{
"action_type": "approve",
"body": "<summary of review>",
"verdict": "lgtm"
}
If there are serious issues that block merge:
{
"action_type": "request_changes",
"body": "<summary of required changes>",
"verdict": "request_changes"
}
Respond ONLY with the JSON object. No markdown, no explanation outside the JSON."""
def __init__(self, api_key: str):
self.api_key = api_key
self.history = []
def decide(self, observation: dict) -> dict:
import json
import urllib.request
diff = observation.get("diff", "")
pr_title = observation.get("pr_title", "")
step = observation.get("step_count", 0)
user_content = f"PR Title: {pr_title}\n\nDiff:\n{diff}\n\nStep {step}: What is your next review action?"
self.history.append({"role": "user", "content": user_content})
payload = json.dumps({
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"system": self.SYSTEM_PROMPT,
"messages": self.history
}).encode()
req = urllib.request.Request(
"https://api.anthropic.com/v1/messages",
data=payload,
headers={
"Content-Type": "application/json",
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01"
}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
text = data["content"][0]["text"].strip()
# Strip markdown fences if present
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
action = json.loads(text)
self.history.append({"role": "assistant", "content": text})
return action
except Exception as e:
# Fall back to approve on error
return {"action_type": "approve", "body": f"LLM error, approving: {e}", "verdict": "lgtm"}
def run_episode(url: str, task_id: str, seed: int, agent, verbose: bool = False) -> dict:
"""
Run a complete evaluation episode.
Returns result dict with final_score, steps, episode_id.
"""
import requests
import time
start_time = time.time()
# Reset
resp = requests.post(f"{url}/reset", json={"task_id": task_id, "seed": seed}, timeout=10)
resp.raise_for_status()
data = resp.json()
episode_id = data["episode_id"]
obs = data["result"]["observation"]
if verbose:
print(f"\n{'='*60}")
print(f"Episode: {episode_id}")
print(f"Task: {task_id}, Seed: {seed}, Scenario: {obs.get('scenario_hash', '?')}")
print(f"{'='*60}")
done = False
steps = 0
while not done:
action = agent.decide(obs)
if verbose:
print(f"\nStep {steps + 1}: {action.get('action_type')} \u2014 {action.get('body', '')[:80]}")
step_resp = requests.post(f"{url}/step/{episode_id}", json=action, timeout=10)
step_resp.raise_for_status()
step_data = step_resp.json()
obs = step_data["observation"]
done = step_data.get("done", False)
steps += 1
# Get final result
result_resp = requests.get(f"{url}/result/{episode_id}", timeout=10)
result_resp.raise_for_status()
result = result_resp.json()
duration = time.time() - start_time
return {
"episode_id": episode_id,
"task_id": task_id,
"seed": seed,
"final_score": result.get("final_score", 0.0),
"steps_taken": result.get("steps_taken", steps),
"issues_found": result.get("issues_found", 0),
"issues_total": result.get("issues_total", 0),
"noise_penalties": result.get("noise_penalties", 0),
"terminated_reason": result.get("terminated_reason", "unknown"),
"duration_seconds": round(duration, 2)
}
def save_results(results: list, output_path: str):
import json, csv
if output_path.endswith(".json"):
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
elif output_path.endswith(".csv"):
if results:
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
def main():
parser = argparse.ArgumentParser(description="CodeLens Baseline Agent")
parser.add_argument("--url", default="http://localhost:7860")
parser.add_argument("--task", default="bug_detection",
choices=["bug_detection", "security_audit", "architectural_review"])
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--agent", default="keyword", choices=["keyword", "llm"])
parser.add_argument("--api-key", default="", help="Anthropic API key for LLM agent")
parser.add_argument("--output", default="", help="Output file (.json or .csv)")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--max-steps", type=int, default=None, help="Override max steps (for testing)")
args = parser.parse_args()
# Create agent
if args.agent == "llm":
import os
api_key = args.api_key or os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
print("ERROR: LLM agent requires --api-key or ANTHROPIC_API_KEY env var")
sys.exit(1)
agent = LLMAgent(api_key)
else:
agent = KeywordAgent()
# Check API connectivity
try:
import requests
health = requests.get(f"{args.url}/health", timeout=5)
health.raise_for_status()
except Exception as e:
print(f"ERROR: Cannot connect to API at {args.url}: {e}")
sys.exit(1)
# Run episode
try:
result = run_episode(args.url, args.task, args.seed, agent, args.verbose)
print(f"\nResult: score={result['final_score']:.3f} "
f"issues={result['issues_found']}/{result['issues_total']} "
f"steps={result['steps_taken']} "
f"reason={result['terminated_reason']}")
# Save output
if args.output:
save_results([result], args.output)
print(f"Results saved to: {args.output}")
except Exception as e:
print(f"Episode failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()