import base64 import json from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from fastapi import Request from fastapi.responses import JSONResponse from ..common.config import Config from ..common.logger import logger def get_client_ip(request: Request) -> str: forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: return forwarded_for.split(",")[0].strip() real_ip = request.headers.get("X-Real-IP") if real_ip: return real_ip.strip() return request.client.host if request.client else "unknown" def _get_private_key(): private_key_str = Config.PRIVATE_KEY if not private_key_str: raise ValueError("PRIVATE_KEY environment variable is not set. ") private_key_bytes = private_key_str.encode("utf-8") return serialization.load_pem_private_key(private_key_bytes, password=None) def rsa_sign(data: dict) -> str: private_key = _get_private_key() payload = json.dumps(data, separators=(",", ":"), sort_keys=True).encode() signature = private_key.sign(payload, padding.PKCS1v15(), hashes.SHA256()) return base64.b64encode(signature).decode() def response_success(data: dict, status_code: int = 200): content = {"success": True, "data": data, "error": None} # logger.info(f"Response success: {content}") return JSONResponse(content=content, status_code=status_code) def response_error(code: str, message: str, status_code: int): content = { "success": False, "data": None, "error": {"code": code, "message": message}, } logger.info(f"Response error: {content}") return JSONResponse(content=content, status_code=status_code)