import os import json import html from typing import Any, Dict, List, Optional, Tuple import requests from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel from openai import OpenAI # =============================== # ENV / CONFIG (PROD-like) # =============================== load_dotenv() DEBUG_STARTUP_LOGS = os.getenv("DEBUG_STARTUP_LOGS", "0").strip().lower() in ("1", "true", "yes") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip() if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY is missing. Put it into .env") QDRANT_URL = os.getenv("QDRANT_URL", "http://127.0.0.1:6333").strip().rstrip("/") QDRANT_COLLECTION = os.getenv("QDRANT_COLLECTION", "pms_equipment").strip() QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "").strip() EMBED_MODEL = os.getenv("EMBED_MODEL", "text-embedding-3-small").strip() VECTOR_SIZE = int(os.getenv("VECTOR_SIZE", "1536").strip()) TOP_K = int(os.getenv("TOP_K", "5").strip()) # =============================== # Evidence gate (PROD) # =============================== SCORE_THRESHOLD = float(os.getenv("SCORE_THRESHOLD", "0.55")) MIN_STRONG_HITS = int(os.getenv("MIN_STRONG_HITS", "1")) # =============================== # Payload / token hygiene # =============================== MAX_QUERY_CHARS = int(os.getenv("MAX_QUERY_CHARS", "800").strip()) MIN_QUERY_CHARS = int(os.getenv("MIN_QUERY_CHARS", "3").strip()) MAX_EVIDENCE_CHARS = int(os.getenv("MAX_EVIDENCE_CHARS", "12000").strip()) RETURN_RAW_HITS = os.getenv("RETURN_RAW_HITS", "1").strip().lower() in ("1", "true", "yes") # =============================== # LLM # =============================== LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini").strip() # JSON-only audit answer if DEBUG_STARTUP_LOGS: print("QDRANT_URL =", QDRANT_URL) print("QDRANT_COLLECTION =", QDRANT_COLLECTION) print("QDRANT_API_KEY =", "SET" if QDRANT_API_KEY else "MISSING") print("EMBED_MODEL =", EMBED_MODEL) print("VECTOR_SIZE =", VECTOR_SIZE) print("TOP_K =", TOP_K) print("LLM_MODEL =", LLM_MODEL) # =============================== # CLIENTS # =============================== oai = OpenAI(api_key=OPENAI_API_KEY) # =============================== # APP # =============================== app = FastAPI(title="PMS Copilot — RAG MVP") # ============================================================ # SCHEMAS # ============================================================ class AskRequest(BaseModel): q: str # ============================================================ # HELPERS # ============================================================ def embed(text: str) -> List[float]: """OpenAI embeddings -> vector[VECTOR_SIZE].""" resp = oai.embeddings.create(model=EMBED_MODEL, input=text) vec = resp.data[0].embedding if len(vec) != VECTOR_SIZE: raise RuntimeError( f"Embedding dim mismatch: got {len(vec)} but VECTOR_SIZE={VECTOR_SIZE}. " f"Check EMBED_MODEL / VECTOR_SIZE in .env" ) return vec def qdrant_search_rest(query_vec: List[float], limit: int) -> List[Dict[str, Any]]: """ Qdrant REST search (robust, avoids qdrant_client version/SyncApis issues). Returns list of points: [{"id":..., "score":..., "payload": {...}}, ...] """ url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/search" payload = { "vector": query_vec, "limit": limit, "with_payload": True, "with_vectors": False, } headers: Dict[str, str] = {} # Qdrant Cloud/self-host can require an API key. For Qdrant Cloud, "api-key" is commonly used. if QDRANT_API_KEY: headers["api-key"] = QDRANT_API_KEY r = requests.post(url, json=payload, headers=headers, timeout=30) r.raise_for_status() data = r.json() return data.get("result", []) def pick_text_from_payload(payload: Dict[str, Any]) -> Optional[str]: """Extract readable text from payload (support common field names).""" for k in ("text", "chunk", "content", "page_content", "body", "passage", "PROCEDURE"): v = payload.get(k) if isinstance(v, str) and v.strip(): return v.strip() if payload: keys_pref = ["GROUPS", "FREQUENCY TYPE", "MAINTENANCE HEAD", "RESPONSIBILITY", "PROCEDURE"] parts = [] for k in keys_pref: if k in payload and payload[k] not in (None, ""): parts.append(f"{k}: {payload[k]}") if parts: return " | ".join(parts) return None def build_evidence_blocks(hits: List[Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]: """ Build evidence list for LLM: - evidence_text: lines like [1] ... - sources: minimal metadata for UI """ evidence_lines: List[str] = [] sources: List[Dict[str, Any]] = [] for i, h in enumerate(hits, start=1): payload = h.get("payload") or {} text = pick_text_from_payload(payload) or "" text = text.replace("\r", " ").replace("\n", " ").strip() if not text: text = json.dumps(payload, ensure_ascii=False) evidence_lines.append(f"[{i}] {text}") sources.append( { "n": i, "id": h.get("id"), "score": h.get("score"), "GROUPS": payload.get("GROUPS"), "FREQUENCY TYPE": payload.get("FREQUENCY TYPE"), "MAINTENANCE HEAD": payload.get("MAINTENANCE HEAD"), "RESPONSIBILITY": payload.get("RESPONSIBILITY"), } ) evidence_text = "\n".join(evidence_lines) if len(evidence_text) > MAX_EVIDENCE_CHARS: evidence_text = evidence_text[:MAX_EVIDENCE_CHARS] + "\n...[TRUNCATED]" return evidence_text, sources def _extract_first_json_object(s: str) -> str: """ Best-effort recovery if LLM outputs extra text. Returns substring from first '{' to last '}'. """ if not s: return s start = s.find("{") end = s.rfind("}") if start == -1 or end == -1 or end <= start: return s return s[start : end + 1] def run_llm_audit_json(query: str, evidence_text: str) -> Dict[str, Any]: """ LLM audit-style answer. STRICT JSON ONLY (enforced by system contract + JSON parse). """ system_prompt = """ You are a maritime audit assistant. RULES (MANDATORY): - Output MUST be valid JSON - NO markdown - NO explanations - NO text outside JSON - Use ONLY the provided evidence - If information is missing, use "Not found in provided records" JSON SCHEMA (exact): { "summary": string, "findings": [ { "topic": string, "requirement": string, "observation": string, "risk": string, "evidence_refs": [number] } ], "conclusion": string } """.strip() user_prompt = f""" AUDIT QUESTION: {query} EVIDENCE: {evidence_text} """.strip() resp = oai.responses.create( model=LLM_MODEL, input=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=0, ) raw = resp.output_text or "" candidate = _extract_first_json_object(raw) try: return json.loads(candidate) except json.JSONDecodeError as e: raise RuntimeError(f"LLM returned invalid JSON: {e}\n\nRAW OUTPUT:\n{raw}") # ============================================================ # API: HEALTH # ============================================================ @app.get("/health") def health(): return {"status": "ok"} # ============================================================ # UI (HTML) # ============================================================ @app.get("/", response_class=HTMLResponse) def home(): qdrant_url_html = html.escape(QDRANT_URL) coll_html = html.escape(QDRANT_COLLECTION) embed_html = html.escape(EMBED_MODEL) llm_html = html.escape(LLM_MODEL) return f""" PMS Copilot — RAG MVP

PMS Copilot — RAG MVP

Qdrant: {qdrant_url_html} · Collection: {coll_html} · Embed: {embed_html} · TopK: {TOP_K} · LLM: {llm_html}
""".strip() # ============================================================ # API # ============================================================ @app.post("/ask") def ask(req: AskRequest): q = (req.q or "").strip() if not q: return JSONResponse({"ok": False, "error": "Empty query"}, status_code=400) if len(q) < MIN_QUERY_CHARS: return JSONResponse( {"ok": False, "error": f"Query too short (min {MIN_QUERY_CHARS} chars)"}, status_code=400, ) if len(q) > MAX_QUERY_CHARS: q = q[:MAX_QUERY_CHARS] # 1) Embedding try: query_vec = embed(q) except Exception as e: return JSONResponse( { "ok": False, "error": "Embedding failed", "details": str(e), "debug": { "embed_model": EMBED_MODEL, "vector_size": VECTOR_SIZE, }, }, status_code=500, ) # 2) Qdrant search (REST) try: raw_points = qdrant_search_rest(query_vec, TOP_K) except Exception as e: return JSONResponse( { "ok": False, "error": "Qdrant search failed", "details": str(e), "debug": { "qdrant_url": QDRANT_URL, "collection": QDRANT_COLLECTION, "qdrant_api_key_set": bool(QDRANT_API_KEY), }, }, status_code=500, ) # Normalize hits for downstream hits: List[Dict[str, Any]] = [] for p in raw_points: hits.append( { "id": p.get("id"), "score": p.get("score"), "payload": p.get("payload") or {}, } ) # Evidence gate strong_hits = sum(1 for h in hits if (h.get("score") or 0) >= SCORE_THRESHOLD) evidence_text, sources = build_evidence_blocks(hits) if strong_hits < MIN_STRONG_HITS: return { "ok": True, "query": q, "audit": { "summary": "Insufficient evidence found in PMS data for a grounded audit answer.", "findings": [ { "topic": "Evidence gating", "requirement": f"At least {MIN_STRONG_HITS} hits with score >= {SCORE_THRESHOLD}", "observation": f"Only {strong_hits} strong hits were retrieved.", "risk": "Answer may be speculative without sufficient PMS evidence.", "evidence_refs": [], } ], "conclusion": "Please refine the question or ensure the relevant PMS/manual records exist in the collection.", }, "sources": sources, "hits": hits if RETURN_RAW_HITS else [], "debug": { "qdrant_url": QDRANT_URL, "collection": QDRANT_COLLECTION, "top_k": TOP_K, "embed_model": EMBED_MODEL, "vector_size": VECTOR_SIZE, "llm_model": LLM_MODEL, "strong_hits": strong_hits, "score_threshold": SCORE_THRESHOLD, "min_strong_hits": MIN_STRONG_HITS, "llm_called": False, }, } # 3) LLM audit JSON (strict) try: audit = run_llm_audit_json(q, evidence_text) except Exception as e: return JSONResponse( { "ok": False, "error": "LLM failed", "details": str(e), "debug": {"llm_model": LLM_MODEL}, "sources": sources, "hits": hits if RETURN_RAW_HITS else [], }, status_code=500, ) return { "ok": True, "query": q, "audit": audit, # STRICT JSON (parsed) "sources": sources, # compact evidence table for UI "hits": hits if RETURN_RAW_HITS else [], "debug": { "qdrant_url": QDRANT_URL, "collection": QDRANT_COLLECTION, "top_k": TOP_K, "embed_model": EMBED_MODEL, "vector_size": VECTOR_SIZE, "llm_model": LLM_MODEL, "strong_hits": strong_hits, "score_threshold": SCORE_THRESHOLD, "min_strong_hits": MIN_STRONG_HITS, "llm_called": True, }, }