"""API routes for DocVault.""" import mimetypes import requests from flask import Blueprint, Response, jsonify, request, stream_with_context from werkzeug.utils import secure_filename from server import config from server.storage.factory import get_storage from server.utils.logger import setup_logger from server.utils.validators import PathValidator api_bp = Blueprint("api", __name__, url_prefix="/api") logger = setup_logger(__name__) def get_user_id_from_request() -> str: return request.headers.get("X-User-ID", config.DEFAULT_USER_ID) def allowed_file(filename: str) -> bool: if "." not in filename: return False return filename.rsplit(".", 1)[1].lower() in config.ALLOWED_EXTENSIONS def _storage(): return get_storage() def _item_type(storage, user_id: str, path: str) -> str | None: if hasattr(storage, "get_item_type"): return storage.get_item_type(user_id, path) return None @api_bp.route("/health", methods=["GET"]) def health_check(): return jsonify( { "storage": config.HF_STORAGE_LABEL, "repo": config.HF_REPO_ID, "status": "ok", } ), 200 @api_bp.route("/create-folder", methods=["POST"]) def create_folder(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} folder_path = (data.get("folder_path") or "").strip() if not folder_path: return jsonify({"success": False, "error": "folder_path is required"}), 400 result = _storage().create_folder(user_id, folder_path) return jsonify(result), 201 if result.get("success") else 400 except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logger.error("create_folder failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/upload", methods=["POST"]) @api_bp.route("/upload-file", methods=["POST"]) def upload_file(): try: user_id = get_user_id_from_request() folder_path = (request.form.get("folder_path") or "").strip() if "file" not in request.files: return jsonify({"success": False, "error": "No file provided"}), 400 file = request.files["file"] if not file.filename: return jsonify({"success": False, "error": "No file selected"}), 400 filename = secure_filename(file.filename) if not filename: return jsonify({"success": False, "error": "Invalid filename"}), 400 if not allowed_file(filename): return jsonify({"success": False, "error": "File type not allowed"}), 400 if not PathValidator.is_valid_filename(filename): return jsonify({"success": False, "error": "Invalid filename"}), 400 result = _storage().upload_file(user_id, folder_path, filename, file) return jsonify(result), 201 if result.get("success") else 400 except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logger.error("upload failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/delete", methods=["POST"]) def delete_item(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} storage = _storage() file_path = (data.get("file_path") or "").strip() folder_path = (data.get("folder_path") or "").strip() path = (data.get("path") or "").strip() item_type = (data.get("type") or "").strip().lower() if file_path: result = storage.delete_file(user_id, file_path) elif folder_path: result = storage.delete_folder(user_id, folder_path) elif path: if item_type == "file": result = storage.delete_file(user_id, path) elif item_type == "folder": result = storage.delete_folder(user_id, path) else: resolved_type = _item_type(storage, user_id, path) if resolved_type == "file": result = storage.delete_file(user_id, path) elif resolved_type == "folder": result = storage.delete_folder(user_id, path) else: result = {"success": False, "error": "Item not found", "code": "NOT_FOUND"} else: return jsonify({"success": False, "error": "path is required"}), 400 return jsonify(result), 200 if result.get("success") else 400 except Exception as exc: logger.error("delete failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/delete-file", methods=["POST"]) def delete_file_alias(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} file_path = (data.get("file_path") or data.get("path") or "").strip() if not file_path: return jsonify({"success": False, "error": "file_path is required"}), 400 result = _storage().delete_file(user_id, file_path) return jsonify(result), 200 if result.get("success") else 400 except Exception as exc: logger.error("delete_file failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/delete-folder", methods=["POST"]) def delete_folder_alias(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} folder_path = (data.get("folder_path") or data.get("path") or "").strip() if not folder_path: return jsonify({"success": False, "error": "folder_path is required"}), 400 result = _storage().delete_folder(user_id, folder_path) return jsonify(result), 200 if result.get("success") else 400 except Exception as exc: logger.error("delete_folder failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/list", methods=["GET"]) def list_contents(): try: user_id = get_user_id_from_request() folder_path = ( request.args.get("folder_path", request.args.get("path", "")) or "" ).strip() result = _storage().list(user_id, folder_path) return jsonify(result), 200 if result.get("success") else 400 except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logger.error("list failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/rename", methods=["POST"]) def rename_item(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} item_path = (data.get("item_path") or "").strip() new_name = (data.get("new_name") or "").strip() if not item_path or not new_name: return jsonify({"success": False, "error": "item_path and new_name are required"}), 400 storage = _storage() item_type = _item_type(storage, user_id, item_path) if item_type == "file": result = storage.rename_file(user_id, item_path, new_name) elif item_type == "folder": result = storage.rename_folder(user_id, item_path, new_name) else: result = {"success": False, "error": "Item not found", "code": "NOT_FOUND"} return jsonify(result), 200 if result.get("success") else 400 except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logger.error("rename failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/storage-stats", methods=["GET"]) def storage_stats(): try: user_id = get_user_id_from_request() result = _storage().get_stats(user_id) return jsonify(result), 200 if result.get("success") else 400 except Exception as exc: logger.error("storage_stats failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/download/", methods=["GET"]) def download_file(file_path: str): try: user_id = get_user_id_from_request() download_info = _storage().download(user_id, file_path) headers = {"Authorization": f"Bearer {config.HF_TOKEN}"} upstream = requests.get(download_info["url"], headers=headers, stream=True, timeout=30) upstream.raise_for_status() content_type = upstream.headers.get("Content-Type") or mimetypes.guess_type(file_path)[0] if not content_type: content_type = "application/octet-stream" filename = file_path.split("/")[-1] disposition = "attachment" if request.args.get("download") == "true" else "inline" return Response( stream_with_context(upstream.iter_content(chunk_size=8192)), content_type=content_type, headers={"Content-Disposition": f'{disposition}; filename="{filename}"'}, ) except FileNotFoundError as exc: return jsonify({"success": False, "error": str(exc)}), 404 except Exception as exc: logger.error("download failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/history", methods=["GET"]) def get_history(): try: user_id = get_user_id_from_request() path = (request.args.get("path") or "").strip() if not path: return jsonify({"success": False, "error": "path is required"}), 400 return jsonify({"success": True, "history": _storage().get_history(user_id, path)}), 200 except Exception as exc: logger.error("history failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.route("/restore", methods=["POST"]) def restore_version(): try: user_id = get_user_id_from_request() data = request.get_json(silent=True) or {} path = (data.get("path") or "").strip() revision = (data.get("revision") or "").strip() as_copy = bool(data.get("as_copy", False)) if not path or not revision: return jsonify({"success": False, "error": "path and revision are required"}), 400 result = _storage().restore(user_id, path, revision, as_copy=as_copy) return jsonify(result), 200 if result.get("success") else 400 except Exception as exc: logger.error("restore failed: %s", exc, exc_info=True) return jsonify({"success": False, "error": str(exc)}), 500 @api_bp.errorhandler(404) def not_found(error): return jsonify({"success": False, "error": "Endpoint not found"}), 404 @api_bp.errorhandler(500) def internal_error(error): logger.error("Internal server error: %s", error) return jsonify({"success": False, "error": "Internal server error"}), 500