File size: 5,586 Bytes
594ed40 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
from fastapi import Request, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
from backend.app.utils.jwt_utils import verify_token
from backend.app.schemas.auth import TokenData
@dataclass
class User:
"""認証済みユーザー"""
id: str
email: str = ""
role: str = "viewer" # guest, viewer, editor, expert, admin
is_expert: bool = False
orcid_id: Optional[str] = None
verified_at: Optional[datetime] = None
display_name: str = ""
@dataclass
class GuestUser:
"""ゲストユーザー(未認証)"""
id: str = "guest"
role: str = "guest"
is_expert: bool = False
orcid_id: Optional[str] = None
# OAuth2スキーム(トークン取得エンドポイント)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""
JWTトークンを検証し、現在のユーザーを取得する。
Raises:
HTTPException: トークンが無効または期限切れの場合
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token_data = verify_token(token)
if token_data is None:
raise credentials_exception
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_current_user_optional(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> Optional[User]:
"""
オプショナルなユーザー認証。トークンがない場合はNoneを返す。
認証が任意のエンドポイント用。
"""
if token is None:
return None
token_data = verify_token(token)
if token_data is None:
return None
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
async def get_user_or_guest(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False))
) -> User:
"""
ゲストアクセスを許可するエンドポイント用。
認証済みの場合はUserを返し、未認証の場合はGuestUserを返す。
"""
if token is None:
return GuestUser()
token_data = verify_token(token)
if token_data is None:
return GuestUser()
return User(
id=token_data.user_id,
role=token_data.role or "viewer",
is_expert=token_data.is_expert,
orcid_id=token_data.orcid_id,
display_name=token_data.display_name or ""
)
def require_role(required_role: str):
"""
特定のロールを必要とする依存性デコレータ。
使用例:
@router.post("/admin-only")
async def admin_endpoint(user: User = Depends(require_role("admin"))):
...
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role != required_role and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{required_role}' required"
)
return user
return role_checker
def require_expert():
"""
ORCID認証済み専門家を必要とする依存性デコレータ。
"""
async def expert_checker(user: User = Depends(get_current_user)) -> User:
if not user.is_expert or not user.orcid_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ORCID-verified expert status required"
)
return user
return expert_checker
def require_authenticated():
"""
ゲストを除外し、認証済みユーザーのみを許可する依存性デコレータ。
"""
async def auth_checker(user: User = Depends(get_user_or_guest)) -> User:
if isinstance(user, GuestUser) or user.role == "guest":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
return user
return auth_checker
class JWTMiddleware:
"""
JWT認証ミドルウェア(オプショナル - 依存性注入推奨)
"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
request = Request(scope)
# Authorization ヘッダーからトークンを取得
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
token_data = verify_token(token)
if token_data:
# requestのstateにユーザー情報を格納
scope["state"] = scope.get("state", {})
scope["state"]["user"] = User(
id=token_data.user_id,
role=token_data.role or "viewer"
)
await self.app(scope, receive, send) |