Spaces:
Sleeping
Sleeping
File size: 3,171 Bytes
6498ac1 5a05423 6498ac1 d47ae8b 96b82bc 69241fb 6498ac1 5a05423 96b82bc d47ae8b 96b82bc d47ae8b 96b82bc d47ae8b 96b82bc d47ae8b 6e87ff8 6498ac1 96b82bc d47ae8b 6498ac1 d47ae8b 96b82bc 6e87ff8 d47ae8b 96b82bc 6498ac1 6e87ff8 d47ae8b 6498ac1 d8ef0e2 d47ae8b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# app.py (FINAL w/ contradiction scan)
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Literal
from datetime import datetime
import json
app = FastAPI()
class Metrics(BaseModel):
TCS: float = Field(ge=0.0, le=1.0, default=0.0)
HDI: float = 0.0
PDS: float = Field(ge=0.0, le=1.0, default=0.0)
EVI: int = 0
CBS: float = Field(ge=0.0, le=1.0, default=0.0)
LQS: float = Field(ge=0.0, le=1.0, default=0.0)
class AuditResult(BaseModel):
metrics: Metrics
decision: Literal['APPROVE','REWORK']
deltas: List[str] = Field(default_factory=list)
@app.get("/")
def root():
return JSONResponse({"ok": True, "service": "auditor", "hint": "use /health or POST /audit"})
@app.get("/health")
def health():
return {"ok": True, "service": "auditor", "ts": datetime.utcnow().isoformat()}
def contradiction_hits(text: str) -> int:
flags = [
"no significant", "not associated", "fails to", "did not find",
"contradict", "contradiction", "inconsistent", "null result",
"lack of association", "negative finding", "non-significant"
]
t = text.lower()
return sum(1 for f in flags if f in t)
@app.post("/audit", response_model=AuditResult)
def audit(payload: Dict[str, Any]):
bundle = payload.get("bundle", {})
m_in = payload.get("metrics", {})
# seed metrics from input
m = Metrics(**{
"TCS": float(m_in.get("TCS", 0.0)),
"HDI": float(m_in.get("HDI", 0.0)),
"PDS": float(m_in.get("PDS", 0.0)),
"EVI": int(m_in.get("EVI", 0)),
"CBS": float(m_in.get("CBS", 0.0)),
"LQS": float(m_in.get("LQS", 0.0)),
})
edges = len(bundle.get("network",{}).get("data",[]) or [])
regions = len((bundle.get("regions",{}).get("data") or {}).get("regions_ranked",[]) or [])
lit_ok = bool(bundle.get("literature"))
# heuristic patches
m.TCS = min(1.0, (1 if edges>0 else 0) + (1 if regions>0 else 0) + (1 if lit_ok else 0)) * 0.9
if regions > 8: m.HDI = max(m.HDI, 1.2)
elif regions > 4: m.HDI = max(m.HDI, 1.0)
else: m.HDI = max(m.HDI, 0.8)
m.EVI = (1 if edges>0 else 0) + (1 if regions>0 else 0) + (1 if lit_ok else 0)
m.CBS = max(0.2, float(m.CBS))
# NEW: contradiction scan → raises CBS if many “conflict” phrases are present
try:
blob = json.dumps(bundle, default=str)
except Exception:
blob = str(bundle)
hits = contradiction_hits(blob)
# add up to +0.7 in steps of +0.1 (cap at 0.9 total)
m.CBS = min(0.9, max(m.CBS, 0.2 + 0.1 * hits))
deltas: List[str] = []
decision: Literal['APPROVE','REWORK'] = 'APPROVE'
if (m.HDI < 1.0) or (m.EVI < 3) or (m.CBS > 0.6):
decision = 'REWORK'
if m.HDI < 1.0: deltas.append("Increase hypothesis diversity: broaden regions search.")
if m.EVI < 3: deltas.append("Add at least one more evidence type (e.g., trials or review).")
if m.CBS > 0.6: deltas.append("Surface and reconcile conflicting findings explicitly.")
return AuditResult(metrics=m, decision=decision, deltas=deltas)
|