CodeLens / codelens_env /scenarios.py
ArshVerma's picture
feat: finalize CodeLens. rebranding and production environment polish
adea8c3
Raw
History Blame Contribute Delete
32.3 kB
from codelens_env.models import Scenario, FileChanged, GroundTruthIssue, Category, Severity, TaskId, Verdict
def get_scenario(task_id: TaskId, seed: int) -> Scenario:
scenarios = [s for s in ALL_SCENARIOS if s.task_id == task_id]
if not scenarios:
raise ValueError(f"No scenarios found for task: {task_id}")
return scenarios[seed % len(scenarios)]
def all_scenarios() -> list[Scenario]:
return ALL_SCENARIOS
# --- BUG DETECTION SCENARIOS ---
bug_001 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Add pagination to user list endpoint",
pr_description="Processing elements in the list but missing the last one due to range(len(x)-1).",
files_changed=[
FileChanged(
filename="api/users.py",
language="python",
patch="""--- a/api/users.py
+++ b/api/users.py
@@ -10,3 +10,3 @@
def get_users(page, size):
items = db.get_all_users()
- return items[page * size : (page + 1) * size]
+ return items[page * size : page * size + size - 1]""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_001",
category=Category.BUG,
severity=Severity.MEDIUM,
filename="api/users.py",
line_number=12,
description="Off-by-one error in pagination slice loses last item per page",
keywords=["off-by-one", "pagination"]
)
],
hash="bug_001",
difficulty="easy"
)
bug_002 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Refactor user profile builder",
pr_description="New helper to fetch data with a default empty list for items.",
files_changed=[
FileChanged(
filename="models/profile.py",
language="python",
patch="""--- a/models/profile.py
+++ b/models/profile.py
@@ -3,3 +3,5 @@
-def build_profile(name, tags=None):
- tags = tags or []
+def build_profile(name, tags=[]):
+ tags.append("user")
+ return {"name": name, "tags": tags}""",
additions=3,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_002",
category=Category.BUG,
severity=Severity.MEDIUM,
filename="models/profile.py",
line_number=5,
description="Mutable default argument causes state leakage between calls",
keywords=["mutable", "default"]
)
],
hash="bug_002",
difficulty="easy"
)
bug_003 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Add session-based auth check",
pr_description="Lookup user by ID and access properties without guard.",
files_changed=[
FileChanged(
filename="auth.py",
language="python",
patch="""--- a/auth.py
+++ b/auth.py
@@ -14,3 +14,3 @@
def check_auth(session_id):
user = get_user(session_id)
- if user and user.is_active:
+ return user.is_admin""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_003",
category=Category.BUG,
severity=Severity.HIGH,
filename="auth.py",
line_number=16,
description="None dereference — get_user can return None, user.is_admin will crash",
keywords=["None", "dereference"]
)
],
hash="bug_003",
difficulty="medium"
)
bug_004 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Add global request counter",
pr_description="Parallel threads updating shared cache without locking.",
files_changed=[
FileChanged(
filename="middleware/counter.py",
language="python",
patch="""--- a/middleware/counter.py
+++ b/middleware/counter.py
@@ -5,3 +5,3 @@
-def increment():
- with lock:
- global count
- count += 1
+def increment():
+ global count
+ count += 1""",
additions=2,
deletions=3,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_004",
category=Category.BUG,
severity=Severity.HIGH,
filename="middleware/counter.py",
line_number=7,
description="Race condition in counter update: multiple threads may overwrite each other's increments.",
keywords=["race condition", "thread"]
)
],
hash="bug_004",
difficulty="hard"
)
bug_005 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Handle DB connection errors",
pr_description="Swallow all errors during data import.",
files_changed=[
FileChanged(
filename="db/connection.py",
language="python",
patch="""--- a/db/connection.py
+++ b/db/connection.py
@@ -8,3 +8,3 @@
- except psycopg2.OperationalError:
- log.error("DB down")
+ except Exception:
+ pass""",
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_005",
category=Category.BUG,
severity=Severity.MEDIUM,
filename="db/connection.py",
line_number=9,
description="Broad exception catch-all suppresses real errors and hides bugs.",
keywords=["broad exception", "catch"]
)
],
hash="bug_005",
difficulty="medium"
)
bug_006 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Add score percentage calculator",
pr_description="Integer division result truncated.",
files_changed=[
FileChanged(
filename="scoring/calc.py",
language="python",
patch="""--- a/scoring/calc.py
+++ b/scoring/calc.py
@@ -4,3 +4,3 @@
def get_percentage(score, total):
- return (score / total) * 100
+ return score / total""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_006",
category=Category.BUG,
severity=Severity.LOW,
filename="scoring/calc.py",
line_number=5,
description="Integer division truncation or missing multiplier in percentage calculation",
keywords=["division", "truncat"]
)
],
hash="bug_006",
difficulty="medium"
)
bug_007 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Simplify status checker",
pr_description="Unreachable code after return.",
files_changed=[
FileChanged(
filename="utils/status.py",
language="python",
patch="""--- a/utils/status.py
+++ b/utils/status.py
@@ -5,5 +5,3 @@
def is_active(user):
- if user.deleted:
- return False
- return user.active
+ return True
+ log.info("Checked user status")""",
additions=2,
deletions=3,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_007",
category=Category.BUG,
severity=Severity.LOW,
filename="utils/status.py",
line_number=8,
description="Unreachable code after return statement",
keywords=["unreachable", "dead code"]
)
],
hash="bug_007",
difficulty="medium"
)
bug_008 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Parse webhook payload",
pr_description="Dict key assumed present — will KeyError if user absent.",
files_changed=[
FileChanged(
filename="webhooks/parser.py",
language="python",
patch="""--- a/webhooks/parser.py
+++ b/webhooks/parser.py
@@ -12,2 +12,2 @@
def parse_event(data):
- email = data.get("user", {}).get("email")
+ email = data["user"]["email"]""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_008",
category=Category.BUG,
severity=Severity.HIGH,
filename="webhooks/parser.py",
line_number=13,
description="Unsafe dictionary access will raise KeyError if 'user' or 'email' keys are missing",
keywords=["KeyError", "dict"]
)
],
hash="bug_008",
difficulty="medium"
)
bug_009 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Add balance check to payment flow",
pr_description="Check if sensor reading is exactly 0.0.",
files_changed=[
FileChanged(
filename="payments/validator.py",
language="python",
patch="""--- a/payments/validator.py
+++ b/payments/validator.py
@@ -7,3 +7,3 @@
def validate_tx(balance, amount):
- if balance < 0.01:
+ if balance == 0.0:
return False""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_009",
category=Category.BUG,
severity=Severity.MEDIUM,
filename="payments/validator.py",
line_number=8,
description="Floating point equality comparison is unreliable due to precision issues",
keywords=["float", "comparison"]
)
],
hash="bug_009",
difficulty="medium"
)
bug_010 = Scenario(
task_id=TaskId.BUG_DETECTION,
pr_title="Clone user config before mutation",
pr_description="Shallow copy treated as deep copy — affects original.",
files_changed=[
FileChanged(
filename="config/user_config.py",
language="python",
patch="""--- a/config/user_config.py
+++ b/config/user_config.py
@@ -10,3 +10,3 @@
def update_config(original):
- import copy
- cfg = copy.deepcopy(original)
+ cfg = original.copy()
+ cfg["settings"]["theme"] = "dark" """,
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="bug_010",
category=Category.BUG,
severity=Severity.MEDIUM,
filename="config/user_config.py",
line_number=11,
description="Shallow copy used for nested dictionary mutation; will modify the original object",
keywords=["shallow copy", "deep copy"]
)
],
hash="bug_010",
difficulty="medium"
)
# --- SECURITY AUDIT SCENARIOS ---
sec_001 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add user search endpoint",
pr_description="Bypassing ORM for a raw SQL query.",
files_changed=[
FileChanged(
filename="api/search.py",
language="python",
patch="""--- a/api/search.py
+++ b/api/search.py
@@ -15,3 +15,3 @@
def find_user(name):
- return db.users.filter(name=name).first()
+ query = f"SELECT * FROM users WHERE name = '{name}'"
+ return db.execute_raw(query)""",
additions=2,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_001",
category=Category.SECURITY,
severity=Severity.CRITICAL,
filename="api/search.py",
line_number=16,
description="SQL injection vulnerability via f-string in raw query. Use parameterized queries.",
keywords=["SQL injection", "injection"]
)
],
hash="sec_001",
difficulty="hard"
)
sec_002 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add Stripe webhook handler",
pr_description="Hardcoded secret key in configuration.",
files_changed=[
FileChanged(
filename="payments/webhook.py",
language="python",
patch="""--- a/payments/webhook.py
+++ b/payments/webhook.py
@@ -5,1 +5,1 @@
-stripe_secret = os.getenv("STRIPE_SECRET")
+SECRET_KEY = "sk_live_dummy_secret_key_for_testing_12345" """,
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_002",
category=Category.SECURITY,
severity=Severity.CRITICAL,
filename="payments/webhook.py",
line_number=5,
description="Hardcoded sensitive credentials in source code",
keywords=["hardcoded", "secret"]
)
],
hash="sec_002",
difficulty="easy"
)
sec_003 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add file download endpoint",
pr_description="New endpoint to read local audit logs based on path (no sanitization).",
files_changed=[
FileChanged(
filename="api/files.py",
language="python",
patch="""--- a/api/files.py
+++ b/api/files.py
@@ -10,3 +10,3 @@
def download_file(user_input):
- safe_path = os.path.join(BASE_DIR, os.path.basename(user_input))
- return open(safe_path, "rb").read()
+ filepath = BASE_DIR + "/" + user_input
+ return open(filepath, "rb").read()""",
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_003",
category=Category.SECURITY,
severity=Severity.HIGH,
filename="api/files.py",
line_number=11,
description="Path traversal vulnerability: user input is directly concatenated to the base path",
keywords=["path traversal", "directory traversal"]
)
],
hash="sec_003",
difficulty="medium"
)
sec_004 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add system ping utility",
pr_description="Command injection using os.system with user input.",
files_changed=[
FileChanged(
filename="utils/network.py",
language="python",
patch="""--- a/utils/network.py
+++ b/utils/network.py
@@ -8,3 +8,3 @@
def ping_host(host):
- import subprocess
- return subprocess.run(["ping", "-c", "1", host])
+ import os
+ os.system(f"ping -c 1 {host}")""",
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_004",
category=Category.SECURITY,
severity=Severity.CRITICAL,
filename="utils/network.py",
line_number=10,
description="Command injection vulnerability via os.system and shell formatting",
keywords=["command injection", "os.system"]
)
],
hash="sec_004",
difficulty="medium"
)
sec_005 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add session state caching",
pr_description="Faster state loading by using pickle format for internal caches.",
files_changed=[
FileChanged(
filename="cache/session.py",
language="python",
patch="""--- a/cache/session.py
+++ b/cache/session.py
@@ -10,3 +10,3 @@
def get_session(key):
- data = redis.get(key)
- return json.loads(data)
+ import pickle
+ return pickle.loads(redis.get(key))""",
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_005",
category=Category.SECURITY,
severity=Severity.HIGH,
filename="cache/session.py",
line_number=12,
description="Insecure deserialization using pickle leads to Arbitrary Code Execution (RCE)",
keywords=["pickle", "deserialization"]
)
],
hash="sec_005",
difficulty="medium"
)
sec_006 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add JWT decode helper",
pr_description="Allow bypassing JWT checks for faster local development loop.",
files_changed=[
FileChanged(
filename="auth/jwt_helper.py",
language="python",
patch="""--- a/auth/jwt_helper.py
+++ b/auth/jwt_helper.py
@@ -15,3 +15,3 @@
def decode_token(token):
- return jwt.decode(token, SECRET, algorithms=["HS256"])
+ return jwt.decode(token, options={"verify_signature": False})""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_006",
category=Category.SECURITY,
severity=Severity.CRITICAL,
filename="auth/jwt_helper.py",
line_number=16,
description="JWT decoded without signature verification; attackers can forge any account",
keywords=["JWT", "signature"]
)
],
hash="sec_006",
difficulty="hard"
)
sec_007 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add login redirect",
pr_description="Allow all origins for login redirect.",
files_changed=[
FileChanged(
filename="views/auth.py",
language="python",
patch="""--- a/views/auth.py
+++ b/views/auth.py
@@ -20,3 +20,3 @@
def login_complete(request):
- next_url = validate_internal_url(request.args.get("next"))
- return redirect(next_url or "/dashboard")
+ return redirect(request.args.get("next"))""",
additions=1,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_007",
category=Category.SECURITY,
severity=Severity.MEDIUM,
filename="views/auth.py",
line_number=21,
description="Open redirect vulnerability allows attackers to phish users",
keywords=["open redirect", "redirect"]
)
],
hash="sec_007",
difficulty="medium"
)
sec_008 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Update app configuration",
pr_description="DEBUG mode enabled in production settings.",
files_changed=[
FileChanged(
filename="config/settings.py",
language="python",
patch="""--- a/config/settings.py
+++ b/config/settings.py
@@ -35,3 +35,4 @@
-# Production settings
-DEBUG = False
-TESTING = False
+# Debug settings for prod troubleshooting
+DEBUG = True
+TESTING = True""",
additions=3,
deletions=3,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_008",
category=Category.SECURITY,
severity=Severity.HIGH,
filename="config/settings.py",
line_number=37,
description="DEBUG mode enabled in production settings discloses system secrets",
keywords=["debug", "production"]
)
],
hash="sec_008",
difficulty="easy"
)
sec_009 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Enable CORS for frontend",
pr_description="Resolving frontend browser errors by allowing all origins.",
files_changed=[
FileChanged(
filename="app.py",
language="python",
patch="""--- a/app.py
+++ b/app.py
@@ -55,3 +55,3 @@
app.add_middleware(CORSMiddleware,
- allow_origins=["https://secure.app.com"],
+ allow_origins=["*"],
allow_credentials=True)""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_009",
category=Category.SECURITY,
severity=Severity.MEDIUM,
filename="app.py",
line_number=56,
description="Sensitive CORS policy with wildcard (*) allows data theft via CSRF",
keywords=["CORS", "wildcard"]
)
],
hash="sec_009",
difficulty="medium"
)
sec_010 = Scenario(
task_id=TaskId.SECURITY_AUDIT,
pr_title="Add admin password check",
pr_description="Faster password check by using native equality.",
files_changed=[
FileChanged(
filename="admin/auth.py",
language="python",
patch="""--- a/admin/auth.py
+++ b/admin/auth.py
@@ -10,3 +10,3 @@
def verify_admin(provided_password):
- import secrets
- return secrets.compare_digest(ADMIN_PASS, provided_password)
+ return ADMIN_PASS == provided_password""",
additions=1,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="sec_010",
category=Category.SECURITY,
severity=Severity.HIGH,
filename="admin/auth.py",
line_number=11,
description="Timing attack vulnerability in password comparison; use secrets.compare_digest",
keywords=["timing attack", "constant time"]
)
],
hash="sec_010",
difficulty="medium"
)
# --- ARCHITECTURAL REVIEW SCENARIOS ---
arch_001 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add UserManager service",
pr_description="A 200-line class that handles auth, email sending, billing, and profile.",
files_changed=[
FileChanged(
filename="services/user_manager.py",
language="python",
patch="""--- a/services/user_manager.py
+++ b/services/user_manager.py
@@ -1,5 +1,10 @@
-class UserAuth: pass
-class UserBilling: pass
-class UserEmail: pass
+class UserManager:
+ def authenticate(self, user): pass
+ def process_payment(self, amount): pass
+ def send_welcome_email(self, email): pass
+ def update_profile_picture(self, img): pass
+ def sync_to_marketing_tool(self): pass""",
additions=6,
deletions=3,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_001",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="services/user_manager.py",
line_number=2,
description="God class violation: UserManager handles multiple unrelated domains (auth, billing, email)",
keywords=["single responsibility", "god class"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_001",
difficulty="medium"
)
arch_002 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add order details endpoint",
pr_description="Fetching order items inside a loop (N+1 query).",
files_changed=[
FileChanged(
filename="api/orders.py",
language="python",
patch="""--- a/api/orders.py
+++ b/api/orders.py
@@ -25,3 +25,4 @@
def get_order_history(user_id):
- return db.query(Order).options(joinedload(Order.items)).all()
+ orders = db.query(Order).filter_by(user_id=user_id).all()
+ for o in orders:
+ o.items = db.query(Item).filter_by(order_id=o.id).all()
+ return orders""",
additions=3,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_002",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="api/orders.py",
line_number=27,
description="N+1 query pattern: fetching items in a loop will cause DB performance collapse",
keywords=["N+1", "query"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_002",
difficulty="hard"
)
arch_003 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add notification system",
pr_description="Tight coupling via hardwired SendGrid import.",
files_changed=[
FileChanged(
filename="services/notifier.py",
language="python",
patch="""--- a/services/notifier.py
+++ b/services/notifier.py
@@ -1,3 +1,3 @@
-from services.interfaces import MailProvider
+from integrations.sendgrid import send_email
-def notify(user, provider: MailProvider):
- provider.send(user.email)
+def notify(user):
+ send_email(user.email)""",
additions=3,
deletions=3,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_003",
category=Category.ARCHITECTURE,
severity=Severity.MEDIUM,
filename="services/notifier.py",
line_number=2,
description="Tight coupling: service depends on concrete implementation instead of abstraction",
keywords=["tight coupling", "dependency injection"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_003",
difficulty="medium"
)
arch_004 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add external price fetch to checkout",
pr_description="Synchronous blocking call inside async checkout handler.",
files_changed=[
FileChanged(
filename="checkout/handler.py",
language="python",
patch="""--- a/checkout/handler.py
+++ b/checkout/handler.py
@@ -10,3 +10,4 @@
async def checkout(cart):
- async with aiohttp.ClientSession() as s:
- price = await s.get(PRICE_API)
+ import requests
+ price = requests.get(PRICE_API)
+ return process_order(price)""",
additions=2,
deletions=2,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_004",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="checkout/handler.py",
line_number=12,
description="Blocking HTTP call inside async function will stall the entire event loop",
keywords=["blocking", "async"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_004",
difficulty="medium"
)
arch_005 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Integrate weather API",
pr_description="Missing retry/resilience on external call.",
files_changed=[
FileChanged(
filename="services/weather.py",
language="python",
patch="""--- a/services/weather.py
+++ b/services/weather.py
@@ -5,3 +5,3 @@
def get_temp(city):
- return circuit_breaker.call(WEATHER_URL, timeout=2)
+ return requests.get(WEATHER_URL).json()""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_005",
category=Category.ARCHITECTURE,
severity=Severity.MEDIUM,
filename="services/weather.py",
line_number=6,
description="Missing resilience (retry, timeout, circuit breaker) on external API dependency",
keywords=["retry", "resilience"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_005",
difficulty="medium"
)
arch_006 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Refactor model relationships",
pr_description="Circular import between User and Order models.",
files_changed=[
FileChanged(
filename="models/order.py",
language="python",
patch="""--- a/models/order.py
+++ b/models/order.py
@@ -1,1 +1,2 @@
+from models.user import User
class Order(BaseModel):
- user_id: int
+ user: User""",
additions=2,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_006",
category=Category.ARCHITECTURE,
severity=Severity.MEDIUM,
filename="models/order.py",
line_number=1,
description="Circular dependency risk: order depends on user while user likely imports order",
keywords=["circular import", "circular dependency"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_006",
difficulty="hard"
)
arch_007 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add all-products endpoint",
pr_description="Missing pagination on unbounded list endpoint.",
files_changed=[
FileChanged(
filename="api/products.py",
language="python",
patch="""--- a/api/products.py
+++ b/api/products.py
@@ -10,3 +10,3 @@
def list_products():
- return db.query(Product).limit(50).all()
+ return db.query(Product).all()""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_007",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="api/products.py",
line_number=11,
description="Missing pagination on list endpoint will lead to memory exhaustion",
keywords=["pagination", "limit"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_007",
difficulty="medium"
)
arch_008 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Document the payment integration",
pr_description="Sensitive API key included in documentation comment.",
files_changed=[
FileChanged(
filename="docs/payment_notes.py",
language="python",
patch="""--- a/docs/payment_notes.py
+++ b/docs/payment_notes.py
@@ -1,2 +1,3 @@
# Payment integration notes
+# Use API key: pk_test_abc123 for testing
def init(): pass""",
additions=1,
deletions=0,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_008",
category=Category.ARCHITECTURE,
severity=Severity.MEDIUM,
filename="docs/payment_notes.py",
line_number=2,
description="Secret leaked in code comment; should be in environment variables only",
keywords=["secret", "comment"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_008",
difficulty="medium"
)
arch_009 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Add detailed auth logging",
pr_description="Logging sensitive user password in cleartext.",
files_changed=[
FileChanged(
filename="auth/logger.py",
language="python",
patch="""--- a/auth/logger.py
+++ b/auth/logger.py
@@ -5,3 +5,3 @@
def log_login(email, password):
- logger.info(f"Attempt for {email}")
+ logger.info(f"Login attempt: user={email} password={password}")""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_009",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="auth/logger.py",
line_number=6,
description="PII/Security Leak: logging plain-text passwords violates security policy",
keywords=["sensitive", "log"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_009",
difficulty="medium"
)
arch_010 = Scenario(
task_id=TaskId.ARCHITECTURAL_REVIEW,
pr_title="Set up database connection",
pr_description="Hardcoded DB connection string with credentials.",
files_changed=[
FileChanged(
filename="db/setup.py",
language="python",
patch="""--- a/db/setup.py
+++ b/db/setup.py
@@ -5,3 +5,3 @@
def connect():
- url = os.environ.get("DATABASE_URL")
+ url = "postgresql://admin:password123@localhost:5432/mydb"
return create_engine(url)""",
additions=1,
deletions=1,
)
],
ground_truth_issues=[
GroundTruthIssue(
id="arch_010",
category=Category.ARCHITECTURE,
severity=Severity.HIGH,
filename="db/setup.py",
line_number=6,
description="Hardcoded environment configuration and credentials",
keywords=["hardcoded", "configuration"],
required_verdict=Verdict.REQUEST_CHANGES
)
],
hash="arch_010",
difficulty="medium"
)
ALL_SCENARIOS = [
bug_001, bug_003, bug_002, bug_004, bug_005, bug_006, bug_007, bug_008, bug_009, bug_010,
sec_001, sec_002, sec_003, sec_004, sec_005, sec_006, sec_007, sec_008, sec_009, sec_010,
arch_001, arch_002, arch_003, arch_004, arch_005, arch_006, arch_007, arch_008, arch_009, arch_010
]