ssra-auditor / app.py
darkfrostx's picture
Update app.py
6498ac1 verified
# app.py (FINAL w/ contradiction scan)
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Literal
from datetime import datetime
import json
app = FastAPI()
class Metrics(BaseModel):
TCS: float = Field(ge=0.0, le=1.0, default=0.0)
HDI: float = 0.0
PDS: float = Field(ge=0.0, le=1.0, default=0.0)
EVI: int = 0
CBS: float = Field(ge=0.0, le=1.0, default=0.0)
LQS: float = Field(ge=0.0, le=1.0, default=0.0)
class AuditResult(BaseModel):
metrics: Metrics
decision: Literal['APPROVE','REWORK']
deltas: List[str] = Field(default_factory=list)
@app.get("/")
def root():
return JSONResponse({"ok": True, "service": "auditor", "hint": "use /health or POST /audit"})
@app.get("/health")
def health():
return {"ok": True, "service": "auditor", "ts": datetime.utcnow().isoformat()}
def contradiction_hits(text: str) -> int:
flags = [
"no significant", "not associated", "fails to", "did not find",
"contradict", "contradiction", "inconsistent", "null result",
"lack of association", "negative finding", "non-significant"
]
t = text.lower()
return sum(1 for f in flags if f in t)
@app.post("/audit", response_model=AuditResult)
def audit(payload: Dict[str, Any]):
bundle = payload.get("bundle", {})
m_in = payload.get("metrics", {})
# seed metrics from input
m = Metrics(**{
"TCS": float(m_in.get("TCS", 0.0)),
"HDI": float(m_in.get("HDI", 0.0)),
"PDS": float(m_in.get("PDS", 0.0)),
"EVI": int(m_in.get("EVI", 0)),
"CBS": float(m_in.get("CBS", 0.0)),
"LQS": float(m_in.get("LQS", 0.0)),
})
edges = len(bundle.get("network",{}).get("data",[]) or [])
regions = len((bundle.get("regions",{}).get("data") or {}).get("regions_ranked",[]) or [])
lit_ok = bool(bundle.get("literature"))
# heuristic patches
m.TCS = min(1.0, (1 if edges>0 else 0) + (1 if regions>0 else 0) + (1 if lit_ok else 0)) * 0.9
if regions > 8: m.HDI = max(m.HDI, 1.2)
elif regions > 4: m.HDI = max(m.HDI, 1.0)
else: m.HDI = max(m.HDI, 0.8)
m.EVI = (1 if edges>0 else 0) + (1 if regions>0 else 0) + (1 if lit_ok else 0)
m.CBS = max(0.2, float(m.CBS))
# NEW: contradiction scan → raises CBS if many “conflict” phrases are present
try:
blob = json.dumps(bundle, default=str)
except Exception:
blob = str(bundle)
hits = contradiction_hits(blob)
# add up to +0.7 in steps of +0.1 (cap at 0.9 total)
m.CBS = min(0.9, max(m.CBS, 0.2 + 0.1 * hits))
deltas: List[str] = []
decision: Literal['APPROVE','REWORK'] = 'APPROVE'
if (m.HDI < 1.0) or (m.EVI < 3) or (m.CBS > 0.6):
decision = 'REWORK'
if m.HDI < 1.0: deltas.append("Increase hypothesis diversity: broaden regions search.")
if m.EVI < 3: deltas.append("Add at least one more evidence type (e.g., trials or review).")
if m.CBS > 0.6: deltas.append("Surface and reconcile conflicting findings explicitly.")
return AuditResult(metrics=m, decision=decision, deltas=deltas)