kofdai's picture
Upload folder using huggingface_hub
594ed40 verified
from fastapi import Request, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
from backend.app.utils.jwt_utils import verify_token
from backend.app.schemas.auth import TokenData
@dataclass
class User:
"""認証済みユーザー"""
id: str
email: str = ""
role: str = "viewer" # guest, viewer, editor, expert, admin
is_expert: bool = False
orcid_id: Optional[str] = None
verified_at: Optional[datetime] = None
display_name: str = ""
@dataclass
class GuestUser:
"""ゲストユーザー(未認証)"""
id: str = "guest"
role: str = "guest"
is_expert: bool = False
orcid_id: Optional[str] = None
# OAuth2スキーム(トークン取得エンドポイント)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""
JWTトークンを検証し、現在のユーザーを取得する。
Raises:
HTTPException: トークンが無効または期限切れの場合
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = verify_token(token)
if token_data is None:
raise credentials_exception
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_current_user_optional(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> Optional[User]:
"""
オプショナルなユーザー認証。トークンがない場合はNoneを返す。
認証が任意のエンドポイント用。
"""
if token is None:
return None
token_data = verify_token(token)
if token_data is None:
return None
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_user_or_guest(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> User:
"""
ゲストアクセスを許可するエンドポイント用。
認証済みの場合はUserを返し、未認証の場合はGuestUserを返す。
"""
if token is None:
return GuestUser()
token_data = verify_token(token)
if token_data is None:
return GuestUser()
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
def require_role(required_role: str):
"""
特定のロールを必要とする依存性デコレータ。
使用例:
@router.post("/admin-only")
async def admin_endpoint(user: User = Depends(require_role("admin"))):
...
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role != required_role and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{required_role}' required"
)
return user
return role_checker
def require_expert():
"""
ORCID認証済み専門家を必要とする依存性デコレータ。
"""
async def expert_checker(user: User = Depends(get_current_user)) -> User:
if not user.is_expert or not user.orcid_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ORCID-verified expert status required"
)
return user
return expert_checker
def require_authenticated():
"""
ゲストを除外し、認証済みユーザーのみを許可する依存性デコレータ。
"""
async def auth_checker(user: User = Depends(get_user_or_guest)) -> User:
if isinstance(user, GuestUser) or user.role == "guest":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
return user
return auth_checker
class JWTMiddleware:
"""
JWT認証ミドルウェア(オプショナル - 依存性注入推奨)
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
request = Request(scope)
# Authorization ヘッダーからトークンを取得
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
token_data = verify_token(token)
if token_data:
# requestのstateにユーザー情報を格納
scope["state"] = scope.get("state", {})
scope["state"]["user"] = User(
id=token_data.user_id,
role=token_data.role or "viewer"
)
await self.app(scope, receive, send)