Meta-SRE / app /engine /sandbox.py
Anvit25's picture
Deploy Meta-SRE OpenEnv benchmark FastAPI server
ad6248e
"""
Layer 1 – Virtual File System.
Stores all service codebases as in-memory strings keyed by
(service, filename). Every task starts from a clean snapshot of
its own buggy codebase; edits accumulate on top of that snapshot.
"""
from __future__ import annotations
import copy
from typing import Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Buggy source code snapshots – one per task
# Each task mutates only the files it needs; unchanged files are shared via
# SHARED_FILES and merged in at reset() time.
# ---------------------------------------------------------------------------
SHARED_FILES: Dict[str, Dict[str, str]] = {
"ad_ranking": {
"utils.py": """\
from typing import Dict, List
def normalize_scores(ads: List[Dict]) -> List[Dict]:
if not ads:
return ads
max_score = max(ad['score'] for ad in ads)
min_score = min(ad['score'] for ad in ads)
score_range = max_score - min_score or 1.0
return [
{**ad, 'normalized_score': (ad['score'] - min_score) / score_range}
for ad in ads
]
def filter_by_budget(ads: List[Dict], daily_budget_cents: int) -> List[Dict]:
return [ad for ad in ads if ad.get('spend_today_cents', 0) < daily_budget_cents]
def compute_roas(revenue: float, spend: float) -> float:
return revenue / spend if spend > 0 else 0.0
""",
"models.py": """\
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Ad:
ad_id: str
campaign_id: str
category: str
target_age: str
clicks: int = 0
impressions: int = 0
spend_today_cents: int = 0
active: bool = True
score: float = 0.0
@dataclass
class UserContext:
user_id: str
interest: str
age_group: str
country: str
""",
},
"capi_pipeline": {
"validator.py": """\
from typing import Dict, Any
REQUIRED_FIELDS = {'event_name', 'event_time', 'event_id'}
def validate_event(event: Dict[str, Any]) -> Tuple[bool, str]:
missing = REQUIRED_FIELDS - set(event.keys())
if missing:
return False, f'Missing fields: {missing}'
if not isinstance(event.get('event_time'), (int, float)):
return False, 'event_time must be numeric'
return True, 'ok'
""",
},
"whatsapp_sync": {
"models.py": """\
from dataclasses import dataclass
from typing import Optional
@dataclass
class Message:
id: int
user_id: int
sender_id: int
content: str
timestamp: int
synced: bool = False
thread_id: Optional[int] = None
""",
},
}
# ---------------------------------------------------------------------------
# Task-specific buggy snapshots
# ---------------------------------------------------------------------------
TASK_SNAPSHOTS: Dict[int, Dict[str, Dict[str, str]]] = {
# ------------------------------------------------------------------
# Task 1 – Easy: Hallucinated attribute (ad.get_clicks())
# ------------------------------------------------------------------
1: {
"ad_ranking": {
"ranker.py": """\
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AdRanker:
\"\"\"Scores and ranks candidate ads for a user.\"\"\"
def __init__(self, api_client):
self.api = api_client
self.model_version = "v2.3.1"
self._cache = {}
def fetch_candidate_ads(self, user_id: str) -> List[Dict]:
ads = self.api.get_all_ads(user_id)
return [ad for ad in ads if ad.get('active', False)]
def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]:
scored = []
for ad in ads:
click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)
relevance = self._compute_relevance(ad, user_context)
score = (click_rate * 0.4) + (relevance * 0.6)
scored.append({**ad, 'score': round(score, 4)})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _compute_relevance(self, ad: Dict, context: Dict) -> float:
category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3
age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5
return round((category_match + age_match) / 2.0, 4)
def rank(self, user_id: str, user_context: Dict) -> List[Dict]:
candidates = self.fetch_candidate_ads(user_id)
if not candidates:
logger.warning(f"No candidates for user {user_id}")
return []
return self.score_ads(candidates, user_context)
""",
},
},
# ------------------------------------------------------------------
# Task 2 – Medium: Silent timestamp corruption in CAPI → bad ROAS
# ------------------------------------------------------------------
2: {
"capi_pipeline": {
"transformer.py": """\
import logging
from typing import Dict, Any, List
from datetime import datetime
logger = logging.getLogger(__name__)
class EventTransformer:
\"\"\"Transforms raw CAPI events into normalised format.\"\"\"
SUPPORTED_EVENTS = {
'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration'
}
def __init__(self):
self._processed_count = 0
def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]:
if raw_event.get('event_name') not in self.SUPPORTED_EVENTS:
logger.warning(f"Unknown event type: {raw_event.get('event_name')}")
return None
event_time = self._normalize_timestamp(raw_event.get('event_time', 0))
transformed = {
'event_id': raw_event.get('event_id'),
'event_name': raw_event.get('event_name'),
'event_time': event_time,
'user_data': self._hash_user_data(raw_event.get('user_data', {})),
'custom_data': raw_event.get('custom_data', {}),
'processed_at': int(datetime.utcnow().timestamp()),
}
self._processed_count += 1
return transformed
def _normalize_timestamp(self, ts: Any) -> int:
\"\"\"Normalise event timestamp to Unix seconds.\"\"\"
ts = int(ts)
# BUG: threshold is 1_000_000_000 (10 digits) instead of
# 1_000_000_000_000 (13 digits for milliseconds).
# A normal unix-second timestamp like 1_700_000_000 passes the
# condition and gets divided by 1000 → year ~1970+20 days.
if ts > 1_000_000_000:
return ts // 1000
return ts
def _hash_user_data(self, user_data: Dict) -> Dict:
import hashlib
hashed = {}
for key, val in user_data.items():
if key in ('email', 'phone', 'fn', 'ln'):
hashed[key] = hashlib.sha256(
str(val).lower().encode()
).hexdigest()
else:
hashed[key] = val
return hashed
def batch_transform(self, events: List[Dict]) -> List[Dict]:
return [t for e in events if (t := self.transform(e)) is not None]
""",
"ingestor.py": """\
import logging
from typing import Dict, Any
from .transformer import EventTransformer
logger = logging.getLogger(__name__)
class EventIngestor:
\"\"\"Ingests and validates CAPI event payloads.\"\"\"
def __init__(self, transformer: EventTransformer):
self.transformer = transformer
self._event_buffer = []
def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]:
try:
events = raw_payload.get('data', [])
if not events:
return {'status': 'error', 'message': 'No events in payload'}
transformed = self.transformer.batch_transform(events)
self._event_buffer.extend(transformed)
return {'status': 'ok', 'processed': len(transformed)}
except Exception as e:
logger.error(f"Ingest failed: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
def flush(self) -> int:
count = len(self._event_buffer)
self._event_buffer.clear()
logger.info(f"Flushed {count} events")
return count
""",
},
"ad_ranking": {
"ranker.py": """\
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AdRanker:
\"\"\"Scores and ranks candidate ads for a user.\"\"\"
def __init__(self, api_client):
self.api = api_client
self.model_version = "v2.3.1"
def fetch_candidate_ads(self, user_id: str) -> List[Dict]:
ads = self.api.get_all_ads(user_id)
return [ad for ad in ads if ad.get('active', False)]
def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]:
scored = []
for ad in ads:
click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1)
relevance = self._compute_relevance(ad, user_context)
score = (click_rate * 0.4) + (relevance * 0.6)
scored.append({**ad, 'score': round(score, 4)})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _compute_relevance(self, ad: Dict, context: Dict) -> float:
category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3
age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5
return round((category_match + age_match) / 2.0, 4)
def rank(self, user_id: str, user_context: Dict) -> List[Dict]:
candidates = self.fetch_candidate_ads(user_id)
if not candidates:
logger.warning(f"No candidates for user {user_id}")
return []
return self.score_ads(candidates, user_context)
""",
},
},
# ------------------------------------------------------------------
# Task 3 – Medium-Hard: DB connection leak in WhatsApp sync handler
# ------------------------------------------------------------------
3: {
"whatsapp_sync": {
"handler.py": """\
import asyncio
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class MessageSyncHandler:
\"\"\"Handles real-time WhatsApp message synchronisation.\"\"\"
def __init__(self, db_pool, message_queue):
self.db_pool = db_pool
self.queue = message_queue
self._sync_count = 0
async def sync_user_messages(self, user_id: str) -> List[Dict]:
\"\"\"Fetch and mark-as-synced all pending messages for a user.\"\"\"
conn = await self.db_pool.acquire()
try:
messages = await conn.fetch(
"SELECT id, content, sender_id, timestamp "
"FROM messages WHERE user_id = $1 AND synced = FALSE "
"ORDER BY timestamp",
user_id,
)
processed = []
for msg in messages:
await conn.execute(
"UPDATE messages SET synced = TRUE WHERE id = $1",
msg['id'],
)
processed.append(dict(msg))
self._sync_count += len(processed)
return processed
except Exception as e:
logger.error(f"Sync failed for user {user_id}: {e}")
raise
# BUG: missing `finally: await self.db_pool.release(conn)`
# Under load the pool exhausts → all sync requests hang indefinitely.
async def process_queue(self, batch_size: int = 50) -> int:
processed = 0
while processed < batch_size:
try:
user_id = await asyncio.wait_for(
self.queue.get(), timeout=1.0
)
await self.sync_user_messages(user_id)
processed += 1
except asyncio.TimeoutError:
break
return processed
""",
"db.py": """\
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
MIGRATIONS: List[Dict] = [
{
"version": "001",
"description": "Create messages table",
"up": (
"CREATE TABLE messages ("
" id SERIAL PRIMARY KEY,"
" user_id INTEGER NOT NULL,"
" content TEXT,"
" sender_id INTEGER,"
" timestamp BIGINT,"
" synced BOOLEAN DEFAULT FALSE"
");"
),
},
]
class MigrationRunner:
def __init__(self, db_conn):
self.conn = db_conn
self._applied: List[str] = []
async def apply(self, migration: Dict) -> bool:
await self.conn.execute(migration['up'])
self._applied.append(migration['version'])
logger.info(f"Applied migration {migration['version']}")
return True
""",
},
},
# ------------------------------------------------------------------
# Task 4 – Hard: Red-herring cascade from a bad DB migration (003)
# ------------------------------------------------------------------
4: {
"whatsapp_sync": {
"db.py": """\
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
# Migration 003 introduces a circular FK:
# message_threads.parent_message_id → messages.id
# messages.thread_id → message_threads.id
# PostgreSQL refuses the self-referential constraint during ALTER TABLE,
# causing FK violation errors that cascade to all consumers of both tables.
MIGRATIONS: List[Dict] = [
{
"version": "001",
"description": "Create messages table",
"up": (
"CREATE TABLE IF NOT EXISTS messages ("
" id SERIAL PRIMARY KEY,"
" user_id INTEGER NOT NULL,"
" content TEXT,"
" sender_id INTEGER,"
" timestamp BIGINT,"
" synced BOOLEAN DEFAULT FALSE"
");"
),
},
{
"version": "002",
"description": "Add user preferences",
"up": (
"CREATE TABLE IF NOT EXISTS user_preferences ("
" id SERIAL PRIMARY KEY,"
" user_id INTEGER NOT NULL,"
" notification_enabled BOOLEAN DEFAULT TRUE,"
" sync_frequency INTEGER DEFAULT 30"
");"
),
},
{
"version": "003",
"description": "Add message threads with back-reference",
"up": (
"CREATE TABLE IF NOT EXISTS message_threads ("
" id SERIAL PRIMARY KEY,"
" parent_message_id INTEGER REFERENCES messages(id) ON DELETE CASCADE,"
" participant_ids INTEGER[] NOT NULL,"
" created_at BIGINT"
");"
"ALTER TABLE messages"
" ADD COLUMN thread_id INTEGER REFERENCES message_threads(id);"
),
# BUG: circular FK — messages → message_threads → messages
# Fix: remove the ALTER TABLE line (messages should NOT reference threads)
},
]
class MigrationRunner:
def __init__(self, db_conn):
self.conn = db_conn
self._applied: List[str] = []
async def apply(self, migration: Dict) -> bool:
await self.conn.execute(migration['up'])
self._applied.append(migration['version'])
logger.info(f"Applied migration {migration['version']}: {migration['description']}")
return True
async def rollback_version(self, version: str) -> bool:
logger.warning(f"Rolling back migration {version}")
self._applied = [v for v in self._applied if v != version]
return True
async def run_all(self):
for migration in MIGRATIONS:
await self.apply(migration)
""",
"handler.py": """\
import asyncio
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class MessageSyncHandler:
def __init__(self, db_pool, message_queue):
self.db_pool = db_pool
self.queue = message_queue
self._sync_count = 0
async def sync_user_messages(self, user_id: str) -> List[Dict]:
conn = await self.db_pool.acquire()
try:
messages = await conn.fetch(
"SELECT id, content, sender_id, timestamp "
"FROM messages WHERE user_id = $1 AND synced = FALSE "
"ORDER BY timestamp",
user_id,
)
processed = []
for msg in messages:
await conn.execute(
"UPDATE messages SET synced = TRUE WHERE id = $1",
msg['id'],
)
processed.append(dict(msg))
self._sync_count += len(processed)
return processed
except Exception as e:
logger.error(f"Sync failed for user {user_id}: {e}")
raise
finally:
await self.db_pool.release(conn)
async def process_queue(self, batch_size: int = 50) -> int:
processed = 0
while processed < batch_size:
try:
user_id = await asyncio.wait_for(
self.queue.get(), timeout=1.0
)
await self.sync_user_messages(user_id)
processed += 1
except asyncio.TimeoutError:
break
return processed
""",
},
"capi_pipeline": {
"ingestor.py": """\
import logging
from typing import Dict, Any
from .transformer import EventTransformer
logger = logging.getLogger(__name__)
class EventIngestor:
def __init__(self, transformer: EventTransformer):
self.transformer = transformer
self._event_buffer = []
def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]:
try:
events = raw_payload.get('data', [])
if not events:
return {'status': 'error', 'message': 'No events in payload'}
transformed = self.transformer.batch_transform(events)
self._event_buffer.extend(transformed)
return {'status': 'ok', 'processed': len(transformed)}
except Exception as e:
logger.error(f"Ingest failed: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
def flush(self) -> int:
count = len(self._event_buffer)
self._event_buffer.clear()
return count
""",
"transformer.py": """\
import logging
from typing import Dict, Any, List
from datetime import datetime
logger = logging.getLogger(__name__)
class EventTransformer:
SUPPORTED_EVENTS = {
'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration'
}
def __init__(self):
self._processed_count = 0
def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]:
if raw_event.get('event_name') not in self.SUPPORTED_EVENTS:
return None
event_time = self._normalize_timestamp(raw_event.get('event_time', 0))
transformed = {
'event_id': raw_event.get('event_id'),
'event_name': raw_event.get('event_name'),
'event_time': event_time,
'user_data': raw_event.get('user_data', {}),
'custom_data': raw_event.get('custom_data', {}),
'processed_at': int(datetime.utcnow().timestamp()),
}
self._processed_count += 1
return transformed
def _normalize_timestamp(self, ts: Any) -> int:
ts = int(ts)
if ts > 1_000_000_000_000:
return ts // 1000
return ts
def batch_transform(self, events: List[Dict]) -> List[Dict]:
return [t for e in events if (t := self.transform(e)) is not None]
""",
},
"ad_ranking": {
"ranker.py": """\
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AdRanker:
def __init__(self, api_client):
self.api = api_client
self.model_version = "v2.3.1"
def fetch_candidate_ads(self, user_id: str) -> List[Dict]:
ads = self.api.get_all_ads(user_id)
return [ad for ad in ads if ad.get('active', False)]
def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]:
scored = []
for ad in ads:
click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1)
relevance = self._compute_relevance(ad, user_context)
score = (click_rate * 0.4) + (relevance * 0.6)
scored.append({**ad, 'score': round(score, 4)})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _compute_relevance(self, ad: Dict, context: Dict) -> float:
category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3
age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5
return round((category_match + age_match) / 2.0, 4)
def rank(self, user_id: str, user_context: Dict) -> List[Dict]:
candidates = self.fetch_candidate_ads(user_id)
if not candidates:
return []
return self.score_ads(candidates, user_context)
""",
},
},
# ------------------------------------------------------------------
# Task 5 – Hard: PII data-leak via DEBUG_MODE=True in production
# ------------------------------------------------------------------
5: {
"capi_pipeline": {
"ingestor.py": """\
import logging
from typing import Dict, Any
from .transformer import EventTransformer
logger = logging.getLogger(__name__)
DEBUG_MODE = True # BUG: must be False in production – leaks raw user PII
class EventIngestor:
\"\"\"Ingests and validates CAPI event payloads.\"\"\"
def __init__(self, transformer: EventTransformer):
self.transformer = transformer
self._event_buffer = []
def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]:
try:
events = raw_payload.get('data', [])
if not events:
return {'status': 'error', 'message': 'No events in payload'}
transformed = self.transformer.batch_transform(events)
self._event_buffer.extend(transformed)
if DEBUG_MODE:
# SECURITY BUG: exposes raw PII (emails, phone numbers) in the
# HTTP response – visible in CDN logs, browser network tabs, etc.
return {
'status': 'ok',
'processed': len(transformed),
'debug_data': {
'raw_payload': raw_payload,
'user_emails': [e.get('user_data', {}) for e in events],
'buffer_state': self._event_buffer,
},
}
return {'status': 'ok', 'processed': len(transformed)}
except Exception as e:
logger.error(f"Ingest failed: {e}", exc_info=True)
return {'status': 'error', 'message': str(e)}
def flush(self) -> int:
count = len(self._event_buffer)
self._event_buffer.clear()
logger.info(f"Flushed {count} events")
return count
""",
"transformer.py": """\
import logging
from typing import Dict, Any, List
from datetime import datetime
logger = logging.getLogger(__name__)
class EventTransformer:
SUPPORTED_EVENTS = {
'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration'
}
def __init__(self):
self._processed_count = 0
def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]:
if raw_event.get('event_name') not in self.SUPPORTED_EVENTS:
return None
event_time = self._normalize_timestamp(raw_event.get('event_time', 0))
transformed = {
'event_id': raw_event.get('event_id'),
'event_name': raw_event.get('event_name'),
'event_time': event_time,
'user_data': self._hash_user_data(raw_event.get('user_data', {})),
'custom_data': raw_event.get('custom_data', {}),
'processed_at': int(datetime.utcnow().timestamp()),
}
self._processed_count += 1
return transformed
def _normalize_timestamp(self, ts: Any) -> int:
ts = int(ts)
if ts > 1_000_000_000_000:
return ts // 1000
return ts
def _hash_user_data(self, user_data: Dict) -> Dict:
import hashlib
hashed = {}
for key, val in user_data.items():
if key in ('email', 'phone', 'fn', 'ln'):
hashed[key] = hashlib.sha256(
str(val).lower().encode()
).hexdigest()
else:
hashed[key] = val
return hashed
def batch_transform(self, events: List[Dict]) -> List[Dict]:
return [t for e in events if (t := self.transform(e)) is not None]
""",
},
"ad_ranking": {
"ranker.py": """\
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AdRanker:
def __init__(self, api_client):
self.api = api_client
self.model_version = "v2.3.1"
def fetch_candidate_ads(self, user_id: str) -> List[Dict]:
ads = self.api.get_all_ads(user_id)
return [ad for ad in ads if ad.get('active', False)]
def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]:
scored = []
for ad in ads:
click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1)
relevance = self._compute_relevance(ad, user_context)
score = (click_rate * 0.4) + (relevance * 0.6)
scored.append({**ad, 'score': round(score, 4)})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _compute_relevance(self, ad: Dict, context: Dict) -> float:
category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3
age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5
return round((category_match + age_match) / 2.0, 4)
def rank(self, user_id: str, user_context: Dict) -> List[Dict]:
candidates = self.fetch_candidate_ads(user_id)
if not candidates:
return []
return self.score_ads(candidates, user_context)
""",
},
"whatsapp_sync": {
"handler.py": """\
import asyncio
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class MessageSyncHandler:
def __init__(self, db_pool, message_queue):
self.db_pool = db_pool
self.queue = message_queue
self._sync_count = 0
async def sync_user_messages(self, user_id: str) -> List[Dict]:
conn = await self.db_pool.acquire()
try:
messages = await conn.fetch(
"SELECT id, content, sender_id, timestamp "
"FROM messages WHERE user_id = $1 AND synced = FALSE "
"ORDER BY timestamp",
user_id,
)
processed = []
for msg in messages:
await conn.execute(
"UPDATE messages SET synced = TRUE WHERE id = $1",
msg['id'],
)
processed.append(dict(msg))
self._sync_count += len(processed)
return processed
except Exception as e:
logger.error(f"Sync failed for user {user_id}: {e}")
raise
finally:
await self.db_pool.release(conn)
async def process_queue(self, batch_size: int = 50) -> int:
processed = 0
while processed < batch_size:
try:
user_id = await asyncio.wait_for(
self.queue.get(), timeout=1.0
)
await self.sync_user_messages(user_id)
processed += 1
except asyncio.TimeoutError:
break
return processed
""",
"db.py": """\
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
MIGRATIONS: List[Dict] = [
{
"version": "001",
"description": "Create messages table",
"up": (
"CREATE TABLE IF NOT EXISTS messages ("
" id SERIAL PRIMARY KEY,"
" user_id INTEGER NOT NULL,"
" content TEXT,"
" sender_id INTEGER,"
" timestamp BIGINT,"
" synced BOOLEAN DEFAULT FALSE"
");"
),
},
]
class MigrationRunner:
def __init__(self, db_conn):
self.conn = db_conn
self._applied: List[str] = []
async def apply(self, migration: Dict) -> bool:
await self.conn.execute(migration['up'])
self._applied.append(migration['version'])
return True
""",
},
},
}
# ---------------------------------------------------------------------------
# VirtualFileSystem
# ---------------------------------------------------------------------------
class EditRecord:
__slots__ = ("step", "service", "filename", "line_idx", "old_code", "new_code")
def __init__(self, step, service, filename, line_idx, old_code, new_code):
self.step = step
self.service = service
self.filename = filename
self.line_idx = line_idx
self.old_code = old_code
self.new_code = new_code
def to_dict(self):
return {
"step": self.step,
"service": self.service,
"filename": self.filename,
"line_number": self.line_idx + 1,
"old_code": self.old_code,
"new_code": self.new_code,
}
class VirtualFileSystem:
"""In-memory multi-service file system with history tracking."""
def __init__(self):
self._files: Dict[str, Dict[str, str]] = {}
self._history: List[EditRecord] = []
self._task_id: int = 0
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def reset(self, task_id: int) -> None:
"""Load the buggy snapshot for a specific task."""
self._task_id = task_id
self._history.clear()
snapshot = TASK_SNAPSHOTS.get(task_id, {})
# Start from shared base, then overlay task-specific files
merged: Dict[str, Dict[str, str]] = {}
for service, files in SHARED_FILES.items():
merged[service] = dict(files)
for service, files in snapshot.items():
if service not in merged:
merged[service] = {}
merged[service].update(files)
self._files = merged
# ------------------------------------------------------------------
# Read
# ------------------------------------------------------------------
def list_files(self, service: str) -> List[str]:
return sorted(self._files.get(service, {}).keys())
def list_services(self) -> List[str]:
return sorted(self._files.keys())
def read_file(self, service: str, filename: str) -> Tuple[bool, str]:
"""Return (found, content)."""
content = self._files.get(service, {}).get(filename)
if content is None:
return False, f"File not found: {service}/{filename}"
return True, content
def get_file_lines(self, service: str, filename: str) -> Optional[List[str]]:
found, content = self.read_file(service, filename)
if not found:
return None
return content.splitlines()
# ------------------------------------------------------------------
# Write
# ------------------------------------------------------------------
def edit_line(
self,
service: str,
filename: str,
line_number: int, # 1-based
new_code: str,
step: int = 0,
) -> Tuple[bool, str]:
"""Replace a single line (1-based). Returns (success, message)."""
lines = self.get_file_lines(service, filename)
if lines is None:
return False, f"File not found: {service}/{filename}"
idx = line_number - 1
if not (0 <= idx < len(lines)):
return False, f"Line {line_number} out of range (file has {len(lines)} lines)"
old_code = lines[idx]
lines[idx] = new_code
self._files[service][filename] = "\n".join(lines)
self._history.append(
EditRecord(step, service, filename, idx, old_code, new_code)
)
return True, "ok"
# ------------------------------------------------------------------
# History / blame
# ------------------------------------------------------------------
def get_edit_history(
self,
service: Optional[str] = None,
filename: Optional[str] = None,
) -> List[dict]:
records = self._history
if service:
records = [r for r in records if r.service == service]
if filename:
records = [r for r in records if r.filename == filename]
return [r.to_dict() for r in records]
def git_blame(self, service: str, filename: str, line_number: int) -> str:
"""Return the last edit record for a specific line, or 'AI-generated' if untouched."""
idx = line_number - 1
matching = [
r for r in reversed(self._history)
if r.service == service and r.filename == filename and r.line_idx == idx
]
if matching:
r = matching[0]
return (
f"Step {r.step}: agent changed line {line_number} in "
f"{service}/{filename}\n"
f" - {r.old_code!r}\n"
f" + {r.new_code!r}"
)
return (
f"Line {line_number} in {service}/{filename} was last modified by: "
f"Junior AI code-gen bot (commit a3f91b2, 2026-04-23 02:14 UTC)"
)
def build_git_diff(self) -> Optional[str]:
if not self._history:
return None
lines = [f"--- Task {self._task_id} working diff ---"]
for r in self._history:
lines.append(
f"@@ {r.service}/{r.filename} line {r.line_idx + 1} @@\n"
f"-{r.old_code}\n"
f"+{r.new_code}"
)
return "\n".join(lines)