File size: 3,448 Bytes
02daacc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
"""
API Module
All API endpoints for external access.
"""
import json
from datetime import datetime
from .config import VERSION
from .logger import logger
from .events import event_manager
from .file_manager import create_folder, create_file, read_file, write_file, delete_item, rename_item
from .system import get_system_info, get_space_info, get_storage_stats
def ping() -> dict:
"""Health check endpoint"""
return {"status": "ok", "version": VERSION, "timestamp": datetime.now().isoformat()}
def get_api_list() -> str:
"""List all available API endpoints"""
return json.dumps({
"system": ["ping()", "get_system_info()", "get_system_dashboard()", "get_space_info()", "get_storage_stats()"],
"files": ["create_folder(name)", "create_file(name,content)", "read_file(name)", "write_file(name,content)",
"delete_item(name)", "rename_item(old,new)", "search_files(query)"],
"hub": ["search_hf_models(query,task,library,sort)", "download_model_file(model_id,filename)",
"download_from_url(url,filename)", "list_downloaded_models()"],
"events": ["api_create_event(type,path,hours,enabled,desc)", "api_delete_event(id)",
"api_toggle_event(id)", "api_run_event(id)", "api_get_events()"],
"logs": ["api_get_logs(type,limit,level,category)", "api_clear_logs(type)", "api_export_logs(type)"]
}, indent=2)
# Event API
def api_create_event(event_type: str, target_path: str, interval_hours: float, enabled: bool, description: str) -> str:
"""Create a scheduled event"""
try:
event_id = event_manager.create_event(
event_type=event_type,
target_path=target_path,
interval_hours=float(interval_hours),
enabled=enabled,
description=description
)
return json.dumps({"success": True, "event_id": event_id})
except Exception as e:
return json.dumps({"success": False, "error": str(e)})
def api_delete_event(event_id: str) -> str:
"""Delete a scheduled event"""
success = event_manager.delete_event(event_id)
return json.dumps({"success": success})
def api_toggle_event(event_id: str) -> str:
"""Toggle event enabled status"""
success = event_manager.toggle_event(event_id)
if success and event_id in event_manager.events:
return json.dumps({"success": True, "enabled": event_manager.events[event_id].enabled})
return json.dumps({"success": False})
def api_run_event(event_id: str) -> str:
"""Run event immediately"""
success = event_manager.run_now(event_id)
return json.dumps({"success": success})
def api_get_events() -> str:
"""Get all scheduled events"""
return json.dumps({"events": event_manager.get_events()}, indent=2)
# Log API
def api_get_logs(log_type: str = "all", limit: int = 100, level: str = "", category: str = "") -> str:
"""Get logs"""
logs = logger.get_logs(
log_type=log_type,
limit=int(limit),
level=level if level else None,
category=category if category else None
)
return json.dumps({"count": len(logs), "logs": logs}, indent=2)
def api_clear_logs(log_type: str = "all") -> str:
"""Clear logs"""
success = logger.clear_logs(log_type)
return json.dumps({"success": success})
def api_export_logs(log_type: str = "all") -> str:
"""Export logs"""
return logger.export_logs(log_type)
|