Spaces:
Running
Running
File size: 5,204 Bytes
bc10fd9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | import sqlite3
from datetime import datetime, date, timedelta
# βββ Init ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def init_db():
conn = sqlite3.connect('tasks.db')
cursor = conn.cursor()
# Create table with new date_context column
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
time_context TEXT NOT NULL,
date_context TEXT NOT NULL DEFAULT 'today',
status TEXT DEFAULT 'pending'
)
''')
# Migrate existing DB: add date_context if it doesn't exist yet
try:
cursor.execute("ALTER TABLE tasks ADD COLUMN date_context TEXT NOT NULL DEFAULT 'today'")
print("Migration: added date_context column.")
except sqlite3.OperationalError:
pass # Column already exists β safe to ignore
conn.commit()
conn.close()
# βββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_db_connection():
conn = sqlite3.connect('tasks.db')
conn.row_factory = sqlite3.Row
return conn
def resolve_date(date_context: str) -> str:
"""
Converts natural language date strings into ISO format (YYYY-MM-DD).
Accepts: 'today', 'tomorrow', 'YYYY-MM-DD', or any existing value.
Returns the resolved ISO date string, or the raw value if unrecognised.
"""
if not date_context:
return date.today().isoformat()
normalised = date_context.strip().lower()
if normalised == "today":
return date.today().isoformat()
elif normalised == "tomorrow":
return (date.today() + timedelta(days=1)).isoformat()
elif normalised == "yesterday":
return (date.today() - timedelta(days=1)).isoformat()
# Already an ISO date β return as-is
try:
datetime.strptime(date_context.strip(), "%Y-%m-%d")
return date_context.strip()
except ValueError:
pass
# Unrecognised β store raw so AI-generated strings like "next Monday" are kept
return date_context.strip()
# βββ CRUD ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_all_tasks():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE status = 'pending' ORDER BY date_context, time_context")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_tasks_by_date(date_context: str):
"""Fetch pending tasks for a specific date (accepts 'today', 'tomorrow', or ISO date)."""
resolved = resolve_date(date_context)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM tasks WHERE status = 'pending' AND date_context = ? ORDER BY time_context",
(resolved,)
)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def create_task(title: str, time_context: str, date_context: str = "today"):
resolved_date = resolve_date(date_context)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO tasks (title, time_context, date_context) VALUES (?, ?, ?)",
(title, time_context, resolved_date)
)
conn.commit()
new_id = cursor.lastrowid
conn.close()
# Return the created task so main.py can track last_task_id
return {"id": new_id, "title": title, "time_context": time_context, "date_context": resolved_date}
def delete_task(task_id: int):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
conn.commit()
conn.close()
def update_task(task_id: int, new_time: str = None, new_date: str = None, new_title: str = None):
conn = get_db_connection()
cursor = conn.cursor()
if new_time:
cursor.execute("UPDATE tasks SET time_context = ? WHERE id = ?", (new_time, task_id))
if new_date:
resolved = resolve_date(new_date)
cursor.execute("UPDATE tasks SET date_context = ? WHERE id = ?", (resolved, task_id))
if new_title:
cursor.execute("UPDATE tasks SET title = ? WHERE id = ?", (new_title, task_id))
conn.commit()
conn.close()
def complete_task(task_id: int):
"""Mark a task as done without deleting it."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("UPDATE tasks SET status = 'done' WHERE id = ?", (task_id,))
conn.commit()
conn.close()
if __name__ == "__main__":
init_db()
print("Database initialised successfully.") |