from __future__ import annotations import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from urllib.parse import urlencode from fastapi import FastAPI, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates APP_TITLE = "Human Intelligence" DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) CONVERSATIONS_FILE = DATA_DIR / "conversations.json" EMBED_FILE = DATA_DIR / "embeddings.json" PAGE_SIZE = 20 DEFAULT_TEMPLATES_DIR = Path(__file__).resolve().parent / "templates" DOCKER_TEMPLATES_DIR = Path("/app/templates") TEMPLATES_DIR = DOCKER_TEMPLATES_DIR if DOCKER_TEMPLATES_DIR.exists() else DEFAULT_TEMPLATES_DIR DATA_DIR.mkdir(parents=True, exist_ok=True) TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title=APP_TITLE) templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates") # Utilities def now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def read_json(path: Path, default: Any): if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def write_json(path: Path, data: Any) -> None: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text( json.dumps(data, ensure_ascii=False, indent=2, default=str), encoding="utf-8", ) tmp.replace(path) def get_client_id(request: Request, payload: dict | None = None) -> str: header_value = request.headers.get("x-client-id", "").strip() if header_value: return header_value if payload: payload_value = str(payload.get("client_id", "")).strip() if payload_value: return payload_value return "anon" def anon_label(client_id: str) -> str: return "Anonymous" def clean_text(value: Any) -> str: return str(value or "").strip() def to_dt(value: Any) -> datetime: text = clean_text(value) if not text: return datetime.fromtimestamp(0, tz=timezone.utc) try: return datetime.fromisoformat(text.replace("Z", "+00:00")) except Exception: return datetime.fromtimestamp(0, tz=timezone.utc) def clamp_page(value: Any) -> int: try: page = int(value) except Exception: page = 1 return page if page > 0 else 1 def build_list_query(status: str, q: str, page: int) -> str: params = { "status": status, "page": page, } if q: params["q"] = q return urlencode(params) # Conversations CRUD def load_conversations() -> list[dict[str, Any]]: data = read_json(CONVERSATIONS_FILE, []) if isinstance(data, dict) and "conversations" in data: data = data["conversations"] return data if isinstance(data, list) else [] def save_conversations(conversations: list[dict[str, Any]]) -> None: write_json(CONVERSATIONS_FILE, conversations) def normalize_version(version: dict[str, Any]) -> dict[str, Any]: v = dict(version or {}) v.setdefault("id", uuid.uuid4().hex) v.setdefault("text", "") v.setdefault("author", "Anonymous") v.setdefault("created_at", now_iso()) v.setdefault("votes", 0) v.setdefault("votes_by_client", {}) if not isinstance(v["votes_by_client"], dict): v["votes_by_client"] = {} v["votes"] = int(v.get("votes", 0)) return v def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]: a = dict(answer or {}) a.setdefault("id", uuid.uuid4().hex) a.setdefault("versions", []) a.setdefault("active_version", "") a.setdefault("created_at", now_iso()) a.setdefault("updated_at", a["created_at"]) versions = [ normalize_version(v) for v in a.get("versions", []) if isinstance(v, dict) ] a["versions"] = versions if versions: version_ids = {v["id"] for v in versions} if a["active_version"] not in version_ids: a["active_version"] = max( versions, key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), )["id"] return a def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]: c = dict(conversation or {}) c.setdefault("id", uuid.uuid4().hex) c.setdefault("question", "") c.setdefault("author", "Anonymous") c.setdefault("created_at", now_iso()) c.setdefault("updated_at", c["created_at"]) c.setdefault("turns", []) c.setdefault("answers", []) turns: list[dict[str, Any]] = [] for turn in c.get("turns", []): if not isinstance(turn, dict): continue t = dict(turn) t.setdefault("id", uuid.uuid4().hex) t.setdefault("role", "user") t.setdefault("text", "") t.setdefault("author", "Anonymous") t.setdefault("ts", now_iso()) turns.append(t) c["turns"] = turns c["answers"] = [ normalize_answer(a) for a in c.get("answers", []) if isinstance(a, dict) ] if not c["turns"] and c["question"]: c["turns"].append( { "id": uuid.uuid4().hex, "role": "user", "text": c["question"], "author": c.get("author", "Anonymous"), "ts": c["created_at"], } ) return c def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]: if not conversation_id: return None for conv in load_conversations(): if str(conv.get("id")) == conversation_id: return normalize_conversation(conv) return None def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]: conversation = normalize_conversation(conversation) conversation["updated_at"] = now_iso() conversations = [normalize_conversation(c) for c in load_conversations()] replaced = False for i, existing in enumerate(conversations): if str(existing.get("id")) == str(conversation["id"]): conversations[i] = conversation replaced = True break if not replaced: conversations.insert(0, conversation) save_conversations(conversations) return conversation # Question summaries and search def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]: versions = answer.get("versions", []) if not versions: return None active_id = answer.get("active_version") for version in versions: if version.get("id") == active_id: return version return max( versions, key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), ) def answer_score(answer: dict[str, Any]) -> tuple[int, str]: av = active_version(answer) if av is None: return 0, str(answer.get("created_at", "")) return int(av.get("votes", 0)), str(answer.get("created_at", "")) def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]: answers = conversation.get("answers", []) if not answers: return None best = max(answers, key=answer_score) av = active_version(best) if av is None: return None return { "answer_id": best["id"], "version_id": av["id"], "text": av["text"], "votes": int(av.get("votes", 0)), "author": av.get("author", "Anonymous"), "created_at": av.get("created_at", ""), } def answer_text_length(conversation: dict[str, Any]) -> int: best = best_answer_payload(conversation) if not best: return 0 return len(clean_text(best.get("text"))) def conversation_search_blob(conversation: dict[str, Any]) -> str: parts = [clean_text(conversation.get("question"))] for answer in conversation.get("answers", []): for version in answer.get("versions", []): parts.append(clean_text(version.get("text"))) return "\n".join(part for part in parts if part).casefold() def matches_query(conversation: dict[str, Any], query: str) -> bool: query = clean_text(query).casefold() if not query: return True return query in conversation_search_blob(conversation) def conversation_summary(conversation: dict[str, Any]) -> dict[str, Any]: conversation = normalize_conversation(conversation) answers = conversation.get("answers", []) best = best_answer_payload(conversation) return { "id": conversation["id"], "question": clean_text(conversation.get("question")), "created_at": conversation.get("created_at", ""), "updated_at": conversation.get("updated_at", ""), "answer_count": len(answers), "has_answers": bool(answers), "best_answer": best, "best_answer_length": answer_text_length(conversation), "best_answer_preview": clean_text(best.get("text"))[:220] if best else "", } def sort_conversations_for_queue( conversations: list[dict[str, Any]], status: str, query: str, ) -> list[dict[str, Any]]: status = clean_text(status).lower() or "unanswered" query = clean_text(query) if status == "unanswered" and not query: return sorted( conversations, key=lambda c: to_dt(c.get("created_at") or c.get("updated_at")), reverse=True, ) def key(conversation: dict[str, Any]): has_answers = bool(conversation.get("answers")) best_length = answer_text_length(conversation) if has_answers else 0 updated = to_dt(conversation.get("updated_at") or conversation.get("created_at")) return ( 0 if not has_answers else 1, 0 if not has_answers else best_length, -updated.timestamp(), ) return sorted(conversations, key=key) def list_questions(status: str = "unanswered", q: str = "", page: int = 1) -> dict[str, Any]: status = clean_text(status).lower() or "unanswered" if status not in {"unanswered", "answered", "all"}: status = "unanswered" q = clean_text(q) page = clamp_page(page) conversations = [normalize_conversation(c) for c in load_conversations()] if status == "unanswered": conversations = [c for c in conversations if not c.get("answers")] elif status == "answered": conversations = [c for c in conversations if c.get("answers")] if q: conversations = [c for c in conversations if matches_query(c, q)] conversations = sort_conversations_for_queue(conversations, status=status, query=q) total = len(conversations) start = (page - 1) * PAGE_SIZE end = start + PAGE_SIZE items = [conversation_summary(c) for c in conversations[start:end]] return { "items": items, "page": page, "page_size": PAGE_SIZE, "total": total, "has_prev": page > 1, "has_next": end < total, "status": status, "q": q, } # Write actions def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]: question = question.strip() now = now_iso() conversation = { "id": uuid.uuid4().hex, "question": question, "author": author, "created_at": now, "updated_at": now, "turns": [ { "id": uuid.uuid4().hex, "role": "user", "text": question, "author": author, "ts": now, } ], "answers": [], } return save_conversation(conversation) def add_answer( conversation_id: str, text: str, author: str = "Anonymous", question_if_new: str | None = None, ) -> tuple[Optional[dict[str, Any]], str]: text = text.strip() if not text: return None, "empty answer" conversation = load_conversation(conversation_id) if question_if_new: if conversation is None or conversation.get("question") != question_if_new: conversation = create_conversation(question_if_new, author) elif conversation is None: return None, "conversation not found" now = now_iso() version = normalize_version( { "text": text, "author": author, "created_at": now, "votes": 0, "votes_by_client": {}, } ) answer = normalize_answer( { "id": uuid.uuid4().hex, "versions": [version], "active_version": version["id"], "created_at": now, "updated_at": now, } ) conversation["answers"].append(answer) conversation["turns"].append( { "id": uuid.uuid4().hex, "role": "assistant", "text": text, "author": author, "answer_id": answer["id"], "version_id": version["id"], "ts": now, } ) conversation = save_conversation(conversation) return conversation, "ok" def propose_version( conversation_id: str, answer_id: str, text: str, author: str = "Anonymous", ) -> tuple[Optional[dict[str, Any]], str]: text = text.strip() if not text: return None, "empty proposal" conversation = load_conversation(conversation_id) if conversation is None: return None, "conversation not found" for answer in conversation["answers"]: if str(answer.get("id")) != answer_id: continue now = now_iso() version = normalize_version( { "text": text, "author": author, "created_at": now, "votes": 0, "votes_by_client": {}, } ) answer["versions"].append(version) answer["updated_at"] = now conversation = save_conversation(conversation) return conversation, "ok" return None, "answer not found" def vote_version( conversation_id: str, answer_id: str, version_id: str, client_id: str, delta: int, ) -> tuple[Optional[dict[str, Any]], str]: conversation = load_conversation(conversation_id) if conversation is None: return None, "conversation not found" delta = 1 if int(delta) >= 0 else -1 for answer in conversation["answers"]: if str(answer.get("id")) != answer_id: continue for version in answer.get("versions", []): if str(version.get("id")) != version_id: continue votes_by_client = version.setdefault("votes_by_client", {}) if not isinstance(votes_by_client, dict): votes_by_client = {} version["votes_by_client"] = votes_by_client current = int(votes_by_client.get(client_id, 0)) if current == delta: return conversation, "already_voted" votes_by_client[client_id] = delta version["votes"] = int(sum(int(v) for v in votes_by_client.values())) if answer.get("versions"): answer["active_version"] = max( answer["versions"], key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), )["id"] conversation = save_conversation(conversation) return conversation, "ok" return None, "version not found" # Routes @app.get("/", response_class=HTMLResponse) def home(request: Request): status = clean_text(request.query_params.get("status")).lower() or "unanswered" if status not in {"unanswered", "answered", "all"}: status = "unanswered" q = clean_text(request.query_params.get("q")) page = clamp_page(request.query_params.get("page", 1)) conversation_id = clean_text(request.query_params.get("conversation_id")) list_state = list_questions(status=status, q=q, page=page) detail = load_conversation(conversation_id) if conversation_id else None init = { "ok": True, "client_id": get_client_id(request), "view": "detail" if detail else "list", "filters": { "status": list_state["status"], "q": list_state["q"], "page": list_state["page"], "page_size": list_state["page_size"], }, "list": list_state, "detail": detail, "conversation_id": conversation_id, "back_query": build_list_query(list_state["status"], list_state["q"], list_state["page"]), } return templates.TemplateResponse( request, "index.html", { "app_title": APP_TITLE, "init_json": json.dumps(init, ensure_ascii=False), }, ) @app.get("/logo.png") def logo(): logo_path = TEMPLATES_DIR / "logo.png" if logo_path.exists(): return FileResponse(logo_path) return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404) @app.get("/health") def health(): return {"ok": True} @app.post("/api") async def api(request: Request): try: payload = await request.json() except Exception: return JSONResponse({"ok": False, "error": "bad payload"}) action = clean_text(payload.get("action")) client_id = get_client_id(request, payload) author = anon_label(client_id) if action == "init": status = clean_text(payload.get("status")).lower() or "unanswered" q = clean_text(payload.get("q")) page = clamp_page(payload.get("page", 1)) conversation_id = clean_text(payload.get("conversation_id")) return JSONResponse( { "ok": True, "client_id": client_id, "list": list_questions(status=status, q=q, page=page), "detail": load_conversation(conversation_id) if conversation_id else None, } ) if action == "list_questions": status = clean_text(payload.get("status")).lower() or "unanswered" q = clean_text(payload.get("q")) page = clamp_page(payload.get("page", 1)) return JSONResponse({"ok": True, **list_questions(status=status, q=q, page=page)}) if action in {"get_question_detail", "get_conversation"}: conversation_id = clean_text(payload.get("conversation_id")) conversation = load_conversation(conversation_id) if conversation is None: return JSONResponse({"ok": False, "error": "not found"}) return JSONResponse({"ok": True, "conversation": conversation}) if action in {"add_answer", "answer"}: conversation_id = clean_text(payload.get("conversation_id")) text = clean_text(payload.get("text")) question = clean_text(payload.get("question")) or None conversation, msg = add_answer( conversation_id=conversation_id, text=text, author=author, question_if_new=question, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) return JSONResponse({"ok": True, "conversation": conversation}) if action in {"propose_version", "propose"}: conversation_id = clean_text(payload.get("conversation_id")) answer_id = clean_text(payload.get("answer_id")) text = clean_text(payload.get("text")) conversation, msg = propose_version( conversation_id=conversation_id, answer_id=answer_id, text=text, author=author, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) return JSONResponse({"ok": True, "conversation": conversation}) if action in {"vote_version", "vote"}: conversation_id = clean_text(payload.get("conversation_id")) answer_id = clean_text(payload.get("answer_id")) version_id = clean_text(payload.get("version_id")) delta = int(payload.get("delta", 1)) conversation, msg = vote_version( conversation_id=conversation_id, answer_id=answer_id, version_id=version_id, client_id=client_id, delta=delta, ) if conversation is None: return JSONResponse({"ok": False, "error": msg}) if msg == "already_voted": return JSONResponse({"ok": False, "error": "already voted"}) return JSONResponse({"ok": True, "conversation": conversation}) return JSONResponse({"ok": False, "error": f"unknown action: {action}"}) if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False)