| from uuid import uuid4 | |
| from fastapi import Request | |
| from app.config import config | |
| from app.models.exception import HttpException | |
| def get_task_id(request: Request): | |
| task_id = request.headers.get("x-task-id") | |
| if not task_id: | |
| task_id = uuid4() | |
| return str(task_id) | |
| def get_api_key(request: Request): | |
| api_key = request.headers.get("x-api-key") | |
| return api_key | |
| def verify_token(request: Request): | |
| token = get_api_key(request) | |
| if token != config.app.get("api_key", ""): | |
| request_id = get_task_id(request) | |
| request_url = request.url | |
| user_agent = request.headers.get("user-agent") | |
| raise HttpException( | |
| task_id=request_id, | |
| status_code=401, | |
| message=f"invalid token: {request_url}, {user_agent}", | |
| ) | |