Spaces:
Running
Running
| import sqlite3 | |
| from datetime import datetime, date, timedelta | |
| # βββ Init ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def init_db(): | |
| conn = sqlite3.connect('tasks.db') | |
| cursor = conn.cursor() | |
| # Create table with new date_context column | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| time_context TEXT NOT NULL, | |
| date_context TEXT NOT NULL DEFAULT 'today', | |
| status TEXT DEFAULT 'pending' | |
| ) | |
| ''') | |
| # Migrate existing DB: add date_context if it doesn't exist yet | |
| try: | |
| cursor.execute("ALTER TABLE tasks ADD COLUMN date_context TEXT NOT NULL DEFAULT 'today'") | |
| print("Migration: added date_context column.") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists β safe to ignore | |
| conn.commit() | |
| conn.close() | |
| # βββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_db_connection(): | |
| conn = sqlite3.connect('tasks.db') | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def resolve_date(date_context: str) -> str: | |
| """ | |
| Converts natural language date strings into ISO format (YYYY-MM-DD). | |
| Accepts: 'today', 'tomorrow', 'YYYY-MM-DD', or any existing value. | |
| Returns the resolved ISO date string, or the raw value if unrecognised. | |
| """ | |
| if not date_context: | |
| return date.today().isoformat() | |
| normalised = date_context.strip().lower() | |
| if normalised == "today": | |
| return date.today().isoformat() | |
| elif normalised == "tomorrow": | |
| return (date.today() + timedelta(days=1)).isoformat() | |
| elif normalised == "yesterday": | |
| return (date.today() - timedelta(days=1)).isoformat() | |
| # Already an ISO date β return as-is | |
| try: | |
| datetime.strptime(date_context.strip(), "%Y-%m-%d") | |
| return date_context.strip() | |
| except ValueError: | |
| pass | |
| # Unrecognised β store raw so AI-generated strings like "next Monday" are kept | |
| return date_context.strip() | |
| # βββ CRUD ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_all_tasks(): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM tasks WHERE status = 'pending' ORDER BY date_context, time_context") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_tasks_by_date(date_context: str): | |
| """Fetch pending tasks for a specific date (accepts 'today', 'tomorrow', or ISO date).""" | |
| resolved = resolve_date(date_context) | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT * FROM tasks WHERE status = 'pending' AND date_context = ? ORDER BY time_context", | |
| (resolved,) | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def create_task(title: str, time_context: str, date_context: str = "today"): | |
| resolved_date = resolve_date(date_context) | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT INTO tasks (title, time_context, date_context) VALUES (?, ?, ?)", | |
| (title, time_context, resolved_date) | |
| ) | |
| conn.commit() | |
| new_id = cursor.lastrowid | |
| conn.close() | |
| # Return the created task so main.py can track last_task_id | |
| return {"id": new_id, "title": title, "time_context": time_context, "date_context": resolved_date} | |
| def delete_task(task_id: int): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) | |
| conn.commit() | |
| conn.close() | |
| def update_task(task_id: int, new_time: str = None, new_date: str = None, new_title: str = None): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| if new_time: | |
| cursor.execute("UPDATE tasks SET time_context = ? WHERE id = ?", (new_time, task_id)) | |
| if new_date: | |
| resolved = resolve_date(new_date) | |
| cursor.execute("UPDATE tasks SET date_context = ? WHERE id = ?", (resolved, task_id)) | |
| if new_title: | |
| cursor.execute("UPDATE tasks SET title = ? WHERE id = ?", (new_title, task_id)) | |
| conn.commit() | |
| conn.close() | |
| def complete_task(task_id: int): | |
| """Mark a task as done without deleting it.""" | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("UPDATE tasks SET status = 'done' WHERE id = ?", (task_id,)) | |
| conn.commit() | |
| conn.close() | |
| if __name__ == "__main__": | |
| init_db() | |
| print("Database initialised successfully.") |