import os import time import base64 import hashlib from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptography.hazmat.primitives import serialization class MEPIdentity: def __init__(self, key_path="private.pem"): self.key_path = key_path self._load_or_generate() def _load_or_generate(self): if os.path.exists(self.key_path): with open(self.key_path, "rb") as f: self.private_key = serialization.load_pem_private_key(f.read(), password=None) else: self.private_key = ed25519.Ed25519PrivateKey.generate() with open(self.key_path, "wb") as f: f.write(self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) self.public_key = self.private_key.public_key() self.pub_pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ).decode('utf-8') sha = hashlib.sha256(self.pub_pem.encode('utf-8')).hexdigest() self.node_id = f"node_{sha[:12]}" def sign(self, payload: str, timestamp: str) -> str: message = f"{payload}{timestamp}".encode('utf-8') signature = self.private_key.sign(message) return base64.b64encode(signature).decode('utf-8') def get_auth_headers(self, payload: str) -> dict: ts = str(int(time.time())) sig = self.sign(payload, ts) return { "X-MEP-NodeID": self.node_id, "X-MEP-Timestamp": ts, "X-MEP-Signature": sig }