Spaces:
Sleeping
Sleeping
| from typing import Any, Dict, List, Optional | |
| import calendar | |
| from datetime import datetime | |
| from pathlib import Path | |
| from helper import ( | |
| add_row_to_csv, | |
| generate_unique_id, | |
| get_csv_path, | |
| get_current_time_epoch, | |
| update_row_in_csv, | |
| list_csv_files, | |
| get_table_metadata, | |
| get_csv_columns, | |
| execute_sql_query, | |
| ) | |
| # ===== TASK MANAGEMENT ===== | |
| def create_task( | |
| description: str, | |
| schedule_type: str = "one_time", | |
| scheduled_at_epoch: int | None = None, | |
| interval_seconds: int = 0, | |
| scheduled_input: str = "", | |
| ) -> str: | |
| """ | |
| Creates a new task in the tasks CSV file. | |
| Args: | |
| description: Short human-friendly description of the task. | |
| schedule_type: Either "one_time" or "recurring". | |
| scheduled_at_epoch: Unix timestamp (UTC) for the next run. | |
| interval_seconds: Interval in seconds for recurring tasks (0 for one-time). | |
| scheduled_input: Original human text for the schedule (optional). | |
| """ | |
| csv_path = get_csv_path("tasks") | |
| task_data: Dict[str, Any] = { | |
| "task_id": generate_unique_id("task"), | |
| "created_at_epoch": get_current_time_epoch(), | |
| "description": description, | |
| "status": "pending", | |
| "schedule_type": schedule_type, | |
| "scheduled_at_epoch": scheduled_at_epoch or 0, | |
| "interval_seconds": interval_seconds, | |
| "scheduled_input": scheduled_input, | |
| } | |
| add_row_to_csv(csv_path, task_data) | |
| return f"Created task {task_data['task_id']} ({description})" | |
| def list_tasks(status: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| List tasks from tasks.csv with optional filtering. | |
| Args: | |
| status: Filter by status (e.g., 'pending', 'done'). | |
| limit: Maximum number of tasks to return. | |
| Returns: | |
| A list of tasks (each task is a dict). Empty list if none match. | |
| """ | |
| query = "SELECT * FROM tasks" | |
| if status: | |
| query += f" WHERE status = '{status}'" | |
| # Sort by scheduled_at_epoch if available, else created_at_epoch | |
| # We assume these columns exist. If not, DuckDB might complain if we order by them. | |
| # But tasks.csv is created by us, so we know the schema. | |
| query += " ORDER BY scheduled_at_epoch DESC, created_at_epoch DESC" | |
| query += f" LIMIT {limit}" | |
| return execute_sql_query(query) | |
| # ===== TIME UTILITIES ===== | |
| def build_datetime_epoch( | |
| year: int, month: int, day: int, hour: int, minute: int | |
| ) -> int: | |
| """ | |
| Convert a specific date and time to a Unix timestamp (UTC). | |
| Args: | |
| year: The year (e.g. 2023). | |
| month: The month (1-12). | |
| day: The day of the month (1-31). | |
| hour: The hour (0-23). | |
| minute: The minute (0-59). | |
| """ | |
| dt = datetime(year, month, day, hour, minute) | |
| return int(calendar.timegm(dt.utctimetuple())) | |
| # ===== GENERIC TABLE OPERATIONS ===== | |
| def list_tables() -> Dict[str, Dict[str, Any]]: | |
| """ | |
| List all available tables with their columns and metadata. | |
| Returns: | |
| Dictionary mapping table name -> {columns, description, example_sentence}. | |
| Excludes tables.csv from results. | |
| """ | |
| result = {} | |
| files = list_csv_files() | |
| for file_path in files: | |
| path = Path(file_path) | |
| table_name = path.stem | |
| # Exclude tables.csv from MCP resource | |
| if table_name == "tables": | |
| continue | |
| columns = get_csv_columns(table_name) | |
| metadata = get_table_metadata(table_name) | |
| if metadata: | |
| result[table_name] = { | |
| "columns": columns, | |
| "description": metadata.get("description", ""), | |
| "example_sentence": metadata.get("example_sentence", ""), | |
| } | |
| else: | |
| result[table_name] = { | |
| "columns": columns, | |
| "description": "", | |
| "example_sentence": "", | |
| } | |
| return result | |
| def append_row(table_name: str, row_json: str) -> Dict[str, Any]: | |
| """ | |
| Append a row to any table using JSON payload. | |
| Args: | |
| table_name: Name of the table to append to. | |
| row_json: JSON object string with column values. | |
| Returns: | |
| Dict with message and echoed row data. On error, includes error key. | |
| """ | |
| import json | |
| csv_path = get_csv_path(table_name) | |
| if not csv_path.exists(): | |
| return {"error": f"Table '{table_name}' does not exist."} | |
| try: | |
| row_data = json.loads(row_json) if row_json.strip() else {} | |
| except json.JSONDecodeError as exc: | |
| return {"error": f"Error parsing JSON: {exc}"} | |
| if not isinstance(row_data, dict): | |
| return {"error": "row_json must decode to a JSON object."} | |
| add_row_to_csv(csv_path, row_data) | |
| return {"message": f"Added row to '{table_name}'.", "row": row_data} | |
| def run_sql_query(query: str) -> List[Dict[str, Any]]: | |
| """ | |
| Run a SQL query against the CSV files. | |
| IMPORTANT: This tool is for READ-ONLY operations (SELECT) only. | |
| Do NOT use this tool for INSERT, UPDATE, or DELETE operations. | |
| To add rows, use 'append_row'. To update rows, use 'update_row'. | |
| The query can reference any CSV file in the data directory by its name (without .csv). | |
| Example: "SELECT * FROM expenses WHERE amount > 100" | |
| """ | |
| return execute_sql_query(query) | |
| def update_row( | |
| table_name: str, | |
| row_id: int, | |
| field_to_update: str, | |
| new_value: str, | |
| ) -> str: | |
| """ | |
| Update a specific field in a row using its row number (rowid). | |
| Args: | |
| table_name: Name of the table (e.g. 'tasks'). | |
| row_id: The row number (rowid) of the item to update. This is a 0-based index. | |
| field_to_update: The name of the column to modify. | |
| new_value: The new value to set. | |
| """ | |
| csv_path = get_csv_path(table_name) | |
| if not csv_path.exists(): | |
| return f"Table '{table_name}' does not exist." | |
| success = update_row_in_csv(csv_path, "rowid", row_id, {field_to_update: new_value}) | |
| if success: | |
| return f"Updated '{table_name}': set {field_to_update}='{new_value}' where rowid={row_id}." | |
| else: | |
| return f"Row not found in '{table_name}' where rowid={row_id}." | |