| from fastapi import Request |
| from fastapi.responses import JSONResponse |
| from starlette.middleware.base import BaseHTTPMiddleware |
| from nacl.signing import VerifyKey |
| from nacl.exceptions import BadSignatureError |
| import hashlib |
| import secrets |
| import base58 |
| import requests |
| import json |
| import httpx |
| import jwt |
| from jwt.exceptions import ( |
| ExpiredSignatureError, |
| InvalidAudienceError, |
| InvalidSignatureError, |
| DecodeError |
| ) |
|
|
|
|
| import datetime |
| import time |
| import base64 |
| |
| |
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def SignatureVerification(mpc,X_401_Addr,X_401_Nonce,X_401_Sign,challange): |
|
|
| |
| if X_401_Addr and X_401_Nonce and X_401_Sign: |
|
|
| try: |
|
|
| |
| signature_bytes=None |
| |
| if mpc=='true': |
| signature_bytes = base64.b64decode(X_401_Sign) |
| elif mpc=='false': |
| signature_bytes=base58.b58decode(X_401_Sign) |
| |
| |
| verify_key = VerifyKey(base58.b58decode(X_401_Addr)) |
| payload_bytes = challange.encode("utf-8") |
| verify_key.verify(payload_bytes, signature_bytes) |
| return True |
| |
| except BadSignatureError: |
| |
| return False |
|
|
|
|
|
|
| def SecretNonceGenerator(): |
| random_bytes = secrets.token_bytes(32) |
| return hashlib.sha256(random_bytes).hexdigest() |
|
|
|
|
|
|
| def verifyJWT(token_string,aud): |
|
|
| try: |
| payload = jwt.decode( |
| token_string, |
| "jwttoken", |
| algorithms=["HS256"], |
| audience=aud, |
| issuer="velocity401" |
| ) |
| |
| wallet_address = payload.get("sub") |
| print(f"✅ Token valid for wallet: {wallet_address}") |
| return wallet_address |
| |
| except ExpiredSignatureError: |
| print("🚨 Token rejected: Signature has expired.") |
| return None |
| except InvalidAudienceError: |
| print(f"🚨 Token rejected: Invalid audience. Expected '{EXPECTED_AUDIENCE}'.") |
| return None |
| except InvalidSignatureError: |
| print("🚨 Token rejected: Invalid signature (key mismatch).") |
| return None |
| except DecodeError as e: |
| print(f"🚨 Token rejected: Malformed JWT structure. Error: {e}") |
| return None |
| except Exception as e: |
| print(f"⚠️ An unexpected error occurred during verification: {e}") |
| return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def generateJWT(wallet,aud): |
|
|
| EXPIRY_DURATION = datetime.timedelta(days=3) |
| try: |
| |
| payload = { |
| |
| "sub": wallet, |
| "aud":aud, |
| "issuer":"velocity401", |
| "exp": datetime.datetime.utcnow() + EXPIRY_DURATION, |
| "iat": datetime.datetime.utcnow() |
| |
| } |
|
|
| token = jwt.encode(payload, "jwttoken", algorithm="HS256") |
|
|
| return token |
| except Exception as e: |
| |
| return None |
|
|
|
|
|
|
|
|
| def TokenCheck(walletPublicKey,required_mint,mint_amount): |
| |
| url = f"https://mainnet.helius-rpc.com/?api-key=4e833ada-d32c-48c5-b020-c11b2253f25b" |
| payload = { |
| "jsonrpc": "2.0", |
| "id": "1", |
| "method": "getTokenAccountsByOwner", |
| "params": [ |
| walletPublicKey, |
| {"mint":required_mint}, |
| {"encoding": "jsonParsed"} |
| ] |
| } |
| |
| headers = {"Content-Type": "application/json"} |
| |
| try: |
| |
| response = requests.post(url, json=payload, headers=headers) |
| response.raise_for_status() |
| data = response.json() |
| |
| except requests.exceptions.RequestException as e: |
| |
| return {"status": False, "message": f"API Request Failed: {e}"} |
|
|
| |
| token_accounts = data.get("result", {}).get("value") |
|
|
| if not token_accounts: |
| |
| return False |
| |
| try: |
| |
| account_info = token_accounts[0]["account"]["data"]["parsed"]["info"] |
| token_amount = account_info["tokenAmount"] |
| ui_amount = float(token_amount.get("uiAmount")) |
| |
| except (TypeError, KeyError, IndexError): |
|
|
| return {"status": False, "message": "Could not parse token account data"} |
|
|
| if float(ui_amount) >= float(mint_amount): |
| |
| print(ui_amount) |
| print(True) |
| return True |
| |
| else: |
| print(ui_amount) |
| return False |
|
|
|
|
|
|
|
|
| class x401Kit(BaseHTTPMiddleware): |
|
|
|
|
| def __init__(self, app,protected_paths:list): |
| super().__init__(app) |
| self.protected_paths = protected_paths |
| |
| |
|
|
|
|
| async def dispatch(self, request: Request, call_next): |
| |
|
|
|
|
| if request.method == "OPTIONS": |
| return await call_next(request) |
| |
| if not any(request.url.path.startswith(p) for p in self.protected_paths): |
| return await call_next(request) |
| |
|
|
|
|
|
|
| |
| NONCE=SecretNonceGenerator() |
| aud=request.headers.get("origin") |
| X_401_Nonce=request.headers.get("X-401-Nonce") |
| X_401_Sign=request.headers.get("X-401-Signature") |
| X_401_Addr=request.headers.get("X-401-Addr") |
| client_jwt=request.headers.get("x-jwt") |
| |
|
|
| |
|
|
| |
|
|
| REQUIRED_SERVICE=None |
|
|
|
|
| |
| |
| |
|
|
| if client_jwt: |
| decoded=verifyJWT(client_jwt,aud) |
| return JSONResponse( |
| content={"status":"identified","token":decoded,"message":"identified already"}, |
| status_code=200, |
| headers={ |
| |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| |
| }) |
|
|
| |
| if not X_401_Addr and not X_401_Nonce and not X_401_Sign : |
|
|
| |
|
|
| payload401={ |
| |
| "X-401-Status":"Authrequired", |
| "x-401-Mechanism":"SOLANA", |
| "X-401-Nonce":NONCE, |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true", |
| "Access-Control-Expose-Headers": "x-401-Nonce, x-401-Mechanism, x-401-Status" |
| } |
| |
| return JSONResponse(content={ |
| |
| |
| "message":"401 Auth Required", |
| "information":"Non persistant stateless auth", |
| "issuer":"velocityinfra" |
| |
| },headers=payload401,status_code=401) |
| |
|
|
|
|
|
|
| required_mint = request.headers.get("required_mint") |
| mint_amount=float(request.headers.get("mint_amount")) |
| print(mint_amount) |
| helius_api_key = request.headers.get("helius_api_key") |
| mpc=request.headers.get("mpc") |
| |
|
|
| |
| |
|
|
| challange=f"CHALLENGE::{X_401_Nonce}::{request.url.path}::VELOCITY401" |
|
|
| signverify=SignatureVerification(mpc,X_401_Addr,X_401_Nonce,X_401_Sign,challange) |
| tokenverify=None |
| if required_mint=="no": |
| tokenverify=True |
| else: |
| |
| tokenverify=TokenCheck(X_401_Addr,required_mint,mint_amount) |
| |
| |
| print(tokenverify) |
|
|
|
|
| |
|
|
| if signverify == True and tokenverify ==True: |
|
|
|
|
| response = await call_next(request) |
|
|
| if response.headers.get("content-type") == "application/json": |
| |
| body_bytes = b"" |
| async for chunk in response.body_iterator: |
| body_bytes += chunk |
|
|
| try: |
| data = json.loads(body_bytes.decode()) |
| except json.JSONDecodeError: |
| return response |
|
|
|
|
| JWTTOKEN=generateJWT(X_401_Addr,aud) |
| data["token"] = JWTTOKEN |
| |
| response_headers = dict(response.headers) |
| response_headers.pop("content-length", None) |
| print(data) |
|
|
| return JSONResponse( |
| content=data, |
| status_code=response.status_code, |
| headers=response_headers |
| ) |
|
|
|
|
| return response |
| |
|
|
| elif tokenverify==False: |
| return JSONResponse( |
| content={"status": "tokenerror", "message": "Missing required token"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| }) |
| |
| elif signverify==False: |
| print("failed") |
| return JSONResponse( |
| content={"status": "signerror", "message": "bad signature"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| }) |
| |
| else: |
| return JSONResponse( |
| content={"status": "autherror", "message": "Authentication failed"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| }) |