from __future__ import annotations import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import numpy as np from fastapi import FastAPI, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles try: from sentence_transformers import SentenceTransformer except Exception: SentenceTransformer = None APP_TITLE = "Human Intelligence" DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) CONVERSATIONS_FILE = DATA_DIR / "conversations.json" EMBED_FILE = DATA_DIR / "embeddings.json" TEMPLATES_DIR = Path("/app/templates") SIMILARITY_THRESHOLD = float(os.environ.get("SIMILARITY_THRESHOLD", "0.62")) EMBED_MODEL_NAME = os.environ.get( "EMBED_MODEL_NAME", "sentence-transformers/paraphrase-MiniLM-L6-v2", ) DATA_DIR.mkdir(parents=True, exist_ok=True) TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title=APP_TITLE) templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) _embed_model = None app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates") # ────────────────────── Utilities ────────────────────── def now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def read_json(path: Path, default: Any): if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def write_json(path: Path, data: Any) -> None: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text( json.dumps(data, ensure_ascii=False, indent=2, default=str), encoding="utf-8", ) tmp.replace(path) def get_client_id(request: Request, payload: dict | None = None) -> str: header_value = request.headers.get("x-client-id", "").strip() if header_value: return header_value if payload: payload_value = str(payload.get("client_id", "")).strip() if payload_value: return payload_value return "anon" def anon_label(client_id: str) -> str: return "Anonymous" # ────────────────────── Embeddings ────────────────────── def load_embed_model(): global _embed_model if _embed_model is None: if SentenceTransformer is None: return None _embed_model = SentenceTransformer(EMBED_MODEL_NAME) return _embed_model def embed_text(text: str) -> list[float]: model = load_embed_model() if model is None: return [] vec = model.encode(text, normalize_embeddings=True) if hasattr(vec, "tolist"): return vec.tolist() return list(vec) def load_embed_index() -> dict[str, dict[str, Any]]: data = read_json(EMBED_FILE, {}) return data if isinstance(data, dict) else {} def save_embed_index(idx: dict[str, dict[str, Any]]) -> None: write_json(EMBED_FILE, idx) # ────────────────────── Conversations CRUD ────────────────────── def load_conversations() -> list[dict[str, Any]]: data = read_json(CONVERSATIONS_FILE, []) if isinstance(data, dict) and "conversations" in data: data = data["conversations"] return data if isinstance(data, list) else [] def save_conversations(conversations: list[dict[str, Any]]) -> None: write_json(CONVERSATIONS_FILE, conversations) def normalize_version(version: dict[str, Any]) -> dict[str, Any]: v = dict(version or {}) v.setdefault("id", uuid.uuid4().hex) v.setdefault("text", "") v.setdefault("author", "Anonymous") v.setdefault("created_at", now_iso()) v.setdefault("votes", 0) v.setdefault("votes_by_client", {}) if not isinstance(v["votes_by_client"], dict): v["votes_by_client"] = {} v["votes"] = int(v.get("votes", 0)) return v def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]: a = dict(answer or {}) a.setdefault("id", uuid.uuid4().hex) a.setdefault("versions", []) a.setdefault("active_version", "") a.setdefault("created_at", now_iso()) a.setdefault("updated_at", a["created_at"]) versions = [ normalize_version(v) for v in a.get("versions", []) if isinstance(v, dict) ] a["versions"] = versions if versions: version_ids = {v["id"] for v in versions} if a["active_version"] not in version_ids: a["active_version"] = max( versions, key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), )["id"] return a def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]: c = dict(conversation or {}) c.setdefault("id", uuid.uuid4().hex) c.setdefault("question", "") c.setdefault("author", "Anonymous") c.setdefault("created_at", now_iso()) c.setdefault("updated_at", c["created_at"]) c.setdefault("turns", []) c.setdefault("answers", []) turns: list[dict[str, Any]] = [] for turn in c.get("turns", []): if not isinstance(turn, dict): continue t = dict(turn) t.setdefault("id", uuid.uuid4().hex) t.setdefault("role", "user") t.setdefault("text", "") t.setdefault("author", "Anonymous") t.setdefault("ts", now_iso()) turns.append(t) c["turns"] = turns c["answers"] = [ normalize_answer(a) for a in c.get("answers", []) if isinstance(a, dict) ] if not c["turns"] and c["question"]: c["turns"].append({ "id": uuid.uuid4().hex, "role": "user", "text": c["question"], "author": c.get("author", "Anonymous"), "ts": c["created_at"], }) return c def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]: if not conversation_id: return None for conv in load_conversations(): if str(conv.get("id")) == conversation_id: return normalize_conversation(conv) return None def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]: conversation = normalize_conversation(conversation) conversation["updated_at"] = now_iso() conversations = [normalize_conversation(c) for c in load_conversations()] replaced = False for i, existing in enumerate(conversations): if str(existing.get("id")) == str(conversation["id"]): conversations[i] = conversation replaced = True break if not replaced: conversations.insert(0, conversation) save_conversations(conversations) return conversation def ensure_embedding(conversation: dict[str, Any]) -> None: question = str(conversation.get("question", "")).strip() if not question: return vec = embed_text(question) if not vec: return idx = load_embed_index() idx[str(conversation["id"])] = { "question": question, "vector": vec, } save_embed_index(idx) # ────────────────────── Semantic search ────────────────────── def find_similar_conversation( question: str, exclude_id: str | None = None, ) -> Optional[dict[str, Any]]: idx = load_embed_index() if not idx or SentenceTransformer is None: return None q_vec = np.array(embed_text(question), dtype=float) if q_vec.size == 0: return None ids = [cid for cid in idx if cid != exclude_id] if not ids: return None try: vecs = np.array([idx[cid]["vector"] for cid in ids], dtype=float) except Exception: return None if vecs.size == 0 or vecs.ndim != 2: return None if vecs.shape[1] != q_vec.shape[0]: return None sims = vecs @ q_vec best_i = int(np.argmax(sims)) score = float(sims[best_i]) if score < SIMILARITY_THRESHOLD: return None conv = load_conversation(ids[best_i]) if conv is None: return None return {"conversation": conv, "score": score} def find_top_k_similar(question: str, k: int = 3) -> list[dict[str, Any]]: idx = load_embed_index() if not idx or SentenceTransformer is None: return [] q_vec = np.array(embed_text(question), dtype=float) if q_vec.size == 0: return [] results: list[tuple[str, float]] = [] for cid, data in idx.items(): try: vec = np.array(data["vector"], dtype=float) except Exception: continue if vec.shape != q_vec.shape: continue score = float(vec @ q_vec) results.append((cid, score)) results.sort(key=lambda x: x[1], reverse=True) out: list[dict[str, Any]] = [] for cid, score in results[:k]: conv = load_conversation(cid) if not conv: continue best = best_answer_payload(conv) if not best: continue out.append({ "conversation_id": cid, "question": conv.get("question"), "answer": best["text"], "score": score, }) return out # ────────────────────── Actions ────────────────────── def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]: question = question.strip() now = now_iso() conversation = { "id": uuid.uuid4().hex, "question": question, "author": author, "created_at": now, "updated_at": now, "turns": [{ "id": uuid.uuid4().hex, "role": "user", "text": question, "author": author, "ts": now, }], "answers": [], } conversation = save_conversation(conversation) ensure_embedding(conversation) return conversation def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]: versions = answer.get("versions", []) if not versions: return None active_id = answer.get("active_version") for version in versions: if version.get("id") == active_id: return version return max( versions, key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), ) def answer_score(answer: dict[str, Any]) -> tuple[int, str]: av = active_version(answer) if av is None: return 0, str(answer.get("created_at", "")) return int(av.get("votes", 0)), str(answer.get("created_at", "")) def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]: answers = conversation.get("answers", []) if not answers: return None best = max(answers, key=answer_score) av = active_version(best) if av is None: return None return { "answer_id": best["id"], "version_id": av["id"], "text": av["text"], "votes": int(av.get("votes", 0)), "author": av.get("author", "Anonymous"), "created_at": av.get("created_at", ""), } def add_answer( conversation_id: str, text: str, author: str = "Anonymous", question_if_new: str | None = None, ) -> tuple[Optional[dict[str, Any]], str]: text = text.strip() if not text: return None, "empty answer" conversation = load_conversation(conversation_id) # If user is answering a NEW question (even if matched), # create a new conversation instead of polluting the matched one. if question_if_new: if conversation is None or conversation.get("question") != question_if_new: conversation = create_conversation(question_if_new, author) elif conversation is None: return None, "conversation not found" now = now_iso() version = normalize_version({ "text": text, "author": author, "created_at": now, "votes": 0, "votes_by_client": {}, }) answer = normalize_answer({ "id": uuid.uuid4().hex, "versions": [version], "active_version": version["id"], "created_at": now, "updated_at": now, }) conversation["answers"].append(answer) conversation["turns"].append({ "id": uuid.uuid4().hex, "role": "assistant", "text": text, "author": author, "answer_id": answer["id"], "version_id": version["id"], "ts": now, }) save_conversation(conversation) return conversation, "ok" def propose_version( conversation_id: str, answer_id: str, text: str, author: str = "Anonymous", ) -> tuple[Optional[dict[str, Any]], str]: text = text.strip() if not text: return None, "empty proposal" conversation = load_conversation(conversation_id) if conversation is None: return None, "conversation not found" for answer in conversation["answers"]: if str(answer.get("id")) != answer_id: continue now = now_iso() version = normalize_version({ "text": text, "author": author, "created_at": now, "votes": 0, "votes_by_client": {}, }) answer["versions"].append(version) answer["updated_at"] = now save_conversation(conversation) return conversation, "ok" return None, "answer not found" def vote_version( conversation_id: str, answer_id: str, version_id: str, client_id: str, delta: int, ) -> tuple[Optional[dict[str, Any]], str]: conversation = load_conversation(conversation_id) if conversation is None: return None, "conversation not found" delta = 1 if int(delta) >= 0 else -1 for answer in conversation["answers"]: if str(answer.get("id")) != answer_id: continue for version in answer.get("versions", []): if str(version.get("id")) != version_id: continue votes_by_client = version.setdefault("votes_by_client", {}) if not isinstance(votes_by_client, dict): votes_by_client = {} version["votes_by_client"] = votes_by_client current = int(votes_by_client.get(client_id, 0)) if current == delta: return conversation, "already_voted" votes_by_client[client_id] = delta version["votes"] = int(sum(int(v) for v in votes_by_client.values())) if answer.get("versions"): answer["active_version"] = max( answer["versions"], key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), )["id"] conversation["updated_at"] = now_iso() save_conversation(conversation) return conversation, "ok" return None, "version not found" # ────────────────────── Routes ────────────────────── @app.get("/", response_class=HTMLResponse) def home(request: Request): init = { "ok": True, "client_id": get_client_id(request), "conversation": None, } return templates.TemplateResponse( "index.html", { "request": request, "app_title": APP_TITLE, "init_json": json.dumps(init, ensure_ascii=False), }, ) @app.get("/logo.png") def logo(): logo_path = Path(__file__).with_name("logo.png") if logo_path.exists(): return FileResponse(logo_path) return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404) @app.get("/health") def health(): return {"ok": True} @app.post("/api") async def api(request: Request): try: payload = await request.json() except Exception: return JSONResponse({"ok": False, "error": "bad payload"}) action = str(payload.get("action", "")) client_id = get_client_id(request, payload) author = anon_label(client_id) # ── init ── if action == "init": conversation_id = str(payload.get("conversation_id", "")).strip() conversation = load_conversation(conversation_id) if conversation_id else None return JSONResponse({ "ok": True, "client_id": client_id, "conversation": conversation, }) # ── get_conversation ── if action == "get_conversation": conversation_id = str(payload.get("conversation_id", "")).strip() conversation = load_conversation(conversation_id) if conversation is None: return JSONResponse({"ok": False, "error": "not found"}) return JSONResponse({"ok": True, "conversation": conversation}) # ── ask ── if action == "ask": question = str(payload.get("question", "")).strip() if not question: return JSONResponse({"ok": False, "error": "empty question"}) # 1) Search FIRST — before creating anything match = find_similar_conversation(question) if match and match.get("conversation"): conversation = match["conversation"] best = best_answer_payload(conversation) related = find_top_k_similar(question, k=3) return JSONResponse({ "ok": True, "matched": True, "similarity": match["score"], "conversation": conversation, "assistant_text": best["text"] if best else "No answer yet. You can write one.", "best_answer": best, "related": related, }) # 2) No match — create new conversation = create_conversation(question, author) return JSONResponse({ "ok": True, "matched": False, "conversation": conversation, "assistant_text": "No answer yet. You can write one.", "best_answer": None, "related": [], }) # ── answer ── if action == "answer": conversation_id = str(payload.get("conversation_id", "")).strip() text = str(payload.get("text", "")).strip() question = str(payload.get("question", "")).strip() or None conversation, msg = add_answer( conversation_id=conversation_id, text=text, author=author, question_if_new=question, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) return JSONResponse({"ok": True, "conversation": conversation}) # ── propose ── if action == "propose": conversation_id = str(payload.get("conversation_id", "")).strip() answer_id = str(payload.get("answer_id", "")).strip() text = str(payload.get("text", "")).strip() conversation, msg = propose_version( conversation_id=conversation_id, answer_id=answer_id, text=text, author=author, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) return JSONResponse({"ok": True, "conversation": conversation}) # ── vote ── if action == "vote": conversation_id = str(payload.get("conversation_id", "")).strip() answer_id = str(payload.get("answer_id", "")).strip() version_id = str(payload.get("version_id", "")).strip() delta = int(payload.get("delta", 1)) conversation, msg = vote_version( conversation_id=conversation_id, answer_id=answer_id, version_id=version_id, client_id=client_id, delta=delta, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) if msg == "already_voted": return JSONResponse({"ok": False, "error": "already voted"}) return JSONResponse({"ok": True, "conversation": conversation}) return JSONResponse({"ok": False, "error": f"unknown action: {action}"}) if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False)