rbt2025's picture
Deploy TextAI v2 - Clean architecture
02daacc verified
"""
API Module
All API endpoints for external access.
"""
import json
from datetime import datetime
from .config import VERSION
from .logger import logger
from .events import event_manager
from .file_manager import create_folder, create_file, read_file, write_file, delete_item, rename_item
from .system import get_system_info, get_space_info, get_storage_stats
def ping() -> dict:
"""Health check endpoint"""
return {"status": "ok", "version": VERSION, "timestamp": datetime.now().isoformat()}
def get_api_list() -> str:
"""List all available API endpoints"""
return json.dumps({
"system": ["ping()", "get_system_info()", "get_system_dashboard()", "get_space_info()", "get_storage_stats()"],
"files": ["create_folder(name)", "create_file(name,content)", "read_file(name)", "write_file(name,content)",
"delete_item(name)", "rename_item(old,new)", "search_files(query)"],
"hub": ["search_hf_models(query,task,library,sort)", "download_model_file(model_id,filename)",
"download_from_url(url,filename)", "list_downloaded_models()"],
"events": ["api_create_event(type,path,hours,enabled,desc)", "api_delete_event(id)",
"api_toggle_event(id)", "api_run_event(id)", "api_get_events()"],
"logs": ["api_get_logs(type,limit,level,category)", "api_clear_logs(type)", "api_export_logs(type)"]
}, indent=2)
# Event API
def api_create_event(event_type: str, target_path: str, interval_hours: float, enabled: bool, description: str) -> str:
"""Create a scheduled event"""
try:
event_id = event_manager.create_event(
event_type=event_type,
target_path=target_path,
interval_hours=float(interval_hours),
enabled=enabled,
description=description
)
return json.dumps({"success": True, "event_id": event_id})
except Exception as e:
return json.dumps({"success": False, "error": str(e)})
def api_delete_event(event_id: str) -> str:
"""Delete a scheduled event"""
success = event_manager.delete_event(event_id)
return json.dumps({"success": success})
def api_toggle_event(event_id: str) -> str:
"""Toggle event enabled status"""
success = event_manager.toggle_event(event_id)
if success and event_id in event_manager.events:
return json.dumps({"success": True, "enabled": event_manager.events[event_id].enabled})
return json.dumps({"success": False})
def api_run_event(event_id: str) -> str:
"""Run event immediately"""
success = event_manager.run_now(event_id)
return json.dumps({"success": success})
def api_get_events() -> str:
"""Get all scheduled events"""
return json.dumps({"events": event_manager.get_events()}, indent=2)
# Log API
def api_get_logs(log_type: str = "all", limit: int = 100, level: str = "", category: str = "") -> str:
"""Get logs"""
logs = logger.get_logs(
log_type=log_type,
limit=int(limit),
level=level if level else None,
category=category if category else None
)
return json.dumps({"count": len(logs), "logs": logs}, indent=2)
def api_clear_logs(log_type: str = "all") -> str:
"""Clear logs"""
success = logger.clear_logs(log_type)
return json.dumps({"success": success})
def api_export_logs(log_type: str = "all") -> str:
"""Export logs"""
return logger.export_logs(log_type)