mohsin-devs's picture
Deploy HF-ready DocVault with HF storage backend
2fe2727
"""API routes for DocVault."""
import mimetypes
import requests
from flask import Blueprint, Response, jsonify, request, stream_with_context
from werkzeug.utils import secure_filename
from server import config
from server.storage.factory import get_storage
from server.utils.logger import setup_logger
from server.utils.validators import PathValidator
api_bp = Blueprint("api", __name__, url_prefix="/api")
logger = setup_logger(__name__)
def get_user_id_from_request() -> str:
return request.headers.get("X-User-ID", config.DEFAULT_USER_ID)
def allowed_file(filename: str) -> bool:
if "." not in filename:
return False
return filename.rsplit(".", 1)[1].lower() in config.ALLOWED_EXTENSIONS
def _storage():
return get_storage()
def _item_type(storage, user_id: str, path: str) -> str | None:
if hasattr(storage, "get_item_type"):
return storage.get_item_type(user_id, path)
return None
@api_bp.route("/health", methods=["GET"])
def health_check():
return jsonify(
{
"storage": config.HF_STORAGE_LABEL,
"repo": config.HF_REPO_ID,
"status": "ok",
}
), 200
@api_bp.route("/create-folder", methods=["POST"])
def create_folder():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
folder_path = (data.get("folder_path") or "").strip()
if not folder_path:
return jsonify({"success": False, "error": "folder_path is required"}), 400
result = _storage().create_folder(user_id, folder_path)
return jsonify(result), 201 if result.get("success") else 400
except ValueError as exc:
return jsonify({"success": False, "error": str(exc)}), 400
except Exception as exc:
logger.error("create_folder failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/upload", methods=["POST"])
@api_bp.route("/upload-file", methods=["POST"])
def upload_file():
try:
user_id = get_user_id_from_request()
folder_path = (request.form.get("folder_path") or "").strip()
if "file" not in request.files:
return jsonify({"success": False, "error": "No file provided"}), 400
file = request.files["file"]
if not file.filename:
return jsonify({"success": False, "error": "No file selected"}), 400
filename = secure_filename(file.filename)
if not filename:
return jsonify({"success": False, "error": "Invalid filename"}), 400
if not allowed_file(filename):
return jsonify({"success": False, "error": "File type not allowed"}), 400
if not PathValidator.is_valid_filename(filename):
return jsonify({"success": False, "error": "Invalid filename"}), 400
result = _storage().upload_file(user_id, folder_path, filename, file)
return jsonify(result), 201 if result.get("success") else 400
except ValueError as exc:
return jsonify({"success": False, "error": str(exc)}), 400
except Exception as exc:
logger.error("upload failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/delete", methods=["POST"])
def delete_item():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
storage = _storage()
file_path = (data.get("file_path") or "").strip()
folder_path = (data.get("folder_path") or "").strip()
path = (data.get("path") or "").strip()
item_type = (data.get("type") or "").strip().lower()
if file_path:
result = storage.delete_file(user_id, file_path)
elif folder_path:
result = storage.delete_folder(user_id, folder_path)
elif path:
if item_type == "file":
result = storage.delete_file(user_id, path)
elif item_type == "folder":
result = storage.delete_folder(user_id, path)
else:
resolved_type = _item_type(storage, user_id, path)
if resolved_type == "file":
result = storage.delete_file(user_id, path)
elif resolved_type == "folder":
result = storage.delete_folder(user_id, path)
else:
result = {"success": False, "error": "Item not found", "code": "NOT_FOUND"}
else:
return jsonify({"success": False, "error": "path is required"}), 400
return jsonify(result), 200 if result.get("success") else 400
except Exception as exc:
logger.error("delete failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/delete-file", methods=["POST"])
def delete_file_alias():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
file_path = (data.get("file_path") or data.get("path") or "").strip()
if not file_path:
return jsonify({"success": False, "error": "file_path is required"}), 400
result = _storage().delete_file(user_id, file_path)
return jsonify(result), 200 if result.get("success") else 400
except Exception as exc:
logger.error("delete_file failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/delete-folder", methods=["POST"])
def delete_folder_alias():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
folder_path = (data.get("folder_path") or data.get("path") or "").strip()
if not folder_path:
return jsonify({"success": False, "error": "folder_path is required"}), 400
result = _storage().delete_folder(user_id, folder_path)
return jsonify(result), 200 if result.get("success") else 400
except Exception as exc:
logger.error("delete_folder failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/list", methods=["GET"])
def list_contents():
try:
user_id = get_user_id_from_request()
folder_path = (
request.args.get("folder_path", request.args.get("path", "")) or ""
).strip()
result = _storage().list(user_id, folder_path)
return jsonify(result), 200 if result.get("success") else 400
except ValueError as exc:
return jsonify({"success": False, "error": str(exc)}), 400
except Exception as exc:
logger.error("list failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/rename", methods=["POST"])
def rename_item():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
item_path = (data.get("item_path") or "").strip()
new_name = (data.get("new_name") or "").strip()
if not item_path or not new_name:
return jsonify({"success": False, "error": "item_path and new_name are required"}), 400
storage = _storage()
item_type = _item_type(storage, user_id, item_path)
if item_type == "file":
result = storage.rename_file(user_id, item_path, new_name)
elif item_type == "folder":
result = storage.rename_folder(user_id, item_path, new_name)
else:
result = {"success": False, "error": "Item not found", "code": "NOT_FOUND"}
return jsonify(result), 200 if result.get("success") else 400
except ValueError as exc:
return jsonify({"success": False, "error": str(exc)}), 400
except Exception as exc:
logger.error("rename failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/storage-stats", methods=["GET"])
def storage_stats():
try:
user_id = get_user_id_from_request()
result = _storage().get_stats(user_id)
return jsonify(result), 200 if result.get("success") else 400
except Exception as exc:
logger.error("storage_stats failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/download/<path:file_path>", methods=["GET"])
def download_file(file_path: str):
try:
user_id = get_user_id_from_request()
download_info = _storage().download(user_id, file_path)
headers = {"Authorization": f"Bearer {config.HF_TOKEN}"}
upstream = requests.get(download_info["url"], headers=headers, stream=True, timeout=30)
upstream.raise_for_status()
content_type = upstream.headers.get("Content-Type") or mimetypes.guess_type(file_path)[0]
if not content_type:
content_type = "application/octet-stream"
filename = file_path.split("/")[-1]
disposition = "attachment" if request.args.get("download") == "true" else "inline"
return Response(
stream_with_context(upstream.iter_content(chunk_size=8192)),
content_type=content_type,
headers={"Content-Disposition": f'{disposition}; filename="{filename}"'},
)
except FileNotFoundError as exc:
return jsonify({"success": False, "error": str(exc)}), 404
except Exception as exc:
logger.error("download failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/history", methods=["GET"])
def get_history():
try:
user_id = get_user_id_from_request()
path = (request.args.get("path") or "").strip()
if not path:
return jsonify({"success": False, "error": "path is required"}), 400
return jsonify({"success": True, "history": _storage().get_history(user_id, path)}), 200
except Exception as exc:
logger.error("history failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.route("/restore", methods=["POST"])
def restore_version():
try:
user_id = get_user_id_from_request()
data = request.get_json(silent=True) or {}
path = (data.get("path") or "").strip()
revision = (data.get("revision") or "").strip()
as_copy = bool(data.get("as_copy", False))
if not path or not revision:
return jsonify({"success": False, "error": "path and revision are required"}), 400
result = _storage().restore(user_id, path, revision, as_copy=as_copy)
return jsonify(result), 200 if result.get("success") else 400
except Exception as exc:
logger.error("restore failed: %s", exc, exc_info=True)
return jsonify({"success": False, "error": str(exc)}), 500
@api_bp.errorhandler(404)
def not_found(error):
return jsonify({"success": False, "error": "Endpoint not found"}), 404
@api_bp.errorhandler(500)
def internal_error(error):
logger.error("Internal server error: %s", error)
return jsonify({"success": False, "error": "Internal server error"}), 500