Rowmind / tools.py
renzoide's picture
Add DuckDB dependency and refactor task management functions
d84b7a3
from typing import Any, Dict, List, Optional
import calendar
from datetime import datetime
from pathlib import Path
from helper import (
add_row_to_csv,
generate_unique_id,
get_csv_path,
get_current_time_epoch,
update_row_in_csv,
list_csv_files,
get_table_metadata,
get_csv_columns,
execute_sql_query,
)
# ===== TASK MANAGEMENT =====
def create_task(
description: str,
schedule_type: str = "one_time",
scheduled_at_epoch: int | None = None,
interval_seconds: int = 0,
scheduled_input: str = "",
) -> str:
"""
Creates a new task in the tasks CSV file.
Args:
description: Short human-friendly description of the task.
schedule_type: Either "one_time" or "recurring".
scheduled_at_epoch: Unix timestamp (UTC) for the next run.
interval_seconds: Interval in seconds for recurring tasks (0 for one-time).
scheduled_input: Original human text for the schedule (optional).
"""
csv_path = get_csv_path("tasks")
task_data: Dict[str, Any] = {
"task_id": generate_unique_id("task"),
"created_at_epoch": get_current_time_epoch(),
"description": description,
"status": "pending",
"schedule_type": schedule_type,
"scheduled_at_epoch": scheduled_at_epoch or 0,
"interval_seconds": interval_seconds,
"scheduled_input": scheduled_input,
}
add_row_to_csv(csv_path, task_data)
return f"Created task {task_data['task_id']} ({description})"
def list_tasks(status: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""
List tasks from tasks.csv with optional filtering.
Args:
status: Filter by status (e.g., 'pending', 'done').
limit: Maximum number of tasks to return.
Returns:
A list of tasks (each task is a dict). Empty list if none match.
"""
query = "SELECT * FROM tasks"
if status:
query += f" WHERE status = '{status}'"
# Sort by scheduled_at_epoch if available, else created_at_epoch
# We assume these columns exist. If not, DuckDB might complain if we order by them.
# But tasks.csv is created by us, so we know the schema.
query += " ORDER BY scheduled_at_epoch DESC, created_at_epoch DESC"
query += f" LIMIT {limit}"
return execute_sql_query(query)
# ===== TIME UTILITIES =====
def build_datetime_epoch(
year: int, month: int, day: int, hour: int, minute: int
) -> int:
"""
Convert a specific date and time to a Unix timestamp (UTC).
Args:
year: The year (e.g. 2023).
month: The month (1-12).
day: The day of the month (1-31).
hour: The hour (0-23).
minute: The minute (0-59).
"""
dt = datetime(year, month, day, hour, minute)
return int(calendar.timegm(dt.utctimetuple()))
# ===== GENERIC TABLE OPERATIONS =====
def list_tables() -> Dict[str, Dict[str, Any]]:
"""
List all available tables with their columns and metadata.
Returns:
Dictionary mapping table name -> {columns, description, example_sentence}.
Excludes tables.csv from results.
"""
result = {}
files = list_csv_files()
for file_path in files:
path = Path(file_path)
table_name = path.stem
# Exclude tables.csv from MCP resource
if table_name == "tables":
continue
columns = get_csv_columns(table_name)
metadata = get_table_metadata(table_name)
if metadata:
result[table_name] = {
"columns": columns,
"description": metadata.get("description", ""),
"example_sentence": metadata.get("example_sentence", ""),
}
else:
result[table_name] = {
"columns": columns,
"description": "",
"example_sentence": "",
}
return result
def append_row(table_name: str, row_json: str) -> Dict[str, Any]:
"""
Append a row to any table using JSON payload.
Args:
table_name: Name of the table to append to.
row_json: JSON object string with column values.
Returns:
Dict with message and echoed row data. On error, includes error key.
"""
import json
csv_path = get_csv_path(table_name)
if not csv_path.exists():
return {"error": f"Table '{table_name}' does not exist."}
try:
row_data = json.loads(row_json) if row_json.strip() else {}
except json.JSONDecodeError as exc:
return {"error": f"Error parsing JSON: {exc}"}
if not isinstance(row_data, dict):
return {"error": "row_json must decode to a JSON object."}
add_row_to_csv(csv_path, row_data)
return {"message": f"Added row to '{table_name}'.", "row": row_data}
def run_sql_query(query: str) -> List[Dict[str, Any]]:
"""
Run a SQL query against the CSV files.
IMPORTANT: This tool is for READ-ONLY operations (SELECT) only.
Do NOT use this tool for INSERT, UPDATE, or DELETE operations.
To add rows, use 'append_row'. To update rows, use 'update_row'.
The query can reference any CSV file in the data directory by its name (without .csv).
Example: "SELECT * FROM expenses WHERE amount > 100"
"""
return execute_sql_query(query)
def update_row(
table_name: str,
row_id: int,
field_to_update: str,
new_value: str,
) -> str:
"""
Update a specific field in a row using its row number (rowid).
Args:
table_name: Name of the table (e.g. 'tasks').
row_id: The row number (rowid) of the item to update. This is a 0-based index.
field_to_update: The name of the column to modify.
new_value: The new value to set.
"""
csv_path = get_csv_path(table_name)
if not csv_path.exists():
return f"Table '{table_name}' does not exist."
success = update_row_in_csv(csv_path, "rowid", row_id, {field_to_update: new_value})
if success:
return f"Updated '{table_name}': set {field_to_update}='{new_value}' where rowid={row_id}."
else:
return f"Row not found in '{table_name}' where rowid={row_id}."