yxc20098 commited on
Commit
9422de7
·
1 Parent(s): 3a2bab2

Security hardening: XSS prevention, input validation, rate limiting

Browse files

- HTML-escape agent_name/agent_url in rendered links (blocks javascript: URIs)
- Validate numeric field types, string lengths, URL schemes
- Sanitize CSV string values to prevent formula injection
- Add global rate limit (20 submissions/hour)
- Replay file size limit (10 MB), sanitized filenames
- Safe regex fallback in search to prevent ReDoS
- 24 new security tests (68 total)

Files changed (2) hide show
  1. app.py +108 -22
  2. tests/test_app.py +168 -0
app.py CHANGED
@@ -11,8 +11,12 @@ Deploy on HuggingFace Spaces:
11
  """
12
 
13
  import csv
 
14
  import json
15
  import os
 
 
 
16
  from datetime import datetime, timezone
17
  from pathlib import Path
18
 
@@ -49,6 +53,29 @@ DISPLAY_COLUMNS = [
49
  ]
50
 
51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  def load_data() -> pd.DataFrame:
53
  """Load leaderboard data from CSV."""
54
  if not DATA_PATH.exists():
@@ -58,28 +85,18 @@ def load_data() -> pd.DataFrame:
58
  df = df.sort_values("score", ascending=False).reset_index(drop=True)
59
  df.insert(0, "Rank", range(1, len(df) + 1))
60
 
61
- # Build agent name with optional hyperlink
62
  if "agent_url" in df.columns:
63
  df["Agent"] = df.apply(
64
- lambda r: (
65
- f'<a href="{r["agent_url"]}" target="_blank">{r["agent_name"]}</a>'
66
- if pd.notna(r.get("agent_url")) and str(r["agent_url"]).strip()
67
- else r["agent_name"]
68
- ),
69
  axis=1,
70
  )
71
  else:
72
- df["Agent"] = df["agent_name"]
73
 
74
- # Build replay download link
75
  if "replay_url" in df.columns:
76
- df["Replay"] = df["replay_url"].apply(
77
- lambda u: (
78
- f'<a href="/replays/{u}" download title="Download replay">&#11015;</a>'
79
- if pd.notna(u) and str(u).strip()
80
- else ""
81
- )
82
- )
83
  else:
84
  df["Replay"] = ""
85
 
@@ -136,12 +153,17 @@ def filter_leaderboard(
136
  if opponent and opponent != "All":
137
  df = df[df["Opponent"] == opponent]
138
 
139
- # Search by agent name (regex)
140
  if search and search.strip():
141
  patterns = [p.strip() for p in search.split(",") if p.strip()]
142
  mask = pd.Series([False] * len(df), index=df.index)
143
  for pattern in patterns:
144
- mask |= df["Agent"].str.contains(pattern, case=False, regex=True, na=False)
 
 
 
 
 
145
  df = df[mask]
146
 
147
  # Re-rank after filtering
@@ -173,6 +195,32 @@ if os.environ.get("HF_TOKEN") and os.environ.get("SPACE_ID"):
173
  pass # Running locally without HF token — skip
174
 
175
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
  def save_submission(results: dict) -> None:
177
  """Append results to local JSONL and CSV."""
178
  # JSONL for CommitScheduler → HF dataset
@@ -188,15 +236,18 @@ def save_submission(results: dict) -> None:
188
  "score", "avg_kills", "avg_deaths", "kd_ratio", "avg_economy",
189
  "avg_game_length", "timestamp", "replay_url", "agent_url",
190
  ]
 
191
  with open(csv_path, "a", newline="") as f:
192
  writer = csv.DictWriter(f, fieldnames=fieldnames)
193
  if not file_exists:
194
  writer.writeheader()
195
- writer.writerow(results)
196
 
197
 
198
  # ── Submission Handling ───────────────────────────────────────────────────────
199
 
 
 
200
  VALID_OPPONENTS = {"Beginner", "Easy", "Medium", "Normal", "Hard"}
201
  VALID_AGENT_TYPES = {"Scripted", "LLM", "RL"}
202
  REQUIRED_FIELDS = [
@@ -226,6 +277,22 @@ def validate_submission(data: dict) -> tuple[bool, str]:
226
  f"Must be one of: {', '.join(sorted(VALID_OPPONENTS))}"
227
  )
228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
  return True, ""
230
 
231
 
@@ -269,6 +336,10 @@ def handle_upload(json_file, replay_file) -> tuple[str, pd.DataFrame]:
269
  if json_file is None:
270
  return "Please upload a JSON file.", add_type_badges(load_data())
271
 
 
 
 
 
272
  try:
273
  with open(json_file.name) as f:
274
  data = json.load(f)
@@ -284,8 +355,14 @@ def handle_upload(json_file, replay_file) -> tuple[str, pd.DataFrame]:
284
  # Save replay if provided
285
  if replay_file is not None:
286
  import shutil
287
- replay_name = Path(replay_file.name).name
288
- shutil.copy2(replay_file.name, SUBMISSIONS_DIR / replay_name)
 
 
 
 
 
 
289
  results_row["replay_url"] = replay_name
290
 
291
  save_submission(results_row)
@@ -299,6 +376,10 @@ def handle_upload(json_file, replay_file) -> tuple[str, pd.DataFrame]:
299
 
300
  def handle_api_submit(json_data: str) -> str:
301
  """API endpoint: accept JSON string submission. Used by CLI auto-upload."""
 
 
 
 
302
  try:
303
  data = json.loads(json_data)
304
  except (json.JSONDecodeError, Exception) as e:
@@ -319,6 +400,10 @@ def handle_api_submit(json_data: str) -> str:
319
 
320
  def handle_api_submit_with_replay(json_data: str, replay_file) -> str:
321
  """API endpoint: accept JSON + replay file. Used by CLI with --replay."""
 
 
 
 
322
  try:
323
  data = json.loads(json_data)
324
  except (json.JSONDecodeError, Exception) as e:
@@ -333,11 +418,12 @@ def handle_api_submit_with_replay(json_data: str, replay_file) -> str:
333
  # Save replay if provided
334
  if replay_file is not None:
335
  import shutil
336
- from datetime import datetime, timezone
337
 
338
  orig = Path(replay_file) if isinstance(replay_file, str) else Path(replay_file.name)
 
 
339
  ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
340
- slug = data["agent_name"].replace("/", "_").replace(" ", "_")[:30]
341
  replay_name = f"replay-{slug}-{ts}.orarep"
342
  shutil.copy2(str(orig), SUBMISSIONS_DIR / replay_name)
343
  results_row["replay_url"] = replay_name
 
11
  """
12
 
13
  import csv
14
+ import html
15
  import json
16
  import os
17
+ import re
18
+ import time
19
+ from collections import defaultdict
20
  from datetime import datetime, timezone
21
  from pathlib import Path
22
 
 
53
  ]
54
 
55
 
56
+ def _safe_agent_link(name: str, url) -> str:
57
+ """Render agent name, optionally as a hyperlink. HTML-escaped to prevent XSS."""
58
+ safe_name = html.escape(str(name))
59
+ if pd.notna(url) and str(url).strip():
60
+ url_str = str(url).strip()
61
+ # Only allow http/https URLs — block javascript:, data:, etc.
62
+ if url_str.startswith(("http://", "https://")):
63
+ safe_url = html.escape(url_str, quote=True)
64
+ return f'<a href="{safe_url}" target="_blank" rel="noopener">{safe_name}</a>'
65
+ return safe_name
66
+
67
+
68
+ def _safe_replay_link(url) -> str:
69
+ """Render replay download link. Filename is sanitized to prevent XSS."""
70
+ if pd.notna(url) and str(url).strip():
71
+ # Sanitize: only allow alphanumeric, dash, underscore, dot
72
+ safe_name = re.sub(r"[^a-zA-Z0-9._-]", "", str(url).strip())
73
+ if safe_name:
74
+ escaped = html.escape(safe_name, quote=True)
75
+ return f'<a href="/replays/{escaped}" download title="Download replay">&#11015;</a>'
76
+ return ""
77
+
78
+
79
  def load_data() -> pd.DataFrame:
80
  """Load leaderboard data from CSV."""
81
  if not DATA_PATH.exists():
 
85
  df = df.sort_values("score", ascending=False).reset_index(drop=True)
86
  df.insert(0, "Rank", range(1, len(df) + 1))
87
 
88
+ # Build agent name with optional hyperlink (XSS-safe)
89
  if "agent_url" in df.columns:
90
  df["Agent"] = df.apply(
91
+ lambda r: _safe_agent_link(r.get("agent_name", ""), r.get("agent_url", "")),
 
 
 
 
92
  axis=1,
93
  )
94
  else:
95
+ df["Agent"] = df["agent_name"].apply(lambda n: html.escape(str(n)))
96
 
97
+ # Build replay download link (XSS-safe)
98
  if "replay_url" in df.columns:
99
+ df["Replay"] = df["replay_url"].apply(_safe_replay_link)
 
 
 
 
 
 
100
  else:
101
  df["Replay"] = ""
102
 
 
153
  if opponent and opponent != "All":
154
  df = df[df["Opponent"] == opponent]
155
 
156
+ # Search by agent name (regex with fallback to literal on invalid patterns)
157
  if search and search.strip():
158
  patterns = [p.strip() for p in search.split(",") if p.strip()]
159
  mask = pd.Series([False] * len(df), index=df.index)
160
  for pattern in patterns:
161
+ try:
162
+ mask |= df["Agent"].str.contains(pattern, case=False, regex=True, na=False)
163
+ except re.error:
164
+ mask |= df["Agent"].str.contains(
165
+ re.escape(pattern), case=False, regex=True, na=False
166
+ )
167
  df = df[mask]
168
 
169
  # Re-rank after filtering
 
195
  pass # Running locally without HF token — skip
196
 
197
 
198
+ def _sanitize_csv_value(val):
199
+ """Strip leading characters that trigger formula execution in spreadsheets."""
200
+ if isinstance(val, str):
201
+ while val and val[0] in ("=", "+", "-", "@", "\t", "\r", "\n"):
202
+ val = val[1:]
203
+ val = val.replace("\n", " ").replace("\r", " ")
204
+ return val
205
+
206
+
207
+ # ── Rate Limiting ────────────────────────────────────────────────────────────
208
+
209
+ _submit_times: dict[str, list[float]] = defaultdict(list)
210
+ MAX_SUBMITS_PER_HOUR = 20
211
+
212
+
213
+ def _check_rate_limit(identifier: str = "global") -> tuple[bool, str]:
214
+ """Simple in-memory rate limiter. Returns (allowed, error_message)."""
215
+ now = time.time()
216
+ times = _submit_times[identifier]
217
+ _submit_times[identifier] = [t for t in times if now - t < 3600]
218
+ if len(_submit_times[identifier]) >= MAX_SUBMITS_PER_HOUR:
219
+ return False, "Rate limit exceeded (max 20 submissions per hour). Try again later."
220
+ _submit_times[identifier].append(now)
221
+ return True, ""
222
+
223
+
224
  def save_submission(results: dict) -> None:
225
  """Append results to local JSONL and CSV."""
226
  # JSONL for CommitScheduler → HF dataset
 
236
  "score", "avg_kills", "avg_deaths", "kd_ratio", "avg_economy",
237
  "avg_game_length", "timestamp", "replay_url", "agent_url",
238
  ]
239
+ safe_results = {k: _sanitize_csv_value(v) for k, v in results.items()}
240
  with open(csv_path, "a", newline="") as f:
241
  writer = csv.DictWriter(f, fieldnames=fieldnames)
242
  if not file_exists:
243
  writer.writeheader()
244
+ writer.writerow(safe_results)
245
 
246
 
247
  # ── Submission Handling ───────────────────────────────────────────────────────
248
 
249
+ MAX_REPLAY_SIZE = 10 * 1024 * 1024 # 10 MB
250
+
251
  VALID_OPPONENTS = {"Beginner", "Easy", "Medium", "Normal", "Hard"}
252
  VALID_AGENT_TYPES = {"Scripted", "LLM", "RL"}
253
  REQUIRED_FIELDS = [
 
277
  f"Must be one of: {', '.join(sorted(VALID_OPPONENTS))}"
278
  )
279
 
280
+ # Type checks for numeric fields
281
+ for field in ("ticks", "kills_cost", "deaths_cost", "assets_value"):
282
+ if not isinstance(data[field], (int, float)):
283
+ return False, f"Field '{field}' must be a number"
284
+
285
+ # String length limits
286
+ if len(str(data["agent_name"])) > 100:
287
+ return False, "agent_name must be 100 characters or fewer"
288
+
289
+ # agent_url: optional, but must be http(s) if provided
290
+ agent_url = str(data.get("agent_url", "")).strip()
291
+ if agent_url and not agent_url.startswith(("http://", "https://")):
292
+ return False, "agent_url must be an HTTP(S) URL"
293
+ if len(agent_url) > 500:
294
+ return False, "agent_url must be 500 characters or fewer"
295
+
296
  return True, ""
297
 
298
 
 
336
  if json_file is None:
337
  return "Please upload a JSON file.", add_type_badges(load_data())
338
 
339
+ allowed, err = _check_rate_limit()
340
+ if not allowed:
341
+ return err, add_type_badges(load_data())
342
+
343
  try:
344
  with open(json_file.name) as f:
345
  data = json.load(f)
 
355
  # Save replay if provided
356
  if replay_file is not None:
357
  import shutil
358
+
359
+ orig = Path(replay_file.name)
360
+ if orig.stat().st_size > MAX_REPLAY_SIZE:
361
+ return "Replay file too large (max 10 MB).", add_type_badges(load_data())
362
+ ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
363
+ slug = re.sub(r"[^a-zA-Z0-9_-]", "", data["agent_name"].replace("/", "_").replace(" ", "_"))[:30]
364
+ replay_name = f"replay-{slug}-{ts}.orarep"
365
+ shutil.copy2(str(orig), SUBMISSIONS_DIR / replay_name)
366
  results_row["replay_url"] = replay_name
367
 
368
  save_submission(results_row)
 
376
 
377
  def handle_api_submit(json_data: str) -> str:
378
  """API endpoint: accept JSON string submission. Used by CLI auto-upload."""
379
+ allowed, err = _check_rate_limit()
380
+ if not allowed:
381
+ return err
382
+
383
  try:
384
  data = json.loads(json_data)
385
  except (json.JSONDecodeError, Exception) as e:
 
400
 
401
  def handle_api_submit_with_replay(json_data: str, replay_file) -> str:
402
  """API endpoint: accept JSON + replay file. Used by CLI with --replay."""
403
+ allowed, err = _check_rate_limit()
404
+ if not allowed:
405
+ return err
406
+
407
  try:
408
  data = json.loads(json_data)
409
  except (json.JSONDecodeError, Exception) as e:
 
418
  # Save replay if provided
419
  if replay_file is not None:
420
  import shutil
 
421
 
422
  orig = Path(replay_file) if isinstance(replay_file, str) else Path(replay_file.name)
423
+ if orig.exists() and orig.stat().st_size > MAX_REPLAY_SIZE:
424
+ return "Replay file too large (max 10 MB)"
425
  ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
426
+ slug = re.sub(r"[^a-zA-Z0-9_-]", "", data["agent_name"].replace("/", "_").replace(" ", "_"))[:30]
427
  replay_name = f"replay-{slug}-{ts}.orarep"
428
  shutil.copy2(str(orig), SUBMISSIONS_DIR / replay_name)
429
  results_row["replay_url"] = replay_name
tests/test_app.py CHANGED
@@ -12,7 +12,13 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
12
  from app import (
13
  AGENT_TYPE_COLORS,
14
  DISPLAY_COLUMNS,
 
15
  VALID_OPPONENTS,
 
 
 
 
 
16
  add_type_badges,
17
  build_app,
18
  filter_leaderboard,
@@ -308,3 +314,165 @@ class TestReplayColumn:
308
  # The default test data has no replay
309
  replay_val = df["Replay"].iloc[0]
310
  assert replay_val == "" or not str(replay_val).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  from app import (
13
  AGENT_TYPE_COLORS,
14
  DISPLAY_COLUMNS,
15
+ MAX_SUBMITS_PER_HOUR,
16
  VALID_OPPONENTS,
17
+ _check_rate_limit,
18
+ _safe_agent_link,
19
+ _safe_replay_link,
20
+ _sanitize_csv_value,
21
+ _submit_times,
22
  add_type_badges,
23
  build_app,
24
  filter_leaderboard,
 
314
  # The default test data has no replay
315
  replay_val = df["Replay"].iloc[0]
316
  assert replay_val == "" or not str(replay_val).strip()
317
+
318
+
319
+ class TestXssPrevention:
320
+ """Test that user input is HTML-escaped to prevent XSS."""
321
+
322
+ def test_javascript_url_blocked(self):
323
+ """javascript: URLs should NOT produce a clickable link."""
324
+ result = _safe_agent_link("Bot", "javascript:alert(1)")
325
+ assert "javascript:" not in result
326
+ assert "Bot" in result
327
+
328
+ def test_data_url_blocked(self):
329
+ result = _safe_agent_link("Bot", "data:text/html,<script>alert(1)</script>")
330
+ assert "data:" not in result
331
+
332
+ def test_html_in_name_escaped(self):
333
+ result = _safe_agent_link('<script>alert("xss")</script>', "")
334
+ assert "<script>" not in result
335
+ assert "&lt;script&gt;" in result
336
+
337
+ def test_quote_injection_in_url_escaped(self):
338
+ result = _safe_agent_link("Bot", 'https://ok.com" onclick="alert(1)')
339
+ assert 'onclick' not in result or '&quot;' in result
340
+
341
+ def test_valid_https_url_works(self):
342
+ result = _safe_agent_link("Bot", "https://github.com/user/repo")
343
+ assert '<a href="https://github.com/user/repo"' in result
344
+ assert 'rel="noopener"' in result
345
+
346
+ def test_replay_link_sanitized(self):
347
+ result = _safe_replay_link('"><script>alert(1)</script>.orarep')
348
+ assert "<script>" not in result
349
+
350
+ def test_replay_path_traversal_stripped(self):
351
+ """Path traversal characters (/) are stripped from replay filenames."""
352
+ result = _safe_replay_link("replay/../../../etc/passwd")
353
+ # The href after /replays/ should have no slashes (traversal stripped)
354
+ href_part = result.split('href="')[1].split('"')[0]
355
+ filename = href_part.replace("/replays/", "")
356
+ assert "/" not in filename
357
+
358
+
359
+ class TestInputValidation:
360
+ """Test stricter input validation."""
361
+
362
+ def _valid_data(self):
363
+ return {
364
+ "agent_name": "TestBot",
365
+ "agent_type": "LLM",
366
+ "opponent": "Beginner",
367
+ "result": "loss",
368
+ "ticks": 27000,
369
+ "kills_cost": 1000,
370
+ "deaths_cost": 2900,
371
+ "assets_value": 9050,
372
+ }
373
+
374
+ def test_string_ticks_rejected(self):
375
+ data = self._valid_data()
376
+ data["ticks"] = "not a number"
377
+ valid, err = validate_submission(data)
378
+ assert not valid
379
+ assert "must be a number" in err
380
+
381
+ def test_dict_kills_rejected(self):
382
+ data = self._valid_data()
383
+ data["kills_cost"] = {"nested": True}
384
+ valid, err = validate_submission(data)
385
+ assert not valid
386
+
387
+ def test_long_agent_name_rejected(self):
388
+ data = self._valid_data()
389
+ data["agent_name"] = "A" * 101
390
+ valid, err = validate_submission(data)
391
+ assert not valid
392
+ assert "100 characters" in err
393
+
394
+ def test_javascript_agent_url_rejected(self):
395
+ data = self._valid_data()
396
+ data["agent_url"] = "javascript:alert(1)"
397
+ valid, err = validate_submission(data)
398
+ assert not valid
399
+ assert "HTTP(S)" in err
400
+
401
+ def test_valid_agent_url_accepted(self):
402
+ data = self._valid_data()
403
+ data["agent_url"] = "https://github.com/user/repo"
404
+ valid, _ = validate_submission(data)
405
+ assert valid
406
+
407
+ def test_empty_agent_url_accepted(self):
408
+ data = self._valid_data()
409
+ data["agent_url"] = ""
410
+ valid, _ = validate_submission(data)
411
+ assert valid
412
+
413
+ def test_long_agent_url_rejected(self):
414
+ data = self._valid_data()
415
+ data["agent_url"] = "https://example.com/" + "a" * 500
416
+ valid, err = validate_submission(data)
417
+ assert not valid
418
+ assert "500 characters" in err
419
+
420
+
421
+ class TestCsvSanitization:
422
+ """Test CSV injection prevention."""
423
+
424
+ def test_formula_trigger_stripped(self):
425
+ assert _sanitize_csv_value("=cmd|'/c calc'!A0") == "cmd|'/c calc'!A0"
426
+
427
+ def test_plus_trigger_stripped(self):
428
+ assert _sanitize_csv_value("+cmd") == "cmd"
429
+
430
+ def test_at_trigger_stripped(self):
431
+ assert _sanitize_csv_value("@SUM(A1)") == "SUM(A1)"
432
+
433
+ def test_newlines_replaced(self):
434
+ assert _sanitize_csv_value("line1\nline2\rline3") == "line1 line2 line3"
435
+
436
+ def test_normal_string_unchanged(self):
437
+ assert _sanitize_csv_value("DeathBot-9000") == "DeathBot-9000"
438
+
439
+ def test_numbers_unchanged(self):
440
+ assert _sanitize_csv_value(42) == 42
441
+ assert _sanitize_csv_value(3.14) == 3.14
442
+
443
+
444
+ class TestRateLimiting:
445
+ """Test rate limiting on submissions."""
446
+
447
+ def test_rate_limit_allows_normal_usage(self):
448
+ _submit_times.clear()
449
+ allowed, _ = _check_rate_limit("test_normal")
450
+ assert allowed
451
+
452
+ def test_rate_limit_blocks_after_max(self):
453
+ _submit_times.clear()
454
+ key = "test_flood"
455
+ for _ in range(MAX_SUBMITS_PER_HOUR):
456
+ allowed, _ = _check_rate_limit(key)
457
+ assert allowed
458
+ allowed, err = _check_rate_limit(key)
459
+ assert not allowed
460
+ assert "Rate limit" in err
461
+
462
+ def test_rate_limit_resets_after_expiry(self):
463
+ import time as _time
464
+ _submit_times.clear()
465
+ key = "test_expiry"
466
+ # Fill with old timestamps
467
+ _submit_times[key] = [_time.time() - 3601] * MAX_SUBMITS_PER_HOUR
468
+ allowed, _ = _check_rate_limit(key)
469
+ assert allowed
470
+
471
+
472
+ class TestSearchSafety:
473
+ """Test that malformed regex doesn't crash the search."""
474
+
475
+ def test_invalid_regex_falls_back(self):
476
+ """An invalid regex pattern should not raise an exception."""
477
+ df = filter_leaderboard("[invalid(regex", [], "All")
478
+ assert isinstance(df, pd.DataFrame)