import hmac import hashlib import time import json import os import base64 from typing import Tuple def sign_payload (payload :dict ,secret :str )->Tuple [str ,str ]: payload ["_ts"]=int (time .time ()) body =json .dumps (payload ,separators =(",",":"),sort_keys =True ) sig =hmac .new ( secret .encode (), body .encode (), hashlib .sha256 ).hexdigest () return body ,sig def verify_payload (body :str ,sig :str ,secret :str ,max_age_seconds :int =300 )->dict : expected =hmac .new ( secret .encode (), body .encode (), hashlib .sha256 ).hexdigest () if not hmac .compare_digest (expected ,sig ): raise ValueError ("Invalid HMAC signature") data =json .loads (body ) ts =data .get ("_ts",0 ) if abs (time .time ()-ts )>max_age_seconds : raise ValueError (f"Stale request: {abs (time .time ()-ts ):.0f}s old") return data def generate_self_signed_cert (cert_path :str ,key_path :str ,cn :str ="localhost"): from cryptography import x509 from cryptography .x509 .oid import NameOID from cryptography .hazmat .primitives import hashes ,serialization from cryptography .hazmat .primitives .asymmetric import rsa from cryptography .hazmat .backends import default_backend import datetime key =rsa .generate_private_key ( public_exponent =65537 , key_size =2048 , backend =default_backend () ) subject =issuer =x509 .Name ([ x509 .NameAttribute (NameOID .COMMON_NAME ,cn ), ]) cert =( x509 .CertificateBuilder () .subject_name (subject ) .issuer_name (issuer ) .public_key (key .public_key ()) .serial_number (x509 .random_serial_number ()) .not_valid_before (datetime .datetime .utcnow ()) .not_valid_after (datetime .datetime .utcnow ()+datetime .timedelta (days =3650 )) .add_extension ( x509 .SubjectAlternativeName ([x509 .DNSName (cn )]), critical =False , ) .sign (key ,hashes .SHA256 (),default_backend ()) ) with open (cert_path ,"wb")as f : f .write (cert .public_bytes (serialization .Encoding .PEM )) with open (key_path ,"wb")as f : f .write (key .private_bytes ( serialization .Encoding .PEM , serialization .PrivateFormat .TraditionalOpenSSL , serialization .NoEncryption () )) def load_or_create_secret (path :str ="shared_secret.key")->str : if os .path .exists (path ): with open (path ,"r")as f : return f .read ().strip () secret =base64 .urlsafe_b64encode (os .urandom (32 )).decode () with open (path ,"w")as f : f .write (secret ) return secret