Answer / main.py
wop's picture
Upload 6 files
2f6de88 verified
from __future__ import annotations
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlencode
from fastapi import FastAPI, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
APP_TITLE = "Human Intelligence"
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
CONVERSATIONS_FILE = DATA_DIR / "conversations.json"
EMBED_FILE = DATA_DIR / "embeddings.json"
PAGE_SIZE = 20
DEFAULT_TEMPLATES_DIR = Path(__file__).resolve().parent / "templates"
DOCKER_TEMPLATES_DIR = Path("/app/templates")
TEMPLATES_DIR = DOCKER_TEMPLATES_DIR if DOCKER_TEMPLATES_DIR.exists() else DEFAULT_TEMPLATES_DIR
DATA_DIR.mkdir(parents=True, exist_ok=True)
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title=APP_TITLE)
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates")
# Utilities
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def read_json(path: Path, default: Any):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def write_json(path: Path, data: Any) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(
json.dumps(data, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
tmp.replace(path)
def get_client_id(request: Request, payload: dict | None = None) -> str:
header_value = request.headers.get("x-client-id", "").strip()
if header_value:
return header_value
if payload:
payload_value = str(payload.get("client_id", "")).strip()
if payload_value:
return payload_value
return "anon"
def anon_label(client_id: str) -> str:
return "Anonymous"
def clean_text(value: Any) -> str:
return str(value or "").strip()
def to_dt(value: Any) -> datetime:
text = clean_text(value)
if not text:
return datetime.fromtimestamp(0, tz=timezone.utc)
try:
return datetime.fromisoformat(text.replace("Z", "+00:00"))
except Exception:
return datetime.fromtimestamp(0, tz=timezone.utc)
def clamp_page(value: Any) -> int:
try:
page = int(value)
except Exception:
page = 1
return page if page > 0 else 1
def build_list_query(status: str, q: str, page: int) -> str:
params = {
"status": status,
"page": page,
}
if q:
params["q"] = q
return urlencode(params)
# Conversations CRUD
def load_conversations() -> list[dict[str, Any]]:
data = read_json(CONVERSATIONS_FILE, [])
if isinstance(data, dict) and "conversations" in data:
data = data["conversations"]
return data if isinstance(data, list) else []
def save_conversations(conversations: list[dict[str, Any]]) -> None:
write_json(CONVERSATIONS_FILE, conversations)
def normalize_version(version: dict[str, Any]) -> dict[str, Any]:
v = dict(version or {})
v.setdefault("id", uuid.uuid4().hex)
v.setdefault("text", "")
v.setdefault("author", "Anonymous")
v.setdefault("created_at", now_iso())
v.setdefault("votes", 0)
v.setdefault("votes_by_client", {})
if not isinstance(v["votes_by_client"], dict):
v["votes_by_client"] = {}
v["votes"] = int(v.get("votes", 0))
return v
def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]:
a = dict(answer or {})
a.setdefault("id", uuid.uuid4().hex)
a.setdefault("versions", [])
a.setdefault("active_version", "")
a.setdefault("created_at", now_iso())
a.setdefault("updated_at", a["created_at"])
versions = [
normalize_version(v)
for v in a.get("versions", [])
if isinstance(v, dict)
]
a["versions"] = versions
if versions:
version_ids = {v["id"] for v in versions}
if a["active_version"] not in version_ids:
a["active_version"] = max(
versions,
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)["id"]
return a
def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]:
c = dict(conversation or {})
c.setdefault("id", uuid.uuid4().hex)
c.setdefault("question", "")
c.setdefault("author", "Anonymous")
c.setdefault("created_at", now_iso())
c.setdefault("updated_at", c["created_at"])
c.setdefault("turns", [])
c.setdefault("answers", [])
turns: list[dict[str, Any]] = []
for turn in c.get("turns", []):
if not isinstance(turn, dict):
continue
t = dict(turn)
t.setdefault("id", uuid.uuid4().hex)
t.setdefault("role", "user")
t.setdefault("text", "")
t.setdefault("author", "Anonymous")
t.setdefault("ts", now_iso())
turns.append(t)
c["turns"] = turns
c["answers"] = [
normalize_answer(a)
for a in c.get("answers", [])
if isinstance(a, dict)
]
if not c["turns"] and c["question"]:
c["turns"].append(
{
"id": uuid.uuid4().hex,
"role": "user",
"text": c["question"],
"author": c.get("author", "Anonymous"),
"ts": c["created_at"],
}
)
return c
def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]:
if not conversation_id:
return None
for conv in load_conversations():
if str(conv.get("id")) == conversation_id:
return normalize_conversation(conv)
return None
def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]:
conversation = normalize_conversation(conversation)
conversation["updated_at"] = now_iso()
conversations = [normalize_conversation(c) for c in load_conversations()]
replaced = False
for i, existing in enumerate(conversations):
if str(existing.get("id")) == str(conversation["id"]):
conversations[i] = conversation
replaced = True
break
if not replaced:
conversations.insert(0, conversation)
save_conversations(conversations)
return conversation
# Question summaries and search
def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]:
versions = answer.get("versions", [])
if not versions:
return None
active_id = answer.get("active_version")
for version in versions:
if version.get("id") == active_id:
return version
return max(
versions,
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)
def answer_score(answer: dict[str, Any]) -> tuple[int, str]:
av = active_version(answer)
if av is None:
return 0, str(answer.get("created_at", ""))
return int(av.get("votes", 0)), str(answer.get("created_at", ""))
def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]:
answers = conversation.get("answers", [])
if not answers:
return None
best = max(answers, key=answer_score)
av = active_version(best)
if av is None:
return None
return {
"answer_id": best["id"],
"version_id": av["id"],
"text": av["text"],
"votes": int(av.get("votes", 0)),
"author": av.get("author", "Anonymous"),
"created_at": av.get("created_at", ""),
}
def answer_text_length(conversation: dict[str, Any]) -> int:
best = best_answer_payload(conversation)
if not best:
return 0
return len(clean_text(best.get("text")))
def conversation_search_blob(conversation: dict[str, Any]) -> str:
parts = [clean_text(conversation.get("question"))]
for answer in conversation.get("answers", []):
for version in answer.get("versions", []):
parts.append(clean_text(version.get("text")))
return "\n".join(part for part in parts if part).casefold()
def matches_query(conversation: dict[str, Any], query: str) -> bool:
query = clean_text(query).casefold()
if not query:
return True
return query in conversation_search_blob(conversation)
def conversation_summary(conversation: dict[str, Any]) -> dict[str, Any]:
conversation = normalize_conversation(conversation)
answers = conversation.get("answers", [])
best = best_answer_payload(conversation)
return {
"id": conversation["id"],
"question": clean_text(conversation.get("question")),
"created_at": conversation.get("created_at", ""),
"updated_at": conversation.get("updated_at", ""),
"answer_count": len(answers),
"has_answers": bool(answers),
"best_answer": best,
"best_answer_length": answer_text_length(conversation),
"best_answer_preview": clean_text(best.get("text"))[:220] if best else "",
}
def sort_conversations_for_queue(
conversations: list[dict[str, Any]],
status: str,
query: str,
) -> list[dict[str, Any]]:
status = clean_text(status).lower() or "unanswered"
query = clean_text(query)
if status == "unanswered" and not query:
return sorted(
conversations,
key=lambda c: to_dt(c.get("created_at") or c.get("updated_at")),
reverse=True,
)
def key(conversation: dict[str, Any]):
has_answers = bool(conversation.get("answers"))
best_length = answer_text_length(conversation) if has_answers else 0
updated = to_dt(conversation.get("updated_at") or conversation.get("created_at"))
return (
0 if not has_answers else 1,
0 if not has_answers else best_length,
-updated.timestamp(),
)
return sorted(conversations, key=key)
def list_questions(status: str = "unanswered", q: str = "", page: int = 1) -> dict[str, Any]:
status = clean_text(status).lower() or "unanswered"
if status not in {"unanswered", "answered", "all"}:
status = "unanswered"
q = clean_text(q)
page = clamp_page(page)
conversations = [normalize_conversation(c) for c in load_conversations()]
if status == "unanswered":
conversations = [c for c in conversations if not c.get("answers")]
elif status == "answered":
conversations = [c for c in conversations if c.get("answers")]
if q:
conversations = [c for c in conversations if matches_query(c, q)]
conversations = sort_conversations_for_queue(conversations, status=status, query=q)
total = len(conversations)
start = (page - 1) * PAGE_SIZE
end = start + PAGE_SIZE
items = [conversation_summary(c) for c in conversations[start:end]]
return {
"items": items,
"page": page,
"page_size": PAGE_SIZE,
"total": total,
"has_prev": page > 1,
"has_next": end < total,
"status": status,
"q": q,
}
# Write actions
def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]:
question = question.strip()
now = now_iso()
conversation = {
"id": uuid.uuid4().hex,
"question": question,
"author": author,
"created_at": now,
"updated_at": now,
"turns": [
{
"id": uuid.uuid4().hex,
"role": "user",
"text": question,
"author": author,
"ts": now,
}
],
"answers": [],
}
return save_conversation(conversation)
def add_answer(
conversation_id: str,
text: str,
author: str = "Anonymous",
question_if_new: str | None = None,
) -> tuple[Optional[dict[str, Any]], str]:
text = text.strip()
if not text:
return None, "empty answer"
conversation = load_conversation(conversation_id)
if question_if_new:
if conversation is None or conversation.get("question") != question_if_new:
conversation = create_conversation(question_if_new, author)
elif conversation is None:
return None, "conversation not found"
now = now_iso()
version = normalize_version(
{
"text": text,
"author": author,
"created_at": now,
"votes": 0,
"votes_by_client": {},
}
)
answer = normalize_answer(
{
"id": uuid.uuid4().hex,
"versions": [version],
"active_version": version["id"],
"created_at": now,
"updated_at": now,
}
)
conversation["answers"].append(answer)
conversation["turns"].append(
{
"id": uuid.uuid4().hex,
"role": "assistant",
"text": text,
"author": author,
"answer_id": answer["id"],
"version_id": version["id"],
"ts": now,
}
)
conversation = save_conversation(conversation)
return conversation, "ok"
def propose_version(
conversation_id: str,
answer_id: str,
text: str,
author: str = "Anonymous",
) -> tuple[Optional[dict[str, Any]], str]:
text = text.strip()
if not text:
return None, "empty proposal"
conversation = load_conversation(conversation_id)
if conversation is None:
return None, "conversation not found"
for answer in conversation["answers"]:
if str(answer.get("id")) != answer_id:
continue
now = now_iso()
version = normalize_version(
{
"text": text,
"author": author,
"created_at": now,
"votes": 0,
"votes_by_client": {},
}
)
answer["versions"].append(version)
answer["updated_at"] = now
conversation = save_conversation(conversation)
return conversation, "ok"
return None, "answer not found"
def vote_version(
conversation_id: str,
answer_id: str,
version_id: str,
client_id: str,
delta: int,
) -> tuple[Optional[dict[str, Any]], str]:
conversation = load_conversation(conversation_id)
if conversation is None:
return None, "conversation not found"
delta = 1 if int(delta) >= 0 else -1
for answer in conversation["answers"]:
if str(answer.get("id")) != answer_id:
continue
for version in answer.get("versions", []):
if str(version.get("id")) != version_id:
continue
votes_by_client = version.setdefault("votes_by_client", {})
if not isinstance(votes_by_client, dict):
votes_by_client = {}
version["votes_by_client"] = votes_by_client
current = int(votes_by_client.get(client_id, 0))
if current == delta:
return conversation, "already_voted"
votes_by_client[client_id] = delta
version["votes"] = int(sum(int(v) for v in votes_by_client.values()))
if answer.get("versions"):
answer["active_version"] = max(
answer["versions"],
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)["id"]
conversation = save_conversation(conversation)
return conversation, "ok"
return None, "version not found"
# Routes
@app.get("/", response_class=HTMLResponse)
def home(request: Request):
status = clean_text(request.query_params.get("status")).lower() or "unanswered"
if status not in {"unanswered", "answered", "all"}:
status = "unanswered"
q = clean_text(request.query_params.get("q"))
page = clamp_page(request.query_params.get("page", 1))
conversation_id = clean_text(request.query_params.get("conversation_id"))
list_state = list_questions(status=status, q=q, page=page)
detail = load_conversation(conversation_id) if conversation_id else None
init = {
"ok": True,
"client_id": get_client_id(request),
"view": "detail" if detail else "list",
"filters": {
"status": list_state["status"],
"q": list_state["q"],
"page": list_state["page"],
"page_size": list_state["page_size"],
},
"list": list_state,
"detail": detail,
"conversation_id": conversation_id,
"back_query": build_list_query(list_state["status"], list_state["q"], list_state["page"]),
}
return templates.TemplateResponse(
request,
"index.html",
{
"app_title": APP_TITLE,
"init_json": json.dumps(init, ensure_ascii=False),
},
)
@app.get("/logo.png")
def logo():
logo_path = TEMPLATES_DIR / "logo.png"
if logo_path.exists():
return FileResponse(logo_path)
return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404)
@app.get("/health")
def health():
return {"ok": True}
@app.post("/api")
async def api(request: Request):
try:
payload = await request.json()
except Exception:
return JSONResponse({"ok": False, "error": "bad payload"})
action = clean_text(payload.get("action"))
client_id = get_client_id(request, payload)
author = anon_label(client_id)
if action == "init":
status = clean_text(payload.get("status")).lower() or "unanswered"
q = clean_text(payload.get("q"))
page = clamp_page(payload.get("page", 1))
conversation_id = clean_text(payload.get("conversation_id"))
return JSONResponse(
{
"ok": True,
"client_id": client_id,
"list": list_questions(status=status, q=q, page=page),
"detail": load_conversation(conversation_id) if conversation_id else None,
}
)
if action == "list_questions":
status = clean_text(payload.get("status")).lower() or "unanswered"
q = clean_text(payload.get("q"))
page = clamp_page(payload.get("page", 1))
return JSONResponse({"ok": True, **list_questions(status=status, q=q, page=page)})
if action in {"get_question_detail", "get_conversation"}:
conversation_id = clean_text(payload.get("conversation_id"))
conversation = load_conversation(conversation_id)
if conversation is None:
return JSONResponse({"ok": False, "error": "not found"})
return JSONResponse({"ok": True, "conversation": conversation})
if action in {"add_answer", "answer"}:
conversation_id = clean_text(payload.get("conversation_id"))
text = clean_text(payload.get("text"))
question = clean_text(payload.get("question")) or None
conversation, msg = add_answer(
conversation_id=conversation_id,
text=text,
author=author,
question_if_new=question,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
return JSONResponse({"ok": True, "conversation": conversation})
if action in {"propose_version", "propose"}:
conversation_id = clean_text(payload.get("conversation_id"))
answer_id = clean_text(payload.get("answer_id"))
text = clean_text(payload.get("text"))
conversation, msg = propose_version(
conversation_id=conversation_id,
answer_id=answer_id,
text=text,
author=author,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
return JSONResponse({"ok": True, "conversation": conversation})
if action in {"vote_version", "vote"}:
conversation_id = clean_text(payload.get("conversation_id"))
answer_id = clean_text(payload.get("answer_id"))
version_id = clean_text(payload.get("version_id"))
delta = int(payload.get("delta", 1))
conversation, msg = vote_version(
conversation_id=conversation_id,
answer_id=answer_id,
version_id=version_id,
client_id=client_id,
delta=delta,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
if msg == "already_voted":
return JSONResponse({"ok": False, "error": "already voted"})
return JSONResponse({"ok": True, "conversation": conversation})
return JSONResponse({"ok": False, "error": f"unknown action: {action}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False)