kofdai's picture
Deploy NullAI Knowledge System to Spaces
075a2b6 verified
from fastapi import Request, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
from backend.app.utils.jwt_utils import verify_token
from backend.app.schemas.auth import TokenData
@dataclass
class User:
"""認証済みユーザー"""
id: str
email: str = ""
role: str = "viewer" # guest, viewer, editor, expert, admin
is_expert: bool = False
orcid_id: Optional[str] = None
verified_at: Optional[datetime] = None
display_name: str = ""
@dataclass
class GuestUser:
"""ゲストユーザー(未認証)"""
id: str = "guest"
role: str = "guest"
is_expert: bool = False
orcid_id: Optional[str] = None
# OAuth2スキーム(トークン取得エンドポイント)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""
JWTトークンを検証し、現在のユーザーを取得する。
Raises:
HTTPException: トークンが無効または期限切れの場合
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = verify_token(token)
if token_data is None:
raise credentials_exception
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_current_user_optional(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> Optional[User]:
"""
オプショナルなユーザー認証。トークンがない場合はNoneを返す。
認証が任意のエンドポイント用。
"""
if token is None:
return None
token_data = verify_token(token)
if token_data is None:
return None
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_user_or_guest(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> User:
"""
ゲストアクセスを許可するエンドポイント用。
認証済みの場合はUserを返し、未認証の場合はGuestUserを返す。
"""
if token is None:
return GuestUser()
token_data = verify_token(token)
if token_data is None:
return GuestUser()
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
def require_role(required_role: str):
"""
特定のロールを必要とする依存性デコレータ。
使用例:
@router.post("/admin-only")
async def admin_endpoint(user: User = Depends(require_role("admin"))):
...
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role != required_role and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{required_role}' required"
)
return user
return role_checker
def require_expert():
"""
ORCID認証済み専門家を必要とする依存性デコレータ。
"""
async def expert_checker(user: User = Depends(get_current_user)) -> User:
if not user.is_expert or not user.orcid_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ORCID-verified expert status required"
)
return user
return expert_checker
def require_authenticated():
"""
ゲストを除外し、認証済みユーザーのみを許可する依存性デコレータ。
"""
async def auth_checker(user: User = Depends(get_user_or_guest)) -> User:
if isinstance(user, GuestUser) or user.role == "guest":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
return user
return auth_checker
class JWTMiddleware:
"""
JWT認証ミドルウェア(オプショナル - 依存性注入推奨)
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
request = Request(scope)
# Authorization ヘッダーからトークンを取得
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
token_data = verify_token(token)
if token_data:
# requestのstateにユーザー情報を格納
scope["state"] = scope.get("state", {})
scope["state"]["user"] = User(
id=token_data.user_id,
role=token_data.role or "viewer"
)
await self.app(scope, receive, send)