Spaces:
Paused
Paused
| """ | |
| Authentication Files API Router | |
| Endpoints for managing authentication profile files. | |
| """ | |
| import shutil | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, Field | |
| from config.settings import ACTIVE_AUTH_DIR, SAVED_AUTH_DIR | |
| router = APIRouter(prefix="/api/auth", tags=["auth"]) | |
| class AuthFileInfo(BaseModel): | |
| """Information about an auth file.""" | |
| name: str | |
| path: str | |
| size_bytes: int | |
| is_active: bool = False | |
| class ActivateRequest(BaseModel): | |
| """Request to activate an auth file.""" | |
| filename: str = Field(..., description="The name of the auth file to activate") | |
| class AuthFilesResponse(BaseModel): | |
| """Response containing list of auth files.""" | |
| saved_files: list[AuthFileInfo] | |
| active_file: Optional[str] = None | |
| def _ensure_dirs() -> None: | |
| """Ensure auth directories exist.""" | |
| Path(ACTIVE_AUTH_DIR).mkdir(parents=True, exist_ok=True) | |
| Path(SAVED_AUTH_DIR).mkdir(parents=True, exist_ok=True) | |
| def _get_active_file() -> Optional[str]: | |
| """Get the currently active auth file name.""" | |
| active_dir = Path(ACTIVE_AUTH_DIR) | |
| if not active_dir.exists(): | |
| return None | |
| json_files = list(active_dir.glob("*.json")) | |
| if json_files: | |
| return sorted(json_files)[0].name | |
| return None | |
| def _list_saved_files() -> list[AuthFileInfo]: | |
| """List all saved auth files.""" | |
| saved_dir = Path(SAVED_AUTH_DIR) | |
| active_file = _get_active_file() | |
| files: list[AuthFileInfo] = [] | |
| if saved_dir.exists(): | |
| for f in sorted(saved_dir.glob("*.json")): | |
| files.append( | |
| AuthFileInfo( | |
| name=f.name, | |
| path=str(f), | |
| size_bytes=f.stat().st_size, | |
| is_active=(f.name == active_file), | |
| ) | |
| ) | |
| return files | |
| async def list_auth_files() -> JSONResponse: | |
| """List all saved auth files.""" | |
| _ensure_dirs() | |
| files = _list_saved_files() | |
| active = _get_active_file() | |
| return JSONResponse( | |
| content=AuthFilesResponse( | |
| saved_files=files, # type: ignore[arg-type] | |
| active_file=active, | |
| ).model_dump() | |
| ) | |
| async def get_active_auth() -> JSONResponse: | |
| """Get the currently active auth file.""" | |
| _ensure_dirs() | |
| active = _get_active_file() | |
| return JSONResponse(content={"active_file": active}) | |
| async def activate_auth_file(request: ActivateRequest) -> JSONResponse: | |
| """Activate the specified auth file.""" | |
| _ensure_dirs() | |
| filename = request.filename | |
| # Find source file | |
| source_path: Optional[Path] = None | |
| for search_dir in [SAVED_AUTH_DIR, ACTIVE_AUTH_DIR]: | |
| candidate = Path(search_dir) / filename | |
| if candidate.exists() and candidate.is_file(): | |
| source_path = candidate | |
| break | |
| if not source_path: | |
| raise HTTPException(status_code=404, detail=f"Auth file '{filename}' not found") | |
| # Clear existing active files | |
| active_dir = Path(ACTIVE_AUTH_DIR) | |
| for existing in active_dir.glob("*.json"): | |
| existing.unlink() | |
| # Copy to active directory | |
| dest_path = active_dir / filename | |
| shutil.copy2(source_path, dest_path) | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "message": f"Auth file '{filename}' activated", | |
| "active_file": filename, | |
| } | |
| ) | |
| async def deactivate_auth() -> JSONResponse: | |
| """Remove the currently active auth.""" | |
| _ensure_dirs() | |
| active_dir = Path(ACTIVE_AUTH_DIR) | |
| removed_count = 0 | |
| for f in active_dir.glob("*.json"): | |
| f.unlink() | |
| removed_count += 1 | |
| return JSONResponse( | |
| content={ | |
| "success": True, | |
| "message": f"Removed {removed_count} auth file(s)", | |
| "active_file": None, | |
| } | |
| ) | |