jep-api / main.py
yuqiangJEP's picture
Upload 5 files
263e3ce verified
from __future__ import annotations
import base64
import hashlib
import json
import time
import uuid
from typing import Any, Dict, List, Optional
from fastapi import FastAPI
from pydantic import BaseModel, Field
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.exceptions import InvalidSignature
JEP_WIRE_VERSION = "1"
JEP_CORE_PROFILE = "jep-core-0.6"
SUPPORTED_VERBS = {"J", "D", "T", "V"}
_PRIVATE_KEY = ed25519.Ed25519PrivateKey.generate()
_PUBLIC_KEY = _PRIVATE_KEY.public_key()
app = FastAPI(
title="JEP API v0.6 Seed",
description=(
"API seed for creating, signing, hashing, and verifying JEP v0.6 events. "
"This is a demo implementation and does not define new JEP-Core semantics."
),
version="0.6.0",
)
def canonicalize(value: Any) -> bytes:
return json.dumps(
value,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")
def sha256_tagged(value: Any) -> str:
raw = value if isinstance(value, bytes) else canonicalize(value)
return "sha256:" + hashlib.sha256(raw).hexdigest()
def b64u(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def b64u_decode(value: str) -> bytes:
return base64.urlsafe_b64decode(value + "=" * (-len(value) % 4))
def detached_jws_sign(unsigned_event: Dict[str, Any]) -> str:
protected = {
"alg": "Ed25519",
"typ": "jep-event+jws",
"jep": JEP_WIRE_VERSION,
"kid": "hf-space-demo-key",
}
protected_b64 = b64u(canonicalize(protected))
payload_b64 = b64u(canonicalize(unsigned_event))
signing_input = f"{protected_b64}.{payload_b64}".encode("ascii")
signature = _PRIVATE_KEY.sign(signing_input)
return f"{protected_b64}..{b64u(signature)}"
def detached_jws_verify(unsigned_event: Dict[str, Any], sig: str) -> bool:
try:
protected_b64, empty, signature_b64 = sig.split(".")
if empty != "":
return False
payload_b64 = b64u(canonicalize(unsigned_event))
signing_input = f"{protected_b64}.{payload_b64}".encode("ascii")
_PUBLIC_KEY.verify(b64u_decode(signature_b64), signing_input)
return True
except (ValueError, InvalidSignature):
return False
except Exception:
return False
def unsigned_event_dict(event: Dict[str, Any]) -> Dict[str, Any]:
return {k: v for k, v in event.items() if k != "sig" and v is not None}
def validation_result(
valid: bool,
mode: str,
event_hash: Optional[str] = None,
warnings: Optional[List[Dict[str, Any]]] = None,
errors: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
return {
"valid": valid,
"level": 1 if valid else 0,
"mode": mode,
"profile": JEP_CORE_PROFILE,
"scopes": ["syntax", "hash", "signature"],
"event_hash": event_hash,
"warnings": warnings or [],
"errors": errors or [],
}
class CreateEventRequest(BaseModel):
verb: str = Field(..., description="JEP verb: J, D, T, or V")
who: Optional[str] = Field(None, description="Actor identifier")
what: Any = Field(..., description="Claim, digest, or decision-related payload")
aud: Optional[str] = Field(None, description="Audience / validation context")
ref: Optional[str] = Field(None, description="Reference event hash")
ttl_minutes: Optional[int] = Field(None, description="Optional TTL extension in minutes")
digest_only_who: bool = Field(False, description="Hash the who field before signing")
ext: Dict[str, Any] = Field(default_factory=dict)
ext_crit: List[str] = Field(default_factory=list)
class VerifyEventRequest(BaseModel):
event: Dict[str, Any]
mode: str = "archival"
consume_nonce: bool = False
@app.get("/")
def index() -> Dict[str, Any]:
return {
"name": "JEP API v0.6 Seed",
"version": "0.6.0",
"profile": JEP_CORE_PROFILE,
"endpoints": [
"GET /health",
"POST /events/create",
"POST /events/verify",
"GET /docs",
],
"boundary": {
"implementation_seed": True,
"not_legal_liability": True,
"not_factual_truth": True,
"not_regulatory_compliance": True,
"not_complete_log_proof": True,
},
}
@app.get("/health")
def health() -> Dict[str, Any]:
return {
"ok": True,
"version": "0.6.0",
"profile": JEP_CORE_PROFILE,
"jep": JEP_WIRE_VERSION,
}
@app.post("/events/create")
def create_event(req: CreateEventRequest) -> Dict[str, Any]:
errors: List[Dict[str, Any]] = []
warnings: List[Dict[str, Any]] = []
if req.verb not in SUPPORTED_VERBS:
errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"})
if req.what is None:
errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": "what is required"})
if errors:
return {
"event": None,
"event_hash": None,
"validation": validation_result(False, "create", None, warnings, errors),
}
who_value = req.who or "anonymous"
if req.digest_only_who:
who_value = sha256_tagged(str(who_value).encode("utf-8"))
warnings.append({
"code": "WARN_DIGEST_ONLY_WHO",
"message": "who was replaced by a digest for demo privacy behavior",
})
ext = dict(req.ext or {})
ext_crit = list(req.ext_crit or [])
if req.ttl_minutes is not None:
ext["https://jep.dev/ext/ttl"] = {
"ttl_minutes": req.ttl_minutes,
"expires_at": int(time.time()) + req.ttl_minutes * 60,
}
unsigned_event = {
"jep": JEP_WIRE_VERSION,
"verb": req.verb,
"who": who_value,
"when": int(time.time()),
"what": req.what,
"nonce": str(uuid.uuid4()),
"aud": req.aud,
"ref": req.ref,
}
if ext:
unsigned_event["ext"] = ext
if ext_crit:
unsigned_event["ext_crit"] = ext_crit
unsigned_event = unsigned_event_dict(unsigned_event)
sig = detached_jws_sign(unsigned_event)
event = dict(unsigned_event)
event["sig"] = sig
event_hash = sha256_tagged(event)
return {
"event": event,
"event_hash": event_hash,
"validation": validation_result(True, "create", event_hash, warnings, []),
}
@app.post("/events/verify")
def verify_event(req: VerifyEventRequest) -> Dict[str, Any]:
event = dict(req.event or {})
errors: List[Dict[str, Any]] = []
warnings: List[Dict[str, Any]] = []
if event.get("jep") != JEP_WIRE_VERSION:
errors.append({"code": "ERR_UNSUPPORTED_JEP_VERSION", "message": "jep must be '1'"})
if event.get("verb") not in SUPPORTED_VERBS:
errors.append({"code": "ERR_UNKNOWN_VERB", "message": "verb must be J, D, T, or V"})
for field in ["who", "when", "what", "nonce", "sig"]:
if field not in event or event.get(field) is None:
errors.append({"code": "ERR_MISSING_REQUIRED_FIELD", "message": f"{field} is required"})
event_hash = sha256_tagged(event) if event else None
if not errors:
unsigned = unsigned_event_dict(event)
if not detached_jws_verify(unsigned, event.get("sig", "")):
errors.append({"code": "ERR_INVALID_SIGNATURE", "message": "detached JWS signature verification failed"})
if req.consume_nonce:
warnings.append({
"code": "WARN_NONCE_CONSUMPTION_DEMO_ONLY",
"message": "nonce consumption is not persisted in this demo Space",
})
return validation_result(
valid=len(errors) == 0,
mode=req.mode,
event_hash=event_hash,
warnings=warnings,
errors=errors,
)