from __future__ import annotations import base64 import hashlib import json import time import uuid from typing import Any, Dict, List, Optional from fastapi import FastAPI from pydantic import BaseModel, Field from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptography.exceptions import InvalidSignature JEP_WIRE_VERSION = "1" JEP_CORE_PROFILE = "jep-core-0.6" SUPPORTED_VERBS = {"J", "D", "T", "V"} _PRIVATE_KEY = ed25519.Ed25519PrivateKey.generate() _PUBLIC_KEY = _PRIVATE_KEY.public_key() app = FastAPI( title="JEP API v0.6 Seed", description=( "API seed for creating, signing, hashing, and verifying JEP v0.6 events. " "This is a demo implementation and does not define new JEP-Core semantics." ), version="0.6.0", ) def canonicalize(value: Any) -> bytes: return json.dumps( value, sort_keys=True, separators=(",", ":"), ensure_ascii=False, ).encode("utf-8") def sha256_tagged(value: Any) -> str: raw = value if isinstance(value, bytes) else canonicalize(value) return "sha256:" + hashlib.sha256(raw).hexdigest() def b64u(data: bytes) -> str: return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") def b64u_decode(value: str) -> bytes: return base64.urlsafe_b64decode(value + "=" * (-len(value) % 4)) def detached_jws_sign(unsigned_event: Dict[str, Any]) -> str: protected = { "alg": "Ed25519", "typ": "jep-event+jws", "jep": JEP_WIRE_VERSION, "kid": "hf-space-demo-key", } protected_b64 = b64u(canonicalize(protected)) payload_b64 = b64u(canonicalize(unsigned_event)) signing_input = f"{protected_b64}.{payload_b64}".encode("ascii") signature = _PRIVATE_KEY.sign(signing_input) return f"{protected_b64}..{b64u(signature)}" def detached_jws_verify(unsigned_event: Dict[str, Any], sig: str) -> bool: try: protected_b64, empty, signature_b64 = sig.split(".") if empty != "": return False payload_b64 = b64u(canonicalize(unsigned_event)) signing_input = f"{protected_b64}.{payload_b64}".encode("ascii") _PUBLIC_KEY.verify(b64u_decode(signature_b64), signing_input) return True except (ValueError, InvalidSignature): return False except Exception: return False def unsigned_event_dict(event: Dict[str, Any]) -> Dict[str, Any]: return {k: v for k, v in event.items() if k != "sig" and v is not None} def validation_result( valid: bool, mode: str, event_hash: Optional[str] = None, warnings: Optional[List[Dict[str, Any]]] = None, errors: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: return { "valid": valid, "level": 1 if valid else 0, "mode": mode, "profile": JEP_CORE_PROFILE, "scopes": ["syntax", "hash", "signature"], "event_hash": event_hash, "warnings": warnings or [], "errors": errors or [], } class CreateEventRequest(BaseModel): verb: str = Field(..., description="JEP verb: J, D, T, or V") who: Optional[str] = Field(None, description="Actor identifier") what: Any = Field(..., description="Claim, digest, or decision-related payload") aud: Optional[str] = Field(None, description="Audience / validation context") ref: Optional[str] = Field(None, description="Reference event hash") ttl_minutes: Optional[int] = Field(None, description="Optional TTL extension in minutes") digest_only_who: bool = Field(False, description="Hash the who field before signing") ext: Dict[str, Any] = Field(default_factory=dict) ext_crit: List[str] = Field(default_factory=list) class VerifyEventRequest(BaseModel): event: Dict[str, Any] mode: str = "archival" consume_nonce: bool = False @app.get("/") def index() -> Dict[str, Any]: return { "name": "JEP API v0.6 Seed", "version": "0.6.0", "profile": JEP_CORE_PROFILE, "endpoints": [ "GET /health", "POST /events/create", "POST /events/verify", "GET /docs", ], "boundary": { "implementation_seed": True, "not_legal_liability": True, "not_factual_truth": True, "not_regulatory_compliance": True, "not_complete_log_proof": True, }, } @app.get("/health") def health() -> Dict[str, Any]: return { "ok": True, "version": "0.6.0", "profile": JEP_CORE_PROFILE, "jep": JEP_WIRE_VERSION, } @app.post("/events/create") def create_event(req: CreateEventRequest) -> Dict[str, Any]: errors: List[Dict[str, Any]] = [] warnings: List[Dict[str, Any]] = [] if req.verb not in SUPPORTED_VERBS: errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"}) if req.what is None: errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": "what is required"}) if errors: return { "event": None, "event_hash": None, "validation": validation_result(False, "create", None, warnings, errors), } who_value = req.who or "anonymous" if req.digest_only_who: who_value = sha256_tagged(str(who_value).encode("utf-8")) warnings.append({ "code": "WARN_DIGEST_ONLY_WHO", "message": "who was replaced by a digest for demo privacy behavior", }) ext = dict(req.ext or {}) ext_crit = list(req.ext_crit or []) if req.ttl_minutes is not None: ext["https://jep.dev/ext/ttl"] = { "ttl_minutes": req.ttl_minutes, "expires_at": int(time.time()) + req.ttl_minutes * 60, } unsigned_event = { "jep": JEP_WIRE_VERSION, "verb": req.verb, "who": who_value, "when": int(time.time()), "what": req.what, "nonce": str(uuid.uuid4()), "aud": req.aud, "ref": req.ref, } if ext: unsigned_event["ext"] = ext if ext_crit: unsigned_event["ext_crit"] = ext_crit unsigned_event = unsigned_event_dict(unsigned_event) sig = detached_jws_sign(unsigned_event) event = dict(unsigned_event) event["sig"] = sig event_hash = sha256_tagged(event) return { "event": event, "event_hash": event_hash, "validation": validation_result(True, "create", event_hash, warnings, []), } @app.post("/events/verify") def verify_event(req: VerifyEventRequest) -> Dict[str, Any]: event = dict(req.event or {}) errors: List[Dict[str, Any]] = [] warnings: List[Dict[str, Any]] = [] if event.get("jep") != JEP_WIRE_VERSION: errors.append({"code": "ERR_UNSUPPORTED_JEP_VERSION", "message": "jep must be '1'"}) if event.get("verb") not in SUPPORTED_VERBS: errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"}) for field in ["who", "when", "what", "nonce", "sig"]: if field not in event or event.get(field) is None: errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": f"{field} is required"}) event_hash = sha256_tagged(event) if event else None if not errors: unsigned = unsigned_event_dict(event) if not detached_jws_verify(unsigned, event.get("sig", "")): errors.append({"code": "ERR_INVALID_SIGNATURE", "message": "detached JWS signature verification failed"}) if req.consume_nonce: warnings.append({ "code": "WARN_NONCE_CONSUMPTION_DEMO_ONLY", "message": "nonce consumption is not persisted in this demo Space", }) return validation_result( valid=len(errors) == 0, mode=req.mode, event_hash=event_hash, warnings=warnings, errors=errors, )