| import sqlite3 |
| import re |
|
|
| class Task: |
| def __init__(self, task_id: int): |
| self.task_id = task_id |
| |
| def setup_db(self, conn: sqlite3.Connection): |
| raise NotImplementedError |
| |
| def get_goal(self) -> str: |
| raise NotImplementedError |
| |
| def grade(self, conn: sqlite3.Connection) -> float: |
| raise NotImplementedError |
|
|
| class EasyTask(Task): |
| def __init__(self): |
| super().__init__(1) |
|
|
| def setup_db(self, conn: sqlite3.Connection): |
| c = conn.cursor() |
| c.execute("CREATE TABLE customers (id INTEGER PRIMARY KEY, name TEXT, total_spent REAL)") |
| c.executemany("INSERT INTO customers (name, total_spent) VALUES (?, ?)", [ |
| ("Alice", 500.0), |
| ("Bob", 1200.0), |
| ("Charlie", 50.0), |
| ("Diana", 3000.0), |
| ("Eve", 1000.01) |
| ]) |
| conn.commit() |
|
|
| def get_goal(self) -> str: |
| return "Create a view named 'high_value_customers' containing all customers who have a 'total_spent' greater than 1000.0. The view should contain the exact same columns as the customers table." |
|
|
| def grade(self, conn: sqlite3.Connection) -> float: |
| c = conn.cursor() |
| try: |
| |
| c.execute("SELECT name FROM sqlite_master WHERE type='view' AND name='high_value_customers'") |
| if not c.fetchone(): |
| return 0.01 |
| |
| |
| c.execute("SELECT name, total_spent FROM high_value_customers ORDER BY name") |
| rows = c.fetchall() |
| |
| if len(rows) != 3: |
| return 0.5 |
| |
| expected = [("Bob", 1200.0), ("Diana", 3000.0), ("Eve", 1000.01)] |
| if rows == expected: |
| return 0.99 |
| return 0.5 |
| except Exception: |
| return 0.01 |
|
|
|
|
| class MediumTask(Task): |
| def __init__(self): |
| super().__init__(2) |
|
|
| def setup_db(self, conn: sqlite3.Connection): |
| c = conn.cursor() |
| c.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, category TEXT, price TEXT)") |
| c.executemany("INSERT INTO products (name, category, price) VALUES (?, ?, ?)", [ |
| ("Laptop", "Electronics", "$999.99"), |
| ("Mouse", "electronics", "25.50 USD"), |
| ("Desk", "FURNITURE", "150.0"), |
| ("Chair", "furniture", "$85.00"), |
| ("Headphones", "ELEC", "€45.00") |
| ]) |
| conn.commit() |
|
|
| def get_goal(self) -> str: |
| return ( |
| "The 'products' table is messy. " |
| "1. Standardize the 'category' column to be fully UPPERCASE. (Hint: treat 'ELEC' as 'ELECTRONICS'). " |
| "2. Create a new column 'price_usd' of type REAL. Extract the numeric value from the 'price' string and populate 'price_usd'. " |
| "Do not drop any original columns." |
| ) |
|
|
| def grade(self, conn: sqlite3.Connection) -> float: |
| score = 0.0 |
| c = conn.cursor() |
| try: |
| |
| c.execute("PRAGMA table_info(products)") |
| columns = [row[1] for row in c.fetchall()] |
| if 'price_usd' in columns: |
| score += 0.3 |
| |
| |
| c.execute("SELECT price_usd FROM products ORDER BY id") |
| prices = [row[0] for row in c.fetchall()] |
| expected_prices = [999.99, 25.50, 150.0, 85.0, 45.0] |
| |
| |
| correct_prices = sum(1 for p, e in zip(prices, expected_prices) if p is not None and abs(p - e) < 0.01) |
| score += (correct_prices / 5.0) * 0.4 |
|
|
| |
| c.execute("SELECT category FROM products ORDER BY id") |
| categories = [row[0] for row in c.fetchall()] |
| expected_cats = ["ELECTRONICS", "ELECTRONICS", "FURNITURE", "FURNITURE", "ELECTRONICS"] |
| |
| correct_cats = sum(1 for c, e in zip(categories, expected_cats) if c == e) |
| score += (correct_cats / 5.0) * 0.3 |
| |
| return min(max(score, 0.01), 0.99) |
| except Exception: |
| return min(max(score, 0.01), 0.99) |
|
|
|
|
| class HardTask(Task): |
| def __init__(self): |
| super().__init__(3) |
|
|
| def setup_db(self, conn: sqlite3.Connection): |
| c = conn.cursor() |
| c.execute(""" |
| CREATE TABLE hospital_records ( |
| patient_name TEXT, |
| patient_dob TEXT, |
| doctor_name TEXT, |
| doctor_specialty TEXT, |
| appointment_date TEXT, |
| diagnosis TEXT |
| ) |
| """) |
| records = [ |
| ("John Doe", "1980-01-01", "Dr. Smith", "Cardiology", "2023-10-01", "Hypertension"), |
| ("Jane Roe", "1992-05-15", "Dr. Jones", "Neurology", "2023-10-02", "Migraine"), |
| ("John Doe", "1980-01-01", "Dr. Smith", "Cardiology", "2023-11-01", "Follow-up"), |
| ("Bob Guy", "1975-11-20", "Dr. Smith", "Cardiology", "2023-10-05", "Checkup") |
| ] |
| c.executemany("INSERT INTO hospital_records VALUES (?, ?, ?, ?, ?, ?)", records) |
| conn.commit() |
|
|
| def get_goal(self) -> str: |
| return ( |
| "Normalize the flat 'hospital_records' table into 3 tables: " |
| "'patients' (id INTEGER PRIMARY KEY, name TEXT, dob TEXT), " |
| "'doctors' (id INTEGER PRIMARY KEY, name TEXT, specialty TEXT), and " |
| "'appointments' (id INTEGER PRIMARY KEY, patient_id INTEGER, doctor_id INTEGER, date TEXT, diagnosis TEXT). " |
| "Migrate all data from 'hospital_records' correctly without duplication. " |
| "Ensure foreign keys are correctly pointing to the new IDs." |
| ) |
|
|
| def grade(self, conn: sqlite3.Connection) -> float: |
| score = 0.0 |
| c = conn.cursor() |
| try: |
| |
| c.execute("SELECT name FROM sqlite_master WHERE type='table'") |
| tables = [row[0] for row in c.fetchall()] |
| |
| if 'patients' in tables: score += 0.1 |
| if 'doctors' in tables: score += 0.1 |
| if 'appointments' in tables: score += 0.2 |
| |
| if score < 0.4: |
| return max(0.01, score) |
| |
| |
| c.execute("SELECT COUNT(*) FROM patients") |
| if c.fetchone()[0] == 3: score += 0.1 |
| |
| c.execute("SELECT COUNT(*) FROM doctors") |
| if c.fetchone()[0] == 2: score += 0.1 |
| |
| c.execute("SELECT COUNT(*) FROM appointments") |
| if c.fetchone()[0] == 4: score += 0.1 |
| |
| |
| query = """ |
| SELECT p.name, p.dob, d.name, d.specialty, a.date, a.diagnosis |
| FROM appointments a |
| JOIN patients p ON a.patient_id = p.id |
| JOIN doctors d ON a.doctor_id = d.id |
| ORDER BY p.name, a.date |
| """ |
| c.execute(query) |
| reconstructed = c.fetchall() |
| if len(reconstructed) == 4: |
| score += 0.3 |
| |
| return min(max(score, 0.01), 0.99) |
| except Exception: |
| return min(max(score, 0.01), 0.99) |
|
|
| TASKS = { |
| 1: EasyTask(), |
| 2: MediumTask(), |
| 3: HardTask() |
| } |
|
|