Spaces:
Sleeping
Sleeping
File size: 3,822 Bytes
bd0c393 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | from typing import Optional
from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from loguru import logger
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from open_notebook.utils.encryption import get_secret_from_env
class PasswordAuthMiddleware(BaseHTTPMiddleware):
"""
Middleware to check password authentication for all API requests.
Always active with default password if OPEN_NOTEBOOK_PASSWORD is not set.
Supports Docker secrets via OPEN_NOTEBOOK_PASSWORD_FILE.
"""
def __init__(self, app, excluded_paths: Optional[list] = None):
super().__init__(app)
self.password = get_secret_from_env("OPEN_NOTEBOOK_PASSWORD")
self.excluded_paths = excluded_paths or [
"/",
"/health",
"/docs",
"/openapi.json",
"/redoc",
]
async def dispatch(self, request: Request, call_next):
# Skip authentication if no password is set
if not self.password:
return await call_next(request)
# Skip authentication for excluded paths
if request.url.path in self.excluded_paths:
return await call_next(request)
# Skip authentication for CORS preflight requests (OPTIONS)
if request.method == "OPTIONS":
return await call_next(request)
# Check authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
return JSONResponse(
status_code=401,
content={"detail": "Missing authorization header"},
headers={"WWW-Authenticate": "Bearer"},
)
# Expected format: "Bearer {password}"
try:
scheme, credentials = auth_header.split(" ", 1)
if scheme.lower() != "bearer":
raise ValueError("Invalid authentication scheme")
except ValueError:
return JSONResponse(
status_code=401,
content={"detail": "Invalid authorization header format"},
headers={"WWW-Authenticate": "Bearer"},
)
# Check password
if credentials != self.password:
return JSONResponse(
status_code=401,
content={"detail": "Invalid password"},
headers={"WWW-Authenticate": "Bearer"},
)
# Password is correct, proceed with the request
response = await call_next(request)
return response
# Optional: HTTPBearer security scheme for OpenAPI documentation
security = HTTPBearer(auto_error=False)
def check_api_password(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> bool:
"""
Utility function to check API password.
Can be used as a dependency in individual routes if needed.
Supports Docker secrets via OPEN_NOTEBOOK_PASSWORD_FILE.
Returns True without checking credentials if OPEN_NOTEBOOK_PASSWORD is not configured.
Raises 401 if credentials are missing or don't match the configured password.
"""
password = get_secret_from_env("OPEN_NOTEBOOK_PASSWORD")
# No password configured - skip authentication
if not password:
return True
# No credentials provided
if not credentials:
raise HTTPException(
status_code=401,
detail="Missing authorization",
headers={"WWW-Authenticate": "Bearer"},
)
# Check password
if credentials.credentials != password:
raise HTTPException(
status_code=401,
detail="Invalid password",
headers={"WWW-Authenticate": "Bearer"},
)
return True
|