Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| import numpy as np | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception: | |
| SentenceTransformer = None | |
| APP_TITLE = "Human Intelligence" | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) | |
| CONVERSATIONS_FILE = DATA_DIR / "conversations.json" | |
| EMBED_FILE = DATA_DIR / "embeddings.json" | |
| TEMPLATES_DIR = Path("/app/templates") | |
| SIMILARITY_THRESHOLD = float(os.environ.get("SIMILARITY_THRESHOLD", "0.62")) | |
| EMBED_MODEL_NAME = os.environ.get( | |
| "EMBED_MODEL_NAME", | |
| "sentence-transformers/paraphrase-MiniLM-L6-v2", | |
| ) | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) | |
| app = FastAPI(title=APP_TITLE) | |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) | |
| _embed_model = None | |
| app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates") | |
| # ββββββββββββββββββββββ Utilities ββββββββββββββββββββββ | |
| def now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| def read_json(path: Path, default: Any): | |
| if not path.exists(): | |
| return default | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return default | |
| def write_json(path: Path, data: Any) -> None: | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| tmp.write_text( | |
| json.dumps(data, ensure_ascii=False, indent=2, default=str), | |
| encoding="utf-8", | |
| ) | |
| tmp.replace(path) | |
| def get_client_id(request: Request, payload: dict | None = None) -> str: | |
| header_value = request.headers.get("x-client-id", "").strip() | |
| if header_value: | |
| return header_value | |
| if payload: | |
| payload_value = str(payload.get("client_id", "")).strip() | |
| if payload_value: | |
| return payload_value | |
| return "anon" | |
| def anon_label(client_id: str) -> str: | |
| return "Anonymous" | |
| # ββββββββββββββββββββββ Embeddings ββββββββββββββββββββββ | |
| def load_embed_model(): | |
| global _embed_model | |
| if _embed_model is None: | |
| if SentenceTransformer is None: | |
| return None | |
| _embed_model = SentenceTransformer(EMBED_MODEL_NAME) | |
| return _embed_model | |
| def embed_text(text: str) -> list[float]: | |
| model = load_embed_model() | |
| if model is None: | |
| return [] | |
| vec = model.encode(text, normalize_embeddings=True) | |
| if hasattr(vec, "tolist"): | |
| return vec.tolist() | |
| return list(vec) | |
| def load_embed_index() -> dict[str, dict[str, Any]]: | |
| data = read_json(EMBED_FILE, {}) | |
| return data if isinstance(data, dict) else {} | |
| def save_embed_index(idx: dict[str, dict[str, Any]]) -> None: | |
| write_json(EMBED_FILE, idx) | |
| # ββββββββββββββββββββββ Conversations CRUD ββββββββββββββββββββββ | |
| def load_conversations() -> list[dict[str, Any]]: | |
| data = read_json(CONVERSATIONS_FILE, []) | |
| if isinstance(data, dict) and "conversations" in data: | |
| data = data["conversations"] | |
| return data if isinstance(data, list) else [] | |
| def save_conversations(conversations: list[dict[str, Any]]) -> None: | |
| write_json(CONVERSATIONS_FILE, conversations) | |
| def normalize_version(version: dict[str, Any]) -> dict[str, Any]: | |
| v = dict(version or {}) | |
| v.setdefault("id", uuid.uuid4().hex) | |
| v.setdefault("text", "") | |
| v.setdefault("author", "Anonymous") | |
| v.setdefault("created_at", now_iso()) | |
| v.setdefault("votes", 0) | |
| v.setdefault("votes_by_client", {}) | |
| if not isinstance(v["votes_by_client"], dict): | |
| v["votes_by_client"] = {} | |
| v["votes"] = int(v.get("votes", 0)) | |
| return v | |
| def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]: | |
| a = dict(answer or {}) | |
| a.setdefault("id", uuid.uuid4().hex) | |
| a.setdefault("versions", []) | |
| a.setdefault("active_version", "") | |
| a.setdefault("created_at", now_iso()) | |
| a.setdefault("updated_at", a["created_at"]) | |
| versions = [ | |
| normalize_version(v) | |
| for v in a.get("versions", []) | |
| if isinstance(v, dict) | |
| ] | |
| a["versions"] = versions | |
| if versions: | |
| version_ids = {v["id"] for v in versions} | |
| if a["active_version"] not in version_ids: | |
| a["active_version"] = max( | |
| versions, | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| )["id"] | |
| return a | |
| def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]: | |
| c = dict(conversation or {}) | |
| c.setdefault("id", uuid.uuid4().hex) | |
| c.setdefault("question", "") | |
| c.setdefault("author", "Anonymous") | |
| c.setdefault("created_at", now_iso()) | |
| c.setdefault("updated_at", c["created_at"]) | |
| c.setdefault("turns", []) | |
| c.setdefault("answers", []) | |
| turns: list[dict[str, Any]] = [] | |
| for turn in c.get("turns", []): | |
| if not isinstance(turn, dict): | |
| continue | |
| t = dict(turn) | |
| t.setdefault("id", uuid.uuid4().hex) | |
| t.setdefault("role", "user") | |
| t.setdefault("text", "") | |
| t.setdefault("author", "Anonymous") | |
| t.setdefault("ts", now_iso()) | |
| turns.append(t) | |
| c["turns"] = turns | |
| c["answers"] = [ | |
| normalize_answer(a) | |
| for a in c.get("answers", []) | |
| if isinstance(a, dict) | |
| ] | |
| if not c["turns"] and c["question"]: | |
| c["turns"].append({ | |
| "id": uuid.uuid4().hex, | |
| "role": "user", | |
| "text": c["question"], | |
| "author": c.get("author", "Anonymous"), | |
| "ts": c["created_at"], | |
| }) | |
| return c | |
| def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]: | |
| if not conversation_id: | |
| return None | |
| for conv in load_conversations(): | |
| if str(conv.get("id")) == conversation_id: | |
| return normalize_conversation(conv) | |
| return None | |
| def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]: | |
| conversation = normalize_conversation(conversation) | |
| conversation["updated_at"] = now_iso() | |
| conversations = [normalize_conversation(c) for c in load_conversations()] | |
| replaced = False | |
| for i, existing in enumerate(conversations): | |
| if str(existing.get("id")) == str(conversation["id"]): | |
| conversations[i] = conversation | |
| replaced = True | |
| break | |
| if not replaced: | |
| conversations.insert(0, conversation) | |
| save_conversations(conversations) | |
| return conversation | |
| def ensure_embedding(conversation: dict[str, Any]) -> None: | |
| question = str(conversation.get("question", "")).strip() | |
| if not question: | |
| return | |
| vec = embed_text(question) | |
| if not vec: | |
| return | |
| idx = load_embed_index() | |
| idx[str(conversation["id"])] = { | |
| "question": question, | |
| "vector": vec, | |
| } | |
| save_embed_index(idx) | |
| # ββββββββββββββββββββββ Semantic search ββββββββββββββββββββββ | |
| def find_similar_conversation( | |
| question: str, | |
| exclude_id: str | None = None, | |
| ) -> Optional[dict[str, Any]]: | |
| idx = load_embed_index() | |
| if not idx or SentenceTransformer is None: | |
| return None | |
| q_vec = np.array(embed_text(question), dtype=float) | |
| if q_vec.size == 0: | |
| return None | |
| ids = [cid for cid in idx if cid != exclude_id] | |
| if not ids: | |
| return None | |
| try: | |
| vecs = np.array([idx[cid]["vector"] for cid in ids], dtype=float) | |
| except Exception: | |
| return None | |
| if vecs.size == 0 or vecs.ndim != 2: | |
| return None | |
| if vecs.shape[1] != q_vec.shape[0]: | |
| return None | |
| sims = vecs @ q_vec | |
| best_i = int(np.argmax(sims)) | |
| score = float(sims[best_i]) | |
| if score < SIMILARITY_THRESHOLD: | |
| return None | |
| conv = load_conversation(ids[best_i]) | |
| if conv is None: | |
| return None | |
| return {"conversation": conv, "score": score} | |
| def find_top_k_similar(question: str, k: int = 3) -> list[dict[str, Any]]: | |
| idx = load_embed_index() | |
| if not idx or SentenceTransformer is None: | |
| return [] | |
| q_vec = np.array(embed_text(question), dtype=float) | |
| if q_vec.size == 0: | |
| return [] | |
| results: list[tuple[str, float]] = [] | |
| for cid, data in idx.items(): | |
| try: | |
| vec = np.array(data["vector"], dtype=float) | |
| except Exception: | |
| continue | |
| if vec.shape != q_vec.shape: | |
| continue | |
| score = float(vec @ q_vec) | |
| results.append((cid, score)) | |
| results.sort(key=lambda x: x[1], reverse=True) | |
| out: list[dict[str, Any]] = [] | |
| for cid, score in results[:k]: | |
| conv = load_conversation(cid) | |
| if not conv: | |
| continue | |
| best = best_answer_payload(conv) | |
| if not best: | |
| continue | |
| out.append({ | |
| "conversation_id": cid, | |
| "question": conv.get("question"), | |
| "answer": best["text"], | |
| "score": score, | |
| }) | |
| return out | |
| # ββββββββββββββββββββββ Actions ββββββββββββββββββββββ | |
| def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]: | |
| question = question.strip() | |
| now = now_iso() | |
| conversation = { | |
| "id": uuid.uuid4().hex, | |
| "question": question, | |
| "author": author, | |
| "created_at": now, | |
| "updated_at": now, | |
| "turns": [{ | |
| "id": uuid.uuid4().hex, | |
| "role": "user", | |
| "text": question, | |
| "author": author, | |
| "ts": now, | |
| }], | |
| "answers": [], | |
| } | |
| conversation = save_conversation(conversation) | |
| ensure_embedding(conversation) | |
| return conversation | |
| def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]: | |
| versions = answer.get("versions", []) | |
| if not versions: | |
| return None | |
| active_id = answer.get("active_version") | |
| for version in versions: | |
| if version.get("id") == active_id: | |
| return version | |
| return max( | |
| versions, | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| ) | |
| def answer_score(answer: dict[str, Any]) -> tuple[int, str]: | |
| av = active_version(answer) | |
| if av is None: | |
| return 0, str(answer.get("created_at", "")) | |
| return int(av.get("votes", 0)), str(answer.get("created_at", "")) | |
| def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]: | |
| answers = conversation.get("answers", []) | |
| if not answers: | |
| return None | |
| best = max(answers, key=answer_score) | |
| av = active_version(best) | |
| if av is None: | |
| return None | |
| return { | |
| "answer_id": best["id"], | |
| "version_id": av["id"], | |
| "text": av["text"], | |
| "votes": int(av.get("votes", 0)), | |
| "author": av.get("author", "Anonymous"), | |
| "created_at": av.get("created_at", ""), | |
| } | |
| def add_answer( | |
| conversation_id: str, | |
| text: str, | |
| author: str = "Anonymous", | |
| question_if_new: str | None = None, | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| text = text.strip() | |
| if not text: | |
| return None, "empty answer" | |
| conversation = load_conversation(conversation_id) | |
| # If user is answering a NEW question (even if matched), | |
| # create a new conversation instead of polluting the matched one. | |
| if question_if_new: | |
| if conversation is None or conversation.get("question") != question_if_new: | |
| conversation = create_conversation(question_if_new, author) | |
| elif conversation is None: | |
| return None, "conversation not found" | |
| now = now_iso() | |
| version = normalize_version({ | |
| "text": text, | |
| "author": author, | |
| "created_at": now, | |
| "votes": 0, | |
| "votes_by_client": {}, | |
| }) | |
| answer = normalize_answer({ | |
| "id": uuid.uuid4().hex, | |
| "versions": [version], | |
| "active_version": version["id"], | |
| "created_at": now, | |
| "updated_at": now, | |
| }) | |
| conversation["answers"].append(answer) | |
| conversation["turns"].append({ | |
| "id": uuid.uuid4().hex, | |
| "role": "assistant", | |
| "text": text, | |
| "author": author, | |
| "answer_id": answer["id"], | |
| "version_id": version["id"], | |
| "ts": now, | |
| }) | |
| save_conversation(conversation) | |
| return conversation, "ok" | |
| def propose_version( | |
| conversation_id: str, | |
| answer_id: str, | |
| text: str, | |
| author: str = "Anonymous", | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| text = text.strip() | |
| if not text: | |
| return None, "empty proposal" | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return None, "conversation not found" | |
| for answer in conversation["answers"]: | |
| if str(answer.get("id")) != answer_id: | |
| continue | |
| now = now_iso() | |
| version = normalize_version({ | |
| "text": text, | |
| "author": author, | |
| "created_at": now, | |
| "votes": 0, | |
| "votes_by_client": {}, | |
| }) | |
| answer["versions"].append(version) | |
| answer["updated_at"] = now | |
| save_conversation(conversation) | |
| return conversation, "ok" | |
| return None, "answer not found" | |
| def vote_version( | |
| conversation_id: str, | |
| answer_id: str, | |
| version_id: str, | |
| client_id: str, | |
| delta: int, | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return None, "conversation not found" | |
| delta = 1 if int(delta) >= 0 else -1 | |
| for answer in conversation["answers"]: | |
| if str(answer.get("id")) != answer_id: | |
| continue | |
| for version in answer.get("versions", []): | |
| if str(version.get("id")) != version_id: | |
| continue | |
| votes_by_client = version.setdefault("votes_by_client", {}) | |
| if not isinstance(votes_by_client, dict): | |
| votes_by_client = {} | |
| version["votes_by_client"] = votes_by_client | |
| current = int(votes_by_client.get(client_id, 0)) | |
| if current == delta: | |
| return conversation, "already_voted" | |
| votes_by_client[client_id] = delta | |
| version["votes"] = int(sum(int(v) for v in votes_by_client.values())) | |
| if answer.get("versions"): | |
| answer["active_version"] = max( | |
| answer["versions"], | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| )["id"] | |
| conversation["updated_at"] = now_iso() | |
| save_conversation(conversation) | |
| return conversation, "ok" | |
| return None, "version not found" | |
| # ββββββββββββββββββββββ Routes ββββββββββββββββββββββ | |
| def home(request: Request): | |
| init = { | |
| "ok": True, | |
| "client_id": get_client_id(request), | |
| "conversation": None, | |
| } | |
| return templates.TemplateResponse( | |
| "index.html", | |
| { | |
| "request": request, | |
| "app_title": APP_TITLE, | |
| "init_json": json.dumps(init, ensure_ascii=False), | |
| }, | |
| ) | |
| def logo(): | |
| logo_path = Path(__file__).with_name("logo.png") | |
| if logo_path.exists(): | |
| return FileResponse(logo_path) | |
| return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404) | |
| def health(): | |
| return {"ok": True} | |
| async def api(request: Request): | |
| try: | |
| payload = await request.json() | |
| except Exception: | |
| return JSONResponse({"ok": False, "error": "bad payload"}) | |
| action = str(payload.get("action", "")) | |
| client_id = get_client_id(request, payload) | |
| author = anon_label(client_id) | |
| # ββ init ββ | |
| if action == "init": | |
| conversation_id = str(payload.get("conversation_id", "")).strip() | |
| conversation = load_conversation(conversation_id) if conversation_id else None | |
| return JSONResponse({ | |
| "ok": True, | |
| "client_id": client_id, | |
| "conversation": conversation, | |
| }) | |
| # ββ get_conversation ββ | |
| if action == "get_conversation": | |
| conversation_id = str(payload.get("conversation_id", "")).strip() | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": "not found"}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| # ββ ask ββ | |
| if action == "ask": | |
| question = str(payload.get("question", "")).strip() | |
| if not question: | |
| return JSONResponse({"ok": False, "error": "empty question"}) | |
| # 1) Search FIRST β before creating anything | |
| match = find_similar_conversation(question) | |
| if match and match.get("conversation"): | |
| conversation = match["conversation"] | |
| best = best_answer_payload(conversation) | |
| related = find_top_k_similar(question, k=3) | |
| return JSONResponse({ | |
| "ok": True, | |
| "matched": True, | |
| "similarity": match["score"], | |
| "conversation": conversation, | |
| "assistant_text": best["text"] if best else "No answer yet. You can write one.", | |
| "best_answer": best, | |
| "related": related, | |
| }) | |
| # 2) No match β create new | |
| conversation = create_conversation(question, author) | |
| return JSONResponse({ | |
| "ok": True, | |
| "matched": False, | |
| "conversation": conversation, | |
| "assistant_text": "No answer yet. You can write one.", | |
| "best_answer": None, | |
| "related": [], | |
| }) | |
| # ββ answer ββ | |
| if action == "answer": | |
| conversation_id = str(payload.get("conversation_id", "")).strip() | |
| text = str(payload.get("text", "")).strip() | |
| question = str(payload.get("question", "")).strip() or None | |
| conversation, msg = add_answer( | |
| conversation_id=conversation_id, | |
| text=text, | |
| author=author, | |
| question_if_new=question, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| # ββ propose ββ | |
| if action == "propose": | |
| conversation_id = str(payload.get("conversation_id", "")).strip() | |
| answer_id = str(payload.get("answer_id", "")).strip() | |
| text = str(payload.get("text", "")).strip() | |
| conversation, msg = propose_version( | |
| conversation_id=conversation_id, | |
| answer_id=answer_id, | |
| text=text, | |
| author=author, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| # ββ vote ββ | |
| if action == "vote": | |
| conversation_id = str(payload.get("conversation_id", "")).strip() | |
| answer_id = str(payload.get("answer_id", "")).strip() | |
| version_id = str(payload.get("version_id", "")).strip() | |
| delta = int(payload.get("delta", 1)) | |
| conversation, msg = vote_version( | |
| conversation_id=conversation_id, | |
| answer_id=answer_id, | |
| version_id=version_id, | |
| client_id=client_id, | |
| delta=delta, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| if msg == "already_voted": | |
| return JSONResponse({"ok": False, "error": "already voted"}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| return JSONResponse({"ok": False, "error": f"unknown action: {action}"}) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False) |