Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import hashlib | |
| import json | |
| import time | |
| import uuid | |
| from typing import Any, Dict, List, Optional | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel, Field | |
| from cryptography.hazmat.primitives.asymmetric import ed25519 | |
| from cryptography.exceptions import InvalidSignature | |
| JEP_WIRE_VERSION = "1" | |
| JEP_CORE_PROFILE = "jep-core-0.6" | |
| SUPPORTED_VERBS = {"J", "D", "T", "V"} | |
| _PRIVATE_KEY = ed25519.Ed25519PrivateKey.generate() | |
| _PUBLIC_KEY = _PRIVATE_KEY.public_key() | |
| app = FastAPI( | |
| title="JEP API v0.6 Seed", | |
| description=( | |
| "API seed for creating, signing, hashing, and verifying JEP v0.6 events. " | |
| "This is a demo implementation and does not define new JEP-Core semantics." | |
| ), | |
| version="0.6.0", | |
| ) | |
| def canonicalize(value: Any) -> bytes: | |
| return json.dumps( | |
| value, | |
| sort_keys=True, | |
| separators=(",", ":"), | |
| ensure_ascii=False, | |
| ).encode("utf-8") | |
| def sha256_tagged(value: Any) -> str: | |
| raw = value if isinstance(value, bytes) else canonicalize(value) | |
| return "sha256:" + hashlib.sha256(raw).hexdigest() | |
| def b64u(data: bytes) -> str: | |
| return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") | |
| def b64u_decode(value: str) -> bytes: | |
| return base64.urlsafe_b64decode(value + "=" * (-len(value) % 4)) | |
| def detached_jws_sign(unsigned_event: Dict[str, Any]) -> str: | |
| protected = { | |
| "alg": "Ed25519", | |
| "typ": "jep-event+jws", | |
| "jep": JEP_WIRE_VERSION, | |
| "kid": "hf-space-demo-key", | |
| } | |
| protected_b64 = b64u(canonicalize(protected)) | |
| payload_b64 = b64u(canonicalize(unsigned_event)) | |
| signing_input = f"{protected_b64}.{payload_b64}".encode("ascii") | |
| signature = _PRIVATE_KEY.sign(signing_input) | |
| return f"{protected_b64}..{b64u(signature)}" | |
| def detached_jws_verify(unsigned_event: Dict[str, Any], sig: str) -> bool: | |
| try: | |
| protected_b64, empty, signature_b64 = sig.split(".") | |
| if empty != "": | |
| return False | |
| payload_b64 = b64u(canonicalize(unsigned_event)) | |
| signing_input = f"{protected_b64}.{payload_b64}".encode("ascii") | |
| _PUBLIC_KEY.verify(b64u_decode(signature_b64), signing_input) | |
| return True | |
| except (ValueError, InvalidSignature): | |
| return False | |
| except Exception: | |
| return False | |
| def unsigned_event_dict(event: Dict[str, Any]) -> Dict[str, Any]: | |
| return {k: v for k, v in event.items() if k != "sig" and v is not None} | |
| def validation_result( | |
| valid: bool, | |
| mode: str, | |
| event_hash: Optional[str] = None, | |
| warnings: Optional[List[Dict[str, Any]]] = None, | |
| errors: Optional[List[Dict[str, Any]]] = None, | |
| ) -> Dict[str, Any]: | |
| return { | |
| "valid": valid, | |
| "level": 1 if valid else 0, | |
| "mode": mode, | |
| "profile": JEP_CORE_PROFILE, | |
| "scopes": ["syntax", "hash", "signature"], | |
| "event_hash": event_hash, | |
| "warnings": warnings or [], | |
| "errors": errors or [], | |
| } | |
| class CreateEventRequest(BaseModel): | |
| verb: str = Field(..., description="JEP verb: J, D, T, or V") | |
| who: Optional[str] = Field(None, description="Actor identifier") | |
| what: Any = Field(..., description="Claim, digest, or decision-related payload") | |
| aud: Optional[str] = Field(None, description="Audience / validation context") | |
| ref: Optional[str] = Field(None, description="Reference event hash") | |
| ttl_minutes: Optional[int] = Field(None, description="Optional TTL extension in minutes") | |
| digest_only_who: bool = Field(False, description="Hash the who field before signing") | |
| ext: Dict[str, Any] = Field(default_factory=dict) | |
| ext_crit: List[str] = Field(default_factory=list) | |
| class VerifyEventRequest(BaseModel): | |
| event: Dict[str, Any] | |
| mode: str = "archival" | |
| consume_nonce: bool = False | |
| def index() -> Dict[str, Any]: | |
| return { | |
| "name": "JEP API v0.6 Seed", | |
| "version": "0.6.0", | |
| "profile": JEP_CORE_PROFILE, | |
| "endpoints": [ | |
| "GET /health", | |
| "POST /events/create", | |
| "POST /events/verify", | |
| "GET /docs", | |
| ], | |
| "boundary": { | |
| "implementation_seed": True, | |
| "not_legal_liability": True, | |
| "not_factual_truth": True, | |
| "not_regulatory_compliance": True, | |
| "not_complete_log_proof": True, | |
| }, | |
| } | |
| def health() -> Dict[str, Any]: | |
| return { | |
| "ok": True, | |
| "version": "0.6.0", | |
| "profile": JEP_CORE_PROFILE, | |
| "jep": JEP_WIRE_VERSION, | |
| } | |
| def create_event(req: CreateEventRequest) -> Dict[str, Any]: | |
| errors: List[Dict[str, Any]] = [] | |
| warnings: List[Dict[str, Any]] = [] | |
| if req.verb not in SUPPORTED_VERBS: | |
| errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"}) | |
| if req.what is None: | |
| errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": "what is required"}) | |
| if errors: | |
| return { | |
| "event": None, | |
| "event_hash": None, | |
| "validation": validation_result(False, "create", None, warnings, errors), | |
| } | |
| who_value = req.who or "anonymous" | |
| if req.digest_only_who: | |
| who_value = sha256_tagged(str(who_value).encode("utf-8")) | |
| warnings.append({ | |
| "code": "WARN_DIGEST_ONLY_WHO", | |
| "message": "who was replaced by a digest for demo privacy behavior", | |
| }) | |
| ext = dict(req.ext or {}) | |
| ext_crit = list(req.ext_crit or []) | |
| if req.ttl_minutes is not None: | |
| ext["https://jep.dev/ext/ttl"] = { | |
| "ttl_minutes": req.ttl_minutes, | |
| "expires_at": int(time.time()) + req.ttl_minutes * 60, | |
| } | |
| unsigned_event = { | |
| "jep": JEP_WIRE_VERSION, | |
| "verb": req.verb, | |
| "who": who_value, | |
| "when": int(time.time()), | |
| "what": req.what, | |
| "nonce": str(uuid.uuid4()), | |
| "aud": req.aud, | |
| "ref": req.ref, | |
| } | |
| if ext: | |
| unsigned_event["ext"] = ext | |
| if ext_crit: | |
| unsigned_event["ext_crit"] = ext_crit | |
| unsigned_event = unsigned_event_dict(unsigned_event) | |
| sig = detached_jws_sign(unsigned_event) | |
| event = dict(unsigned_event) | |
| event["sig"] = sig | |
| event_hash = sha256_tagged(event) | |
| return { | |
| "event": event, | |
| "event_hash": event_hash, | |
| "validation": validation_result(True, "create", event_hash, warnings, []), | |
| } | |
| def verify_event(req: VerifyEventRequest) -> Dict[str, Any]: | |
| event = dict(req.event or {}) | |
| errors: List[Dict[str, Any]] = [] | |
| warnings: List[Dict[str, Any]] = [] | |
| if event.get("jep") != JEP_WIRE_VERSION: | |
| errors.append({"code": "ERR_UNSUPPORTED_JEP_VERSION", "message": "jep must be '1'"}) | |
| if event.get("verb") not in SUPPORTED_VERBS: | |
| errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"}) | |
| for field in ["who", "when", "what", "nonce", "sig"]: | |
| if field not in event or event.get(field) is None: | |
| errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": f"{field} is required"}) | |
| event_hash = sha256_tagged(event) if event else None | |
| if not errors: | |
| unsigned = unsigned_event_dict(event) | |
| if not detached_jws_verify(unsigned, event.get("sig", "")): | |
| errors.append({"code": "ERR_INVALID_SIGNATURE", "message": "detached JWS signature verification failed"}) | |
| if req.consume_nonce: | |
| warnings.append({ | |
| "code": "WARN_NONCE_CONSUMPTION_DEMO_ONLY", | |
| "message": "nonce consumption is not persisted in this demo Space", | |
| }) | |
| return validation_result( | |
| valid=len(errors) == 0, | |
| mode=req.mode, | |
| event_hash=event_hash, | |
| warnings=warnings, | |
| errors=errors, | |
| ) | |