pms-copilot / app.py
MavareeSwimmingPool's picture
Upload app.py
750bb35 verified
import os
import json
import html
from typing import Any, Dict, List, Optional, Tuple
import requests
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from openai import OpenAI
# ===============================
# ENV / CONFIG (PROD-like)
# ===============================
load_dotenv()
DEBUG_STARTUP_LOGS = os.getenv("DEBUG_STARTUP_LOGS", "0").strip().lower() in ("1", "true", "yes")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY is missing. Put it into .env")
QDRANT_URL = os.getenv("QDRANT_URL", "http://127.0.0.1:6333").strip().rstrip("/")
QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "pms_equipment").strip()
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "").strip()
EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small").strip()
VECTOR_SIZE = int(os.getenv("VECTOR_SIZE", "1536").strip())
TOP_K = int(os.getenv("TOP_K", "5").strip())
# ===============================
# Evidence gate (PROD)
# ===============================
SCORE_THRESHOLD = float(os.getenv("SCORE_THRESHOLD", "0.55"))
MIN_STRONG_HITS = int(os.getenv("MIN_STRONG_HITS", "1"))
# ===============================
# Payload / token hygiene
# ===============================
MAX_QUERY_CHARS = int(os.getenv("MAX_QUERY_CHARS", "800").strip())
MIN_QUERY_CHARS = int(os.getenv("MIN_QUERY_CHARS", "3").strip())
MAX_EVIDENCE_CHARS = int(os.getenv("MAX_EVIDENCE_CHARS", "12000").strip())
RETURN_RAW_HITS = os.getenv("RETURN_RAW_HITS", "1").strip().lower() in ("1", "true", "yes")
# ===============================
# LLM
# ===============================
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini").strip() # JSON-only audit answer
if DEBUG_STARTUP_LOGS:
print("QDRANT_URL =", QDRANT_URL)
print("QDRANT_COLLECTION =", QDRANT_COLLECTION)
print("QDRANT_API_KEY =", "SET" if QDRANT_API_KEY else "MISSING")
print("EMBED_MODEL =", EMBED_MODEL)
print("VECTOR_SIZE =", VECTOR_SIZE)
print("TOP_K =", TOP_K)
print("LLM_MODEL =", LLM_MODEL)
# ===============================
# CLIENTS
# ===============================
oai = OpenAI(api_key=OPENAI_API_KEY)
# ===============================
# APP
# ===============================
app = FastAPI(title="PMS Copilot — RAG MVP")
# ============================================================
# SCHEMAS
# ============================================================
class AskRequest(BaseModel):
q: str
# ============================================================
# HELPERS
# ============================================================
def embed(text: str) -> List[float]:
"""OpenAI embeddings -> vector[VECTOR_SIZE]."""
resp = oai.embeddings.create(model=EMBED_MODEL, input=text)
vec = resp.data[0].embedding
if len(vec) != VECTOR_SIZE:
raise RuntimeError(
f"Embedding dim mismatch: got {len(vec)} but VECTOR_SIZE={VECTOR_SIZE}. "
f"Check EMBED_MODEL / VECTOR_SIZE in .env"
)
return vec
def qdrant_search_rest(query_vec: List[float], limit: int) -> List[Dict[str, Any]]:
"""
Qdrant REST search (robust, avoids qdrant_client version/SyncApis issues).
Returns list of points: [{"id":..., "score":..., "payload": {...}}, ...]
"""
url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/search"
payload = {
"vector": query_vec,
"limit": limit,
"with_payload": True,
"with_vectors": False,
}
headers: Dict[str, str] = {}
# Qdrant Cloud/self-host can require an API key. For Qdrant Cloud, "api-key" is commonly used.
if QDRANT_API_KEY:
headers["api-key"] = QDRANT_API_KEY
r = requests.post(url, json=payload, headers=headers, timeout=30)
r.raise_for_status()
data = r.json()
return data.get("result", [])
def pick_text_from_payload(payload: Dict[str, Any]) -> Optional[str]:
"""Extract readable text from payload (support common field names)."""
for k in ("text", "chunk", "content", "page_content", "body", "passage", "PROCEDURE"):
v = payload.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
if payload:
keys_pref = ["GROUPS", "FREQUENCY TYPE", "MAINTENANCE HEAD", "RESPONSIBILITY", "PROCEDURE"]
parts = []
for k in keys_pref:
if k in payload and payload[k] not in (None, ""):
parts.append(f"{k}: {payload[k]}")
if parts:
return " | ".join(parts)
return None
def build_evidence_blocks(hits: List[Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]:
"""
Build evidence list for LLM:
- evidence_text: lines like [1] ...
- sources: minimal metadata for UI
"""
evidence_lines: List[str] = []
sources: List[Dict[str, Any]] = []
for i, h in enumerate(hits, start=1):
payload = h.get("payload") or {}
text = pick_text_from_payload(payload) or ""
text = text.replace("\r", " ").replace("\n", " ").strip()
if not text:
text = json.dumps(payload, ensure_ascii=False)
evidence_lines.append(f"[{i}] {text}")
sources.append(
{
"n": i,
"id": h.get("id"),
"score": h.get("score"),
"GROUPS": payload.get("GROUPS"),
"FREQUENCY TYPE": payload.get("FREQUENCY TYPE"),
"MAINTENANCE HEAD": payload.get("MAINTENANCE HEAD"),
"RESPONSIBILITY": payload.get("RESPONSIBILITY"),
}
)
evidence_text = "\n".join(evidence_lines)
if len(evidence_text) > MAX_EVIDENCE_CHARS:
evidence_text = evidence_text[:MAX_EVIDENCE_CHARS] + "\n...[TRUNCATED]"
return evidence_text, sources
def _extract_first_json_object(s: str) -> str:
"""
Best-effort recovery if LLM outputs extra text.
Returns substring from first '{' to last '}'.
"""
if not s:
return s
start = s.find("{")
end = s.rfind("}")
if start == -1 or end == -1 or end <= start:
return s
return s[start : end + 1]
def run_llm_audit_json(query: str, evidence_text: str) -> Dict[str, Any]:
"""
LLM audit-style answer.
STRICT JSON ONLY (enforced by system contract + JSON parse).
"""
system_prompt = """
You are a maritime audit assistant.
RULES (MANDATORY):
- Output MUST be valid JSON
- NO markdown
- NO explanations
- NO text outside JSON
- Use ONLY the provided evidence
- If information is missing, use "Not found in provided records"
JSON SCHEMA (exact):
{
"summary": string,
"findings": [
{
"topic": string,
"requirement": string,
"observation": string,
"risk": string,
"evidence_refs": [number]
}
],
"conclusion": string
}
""".strip()
user_prompt = f"""
AUDIT QUESTION:
{query}
EVIDENCE:
{evidence_text}
""".strip()
resp = oai.responses.create(
model=LLM_MODEL,
input=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0,
)
raw = resp.output_text or ""
candidate = _extract_first_json_object(raw)
try:
return json.loads(candidate)
except json.JSONDecodeError as e:
raise RuntimeError(f"LLM returned invalid JSON: {e}\n\nRAW OUTPUT:\n{raw}")
# ============================================================
# API: HEALTH
# ============================================================
@app.get("/health")
def health():
return {"status": "ok"}
# ============================================================
# UI (HTML)
# ============================================================
@app.get("/", response_class=HTMLResponse)
def home():
qdrant_url_html = html.escape(QDRANT_URL)
coll_html = html.escape(QDRANT_COLLECTION)
embed_html = html.escape(EMBED_MODEL)
llm_html = html.escape(LLM_MODEL)
return f"""
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>PMS Copilot — RAG MVP</title>
<style>
body {{
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 34px auto;
padding: 0 16px;
}}
h1 {{ margin: 0 0 14px 0; font-size: 44px; letter-spacing: -0.5px; }}
.meta {{
color:#666; font-size: 13px; margin: 8px 0 18px 0;
}}
.row {{ display:flex; gap:10px; margin: 14px 0; align-items: stretch; }}
input {{
flex:1; padding: 14px; font-size: 16px;
border: 1px solid #bbb; border-radius: 6px;
}}
button {{
padding: 14px 18px; font-size: 16px; cursor: pointer;
border: 2px solid #222; background: #eee; border-radius: 6px;
min-width: 88px;
}}
.panel {{
background: #f6f6f6;
border-radius: 12px;
padding: 16px;
margin-top: 14px;
border: 1px solid #e2e2e2;
}}
.error {{
background: #fdecec;
border: 1px solid #f3b6b6;
}}
.title {{ font-size: 18px; font-weight: 700; margin: 0 0 10px 0; }}
.sub {{ color:#333; margin: 0 0 10px 0; }}
.kv {{ margin: 0; color:#111; }}
.kv b {{ display:inline-block; min-width: 140px; }}
.findings {{
margin-top: 14px;
display: grid;
grid-template-columns: 1fr;
gap: 10px;
}}
.card {{
background: #fff;
border-radius: 10px;
border: 1px solid #e5e5e5;
padding: 14px;
}}
.card h3 {{
margin: 0 0 8px 0;
font-size: 16px;
}}
.muted {{ color:#666; font-size: 13px; }}
.evidence {{
margin-top: 14px;
}}
table {{
width: 100%;
border-collapse: collapse;
background: #fff;
border-radius: 10px;
overflow: hidden;
border: 1px solid #e5e5e5;
}}
th, td {{
padding: 10px;
border-bottom: 1px solid #eee;
font-size: 13px;
vertical-align: top;
}}
th {{ text-align: left; background: #fafafa; }}
.row2 {{
display:flex; justify-content: space-between; align-items: center;
gap: 12px; margin-top: 10px;
}}
pre {{
margin: 0;
white-space: pre-wrap;
background: #111;
color: #eee;
padding: 12px;
border-radius: 10px;
overflow: auto;
font-size: 12px;
}}
.right {{
display:flex; gap: 10px; align-items: center;
}}
.checkbox {{
display:flex; gap: 8px; align-items: center;
font-size: 13px; color:#333;
}}
</style>
</head>
<body>
<h1>PMS Copilot — RAG MVP</h1>
<div class="meta">
Qdrant: <b>{qdrant_url_html}</b> · Collection: <b>{coll_html}</b> ·
Embed: <b>{embed_html}</b> · TopK: <b>{TOP_K}</b> · LLM: <b>{llm_html}</b>
</div>
<div class="row">
<input id="q" placeholder="Введите запрос..." />
<button onclick="send()">Ask</button>
</div>
<div id="result" class="panel" style="display:none;"></div>
<script>
function esc(s) {{
return String(s ?? "").replaceAll("&", "&amp;").replaceAll("<","&lt;").replaceAll(">","&gt;");
}}
function renderAudit(audit) {{
const summary = audit?.summary ?? "";
const findings = Array.isArray(audit?.findings) ? audit.findings : [];
const conclusion = audit?.conclusion ?? "";
let html = '';
html += `<div class="title">Summary</div>`;
html += `<div class="sub">${{esc(summary)}}</div>`;
html += `<div class="title" style="margin-top:14px;">Findings</div>`;
if (!findings.length) {{
html += `<div class="muted">No findings returned.</div>`;
}} else {{
html += `<div class="findings">`;
for (const f of findings) {{
const refs = Array.isArray(f?.evidence_refs) ? f.evidence_refs.join(", ") : "";
html += `
<div class="card">
<h3>${{esc(f?.topic ?? "Finding")}}</h3>
<p class="kv"><b>Requirement:</b> ${{esc(f?.requirement ?? "")}}</p>
<p class="kv"><b>Observation:</b> ${{esc(f?.observation ?? "")}}</p>
<p class="kv"><b>Risk:</b> ${{esc(f?.risk ?? "")}}</p>
<p class="muted"><b>Evidence refs:</b> ${{esc(refs)}}</p>
</div>
`;
}}
html += `</div>`;
}}
html += `<div class="title" style="margin-top:14px;">Conclusion</div>`;
html += `<div class="sub">${{esc(conclusion)}}</div>`;
return html;
}}
function renderEvidenceTable(sources) {{
if (!Array.isArray(sources) || !sources.length) return '';
let rows = '';
for (const s of sources) {{
rows += `
<tr>
<td>${{esc(s.n)}}</td>
<td>${{esc(s.id)}}</td>
<td>${{esc(s.score)}}</td>
<td>${{esc(s["GROUPS"])}}</td>
<td>${{esc(s["FREQUENCY TYPE"])}}</td>
<td>${{esc(s["RESPONSIBILITY"])}}</td>
</tr>
`;
}}
return `
<div class="evidence">
<div class="title">Evidence</div>
<table>
<thead>
<tr>
<th>#</th>
<th>ID</th>
<th>Score</th>
<th>GROUPS</th>
<th>FREQUENCY</th>
<th>RESPONSIBILITY</th>
</tr>
</thead>
<tbody>${{rows}}</tbody>
</table>
</div>
`;
}}
async function send() {{
const q = document.getElementById('q').value;
const panel = document.getElementById('result');
panel.style.display = 'block';
panel.className = 'panel';
panel.innerHTML = `<div class="title">Working...</div><div class="muted">Embedding → Qdrant → LLM</div>`;
try {{
const r = await fetch('/ask', {{
method: 'POST',
headers: {{ 'Content-Type': 'application/json' }},
body: JSON.stringify({{ q }})
}});
const data = await r.json();
if (!data.ok) {{
panel.className = 'panel error';
panel.innerHTML = `
<div class="title">Error</div>
<div class="sub">${{esc(data.error ?? "Request failed")}}</div>
<pre>${{esc(JSON.stringify(data, null, 2))}}</pre>
`;
return;
}}
const audit = data.audit;
const sources = data.sources;
const auditHtml = renderAudit(audit);
const evidenceHtml = renderEvidenceTable(sources);
panel.innerHTML = `
${{auditHtml}}
${{evidenceHtml}}
<div class="row2">
<div class="checkbox">
<input id="rawToggle" type="checkbox" onchange="toggleRaw()" />
<label for="rawToggle">Show raw JSON</label>
</div>
<div class="right muted">TopK: ${{esc(data.debug?.top_k)}}</div>
</div>
<div id="rawBlock" style="display:none; margin-top:10px;">
<pre>${{esc(JSON.stringify(data, null, 2))}}</pre>
</div>
`;
}} catch (e) {{
panel.className = 'panel error';
panel.innerHTML = `<div class="title">Error</div><pre>${{esc(String(e))}}</pre>`;
}}
}}
function toggleRaw() {{
const cb = document.getElementById('rawToggle');
const block = document.getElementById('rawBlock');
if (!cb || !block) return;
block.style.display = cb.checked ? 'block' : 'none';
}}
</script>
</body>
</html>
""".strip()
# ============================================================
# API
# ============================================================
@app.post("/ask")
def ask(req: AskRequest):
q = (req.q or "").strip()
if not q:
return JSONResponse({"ok": False, "error": "Empty query"}, status_code=400)
if len(q) < MIN_QUERY_CHARS:
return JSONResponse(
{"ok": False, "error": f"Query too short (min {MIN_QUERY_CHARS} chars)"},
status_code=400,
)
if len(q) > MAX_QUERY_CHARS:
q = q[:MAX_QUERY_CHARS]
# 1) Embedding
try:
query_vec = embed(q)
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": "Embedding failed",
"details": str(e),
"debug": {
"embed_model": EMBED_MODEL,
"vector_size": VECTOR_SIZE,
},
},
status_code=500,
)
# 2) Qdrant search (REST)
try:
raw_points = qdrant_search_rest(query_vec, TOP_K)
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": "Qdrant search failed",
"details": str(e),
"debug": {
"qdrant_url": QDRANT_URL,
"collection": QDRANT_COLLECTION,
"qdrant_api_key_set": bool(QDRANT_API_KEY),
},
},
status_code=500,
)
# Normalize hits for downstream
hits: List[Dict[str, Any]] = []
for p in raw_points:
hits.append(
{
"id": p.get("id"),
"score": p.get("score"),
"payload": p.get("payload") or {},
}
)
# Evidence gate
strong_hits = sum(1 for h in hits if (h.get("score") or 0) >= SCORE_THRESHOLD)
evidence_text, sources = build_evidence_blocks(hits)
if strong_hits < MIN_STRONG_HITS:
return {
"ok": True,
"query": q,
"audit": {
"summary": "Insufficient evidence found in PMS data for a grounded audit answer.",
"findings": [
{
"topic": "Evidence gating",
"requirement": f"At least {MIN_STRONG_HITS} hits with score >= {SCORE_THRESHOLD}",
"observation": f"Only {strong_hits} strong hits were retrieved.",
"risk": "Answer may be speculative without sufficient PMS evidence.",
"evidence_refs": [],
}
],
"conclusion": "Please refine the question or ensure the relevant PMS/manual records exist in the collection.",
},
"sources": sources,
"hits": hits if RETURN_RAW_HITS else [],
"debug": {
"qdrant_url": QDRANT_URL,
"collection": QDRANT_COLLECTION,
"top_k": TOP_K,
"embed_model": EMBED_MODEL,
"vector_size": VECTOR_SIZE,
"llm_model": LLM_MODEL,
"strong_hits": strong_hits,
"score_threshold": SCORE_THRESHOLD,
"min_strong_hits": MIN_STRONG_HITS,
"llm_called": False,
},
}
# 3) LLM audit JSON (strict)
try:
audit = run_llm_audit_json(q, evidence_text)
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": "LLM failed",
"details": str(e),
"debug": {"llm_model": LLM_MODEL},
"sources": sources,
"hits": hits if RETURN_RAW_HITS else [],
},
status_code=500,
)
return {
"ok": True,
"query": q,
"audit": audit, # STRICT JSON (parsed)
"sources": sources, # compact evidence table for UI
"hits": hits if RETURN_RAW_HITS else [],
"debug": {
"qdrant_url": QDRANT_URL,
"collection": QDRANT_COLLECTION,
"top_k": TOP_K,
"embed_model": EMBED_MODEL,
"vector_size": VECTOR_SIZE,
"llm_model": LLM_MODEL,
"strong_hits": strong_hits,
"score_threshold": SCORE_THRESHOLD,
"min_strong_hits": MIN_STRONG_HITS,
"llm_called": True,
},
}