| from fastapi import Request |
| from fastapi.responses import JSONResponse |
| from starlette.middleware.base import BaseHTTPMiddleware |
| from nacl.signing import VerifyKey |
| from nacl.exceptions import BadSignatureError |
| import hashlib |
| import secrets |
| import base58 |
| import requests |
| import json |
| import httpx |
| from geopy.geocoders import Nominatim |
| from geopy.exc import GeocoderServiceError |
|
|
| geolocator = Nominatim(user_agent="velocity") |
|
|
|
|
|
|
|
|
|
|
|
|
| def GEOCODER(coords): |
| try: |
| lat = coords.get("latitude") |
| lon = coords.get("longitude") |
|
|
| if lat is None or lon is None: |
| return { |
| "status": "error", |
| "message": "Missing latitude or longitude.", |
| "data": None |
| } |
|
|
| location = geolocator.reverse(f"{lat}, {lon}", exactly_one=True) |
|
|
| if location and "address" in location.raw: |
| country_name = location.raw["address"].get("country") |
| country_code = location.raw["address"].get("country_code") |
|
|
| return { |
| "status": "success", |
| "message": "Location resolved successfully.", |
| "data": { |
| "country": country_name, |
| "code": country_code.upper() if country_code else None |
| } |
| } |
|
|
| return { |
| "status": "not_found", |
| "message": "Location could not be resolved.", |
| "data": None |
| } |
|
|
| except GeocoderServiceError as e: |
| return { |
| "status": "error", |
| "message": f"Geocoding service error: {str(e)}", |
| "data": None |
| } |
|
|
| except Exception as e: |
| return { |
| "status": "error", |
| "message": f"Unexpected error: {str(e)}", |
| "data": None |
| } |
|
|
|
|
|
|
|
|
| def SignatureVerification(X_401_Addr,X_401_Nonce,X_401_Sign,challange): |
|
|
| |
| if X_401_Addr and X_401_Nonce and X_401_Sign: |
|
|
| try: |
|
|
| signature_bytes = base58.b58decode(X_401_Sign) |
| verify_key = VerifyKey(base58.b58decode(X_401_Addr)) |
| payload_bytes = challange.encode("utf-8") |
| |
| verify_key.verify(payload_bytes, signature_bytes) |
| return True |
| |
| except BadSignatureError: |
| |
| return False |
|
|
|
|
|
|
| def SecretNonceGenerator(): |
| random_bytes = secrets.token_bytes(32) |
| return hashlib.sha256(random_bytes).hexdigest() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def TokenCheck(walletPublicKey): |
| |
| url = f"https://mainnet.helius-rpc.com/?api-key=4e833ada-d32c-48c5-b020-c11b2253f25b" |
| payload = { |
| "jsonrpc": "2.0", |
| "id": "1", |
| "method": "getTokenAccountsByOwner", |
| "params": [ |
| walletPublicKey, |
| {"mint":"3d4XyPWkUJzruF5c2qc1QfpLgsaNaDLMtTya1bWBpump" }, |
| {"encoding": "jsonParsed"} |
| ] |
| } |
| |
| headers = {"Content-Type": "application/json"} |
| |
| try: |
| |
| response = requests.post(url, json=payload, headers=headers) |
| response.raise_for_status() |
| data = response.json() |
| |
| except requests.exceptions.RequestException as e: |
| |
| return {"status": False, "message": f"API Request Failed: {e}"} |
|
|
| |
| token_accounts = data.get("result", {}).get("value") |
|
|
| if not token_accounts: |
| |
| return False |
| |
| try: |
| |
| account_info = token_accounts[0]["account"]["data"]["parsed"]["info"] |
| token_amount = account_info["tokenAmount"] |
| ui_amount = float(token_amount.get("uiAmount")) |
| |
| except (TypeError, KeyError, IndexError): |
|
|
| return {"status": False, "message": "Could not parse token account data"} |
|
|
| if ui_amount >= 100000.0: |
| |
| print(ui_amount) |
| return True |
| |
| else: |
| print(ui_amount) |
| return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class x401Kit(BaseHTTPMiddleware): |
|
|
|
|
| def __init__(self, app, protected_paths: list, required_mint: str, mint_amount:int , helius_api_key: str, geo_code:bool ,geo_code_locs:list,secret_domain="VELOCITY"): |
| super().__init__(app) |
| self.protected_paths = protected_paths |
| self.required_mint = required_mint |
| self.mint_amount=mint_amount |
| self.helius_api_key = helius_api_key |
| self.secret_domain = secret_domain |
| self.geo_code=geo_code |
| self.geo_code_locs=geo_code_locs |
| |
|
|
|
|
| async def dispatch(self, request: Request, call_next): |
| |
|
|
|
|
| if request.method == "OPTIONS": |
| return await call_next(request) |
| |
| if not any(request.url.path.startswith(p) for p in self.protected_paths): |
| return await call_next(request) |
| |
|
|
|
|
| NONCE=SecretNonceGenerator() |
| |
| X_401_Nonce=request.headers.get("X-401-Nonce") |
| X_401_Sign=request.headers.get("X-401-Signature") |
| X_401_Addr=request.headers.get("X-401-Addr") |
| lat=request.headers.get("X-Lat") |
| long=request.headers.get("X-Long") |
| coords={ |
| |
| "latitude":lat, |
| "longitude":long |
| |
| } |
| |
|
|
| |
| REQUIRED_SERVICE=None |
| |
| if not X_401_Addr and not X_401_Nonce and not X_401_Sign : |
|
|
| payload401={ |
| |
| "X-401-Status":"Authrequired", |
| "x-401-Mechanism":"SOLANA", |
| "X-401-Nonce":NONCE, |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true", |
| "Access-Control-Expose-Headers": "x-401-Nonce, x-401-Mechanism, x-401-Status" |
| } |
| |
| return JSONResponse(content={ |
| |
| |
| "message":"401 Auth Required", |
| "information":"Non persistant stateless auth" |
| |
| },headers=payload401,status_code=401) |
| |
|
|
|
|
| challange=f"CHALLENGE::{X_401_Nonce}::{request.url.path}::{self.secret_domain}" |
|
|
| signverify=SignatureVerification(X_401_Addr,X_401_Nonce,X_401_Sign,challange) |
| tokenverify=TokenCheck(X_401_Addr) |
| print(tokenverify) |
|
|
|
|
| |
|
|
| if signverify == True and tokenverify == True: |
|
|
|
|
| if self.geo_code==True: |
| |
| country=GEOCODER(coords) |
| print(country) |
| |
| |
| if country["data"] is None: |
| return JSONResponse( |
| |
| content={"status": "locerror", "message": f"Auth error"}, |
| headers= { |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| }, |
| status_code=500 |
| ) |
| |
| |
| if country["data"]["code"] in self.geo_code_locs: |
| |
| return JSONResponse( |
| content={"status": "locdeny", "message":f"access denied for {country['data']['country']}"}, |
| headers= { |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| }, |
| status_code=401 |
| ) |
|
|
| |
| |
| response = await call_next(request) |
|
|
| |
| if response.headers.get("content-type") == "application/json": |
| |
| body_bytes = b"" |
| |
| async for chunk in response.body_iterator: |
| body_bytes += chunk |
|
|
| try: |
| data = json.loads(body_bytes.decode()) |
| except json.JSONDecodeError: |
| return response |
|
|
| |
| data["address"] = X_401_Addr |
| |
| response_headers = dict(response.headers) |
| response_headers.pop("content-length", None) |
| print(data) |
|
|
| return JSONResponse( |
| content=data, |
| status_code=response.status_code, |
| headers=response_headers |
| ) |
| |
|
|
| return response |
| |
|
|
| elif tokenverify==False: |
| return JSONResponse( |
| content={"status": "error", "message": "Missing required token"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| } |
| |
| ) |
| |
| elif signverify==False: |
| print("failed") |
| return JSONResponse( |
| content={"status": "error", "message": "bad signature"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| } |
| ) |
| |
|
|
|
|
| else: |
| return JSONResponse( |
| content={"status": "error", "message": "Authentication failed"}, |
| status_code=500, |
| headers={ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Credentials": "true" |
| } |
| ) |
|
|
| |