from typing import Any, Dict, List, Optional import calendar from datetime import datetime from pathlib import Path from helper import ( add_row_to_csv, generate_unique_id, get_csv_path, get_current_time_epoch, update_row_in_csv, list_csv_files, get_table_metadata, get_csv_columns, execute_sql_query, ) # ===== TASK MANAGEMENT ===== def create_task( description: str, schedule_type: str = "one_time", scheduled_at_epoch: int | None = None, interval_seconds: int = 0, scheduled_input: str = "", ) -> str: """ Creates a new task in the tasks CSV file. Args: description: Short human-friendly description of the task. schedule_type: Either "one_time" or "recurring". scheduled_at_epoch: Unix timestamp (UTC) for the next run. interval_seconds: Interval in seconds for recurring tasks (0 for one-time). scheduled_input: Original human text for the schedule (optional). """ csv_path = get_csv_path("tasks") task_data: Dict[str, Any] = { "task_id": generate_unique_id("task"), "created_at_epoch": get_current_time_epoch(), "description": description, "status": "pending", "schedule_type": schedule_type, "scheduled_at_epoch": scheduled_at_epoch or 0, "interval_seconds": interval_seconds, "scheduled_input": scheduled_input, } add_row_to_csv(csv_path, task_data) return f"Created task {task_data['task_id']} ({description})" def list_tasks(status: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: """ List tasks from tasks.csv with optional filtering. Args: status: Filter by status (e.g., 'pending', 'done'). limit: Maximum number of tasks to return. Returns: A list of tasks (each task is a dict). Empty list if none match. """ query = "SELECT * FROM tasks" if status: query += f" WHERE status = '{status}'" # Sort by scheduled_at_epoch if available, else created_at_epoch # We assume these columns exist. If not, DuckDB might complain if we order by them. # But tasks.csv is created by us, so we know the schema. query += " ORDER BY scheduled_at_epoch DESC, created_at_epoch DESC" query += f" LIMIT {limit}" return execute_sql_query(query) # ===== TIME UTILITIES ===== def build_datetime_epoch( year: int, month: int, day: int, hour: int, minute: int ) -> int: """ Convert a specific date and time to a Unix timestamp (UTC). Args: year: The year (e.g. 2023). month: The month (1-12). day: The day of the month (1-31). hour: The hour (0-23). minute: The minute (0-59). """ dt = datetime(year, month, day, hour, minute) return int(calendar.timegm(dt.utctimetuple())) # ===== GENERIC TABLE OPERATIONS ===== def list_tables() -> Dict[str, Dict[str, Any]]: """ List all available tables with their columns and metadata. Returns: Dictionary mapping table name -> {columns, description, example_sentence}. Excludes tables.csv from results. """ result = {} files = list_csv_files() for file_path in files: path = Path(file_path) table_name = path.stem # Exclude tables.csv from MCP resource if table_name == "tables": continue columns = get_csv_columns(table_name) metadata = get_table_metadata(table_name) if metadata: result[table_name] = { "columns": columns, "description": metadata.get("description", ""), "example_sentence": metadata.get("example_sentence", ""), } else: result[table_name] = { "columns": columns, "description": "", "example_sentence": "", } return result def append_row(table_name: str, row_json: str) -> Dict[str, Any]: """ Append a row to any table using JSON payload. Args: table_name: Name of the table to append to. row_json: JSON object string with column values. Returns: Dict with message and echoed row data. On error, includes error key. """ import json csv_path = get_csv_path(table_name) if not csv_path.exists(): return {"error": f"Table '{table_name}' does not exist."} try: row_data = json.loads(row_json) if row_json.strip() else {} except json.JSONDecodeError as exc: return {"error": f"Error parsing JSON: {exc}"} if not isinstance(row_data, dict): return {"error": "row_json must decode to a JSON object."} add_row_to_csv(csv_path, row_data) return {"message": f"Added row to '{table_name}'.", "row": row_data} def run_sql_query(query: str) -> List[Dict[str, Any]]: """ Run a SQL query against the CSV files. IMPORTANT: This tool is for READ-ONLY operations (SELECT) only. Do NOT use this tool for INSERT, UPDATE, or DELETE operations. To add rows, use 'append_row'. To update rows, use 'update_row'. The query can reference any CSV file in the data directory by its name (without .csv). Example: "SELECT * FROM expenses WHERE amount > 100" """ return execute_sql_query(query) def update_row( table_name: str, row_id: int, field_to_update: str, new_value: str, ) -> str: """ Update a specific field in a row using its row number (rowid). Args: table_name: Name of the table (e.g. 'tasks'). row_id: The row number (rowid) of the item to update. This is a 0-based index. field_to_update: The name of the column to modify. new_value: The new value to set. """ csv_path = get_csv_path(table_name) if not csv_path.exists(): return f"Table '{table_name}' does not exist." success = update_row_in_csv(csv_path, "rowid", row_id, {field_to_update: new_value}) if success: return f"Updated '{table_name}': set {field_to_update}='{new_value}' where rowid={row_id}." else: return f"Row not found in '{table_name}' where rowid={row_id}."