Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from urllib.parse import urlencode | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| APP_TITLE = "Human Intelligence" | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) | |
| CONVERSATIONS_FILE = DATA_DIR / "conversations.json" | |
| EMBED_FILE = DATA_DIR / "embeddings.json" | |
| PAGE_SIZE = 20 | |
| DEFAULT_TEMPLATES_DIR = Path(__file__).resolve().parent / "templates" | |
| DOCKER_TEMPLATES_DIR = Path("/app/templates") | |
| TEMPLATES_DIR = DOCKER_TEMPLATES_DIR if DOCKER_TEMPLATES_DIR.exists() else DEFAULT_TEMPLATES_DIR | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) | |
| app = FastAPI(title=APP_TITLE) | |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) | |
| app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates") | |
| # Utilities | |
| def now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| def read_json(path: Path, default: Any): | |
| if not path.exists(): | |
| return default | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return default | |
| def write_json(path: Path, data: Any) -> None: | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| tmp.write_text( | |
| json.dumps(data, ensure_ascii=False, indent=2, default=str), | |
| encoding="utf-8", | |
| ) | |
| tmp.replace(path) | |
| def get_client_id(request: Request, payload: dict | None = None) -> str: | |
| header_value = request.headers.get("x-client-id", "").strip() | |
| if header_value: | |
| return header_value | |
| if payload: | |
| payload_value = str(payload.get("client_id", "")).strip() | |
| if payload_value: | |
| return payload_value | |
| return "anon" | |
| def anon_label(client_id: str) -> str: | |
| return "Anonymous" | |
| def clean_text(value: Any) -> str: | |
| return str(value or "").strip() | |
| def to_dt(value: Any) -> datetime: | |
| text = clean_text(value) | |
| if not text: | |
| return datetime.fromtimestamp(0, tz=timezone.utc) | |
| try: | |
| return datetime.fromisoformat(text.replace("Z", "+00:00")) | |
| except Exception: | |
| return datetime.fromtimestamp(0, tz=timezone.utc) | |
| def clamp_page(value: Any) -> int: | |
| try: | |
| page = int(value) | |
| except Exception: | |
| page = 1 | |
| return page if page > 0 else 1 | |
| def build_list_query(status: str, q: str, page: int) -> str: | |
| params = { | |
| "status": status, | |
| "page": page, | |
| } | |
| if q: | |
| params["q"] = q | |
| return urlencode(params) | |
| # Conversations CRUD | |
| def load_conversations() -> list[dict[str, Any]]: | |
| data = read_json(CONVERSATIONS_FILE, []) | |
| if isinstance(data, dict) and "conversations" in data: | |
| data = data["conversations"] | |
| return data if isinstance(data, list) else [] | |
| def save_conversations(conversations: list[dict[str, Any]]) -> None: | |
| write_json(CONVERSATIONS_FILE, conversations) | |
| def normalize_version(version: dict[str, Any]) -> dict[str, Any]: | |
| v = dict(version or {}) | |
| v.setdefault("id", uuid.uuid4().hex) | |
| v.setdefault("text", "") | |
| v.setdefault("author", "Anonymous") | |
| v.setdefault("created_at", now_iso()) | |
| v.setdefault("votes", 0) | |
| v.setdefault("votes_by_client", {}) | |
| if not isinstance(v["votes_by_client"], dict): | |
| v["votes_by_client"] = {} | |
| v["votes"] = int(v.get("votes", 0)) | |
| return v | |
| def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]: | |
| a = dict(answer or {}) | |
| a.setdefault("id", uuid.uuid4().hex) | |
| a.setdefault("versions", []) | |
| a.setdefault("active_version", "") | |
| a.setdefault("created_at", now_iso()) | |
| a.setdefault("updated_at", a["created_at"]) | |
| versions = [ | |
| normalize_version(v) | |
| for v in a.get("versions", []) | |
| if isinstance(v, dict) | |
| ] | |
| a["versions"] = versions | |
| if versions: | |
| version_ids = {v["id"] for v in versions} | |
| if a["active_version"] not in version_ids: | |
| a["active_version"] = max( | |
| versions, | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| )["id"] | |
| return a | |
| def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]: | |
| c = dict(conversation or {}) | |
| c.setdefault("id", uuid.uuid4().hex) | |
| c.setdefault("question", "") | |
| c.setdefault("author", "Anonymous") | |
| c.setdefault("created_at", now_iso()) | |
| c.setdefault("updated_at", c["created_at"]) | |
| c.setdefault("turns", []) | |
| c.setdefault("answers", []) | |
| turns: list[dict[str, Any]] = [] | |
| for turn in c.get("turns", []): | |
| if not isinstance(turn, dict): | |
| continue | |
| t = dict(turn) | |
| t.setdefault("id", uuid.uuid4().hex) | |
| t.setdefault("role", "user") | |
| t.setdefault("text", "") | |
| t.setdefault("author", "Anonymous") | |
| t.setdefault("ts", now_iso()) | |
| turns.append(t) | |
| c["turns"] = turns | |
| c["answers"] = [ | |
| normalize_answer(a) | |
| for a in c.get("answers", []) | |
| if isinstance(a, dict) | |
| ] | |
| if not c["turns"] and c["question"]: | |
| c["turns"].append( | |
| { | |
| "id": uuid.uuid4().hex, | |
| "role": "user", | |
| "text": c["question"], | |
| "author": c.get("author", "Anonymous"), | |
| "ts": c["created_at"], | |
| } | |
| ) | |
| return c | |
| def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]: | |
| if not conversation_id: | |
| return None | |
| for conv in load_conversations(): | |
| if str(conv.get("id")) == conversation_id: | |
| return normalize_conversation(conv) | |
| return None | |
| def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]: | |
| conversation = normalize_conversation(conversation) | |
| conversation["updated_at"] = now_iso() | |
| conversations = [normalize_conversation(c) for c in load_conversations()] | |
| replaced = False | |
| for i, existing in enumerate(conversations): | |
| if str(existing.get("id")) == str(conversation["id"]): | |
| conversations[i] = conversation | |
| replaced = True | |
| break | |
| if not replaced: | |
| conversations.insert(0, conversation) | |
| save_conversations(conversations) | |
| return conversation | |
| # Question summaries and search | |
| def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]: | |
| versions = answer.get("versions", []) | |
| if not versions: | |
| return None | |
| active_id = answer.get("active_version") | |
| for version in versions: | |
| if version.get("id") == active_id: | |
| return version | |
| return max( | |
| versions, | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| ) | |
| def answer_score(answer: dict[str, Any]) -> tuple[int, str]: | |
| av = active_version(answer) | |
| if av is None: | |
| return 0, str(answer.get("created_at", "")) | |
| return int(av.get("votes", 0)), str(answer.get("created_at", "")) | |
| def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]: | |
| answers = conversation.get("answers", []) | |
| if not answers: | |
| return None | |
| best = max(answers, key=answer_score) | |
| av = active_version(best) | |
| if av is None: | |
| return None | |
| return { | |
| "answer_id": best["id"], | |
| "version_id": av["id"], | |
| "text": av["text"], | |
| "votes": int(av.get("votes", 0)), | |
| "author": av.get("author", "Anonymous"), | |
| "created_at": av.get("created_at", ""), | |
| } | |
| def answer_text_length(conversation: dict[str, Any]) -> int: | |
| best = best_answer_payload(conversation) | |
| if not best: | |
| return 0 | |
| return len(clean_text(best.get("text"))) | |
| def conversation_search_blob(conversation: dict[str, Any]) -> str: | |
| parts = [clean_text(conversation.get("question"))] | |
| for answer in conversation.get("answers", []): | |
| for version in answer.get("versions", []): | |
| parts.append(clean_text(version.get("text"))) | |
| return "\n".join(part for part in parts if part).casefold() | |
| def matches_query(conversation: dict[str, Any], query: str) -> bool: | |
| query = clean_text(query).casefold() | |
| if not query: | |
| return True | |
| return query in conversation_search_blob(conversation) | |
| def conversation_summary(conversation: dict[str, Any]) -> dict[str, Any]: | |
| conversation = normalize_conversation(conversation) | |
| answers = conversation.get("answers", []) | |
| best = best_answer_payload(conversation) | |
| return { | |
| "id": conversation["id"], | |
| "question": clean_text(conversation.get("question")), | |
| "created_at": conversation.get("created_at", ""), | |
| "updated_at": conversation.get("updated_at", ""), | |
| "answer_count": len(answers), | |
| "has_answers": bool(answers), | |
| "best_answer": best, | |
| "best_answer_length": answer_text_length(conversation), | |
| "best_answer_preview": clean_text(best.get("text"))[:220] if best else "", | |
| } | |
| def sort_conversations_for_queue( | |
| conversations: list[dict[str, Any]], | |
| status: str, | |
| query: str, | |
| ) -> list[dict[str, Any]]: | |
| status = clean_text(status).lower() or "unanswered" | |
| query = clean_text(query) | |
| if status == "unanswered" and not query: | |
| return sorted( | |
| conversations, | |
| key=lambda c: to_dt(c.get("created_at") or c.get("updated_at")), | |
| reverse=True, | |
| ) | |
| def key(conversation: dict[str, Any]): | |
| has_answers = bool(conversation.get("answers")) | |
| best_length = answer_text_length(conversation) if has_answers else 0 | |
| updated = to_dt(conversation.get("updated_at") or conversation.get("created_at")) | |
| return ( | |
| 0 if not has_answers else 1, | |
| 0 if not has_answers else best_length, | |
| -updated.timestamp(), | |
| ) | |
| return sorted(conversations, key=key) | |
| def list_questions(status: str = "unanswered", q: str = "", page: int = 1) -> dict[str, Any]: | |
| status = clean_text(status).lower() or "unanswered" | |
| if status not in {"unanswered", "answered", "all"}: | |
| status = "unanswered" | |
| q = clean_text(q) | |
| page = clamp_page(page) | |
| conversations = [normalize_conversation(c) for c in load_conversations()] | |
| if status == "unanswered": | |
| conversations = [c for c in conversations if not c.get("answers")] | |
| elif status == "answered": | |
| conversations = [c for c in conversations if c.get("answers")] | |
| if q: | |
| conversations = [c for c in conversations if matches_query(c, q)] | |
| conversations = sort_conversations_for_queue(conversations, status=status, query=q) | |
| total = len(conversations) | |
| start = (page - 1) * PAGE_SIZE | |
| end = start + PAGE_SIZE | |
| items = [conversation_summary(c) for c in conversations[start:end]] | |
| return { | |
| "items": items, | |
| "page": page, | |
| "page_size": PAGE_SIZE, | |
| "total": total, | |
| "has_prev": page > 1, | |
| "has_next": end < total, | |
| "status": status, | |
| "q": q, | |
| } | |
| # Write actions | |
| def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]: | |
| question = question.strip() | |
| now = now_iso() | |
| conversation = { | |
| "id": uuid.uuid4().hex, | |
| "question": question, | |
| "author": author, | |
| "created_at": now, | |
| "updated_at": now, | |
| "turns": [ | |
| { | |
| "id": uuid.uuid4().hex, | |
| "role": "user", | |
| "text": question, | |
| "author": author, | |
| "ts": now, | |
| } | |
| ], | |
| "answers": [], | |
| } | |
| return save_conversation(conversation) | |
| def add_answer( | |
| conversation_id: str, | |
| text: str, | |
| author: str = "Anonymous", | |
| question_if_new: str | None = None, | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| text = text.strip() | |
| if not text: | |
| return None, "empty answer" | |
| conversation = load_conversation(conversation_id) | |
| if question_if_new: | |
| if conversation is None or conversation.get("question") != question_if_new: | |
| conversation = create_conversation(question_if_new, author) | |
| elif conversation is None: | |
| return None, "conversation not found" | |
| now = now_iso() | |
| version = normalize_version( | |
| { | |
| "text": text, | |
| "author": author, | |
| "created_at": now, | |
| "votes": 0, | |
| "votes_by_client": {}, | |
| } | |
| ) | |
| answer = normalize_answer( | |
| { | |
| "id": uuid.uuid4().hex, | |
| "versions": [version], | |
| "active_version": version["id"], | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| ) | |
| conversation["answers"].append(answer) | |
| conversation["turns"].append( | |
| { | |
| "id": uuid.uuid4().hex, | |
| "role": "assistant", | |
| "text": text, | |
| "author": author, | |
| "answer_id": answer["id"], | |
| "version_id": version["id"], | |
| "ts": now, | |
| } | |
| ) | |
| conversation = save_conversation(conversation) | |
| return conversation, "ok" | |
| def propose_version( | |
| conversation_id: str, | |
| answer_id: str, | |
| text: str, | |
| author: str = "Anonymous", | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| text = text.strip() | |
| if not text: | |
| return None, "empty proposal" | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return None, "conversation not found" | |
| for answer in conversation["answers"]: | |
| if str(answer.get("id")) != answer_id: | |
| continue | |
| now = now_iso() | |
| version = normalize_version( | |
| { | |
| "text": text, | |
| "author": author, | |
| "created_at": now, | |
| "votes": 0, | |
| "votes_by_client": {}, | |
| } | |
| ) | |
| answer["versions"].append(version) | |
| answer["updated_at"] = now | |
| conversation = save_conversation(conversation) | |
| return conversation, "ok" | |
| return None, "answer not found" | |
| def vote_version( | |
| conversation_id: str, | |
| answer_id: str, | |
| version_id: str, | |
| client_id: str, | |
| delta: int, | |
| ) -> tuple[Optional[dict[str, Any]], str]: | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return None, "conversation not found" | |
| delta = 1 if int(delta) >= 0 else -1 | |
| for answer in conversation["answers"]: | |
| if str(answer.get("id")) != answer_id: | |
| continue | |
| for version in answer.get("versions", []): | |
| if str(version.get("id")) != version_id: | |
| continue | |
| votes_by_client = version.setdefault("votes_by_client", {}) | |
| if not isinstance(votes_by_client, dict): | |
| votes_by_client = {} | |
| version["votes_by_client"] = votes_by_client | |
| current = int(votes_by_client.get(client_id, 0)) | |
| if current == delta: | |
| return conversation, "already_voted" | |
| votes_by_client[client_id] = delta | |
| version["votes"] = int(sum(int(v) for v in votes_by_client.values())) | |
| if answer.get("versions"): | |
| answer["active_version"] = max( | |
| answer["versions"], | |
| key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))), | |
| )["id"] | |
| conversation = save_conversation(conversation) | |
| return conversation, "ok" | |
| return None, "version not found" | |
| # Routes | |
| def home(request: Request): | |
| status = clean_text(request.query_params.get("status")).lower() or "unanswered" | |
| if status not in {"unanswered", "answered", "all"}: | |
| status = "unanswered" | |
| q = clean_text(request.query_params.get("q")) | |
| page = clamp_page(request.query_params.get("page", 1)) | |
| conversation_id = clean_text(request.query_params.get("conversation_id")) | |
| list_state = list_questions(status=status, q=q, page=page) | |
| detail = load_conversation(conversation_id) if conversation_id else None | |
| init = { | |
| "ok": True, | |
| "client_id": get_client_id(request), | |
| "view": "detail" if detail else "list", | |
| "filters": { | |
| "status": list_state["status"], | |
| "q": list_state["q"], | |
| "page": list_state["page"], | |
| "page_size": list_state["page_size"], | |
| }, | |
| "list": list_state, | |
| "detail": detail, | |
| "conversation_id": conversation_id, | |
| "back_query": build_list_query(list_state["status"], list_state["q"], list_state["page"]), | |
| } | |
| return templates.TemplateResponse( | |
| request, | |
| "index.html", | |
| { | |
| "app_title": APP_TITLE, | |
| "init_json": json.dumps(init, ensure_ascii=False), | |
| }, | |
| ) | |
| def logo(): | |
| logo_path = TEMPLATES_DIR / "logo.png" | |
| if logo_path.exists(): | |
| return FileResponse(logo_path) | |
| return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404) | |
| def health(): | |
| return {"ok": True} | |
| async def api(request: Request): | |
| try: | |
| payload = await request.json() | |
| except Exception: | |
| return JSONResponse({"ok": False, "error": "bad payload"}) | |
| action = clean_text(payload.get("action")) | |
| client_id = get_client_id(request, payload) | |
| author = anon_label(client_id) | |
| if action == "init": | |
| status = clean_text(payload.get("status")).lower() or "unanswered" | |
| q = clean_text(payload.get("q")) | |
| page = clamp_page(payload.get("page", 1)) | |
| conversation_id = clean_text(payload.get("conversation_id")) | |
| return JSONResponse( | |
| { | |
| "ok": True, | |
| "client_id": client_id, | |
| "list": list_questions(status=status, q=q, page=page), | |
| "detail": load_conversation(conversation_id) if conversation_id else None, | |
| } | |
| ) | |
| if action == "list_questions": | |
| status = clean_text(payload.get("status")).lower() or "unanswered" | |
| q = clean_text(payload.get("q")) | |
| page = clamp_page(payload.get("page", 1)) | |
| return JSONResponse({"ok": True, **list_questions(status=status, q=q, page=page)}) | |
| if action in {"get_question_detail", "get_conversation"}: | |
| conversation_id = clean_text(payload.get("conversation_id")) | |
| conversation = load_conversation(conversation_id) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": "not found"}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| if action in {"add_answer", "answer"}: | |
| conversation_id = clean_text(payload.get("conversation_id")) | |
| text = clean_text(payload.get("text")) | |
| question = clean_text(payload.get("question")) or None | |
| conversation, msg = add_answer( | |
| conversation_id=conversation_id, | |
| text=text, | |
| author=author, | |
| question_if_new=question, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| if action in {"propose_version", "propose"}: | |
| conversation_id = clean_text(payload.get("conversation_id")) | |
| answer_id = clean_text(payload.get("answer_id")) | |
| text = clean_text(payload.get("text")) | |
| conversation, msg = propose_version( | |
| conversation_id=conversation_id, | |
| answer_id=answer_id, | |
| text=text, | |
| author=author, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| if action in {"vote_version", "vote"}: | |
| conversation_id = clean_text(payload.get("conversation_id")) | |
| answer_id = clean_text(payload.get("answer_id")) | |
| version_id = clean_text(payload.get("version_id")) | |
| delta = int(payload.get("delta", 1)) | |
| conversation, msg = vote_version( | |
| conversation_id=conversation_id, | |
| answer_id=answer_id, | |
| version_id=version_id, | |
| client_id=client_id, | |
| delta=delta, | |
| ) | |
| if conversation is None: | |
| return JSONResponse({"ok": False, "error": msg}) | |
| if msg == "already_voted": | |
| return JSONResponse({"ok": False, "error": "already voted"}) | |
| return JSONResponse({"ok": True, "conversation": conversation}) | |
| return JSONResponse({"ok": False, "error": f"unknown action: {action}"}) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False) | |