""" Layer 1 – Virtual File System. Stores all service codebases as in-memory strings keyed by (service, filename). Every task starts from a clean snapshot of its own buggy codebase; edits accumulate on top of that snapshot. """ from __future__ import annotations import copy from typing import Dict, List, Optional, Tuple # --------------------------------------------------------------------------- # Buggy source code snapshots – one per task # Each task mutates only the files it needs; unchanged files are shared via # SHARED_FILES and merged in at reset() time. # --------------------------------------------------------------------------- SHARED_FILES: Dict[str, Dict[str, str]] = { "ad_ranking": { "utils.py": """\ from typing import Dict, List def normalize_scores(ads: List[Dict]) -> List[Dict]: if not ads: return ads max_score = max(ad['score'] for ad in ads) min_score = min(ad['score'] for ad in ads) score_range = max_score - min_score or 1.0 return [ {**ad, 'normalized_score': (ad['score'] - min_score) / score_range} for ad in ads ] def filter_by_budget(ads: List[Dict], daily_budget_cents: int) -> List[Dict]: return [ad for ad in ads if ad.get('spend_today_cents', 0) < daily_budget_cents] def compute_roas(revenue: float, spend: float) -> float: return revenue / spend if spend > 0 else 0.0 """, "models.py": """\ from dataclasses import dataclass, field from typing import List, Optional @dataclass class Ad: ad_id: str campaign_id: str category: str target_age: str clicks: int = 0 impressions: int = 0 spend_today_cents: int = 0 active: bool = True score: float = 0.0 @dataclass class UserContext: user_id: str interest: str age_group: str country: str """, }, "capi_pipeline": { "validator.py": """\ from typing import Dict, Any REQUIRED_FIELDS = {'event_name', 'event_time', 'event_id'} def validate_event(event: Dict[str, Any]) -> Tuple[bool, str]: missing = REQUIRED_FIELDS - set(event.keys()) if missing: return False, f'Missing fields: {missing}' if not isinstance(event.get('event_time'), (int, float)): return False, 'event_time must be numeric' return True, 'ok' """, }, "whatsapp_sync": { "models.py": """\ from dataclasses import dataclass from typing import Optional @dataclass class Message: id: int user_id: int sender_id: int content: str timestamp: int synced: bool = False thread_id: Optional[int] = None """, }, } # --------------------------------------------------------------------------- # Task-specific buggy snapshots # --------------------------------------------------------------------------- TASK_SNAPSHOTS: Dict[int, Dict[str, Dict[str, str]]] = { # ------------------------------------------------------------------ # Task 1 – Easy: Hallucinated attribute (ad.get_clicks()) # ------------------------------------------------------------------ 1: { "ad_ranking": { "ranker.py": """\ import logging from typing import List, Dict logger = logging.getLogger(__name__) class AdRanker: \"\"\"Scores and ranks candidate ads for a user.\"\"\" def __init__(self, api_client): self.api = api_client self.model_version = "v2.3.1" self._cache = {} def fetch_candidate_ads(self, user_id: str) -> List[Dict]: ads = self.api.get_all_ads(user_id) return [ad for ad in ads if ad.get('active', False)] def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: scored = [] for ad in ads: click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1) relevance = self._compute_relevance(ad, user_context) score = (click_rate * 0.4) + (relevance * 0.6) scored.append({**ad, 'score': round(score, 4)}) return sorted(scored, key=lambda x: x['score'], reverse=True) def _compute_relevance(self, ad: Dict, context: Dict) -> float: category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 return round((category_match + age_match) / 2.0, 4) def rank(self, user_id: str, user_context: Dict) -> List[Dict]: candidates = self.fetch_candidate_ads(user_id) if not candidates: logger.warning(f"No candidates for user {user_id}") return [] return self.score_ads(candidates, user_context) """, }, }, # ------------------------------------------------------------------ # Task 2 – Medium: Silent timestamp corruption in CAPI → bad ROAS # ------------------------------------------------------------------ 2: { "capi_pipeline": { "transformer.py": """\ import logging from typing import Dict, Any, List from datetime import datetime logger = logging.getLogger(__name__) class EventTransformer: \"\"\"Transforms raw CAPI events into normalised format.\"\"\" SUPPORTED_EVENTS = { 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' } def __init__(self): self._processed_count = 0 def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: logger.warning(f"Unknown event type: {raw_event.get('event_name')}") return None event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) transformed = { 'event_id': raw_event.get('event_id'), 'event_name': raw_event.get('event_name'), 'event_time': event_time, 'user_data': self._hash_user_data(raw_event.get('user_data', {})), 'custom_data': raw_event.get('custom_data', {}), 'processed_at': int(datetime.utcnow().timestamp()), } self._processed_count += 1 return transformed def _normalize_timestamp(self, ts: Any) -> int: \"\"\"Normalise event timestamp to Unix seconds.\"\"\" ts = int(ts) # BUG: threshold is 1_000_000_000 (10 digits) instead of # 1_000_000_000_000 (13 digits for milliseconds). # A normal unix-second timestamp like 1_700_000_000 passes the # condition and gets divided by 1000 → year ~1970+20 days. if ts > 1_000_000_000: return ts // 1000 return ts def _hash_user_data(self, user_data: Dict) -> Dict: import hashlib hashed = {} for key, val in user_data.items(): if key in ('email', 'phone', 'fn', 'ln'): hashed[key] = hashlib.sha256( str(val).lower().encode() ).hexdigest() else: hashed[key] = val return hashed def batch_transform(self, events: List[Dict]) -> List[Dict]: return [t for e in events if (t := self.transform(e)) is not None] """, "ingestor.py": """\ import logging from typing import Dict, Any from .transformer import EventTransformer logger = logging.getLogger(__name__) class EventIngestor: \"\"\"Ingests and validates CAPI event payloads.\"\"\" def __init__(self, transformer: EventTransformer): self.transformer = transformer self._event_buffer = [] def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: try: events = raw_payload.get('data', []) if not events: return {'status': 'error', 'message': 'No events in payload'} transformed = self.transformer.batch_transform(events) self._event_buffer.extend(transformed) return {'status': 'ok', 'processed': len(transformed)} except Exception as e: logger.error(f"Ingest failed: {e}", exc_info=True) return {'status': 'error', 'message': str(e)} def flush(self) -> int: count = len(self._event_buffer) self._event_buffer.clear() logger.info(f"Flushed {count} events") return count """, }, "ad_ranking": { "ranker.py": """\ import logging from typing import List, Dict logger = logging.getLogger(__name__) class AdRanker: \"\"\"Scores and ranks candidate ads for a user.\"\"\" def __init__(self, api_client): self.api = api_client self.model_version = "v2.3.1" def fetch_candidate_ads(self, user_id: str) -> List[Dict]: ads = self.api.get_all_ads(user_id) return [ad for ad in ads if ad.get('active', False)] def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: scored = [] for ad in ads: click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) relevance = self._compute_relevance(ad, user_context) score = (click_rate * 0.4) + (relevance * 0.6) scored.append({**ad, 'score': round(score, 4)}) return sorted(scored, key=lambda x: x['score'], reverse=True) def _compute_relevance(self, ad: Dict, context: Dict) -> float: category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 return round((category_match + age_match) / 2.0, 4) def rank(self, user_id: str, user_context: Dict) -> List[Dict]: candidates = self.fetch_candidate_ads(user_id) if not candidates: logger.warning(f"No candidates for user {user_id}") return [] return self.score_ads(candidates, user_context) """, }, }, # ------------------------------------------------------------------ # Task 3 – Medium-Hard: DB connection leak in WhatsApp sync handler # ------------------------------------------------------------------ 3: { "whatsapp_sync": { "handler.py": """\ import asyncio import logging from typing import List, Dict logger = logging.getLogger(__name__) class MessageSyncHandler: \"\"\"Handles real-time WhatsApp message synchronisation.\"\"\" def __init__(self, db_pool, message_queue): self.db_pool = db_pool self.queue = message_queue self._sync_count = 0 async def sync_user_messages(self, user_id: str) -> List[Dict]: \"\"\"Fetch and mark-as-synced all pending messages for a user.\"\"\" conn = await self.db_pool.acquire() try: messages = await conn.fetch( "SELECT id, content, sender_id, timestamp " "FROM messages WHERE user_id = $1 AND synced = FALSE " "ORDER BY timestamp", user_id, ) processed = [] for msg in messages: await conn.execute( "UPDATE messages SET synced = TRUE WHERE id = $1", msg['id'], ) processed.append(dict(msg)) self._sync_count += len(processed) return processed except Exception as e: logger.error(f"Sync failed for user {user_id}: {e}") raise # BUG: missing `finally: await self.db_pool.release(conn)` # Under load the pool exhausts → all sync requests hang indefinitely. async def process_queue(self, batch_size: int = 50) -> int: processed = 0 while processed < batch_size: try: user_id = await asyncio.wait_for( self.queue.get(), timeout=1.0 ) await self.sync_user_messages(user_id) processed += 1 except asyncio.TimeoutError: break return processed """, "db.py": """\ import logging from typing import Dict, List logger = logging.getLogger(__name__) MIGRATIONS: List[Dict] = [ { "version": "001", "description": "Create messages table", "up": ( "CREATE TABLE messages (" " id SERIAL PRIMARY KEY," " user_id INTEGER NOT NULL," " content TEXT," " sender_id INTEGER," " timestamp BIGINT," " synced BOOLEAN DEFAULT FALSE" ");" ), }, ] class MigrationRunner: def __init__(self, db_conn): self.conn = db_conn self._applied: List[str] = [] async def apply(self, migration: Dict) -> bool: await self.conn.execute(migration['up']) self._applied.append(migration['version']) logger.info(f"Applied migration {migration['version']}") return True """, }, }, # ------------------------------------------------------------------ # Task 4 – Hard: Red-herring cascade from a bad DB migration (003) # ------------------------------------------------------------------ 4: { "whatsapp_sync": { "db.py": """\ import logging from typing import Dict, List logger = logging.getLogger(__name__) # Migration 003 introduces a circular FK: # message_threads.parent_message_id → messages.id # messages.thread_id → message_threads.id # PostgreSQL refuses the self-referential constraint during ALTER TABLE, # causing FK violation errors that cascade to all consumers of both tables. MIGRATIONS: List[Dict] = [ { "version": "001", "description": "Create messages table", "up": ( "CREATE TABLE IF NOT EXISTS messages (" " id SERIAL PRIMARY KEY," " user_id INTEGER NOT NULL," " content TEXT," " sender_id INTEGER," " timestamp BIGINT," " synced BOOLEAN DEFAULT FALSE" ");" ), }, { "version": "002", "description": "Add user preferences", "up": ( "CREATE TABLE IF NOT EXISTS user_preferences (" " id SERIAL PRIMARY KEY," " user_id INTEGER NOT NULL," " notification_enabled BOOLEAN DEFAULT TRUE," " sync_frequency INTEGER DEFAULT 30" ");" ), }, { "version": "003", "description": "Add message threads with back-reference", "up": ( "CREATE TABLE IF NOT EXISTS message_threads (" " id SERIAL PRIMARY KEY," " parent_message_id INTEGER REFERENCES messages(id) ON DELETE CASCADE," " participant_ids INTEGER[] NOT NULL," " created_at BIGINT" ");" "ALTER TABLE messages" " ADD COLUMN thread_id INTEGER REFERENCES message_threads(id);" ), # BUG: circular FK — messages → message_threads → messages # Fix: remove the ALTER TABLE line (messages should NOT reference threads) }, ] class MigrationRunner: def __init__(self, db_conn): self.conn = db_conn self._applied: List[str] = [] async def apply(self, migration: Dict) -> bool: await self.conn.execute(migration['up']) self._applied.append(migration['version']) logger.info(f"Applied migration {migration['version']}: {migration['description']}") return True async def rollback_version(self, version: str) -> bool: logger.warning(f"Rolling back migration {version}") self._applied = [v for v in self._applied if v != version] return True async def run_all(self): for migration in MIGRATIONS: await self.apply(migration) """, "handler.py": """\ import asyncio import logging from typing import List, Dict logger = logging.getLogger(__name__) class MessageSyncHandler: def __init__(self, db_pool, message_queue): self.db_pool = db_pool self.queue = message_queue self._sync_count = 0 async def sync_user_messages(self, user_id: str) -> List[Dict]: conn = await self.db_pool.acquire() try: messages = await conn.fetch( "SELECT id, content, sender_id, timestamp " "FROM messages WHERE user_id = $1 AND synced = FALSE " "ORDER BY timestamp", user_id, ) processed = [] for msg in messages: await conn.execute( "UPDATE messages SET synced = TRUE WHERE id = $1", msg['id'], ) processed.append(dict(msg)) self._sync_count += len(processed) return processed except Exception as e: logger.error(f"Sync failed for user {user_id}: {e}") raise finally: await self.db_pool.release(conn) async def process_queue(self, batch_size: int = 50) -> int: processed = 0 while processed < batch_size: try: user_id = await asyncio.wait_for( self.queue.get(), timeout=1.0 ) await self.sync_user_messages(user_id) processed += 1 except asyncio.TimeoutError: break return processed """, }, "capi_pipeline": { "ingestor.py": """\ import logging from typing import Dict, Any from .transformer import EventTransformer logger = logging.getLogger(__name__) class EventIngestor: def __init__(self, transformer: EventTransformer): self.transformer = transformer self._event_buffer = [] def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: try: events = raw_payload.get('data', []) if not events: return {'status': 'error', 'message': 'No events in payload'} transformed = self.transformer.batch_transform(events) self._event_buffer.extend(transformed) return {'status': 'ok', 'processed': len(transformed)} except Exception as e: logger.error(f"Ingest failed: {e}", exc_info=True) return {'status': 'error', 'message': str(e)} def flush(self) -> int: count = len(self._event_buffer) self._event_buffer.clear() return count """, "transformer.py": """\ import logging from typing import Dict, Any, List from datetime import datetime logger = logging.getLogger(__name__) class EventTransformer: SUPPORTED_EVENTS = { 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' } def __init__(self): self._processed_count = 0 def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: return None event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) transformed = { 'event_id': raw_event.get('event_id'), 'event_name': raw_event.get('event_name'), 'event_time': event_time, 'user_data': raw_event.get('user_data', {}), 'custom_data': raw_event.get('custom_data', {}), 'processed_at': int(datetime.utcnow().timestamp()), } self._processed_count += 1 return transformed def _normalize_timestamp(self, ts: Any) -> int: ts = int(ts) if ts > 1_000_000_000_000: return ts // 1000 return ts def batch_transform(self, events: List[Dict]) -> List[Dict]: return [t for e in events if (t := self.transform(e)) is not None] """, }, "ad_ranking": { "ranker.py": """\ import logging from typing import List, Dict logger = logging.getLogger(__name__) class AdRanker: def __init__(self, api_client): self.api = api_client self.model_version = "v2.3.1" def fetch_candidate_ads(self, user_id: str) -> List[Dict]: ads = self.api.get_all_ads(user_id) return [ad for ad in ads if ad.get('active', False)] def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: scored = [] for ad in ads: click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) relevance = self._compute_relevance(ad, user_context) score = (click_rate * 0.4) + (relevance * 0.6) scored.append({**ad, 'score': round(score, 4)}) return sorted(scored, key=lambda x: x['score'], reverse=True) def _compute_relevance(self, ad: Dict, context: Dict) -> float: category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 return round((category_match + age_match) / 2.0, 4) def rank(self, user_id: str, user_context: Dict) -> List[Dict]: candidates = self.fetch_candidate_ads(user_id) if not candidates: return [] return self.score_ads(candidates, user_context) """, }, }, # ------------------------------------------------------------------ # Task 5 – Hard: PII data-leak via DEBUG_MODE=True in production # ------------------------------------------------------------------ 5: { "capi_pipeline": { "ingestor.py": """\ import logging from typing import Dict, Any from .transformer import EventTransformer logger = logging.getLogger(__name__) DEBUG_MODE = True # BUG: must be False in production – leaks raw user PII class EventIngestor: \"\"\"Ingests and validates CAPI event payloads.\"\"\" def __init__(self, transformer: EventTransformer): self.transformer = transformer self._event_buffer = [] def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: try: events = raw_payload.get('data', []) if not events: return {'status': 'error', 'message': 'No events in payload'} transformed = self.transformer.batch_transform(events) self._event_buffer.extend(transformed) if DEBUG_MODE: # SECURITY BUG: exposes raw PII (emails, phone numbers) in the # HTTP response – visible in CDN logs, browser network tabs, etc. return { 'status': 'ok', 'processed': len(transformed), 'debug_data': { 'raw_payload': raw_payload, 'user_emails': [e.get('user_data', {}) for e in events], 'buffer_state': self._event_buffer, }, } return {'status': 'ok', 'processed': len(transformed)} except Exception as e: logger.error(f"Ingest failed: {e}", exc_info=True) return {'status': 'error', 'message': str(e)} def flush(self) -> int: count = len(self._event_buffer) self._event_buffer.clear() logger.info(f"Flushed {count} events") return count """, "transformer.py": """\ import logging from typing import Dict, Any, List from datetime import datetime logger = logging.getLogger(__name__) class EventTransformer: SUPPORTED_EVENTS = { 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' } def __init__(self): self._processed_count = 0 def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: return None event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) transformed = { 'event_id': raw_event.get('event_id'), 'event_name': raw_event.get('event_name'), 'event_time': event_time, 'user_data': self._hash_user_data(raw_event.get('user_data', {})), 'custom_data': raw_event.get('custom_data', {}), 'processed_at': int(datetime.utcnow().timestamp()), } self._processed_count += 1 return transformed def _normalize_timestamp(self, ts: Any) -> int: ts = int(ts) if ts > 1_000_000_000_000: return ts // 1000 return ts def _hash_user_data(self, user_data: Dict) -> Dict: import hashlib hashed = {} for key, val in user_data.items(): if key in ('email', 'phone', 'fn', 'ln'): hashed[key] = hashlib.sha256( str(val).lower().encode() ).hexdigest() else: hashed[key] = val return hashed def batch_transform(self, events: List[Dict]) -> List[Dict]: return [t for e in events if (t := self.transform(e)) is not None] """, }, "ad_ranking": { "ranker.py": """\ import logging from typing import List, Dict logger = logging.getLogger(__name__) class AdRanker: def __init__(self, api_client): self.api = api_client self.model_version = "v2.3.1" def fetch_candidate_ads(self, user_id: str) -> List[Dict]: ads = self.api.get_all_ads(user_id) return [ad for ad in ads if ad.get('active', False)] def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: scored = [] for ad in ads: click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) relevance = self._compute_relevance(ad, user_context) score = (click_rate * 0.4) + (relevance * 0.6) scored.append({**ad, 'score': round(score, 4)}) return sorted(scored, key=lambda x: x['score'], reverse=True) def _compute_relevance(self, ad: Dict, context: Dict) -> float: category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 return round((category_match + age_match) / 2.0, 4) def rank(self, user_id: str, user_context: Dict) -> List[Dict]: candidates = self.fetch_candidate_ads(user_id) if not candidates: return [] return self.score_ads(candidates, user_context) """, }, "whatsapp_sync": { "handler.py": """\ import asyncio import logging from typing import List, Dict logger = logging.getLogger(__name__) class MessageSyncHandler: def __init__(self, db_pool, message_queue): self.db_pool = db_pool self.queue = message_queue self._sync_count = 0 async def sync_user_messages(self, user_id: str) -> List[Dict]: conn = await self.db_pool.acquire() try: messages = await conn.fetch( "SELECT id, content, sender_id, timestamp " "FROM messages WHERE user_id = $1 AND synced = FALSE " "ORDER BY timestamp", user_id, ) processed = [] for msg in messages: await conn.execute( "UPDATE messages SET synced = TRUE WHERE id = $1", msg['id'], ) processed.append(dict(msg)) self._sync_count += len(processed) return processed except Exception as e: logger.error(f"Sync failed for user {user_id}: {e}") raise finally: await self.db_pool.release(conn) async def process_queue(self, batch_size: int = 50) -> int: processed = 0 while processed < batch_size: try: user_id = await asyncio.wait_for( self.queue.get(), timeout=1.0 ) await self.sync_user_messages(user_id) processed += 1 except asyncio.TimeoutError: break return processed """, "db.py": """\ import logging from typing import Dict, List logger = logging.getLogger(__name__) MIGRATIONS: List[Dict] = [ { "version": "001", "description": "Create messages table", "up": ( "CREATE TABLE IF NOT EXISTS messages (" " id SERIAL PRIMARY KEY," " user_id INTEGER NOT NULL," " content TEXT," " sender_id INTEGER," " timestamp BIGINT," " synced BOOLEAN DEFAULT FALSE" ");" ), }, ] class MigrationRunner: def __init__(self, db_conn): self.conn = db_conn self._applied: List[str] = [] async def apply(self, migration: Dict) -> bool: await self.conn.execute(migration['up']) self._applied.append(migration['version']) return True """, }, }, } # --------------------------------------------------------------------------- # VirtualFileSystem # --------------------------------------------------------------------------- class EditRecord: __slots__ = ("step", "service", "filename", "line_idx", "old_code", "new_code") def __init__(self, step, service, filename, line_idx, old_code, new_code): self.step = step self.service = service self.filename = filename self.line_idx = line_idx self.old_code = old_code self.new_code = new_code def to_dict(self): return { "step": self.step, "service": self.service, "filename": self.filename, "line_number": self.line_idx + 1, "old_code": self.old_code, "new_code": self.new_code, } class VirtualFileSystem: """In-memory multi-service file system with history tracking.""" def __init__(self): self._files: Dict[str, Dict[str, str]] = {} self._history: List[EditRecord] = [] self._task_id: int = 0 # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def reset(self, task_id: int) -> None: """Load the buggy snapshot for a specific task.""" self._task_id = task_id self._history.clear() snapshot = TASK_SNAPSHOTS.get(task_id, {}) # Start from shared base, then overlay task-specific files merged: Dict[str, Dict[str, str]] = {} for service, files in SHARED_FILES.items(): merged[service] = dict(files) for service, files in snapshot.items(): if service not in merged: merged[service] = {} merged[service].update(files) self._files = merged # ------------------------------------------------------------------ # Read # ------------------------------------------------------------------ def list_files(self, service: str) -> List[str]: return sorted(self._files.get(service, {}).keys()) def list_services(self) -> List[str]: return sorted(self._files.keys()) def read_file(self, service: str, filename: str) -> Tuple[bool, str]: """Return (found, content).""" content = self._files.get(service, {}).get(filename) if content is None: return False, f"File not found: {service}/{filename}" return True, content def get_file_lines(self, service: str, filename: str) -> Optional[List[str]]: found, content = self.read_file(service, filename) if not found: return None return content.splitlines() # ------------------------------------------------------------------ # Write # ------------------------------------------------------------------ def edit_line( self, service: str, filename: str, line_number: int, # 1-based new_code: str, step: int = 0, ) -> Tuple[bool, str]: """Replace a single line (1-based). Returns (success, message).""" lines = self.get_file_lines(service, filename) if lines is None: return False, f"File not found: {service}/{filename}" idx = line_number - 1 if not (0 <= idx < len(lines)): return False, f"Line {line_number} out of range (file has {len(lines)} lines)" old_code = lines[idx] lines[idx] = new_code self._files[service][filename] = "\n".join(lines) self._history.append( EditRecord(step, service, filename, idx, old_code, new_code) ) return True, "ok" # ------------------------------------------------------------------ # History / blame # ------------------------------------------------------------------ def get_edit_history( self, service: Optional[str] = None, filename: Optional[str] = None, ) -> List[dict]: records = self._history if service: records = [r for r in records if r.service == service] if filename: records = [r for r in records if r.filename == filename] return [r.to_dict() for r in records] def git_blame(self, service: str, filename: str, line_number: int) -> str: """Return the last edit record for a specific line, or 'AI-generated' if untouched.""" idx = line_number - 1 matching = [ r for r in reversed(self._history) if r.service == service and r.filename == filename and r.line_idx == idx ] if matching: r = matching[0] return ( f"Step {r.step}: agent changed line {line_number} in " f"{service}/{filename}\n" f" - {r.old_code!r}\n" f" + {r.new_code!r}" ) return ( f"Line {line_number} in {service}/{filename} was last modified by: " f"Junior AI code-gen bot (commit a3f91b2, 2026-04-23 02:14 UTC)" ) def build_git_diff(self) -> Optional[str]: if not self._history: return None lines = [f"--- Task {self._task_id} working diff ---"] for r in self._history: lines.append( f"@@ {r.service}/{r.filename} line {r.line_idx + 1} @@\n" f"-{r.old_code}\n" f"+{r.new_code}" ) return "\n".join(lines)