| """ |
| Layer 1 – Virtual File System. |
| |
| Stores all service codebases as in-memory strings keyed by |
| (service, filename). Every task starts from a clean snapshot of |
| its own buggy codebase; edits accumulate on top of that snapshot. |
| """ |
| from __future__ import annotations |
| import copy |
| from typing import Dict, List, Optional, Tuple |
|
|
| |
| |
| |
| |
| |
|
|
| SHARED_FILES: Dict[str, Dict[str, str]] = { |
| "ad_ranking": { |
| "utils.py": """\ |
| from typing import Dict, List |
| |
| |
| def normalize_scores(ads: List[Dict]) -> List[Dict]: |
| if not ads: |
| return ads |
| max_score = max(ad['score'] for ad in ads) |
| min_score = min(ad['score'] for ad in ads) |
| score_range = max_score - min_score or 1.0 |
| return [ |
| {**ad, 'normalized_score': (ad['score'] - min_score) / score_range} |
| for ad in ads |
| ] |
| |
| |
| def filter_by_budget(ads: List[Dict], daily_budget_cents: int) -> List[Dict]: |
| return [ad for ad in ads if ad.get('spend_today_cents', 0) < daily_budget_cents] |
| |
| |
| def compute_roas(revenue: float, spend: float) -> float: |
| return revenue / spend if spend > 0 else 0.0 |
| """, |
| "models.py": """\ |
| from dataclasses import dataclass, field |
| from typing import List, Optional |
| |
| |
| @dataclass |
| class Ad: |
| ad_id: str |
| campaign_id: str |
| category: str |
| target_age: str |
| clicks: int = 0 |
| impressions: int = 0 |
| spend_today_cents: int = 0 |
| active: bool = True |
| score: float = 0.0 |
| |
| |
| @dataclass |
| class UserContext: |
| user_id: str |
| interest: str |
| age_group: str |
| country: str |
| """, |
| }, |
| "capi_pipeline": { |
| "validator.py": """\ |
| from typing import Dict, Any |
| |
| |
| REQUIRED_FIELDS = {'event_name', 'event_time', 'event_id'} |
| |
| |
| def validate_event(event: Dict[str, Any]) -> Tuple[bool, str]: |
| missing = REQUIRED_FIELDS - set(event.keys()) |
| if missing: |
| return False, f'Missing fields: {missing}' |
| if not isinstance(event.get('event_time'), (int, float)): |
| return False, 'event_time must be numeric' |
| return True, 'ok' |
| """, |
| }, |
| "whatsapp_sync": { |
| "models.py": """\ |
| from dataclasses import dataclass |
| from typing import Optional |
| |
| |
| @dataclass |
| class Message: |
| id: int |
| user_id: int |
| sender_id: int |
| content: str |
| timestamp: int |
| synced: bool = False |
| thread_id: Optional[int] = None |
| """, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| TASK_SNAPSHOTS: Dict[int, Dict[str, Dict[str, str]]] = { |
|
|
| |
| |
| |
| 1: { |
| "ad_ranking": { |
| "ranker.py": """\ |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class AdRanker: |
| \"\"\"Scores and ranks candidate ads for a user.\"\"\" |
| |
| def __init__(self, api_client): |
| self.api = api_client |
| self.model_version = "v2.3.1" |
| self._cache = {} |
| |
| def fetch_candidate_ads(self, user_id: str) -> List[Dict]: |
| ads = self.api.get_all_ads(user_id) |
| return [ad for ad in ads if ad.get('active', False)] |
| |
| def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: |
| scored = [] |
| for ad in ads: |
| click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1) |
| relevance = self._compute_relevance(ad, user_context) |
| score = (click_rate * 0.4) + (relevance * 0.6) |
| scored.append({**ad, 'score': round(score, 4)}) |
| return sorted(scored, key=lambda x: x['score'], reverse=True) |
| |
| def _compute_relevance(self, ad: Dict, context: Dict) -> float: |
| category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 |
| age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 |
| return round((category_match + age_match) / 2.0, 4) |
| |
| def rank(self, user_id: str, user_context: Dict) -> List[Dict]: |
| candidates = self.fetch_candidate_ads(user_id) |
| if not candidates: |
| logger.warning(f"No candidates for user {user_id}") |
| return [] |
| return self.score_ads(candidates, user_context) |
| """, |
| }, |
| }, |
|
|
| |
| |
| |
| 2: { |
| "capi_pipeline": { |
| "transformer.py": """\ |
| import logging |
| from typing import Dict, Any, List |
| from datetime import datetime |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventTransformer: |
| \"\"\"Transforms raw CAPI events into normalised format.\"\"\" |
| |
| SUPPORTED_EVENTS = { |
| 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' |
| } |
| |
| def __init__(self): |
| self._processed_count = 0 |
| |
| def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: |
| if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: |
| logger.warning(f"Unknown event type: {raw_event.get('event_name')}") |
| return None |
| |
| event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) |
| |
| transformed = { |
| 'event_id': raw_event.get('event_id'), |
| 'event_name': raw_event.get('event_name'), |
| 'event_time': event_time, |
| 'user_data': self._hash_user_data(raw_event.get('user_data', {})), |
| 'custom_data': raw_event.get('custom_data', {}), |
| 'processed_at': int(datetime.utcnow().timestamp()), |
| } |
| self._processed_count += 1 |
| return transformed |
| |
| def _normalize_timestamp(self, ts: Any) -> int: |
| \"\"\"Normalise event timestamp to Unix seconds.\"\"\" |
| ts = int(ts) |
| # BUG: threshold is 1_000_000_000 (10 digits) instead of |
| # 1_000_000_000_000 (13 digits for milliseconds). |
| # A normal unix-second timestamp like 1_700_000_000 passes the |
| # condition and gets divided by 1000 → year ~1970+20 days. |
| if ts > 1_000_000_000: |
| return ts // 1000 |
| return ts |
| |
| def _hash_user_data(self, user_data: Dict) -> Dict: |
| import hashlib |
| hashed = {} |
| for key, val in user_data.items(): |
| if key in ('email', 'phone', 'fn', 'ln'): |
| hashed[key] = hashlib.sha256( |
| str(val).lower().encode() |
| ).hexdigest() |
| else: |
| hashed[key] = val |
| return hashed |
| |
| def batch_transform(self, events: List[Dict]) -> List[Dict]: |
| return [t for e in events if (t := self.transform(e)) is not None] |
| """, |
| "ingestor.py": """\ |
| import logging |
| from typing import Dict, Any |
| from .transformer import EventTransformer |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventIngestor: |
| \"\"\"Ingests and validates CAPI event payloads.\"\"\" |
| |
| def __init__(self, transformer: EventTransformer): |
| self.transformer = transformer |
| self._event_buffer = [] |
| |
| def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: |
| try: |
| events = raw_payload.get('data', []) |
| if not events: |
| return {'status': 'error', 'message': 'No events in payload'} |
| transformed = self.transformer.batch_transform(events) |
| self._event_buffer.extend(transformed) |
| return {'status': 'ok', 'processed': len(transformed)} |
| except Exception as e: |
| logger.error(f"Ingest failed: {e}", exc_info=True) |
| return {'status': 'error', 'message': str(e)} |
| |
| def flush(self) -> int: |
| count = len(self._event_buffer) |
| self._event_buffer.clear() |
| logger.info(f"Flushed {count} events") |
| return count |
| """, |
| }, |
| "ad_ranking": { |
| "ranker.py": """\ |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class AdRanker: |
| \"\"\"Scores and ranks candidate ads for a user.\"\"\" |
| |
| def __init__(self, api_client): |
| self.api = api_client |
| self.model_version = "v2.3.1" |
| |
| def fetch_candidate_ads(self, user_id: str) -> List[Dict]: |
| ads = self.api.get_all_ads(user_id) |
| return [ad for ad in ads if ad.get('active', False)] |
| |
| def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: |
| scored = [] |
| for ad in ads: |
| click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) |
| relevance = self._compute_relevance(ad, user_context) |
| score = (click_rate * 0.4) + (relevance * 0.6) |
| scored.append({**ad, 'score': round(score, 4)}) |
| return sorted(scored, key=lambda x: x['score'], reverse=True) |
| |
| def _compute_relevance(self, ad: Dict, context: Dict) -> float: |
| category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 |
| age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 |
| return round((category_match + age_match) / 2.0, 4) |
| |
| def rank(self, user_id: str, user_context: Dict) -> List[Dict]: |
| candidates = self.fetch_candidate_ads(user_id) |
| if not candidates: |
| logger.warning(f"No candidates for user {user_id}") |
| return [] |
| return self.score_ads(candidates, user_context) |
| """, |
| }, |
| }, |
|
|
| |
| |
| |
| 3: { |
| "whatsapp_sync": { |
| "handler.py": """\ |
| import asyncio |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class MessageSyncHandler: |
| \"\"\"Handles real-time WhatsApp message synchronisation.\"\"\" |
| |
| def __init__(self, db_pool, message_queue): |
| self.db_pool = db_pool |
| self.queue = message_queue |
| self._sync_count = 0 |
| |
| async def sync_user_messages(self, user_id: str) -> List[Dict]: |
| \"\"\"Fetch and mark-as-synced all pending messages for a user.\"\"\" |
| conn = await self.db_pool.acquire() |
| try: |
| messages = await conn.fetch( |
| "SELECT id, content, sender_id, timestamp " |
| "FROM messages WHERE user_id = $1 AND synced = FALSE " |
| "ORDER BY timestamp", |
| user_id, |
| ) |
| processed = [] |
| for msg in messages: |
| await conn.execute( |
| "UPDATE messages SET synced = TRUE WHERE id = $1", |
| msg['id'], |
| ) |
| processed.append(dict(msg)) |
| self._sync_count += len(processed) |
| return processed |
| except Exception as e: |
| logger.error(f"Sync failed for user {user_id}: {e}") |
| raise |
| # BUG: missing `finally: await self.db_pool.release(conn)` |
| # Under load the pool exhausts → all sync requests hang indefinitely. |
| |
| async def process_queue(self, batch_size: int = 50) -> int: |
| processed = 0 |
| while processed < batch_size: |
| try: |
| user_id = await asyncio.wait_for( |
| self.queue.get(), timeout=1.0 |
| ) |
| await self.sync_user_messages(user_id) |
| processed += 1 |
| except asyncio.TimeoutError: |
| break |
| return processed |
| """, |
| "db.py": """\ |
| import logging |
| from typing import Dict, List |
| |
| logger = logging.getLogger(__name__) |
| |
| MIGRATIONS: List[Dict] = [ |
| { |
| "version": "001", |
| "description": "Create messages table", |
| "up": ( |
| "CREATE TABLE messages (" |
| " id SERIAL PRIMARY KEY," |
| " user_id INTEGER NOT NULL," |
| " content TEXT," |
| " sender_id INTEGER," |
| " timestamp BIGINT," |
| " synced BOOLEAN DEFAULT FALSE" |
| ");" |
| ), |
| }, |
| ] |
| |
| |
| class MigrationRunner: |
| def __init__(self, db_conn): |
| self.conn = db_conn |
| self._applied: List[str] = [] |
| |
| async def apply(self, migration: Dict) -> bool: |
| await self.conn.execute(migration['up']) |
| self._applied.append(migration['version']) |
| logger.info(f"Applied migration {migration['version']}") |
| return True |
| """, |
| }, |
| }, |
|
|
| |
| |
| |
| 4: { |
| "whatsapp_sync": { |
| "db.py": """\ |
| import logging |
| from typing import Dict, List |
| |
| logger = logging.getLogger(__name__) |
| |
| # Migration 003 introduces a circular FK: |
| # message_threads.parent_message_id → messages.id |
| # messages.thread_id → message_threads.id |
| # PostgreSQL refuses the self-referential constraint during ALTER TABLE, |
| # causing FK violation errors that cascade to all consumers of both tables. |
| |
| MIGRATIONS: List[Dict] = [ |
| { |
| "version": "001", |
| "description": "Create messages table", |
| "up": ( |
| "CREATE TABLE IF NOT EXISTS messages (" |
| " id SERIAL PRIMARY KEY," |
| " user_id INTEGER NOT NULL," |
| " content TEXT," |
| " sender_id INTEGER," |
| " timestamp BIGINT," |
| " synced BOOLEAN DEFAULT FALSE" |
| ");" |
| ), |
| }, |
| { |
| "version": "002", |
| "description": "Add user preferences", |
| "up": ( |
| "CREATE TABLE IF NOT EXISTS user_preferences (" |
| " id SERIAL PRIMARY KEY," |
| " user_id INTEGER NOT NULL," |
| " notification_enabled BOOLEAN DEFAULT TRUE," |
| " sync_frequency INTEGER DEFAULT 30" |
| ");" |
| ), |
| }, |
| { |
| "version": "003", |
| "description": "Add message threads with back-reference", |
| "up": ( |
| "CREATE TABLE IF NOT EXISTS message_threads (" |
| " id SERIAL PRIMARY KEY," |
| " parent_message_id INTEGER REFERENCES messages(id) ON DELETE CASCADE," |
| " participant_ids INTEGER[] NOT NULL," |
| " created_at BIGINT" |
| ");" |
| "ALTER TABLE messages" |
| " ADD COLUMN thread_id INTEGER REFERENCES message_threads(id);" |
| ), |
| # BUG: circular FK — messages → message_threads → messages |
| # Fix: remove the ALTER TABLE line (messages should NOT reference threads) |
| }, |
| ] |
| |
| |
| class MigrationRunner: |
| def __init__(self, db_conn): |
| self.conn = db_conn |
| self._applied: List[str] = [] |
| |
| async def apply(self, migration: Dict) -> bool: |
| await self.conn.execute(migration['up']) |
| self._applied.append(migration['version']) |
| logger.info(f"Applied migration {migration['version']}: {migration['description']}") |
| return True |
| |
| async def rollback_version(self, version: str) -> bool: |
| logger.warning(f"Rolling back migration {version}") |
| self._applied = [v for v in self._applied if v != version] |
| return True |
| |
| async def run_all(self): |
| for migration in MIGRATIONS: |
| await self.apply(migration) |
| """, |
| "handler.py": """\ |
| import asyncio |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class MessageSyncHandler: |
| def __init__(self, db_pool, message_queue): |
| self.db_pool = db_pool |
| self.queue = message_queue |
| self._sync_count = 0 |
| |
| async def sync_user_messages(self, user_id: str) -> List[Dict]: |
| conn = await self.db_pool.acquire() |
| try: |
| messages = await conn.fetch( |
| "SELECT id, content, sender_id, timestamp " |
| "FROM messages WHERE user_id = $1 AND synced = FALSE " |
| "ORDER BY timestamp", |
| user_id, |
| ) |
| processed = [] |
| for msg in messages: |
| await conn.execute( |
| "UPDATE messages SET synced = TRUE WHERE id = $1", |
| msg['id'], |
| ) |
| processed.append(dict(msg)) |
| self._sync_count += len(processed) |
| return processed |
| except Exception as e: |
| logger.error(f"Sync failed for user {user_id}: {e}") |
| raise |
| finally: |
| await self.db_pool.release(conn) |
| |
| async def process_queue(self, batch_size: int = 50) -> int: |
| processed = 0 |
| while processed < batch_size: |
| try: |
| user_id = await asyncio.wait_for( |
| self.queue.get(), timeout=1.0 |
| ) |
| await self.sync_user_messages(user_id) |
| processed += 1 |
| except asyncio.TimeoutError: |
| break |
| return processed |
| """, |
| }, |
| "capi_pipeline": { |
| "ingestor.py": """\ |
| import logging |
| from typing import Dict, Any |
| from .transformer import EventTransformer |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventIngestor: |
| def __init__(self, transformer: EventTransformer): |
| self.transformer = transformer |
| self._event_buffer = [] |
| |
| def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: |
| try: |
| events = raw_payload.get('data', []) |
| if not events: |
| return {'status': 'error', 'message': 'No events in payload'} |
| transformed = self.transformer.batch_transform(events) |
| self._event_buffer.extend(transformed) |
| return {'status': 'ok', 'processed': len(transformed)} |
| except Exception as e: |
| logger.error(f"Ingest failed: {e}", exc_info=True) |
| return {'status': 'error', 'message': str(e)} |
| |
| def flush(self) -> int: |
| count = len(self._event_buffer) |
| self._event_buffer.clear() |
| return count |
| """, |
| "transformer.py": """\ |
| import logging |
| from typing import Dict, Any, List |
| from datetime import datetime |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventTransformer: |
| SUPPORTED_EVENTS = { |
| 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' |
| } |
| |
| def __init__(self): |
| self._processed_count = 0 |
| |
| def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: |
| if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: |
| return None |
| event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) |
| transformed = { |
| 'event_id': raw_event.get('event_id'), |
| 'event_name': raw_event.get('event_name'), |
| 'event_time': event_time, |
| 'user_data': raw_event.get('user_data', {}), |
| 'custom_data': raw_event.get('custom_data', {}), |
| 'processed_at': int(datetime.utcnow().timestamp()), |
| } |
| self._processed_count += 1 |
| return transformed |
| |
| def _normalize_timestamp(self, ts: Any) -> int: |
| ts = int(ts) |
| if ts > 1_000_000_000_000: |
| return ts // 1000 |
| return ts |
| |
| def batch_transform(self, events: List[Dict]) -> List[Dict]: |
| return [t for e in events if (t := self.transform(e)) is not None] |
| """, |
| }, |
| "ad_ranking": { |
| "ranker.py": """\ |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class AdRanker: |
| def __init__(self, api_client): |
| self.api = api_client |
| self.model_version = "v2.3.1" |
| |
| def fetch_candidate_ads(self, user_id: str) -> List[Dict]: |
| ads = self.api.get_all_ads(user_id) |
| return [ad for ad in ads if ad.get('active', False)] |
| |
| def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: |
| scored = [] |
| for ad in ads: |
| click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) |
| relevance = self._compute_relevance(ad, user_context) |
| score = (click_rate * 0.4) + (relevance * 0.6) |
| scored.append({**ad, 'score': round(score, 4)}) |
| return sorted(scored, key=lambda x: x['score'], reverse=True) |
| |
| def _compute_relevance(self, ad: Dict, context: Dict) -> float: |
| category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 |
| age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 |
| return round((category_match + age_match) / 2.0, 4) |
| |
| def rank(self, user_id: str, user_context: Dict) -> List[Dict]: |
| candidates = self.fetch_candidate_ads(user_id) |
| if not candidates: |
| return [] |
| return self.score_ads(candidates, user_context) |
| """, |
| }, |
| }, |
|
|
| |
| |
| |
| 5: { |
| "capi_pipeline": { |
| "ingestor.py": """\ |
| import logging |
| from typing import Dict, Any |
| from .transformer import EventTransformer |
| |
| logger = logging.getLogger(__name__) |
| |
| DEBUG_MODE = True # BUG: must be False in production – leaks raw user PII |
| |
| |
| class EventIngestor: |
| \"\"\"Ingests and validates CAPI event payloads.\"\"\" |
| |
| def __init__(self, transformer: EventTransformer): |
| self.transformer = transformer |
| self._event_buffer = [] |
| |
| def ingest(self, raw_payload: Dict[str, Any]) -> Dict[str, Any]: |
| try: |
| events = raw_payload.get('data', []) |
| if not events: |
| return {'status': 'error', 'message': 'No events in payload'} |
| |
| transformed = self.transformer.batch_transform(events) |
| self._event_buffer.extend(transformed) |
| |
| if DEBUG_MODE: |
| # SECURITY BUG: exposes raw PII (emails, phone numbers) in the |
| # HTTP response – visible in CDN logs, browser network tabs, etc. |
| return { |
| 'status': 'ok', |
| 'processed': len(transformed), |
| 'debug_data': { |
| 'raw_payload': raw_payload, |
| 'user_emails': [e.get('user_data', {}) for e in events], |
| 'buffer_state': self._event_buffer, |
| }, |
| } |
| |
| return {'status': 'ok', 'processed': len(transformed)} |
| |
| except Exception as e: |
| logger.error(f"Ingest failed: {e}", exc_info=True) |
| return {'status': 'error', 'message': str(e)} |
| |
| def flush(self) -> int: |
| count = len(self._event_buffer) |
| self._event_buffer.clear() |
| logger.info(f"Flushed {count} events") |
| return count |
| """, |
| "transformer.py": """\ |
| import logging |
| from typing import Dict, Any, List |
| from datetime import datetime |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class EventTransformer: |
| SUPPORTED_EVENTS = { |
| 'Purchase', 'AddToCart', 'ViewContent', 'Lead', 'CompleteRegistration' |
| } |
| |
| def __init__(self): |
| self._processed_count = 0 |
| |
| def transform(self, raw_event: Dict[str, Any]) -> Dict[str, Any]: |
| if raw_event.get('event_name') not in self.SUPPORTED_EVENTS: |
| return None |
| event_time = self._normalize_timestamp(raw_event.get('event_time', 0)) |
| transformed = { |
| 'event_id': raw_event.get('event_id'), |
| 'event_name': raw_event.get('event_name'), |
| 'event_time': event_time, |
| 'user_data': self._hash_user_data(raw_event.get('user_data', {})), |
| 'custom_data': raw_event.get('custom_data', {}), |
| 'processed_at': int(datetime.utcnow().timestamp()), |
| } |
| self._processed_count += 1 |
| return transformed |
| |
| def _normalize_timestamp(self, ts: Any) -> int: |
| ts = int(ts) |
| if ts > 1_000_000_000_000: |
| return ts // 1000 |
| return ts |
| |
| def _hash_user_data(self, user_data: Dict) -> Dict: |
| import hashlib |
| hashed = {} |
| for key, val in user_data.items(): |
| if key in ('email', 'phone', 'fn', 'ln'): |
| hashed[key] = hashlib.sha256( |
| str(val).lower().encode() |
| ).hexdigest() |
| else: |
| hashed[key] = val |
| return hashed |
| |
| def batch_transform(self, events: List[Dict]) -> List[Dict]: |
| return [t for e in events if (t := self.transform(e)) is not None] |
| """, |
| }, |
| "ad_ranking": { |
| "ranker.py": """\ |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class AdRanker: |
| def __init__(self, api_client): |
| self.api = api_client |
| self.model_version = "v2.3.1" |
| |
| def fetch_candidate_ads(self, user_id: str) -> List[Dict]: |
| ads = self.api.get_all_ads(user_id) |
| return [ad for ad in ads if ad.get('active', False)] |
| |
| def score_ads(self, ads: List[Dict], user_context: Dict) -> List[Dict]: |
| scored = [] |
| for ad in ads: |
| click_rate = ad.get('clicks', 0) / max(ad.get('impressions', 1), 1) |
| relevance = self._compute_relevance(ad, user_context) |
| score = (click_rate * 0.4) + (relevance * 0.6) |
| scored.append({**ad, 'score': round(score, 4)}) |
| return sorted(scored, key=lambda x: x['score'], reverse=True) |
| |
| def _compute_relevance(self, ad: Dict, context: Dict) -> float: |
| category_match = 1.0 if ad.get('category') == context.get('interest') else 0.3 |
| age_match = 1.0 if ad.get('target_age') == context.get('age_group') else 0.5 |
| return round((category_match + age_match) / 2.0, 4) |
| |
| def rank(self, user_id: str, user_context: Dict) -> List[Dict]: |
| candidates = self.fetch_candidate_ads(user_id) |
| if not candidates: |
| return [] |
| return self.score_ads(candidates, user_context) |
| """, |
| }, |
| "whatsapp_sync": { |
| "handler.py": """\ |
| import asyncio |
| import logging |
| from typing import List, Dict |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class MessageSyncHandler: |
| def __init__(self, db_pool, message_queue): |
| self.db_pool = db_pool |
| self.queue = message_queue |
| self._sync_count = 0 |
| |
| async def sync_user_messages(self, user_id: str) -> List[Dict]: |
| conn = await self.db_pool.acquire() |
| try: |
| messages = await conn.fetch( |
| "SELECT id, content, sender_id, timestamp " |
| "FROM messages WHERE user_id = $1 AND synced = FALSE " |
| "ORDER BY timestamp", |
| user_id, |
| ) |
| processed = [] |
| for msg in messages: |
| await conn.execute( |
| "UPDATE messages SET synced = TRUE WHERE id = $1", |
| msg['id'], |
| ) |
| processed.append(dict(msg)) |
| self._sync_count += len(processed) |
| return processed |
| except Exception as e: |
| logger.error(f"Sync failed for user {user_id}: {e}") |
| raise |
| finally: |
| await self.db_pool.release(conn) |
| |
| async def process_queue(self, batch_size: int = 50) -> int: |
| processed = 0 |
| while processed < batch_size: |
| try: |
| user_id = await asyncio.wait_for( |
| self.queue.get(), timeout=1.0 |
| ) |
| await self.sync_user_messages(user_id) |
| processed += 1 |
| except asyncio.TimeoutError: |
| break |
| return processed |
| """, |
| "db.py": """\ |
| import logging |
| from typing import Dict, List |
| |
| logger = logging.getLogger(__name__) |
| |
| MIGRATIONS: List[Dict] = [ |
| { |
| "version": "001", |
| "description": "Create messages table", |
| "up": ( |
| "CREATE TABLE IF NOT EXISTS messages (" |
| " id SERIAL PRIMARY KEY," |
| " user_id INTEGER NOT NULL," |
| " content TEXT," |
| " sender_id INTEGER," |
| " timestamp BIGINT," |
| " synced BOOLEAN DEFAULT FALSE" |
| ");" |
| ), |
| }, |
| ] |
| |
| |
| class MigrationRunner: |
| def __init__(self, db_conn): |
| self.conn = db_conn |
| self._applied: List[str] = [] |
| |
| async def apply(self, migration: Dict) -> bool: |
| await self.conn.execute(migration['up']) |
| self._applied.append(migration['version']) |
| return True |
| """, |
| }, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class EditRecord: |
| __slots__ = ("step", "service", "filename", "line_idx", "old_code", "new_code") |
|
|
| def __init__(self, step, service, filename, line_idx, old_code, new_code): |
| self.step = step |
| self.service = service |
| self.filename = filename |
| self.line_idx = line_idx |
| self.old_code = old_code |
| self.new_code = new_code |
|
|
| def to_dict(self): |
| return { |
| "step": self.step, |
| "service": self.service, |
| "filename": self.filename, |
| "line_number": self.line_idx + 1, |
| "old_code": self.old_code, |
| "new_code": self.new_code, |
| } |
|
|
|
|
| class VirtualFileSystem: |
| """In-memory multi-service file system with history tracking.""" |
|
|
| def __init__(self): |
| self._files: Dict[str, Dict[str, str]] = {} |
| self._history: List[EditRecord] = [] |
| self._task_id: int = 0 |
|
|
| |
| |
| |
|
|
| def reset(self, task_id: int) -> None: |
| """Load the buggy snapshot for a specific task.""" |
| self._task_id = task_id |
| self._history.clear() |
|
|
| snapshot = TASK_SNAPSHOTS.get(task_id, {}) |
| |
| merged: Dict[str, Dict[str, str]] = {} |
| for service, files in SHARED_FILES.items(): |
| merged[service] = dict(files) |
| for service, files in snapshot.items(): |
| if service not in merged: |
| merged[service] = {} |
| merged[service].update(files) |
|
|
| self._files = merged |
|
|
| |
| |
| |
|
|
| def list_files(self, service: str) -> List[str]: |
| return sorted(self._files.get(service, {}).keys()) |
|
|
| def list_services(self) -> List[str]: |
| return sorted(self._files.keys()) |
|
|
| def read_file(self, service: str, filename: str) -> Tuple[bool, str]: |
| """Return (found, content).""" |
| content = self._files.get(service, {}).get(filename) |
| if content is None: |
| return False, f"File not found: {service}/{filename}" |
| return True, content |
|
|
| def get_file_lines(self, service: str, filename: str) -> Optional[List[str]]: |
| found, content = self.read_file(service, filename) |
| if not found: |
| return None |
| return content.splitlines() |
|
|
| |
| |
| |
|
|
| def edit_line( |
| self, |
| service: str, |
| filename: str, |
| line_number: int, |
| new_code: str, |
| step: int = 0, |
| ) -> Tuple[bool, str]: |
| """Replace a single line (1-based). Returns (success, message).""" |
| lines = self.get_file_lines(service, filename) |
| if lines is None: |
| return False, f"File not found: {service}/{filename}" |
|
|
| idx = line_number - 1 |
| if not (0 <= idx < len(lines)): |
| return False, f"Line {line_number} out of range (file has {len(lines)} lines)" |
|
|
| old_code = lines[idx] |
| lines[idx] = new_code |
| self._files[service][filename] = "\n".join(lines) |
|
|
| self._history.append( |
| EditRecord(step, service, filename, idx, old_code, new_code) |
| ) |
| return True, "ok" |
|
|
| |
| |
| |
|
|
| def get_edit_history( |
| self, |
| service: Optional[str] = None, |
| filename: Optional[str] = None, |
| ) -> List[dict]: |
| records = self._history |
| if service: |
| records = [r for r in records if r.service == service] |
| if filename: |
| records = [r for r in records if r.filename == filename] |
| return [r.to_dict() for r in records] |
|
|
| def git_blame(self, service: str, filename: str, line_number: int) -> str: |
| """Return the last edit record for a specific line, or 'AI-generated' if untouched.""" |
| idx = line_number - 1 |
| matching = [ |
| r for r in reversed(self._history) |
| if r.service == service and r.filename == filename and r.line_idx == idx |
| ] |
| if matching: |
| r = matching[0] |
| return ( |
| f"Step {r.step}: agent changed line {line_number} in " |
| f"{service}/{filename}\n" |
| f" - {r.old_code!r}\n" |
| f" + {r.new_code!r}" |
| ) |
| return ( |
| f"Line {line_number} in {service}/{filename} was last modified by: " |
| f"Junior AI code-gen bot (commit a3f91b2, 2026-04-23 02:14 UTC)" |
| ) |
|
|
| def build_git_diff(self) -> Optional[str]: |
| if not self._history: |
| return None |
| lines = [f"--- Task {self._task_id} working diff ---"] |
| for r in self._history: |
| lines.append( |
| f"@@ {r.service}/{r.filename} line {r.line_idx + 1} @@\n" |
| f"-{r.old_code}\n" |
| f"+{r.new_code}" |
| ) |
| return "\n".join(lines) |
|
|