Chat / main.py
wop's picture
Update main.py
20b22af verified
from __future__ import annotations
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import numpy as np
from fastapi import FastAPI, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
try:
from sentence_transformers import SentenceTransformer
except Exception:
SentenceTransformer = None
APP_TITLE = "Human Intelligence"
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
CONVERSATIONS_FILE = DATA_DIR / "conversations.json"
EMBED_FILE = DATA_DIR / "embeddings.json"
TEMPLATES_DIR = Path("/app/templates")
SIMILARITY_THRESHOLD = float(os.environ.get("SIMILARITY_THRESHOLD", "0.62"))
EMBED_MODEL_NAME = os.environ.get(
"EMBED_MODEL_NAME",
"sentence-transformers/paraphrase-MiniLM-L6-v2",
)
DATA_DIR.mkdir(parents=True, exist_ok=True)
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title=APP_TITLE)
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
_embed_model = None
app.mount("/templates", StaticFiles(directory=str(TEMPLATES_DIR)), name="templates")
# ────────────────────── Utilities ──────────────────────
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def read_json(path: Path, default: Any):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def write_json(path: Path, data: Any) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(
json.dumps(data, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
tmp.replace(path)
def get_client_id(request: Request, payload: dict | None = None) -> str:
header_value = request.headers.get("x-client-id", "").strip()
if header_value:
return header_value
if payload:
payload_value = str(payload.get("client_id", "")).strip()
if payload_value:
return payload_value
return "anon"
def anon_label(client_id: str) -> str:
return "Anonymous"
# ────────────────────── Embeddings ──────────────────────
def load_embed_model():
global _embed_model
if _embed_model is None:
if SentenceTransformer is None:
return None
_embed_model = SentenceTransformer(EMBED_MODEL_NAME)
return _embed_model
def embed_text(text: str) -> list[float]:
model = load_embed_model()
if model is None:
return []
vec = model.encode(text, normalize_embeddings=True)
if hasattr(vec, "tolist"):
return vec.tolist()
return list(vec)
def load_embed_index() -> dict[str, dict[str, Any]]:
data = read_json(EMBED_FILE, {})
return data if isinstance(data, dict) else {}
def save_embed_index(idx: dict[str, dict[str, Any]]) -> None:
write_json(EMBED_FILE, idx)
# ────────────────────── Conversations CRUD ──────────────────────
def load_conversations() -> list[dict[str, Any]]:
data = read_json(CONVERSATIONS_FILE, [])
if isinstance(data, dict) and "conversations" in data:
data = data["conversations"]
return data if isinstance(data, list) else []
def save_conversations(conversations: list[dict[str, Any]]) -> None:
write_json(CONVERSATIONS_FILE, conversations)
def normalize_version(version: dict[str, Any]) -> dict[str, Any]:
v = dict(version or {})
v.setdefault("id", uuid.uuid4().hex)
v.setdefault("text", "")
v.setdefault("author", "Anonymous")
v.setdefault("created_at", now_iso())
v.setdefault("votes", 0)
v.setdefault("votes_by_client", {})
if not isinstance(v["votes_by_client"], dict):
v["votes_by_client"] = {}
v["votes"] = int(v.get("votes", 0))
return v
def normalize_answer(answer: dict[str, Any]) -> dict[str, Any]:
a = dict(answer or {})
a.setdefault("id", uuid.uuid4().hex)
a.setdefault("versions", [])
a.setdefault("active_version", "")
a.setdefault("created_at", now_iso())
a.setdefault("updated_at", a["created_at"])
versions = [
normalize_version(v)
for v in a.get("versions", [])
if isinstance(v, dict)
]
a["versions"] = versions
if versions:
version_ids = {v["id"] for v in versions}
if a["active_version"] not in version_ids:
a["active_version"] = max(
versions,
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)["id"]
return a
def normalize_conversation(conversation: dict[str, Any]) -> dict[str, Any]:
c = dict(conversation or {})
c.setdefault("id", uuid.uuid4().hex)
c.setdefault("question", "")
c.setdefault("author", "Anonymous")
c.setdefault("created_at", now_iso())
c.setdefault("updated_at", c["created_at"])
c.setdefault("turns", [])
c.setdefault("answers", [])
turns: list[dict[str, Any]] = []
for turn in c.get("turns", []):
if not isinstance(turn, dict):
continue
t = dict(turn)
t.setdefault("id", uuid.uuid4().hex)
t.setdefault("role", "user")
t.setdefault("text", "")
t.setdefault("author", "Anonymous")
t.setdefault("ts", now_iso())
turns.append(t)
c["turns"] = turns
c["answers"] = [
normalize_answer(a)
for a in c.get("answers", [])
if isinstance(a, dict)
]
if not c["turns"] and c["question"]:
c["turns"].append({
"id": uuid.uuid4().hex,
"role": "user",
"text": c["question"],
"author": c.get("author", "Anonymous"),
"ts": c["created_at"],
})
return c
def load_conversation(conversation_id: str) -> Optional[dict[str, Any]]:
if not conversation_id:
return None
for conv in load_conversations():
if str(conv.get("id")) == conversation_id:
return normalize_conversation(conv)
return None
def save_conversation(conversation: dict[str, Any]) -> dict[str, Any]:
conversation = normalize_conversation(conversation)
conversation["updated_at"] = now_iso()
conversations = [normalize_conversation(c) for c in load_conversations()]
replaced = False
for i, existing in enumerate(conversations):
if str(existing.get("id")) == str(conversation["id"]):
conversations[i] = conversation
replaced = True
break
if not replaced:
conversations.insert(0, conversation)
save_conversations(conversations)
return conversation
def ensure_embedding(conversation: dict[str, Any]) -> None:
question = str(conversation.get("question", "")).strip()
if not question:
return
vec = embed_text(question)
if not vec:
return
idx = load_embed_index()
idx[str(conversation["id"])] = {
"question": question,
"vector": vec,
}
save_embed_index(idx)
# ────────────────────── Semantic search ──────────────────────
def find_similar_conversation(
question: str,
exclude_id: str | None = None,
) -> Optional[dict[str, Any]]:
idx = load_embed_index()
if not idx or SentenceTransformer is None:
return None
q_vec = np.array(embed_text(question), dtype=float)
if q_vec.size == 0:
return None
ids = [cid for cid in idx if cid != exclude_id]
if not ids:
return None
try:
vecs = np.array([idx[cid]["vector"] for cid in ids], dtype=float)
except Exception:
return None
if vecs.size == 0 or vecs.ndim != 2:
return None
if vecs.shape[1] != q_vec.shape[0]:
return None
sims = vecs @ q_vec
best_i = int(np.argmax(sims))
score = float(sims[best_i])
if score < SIMILARITY_THRESHOLD:
return None
conv = load_conversation(ids[best_i])
if conv is None:
return None
return {"conversation": conv, "score": score}
def find_top_k_similar(question: str, k: int = 3) -> list[dict[str, Any]]:
idx = load_embed_index()
if not idx or SentenceTransformer is None:
return []
q_vec = np.array(embed_text(question), dtype=float)
if q_vec.size == 0:
return []
results: list[tuple[str, float]] = []
for cid, data in idx.items():
try:
vec = np.array(data["vector"], dtype=float)
except Exception:
continue
if vec.shape != q_vec.shape:
continue
score = float(vec @ q_vec)
results.append((cid, score))
results.sort(key=lambda x: x[1], reverse=True)
out: list[dict[str, Any]] = []
for cid, score in results[:k]:
conv = load_conversation(cid)
if not conv:
continue
best = best_answer_payload(conv)
if not best:
continue
out.append({
"conversation_id": cid,
"question": conv.get("question"),
"answer": best["text"],
"score": score,
})
return out
# ────────────────────── Actions ──────────────────────
def create_conversation(question: str, author: str = "Anonymous") -> dict[str, Any]:
question = question.strip()
now = now_iso()
conversation = {
"id": uuid.uuid4().hex,
"question": question,
"author": author,
"created_at": now,
"updated_at": now,
"turns": [{
"id": uuid.uuid4().hex,
"role": "user",
"text": question,
"author": author,
"ts": now,
}],
"answers": [],
}
conversation = save_conversation(conversation)
ensure_embedding(conversation)
return conversation
def active_version(answer: dict[str, Any]) -> Optional[dict[str, Any]]:
versions = answer.get("versions", [])
if not versions:
return None
active_id = answer.get("active_version")
for version in versions:
if version.get("id") == active_id:
return version
return max(
versions,
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)
def answer_score(answer: dict[str, Any]) -> tuple[int, str]:
av = active_version(answer)
if av is None:
return 0, str(answer.get("created_at", ""))
return int(av.get("votes", 0)), str(answer.get("created_at", ""))
def best_answer_payload(conversation: dict[str, Any]) -> Optional[dict[str, Any]]:
answers = conversation.get("answers", [])
if not answers:
return None
best = max(answers, key=answer_score)
av = active_version(best)
if av is None:
return None
return {
"answer_id": best["id"],
"version_id": av["id"],
"text": av["text"],
"votes": int(av.get("votes", 0)),
"author": av.get("author", "Anonymous"),
"created_at": av.get("created_at", ""),
}
def add_answer(
conversation_id: str,
text: str,
author: str = "Anonymous",
question_if_new: str | None = None,
) -> tuple[Optional[dict[str, Any]], str]:
text = text.strip()
if not text:
return None, "empty answer"
conversation = load_conversation(conversation_id)
# If user is answering a NEW question (even if matched),
# create a new conversation instead of polluting the matched one.
if question_if_new:
if conversation is None or conversation.get("question") != question_if_new:
conversation = create_conversation(question_if_new, author)
elif conversation is None:
return None, "conversation not found"
now = now_iso()
version = normalize_version({
"text": text,
"author": author,
"created_at": now,
"votes": 0,
"votes_by_client": {},
})
answer = normalize_answer({
"id": uuid.uuid4().hex,
"versions": [version],
"active_version": version["id"],
"created_at": now,
"updated_at": now,
})
conversation["answers"].append(answer)
conversation["turns"].append({
"id": uuid.uuid4().hex,
"role": "assistant",
"text": text,
"author": author,
"answer_id": answer["id"],
"version_id": version["id"],
"ts": now,
})
save_conversation(conversation)
return conversation, "ok"
def propose_version(
conversation_id: str,
answer_id: str,
text: str,
author: str = "Anonymous",
) -> tuple[Optional[dict[str, Any]], str]:
text = text.strip()
if not text:
return None, "empty proposal"
conversation = load_conversation(conversation_id)
if conversation is None:
return None, "conversation not found"
for answer in conversation["answers"]:
if str(answer.get("id")) != answer_id:
continue
now = now_iso()
version = normalize_version({
"text": text,
"author": author,
"created_at": now,
"votes": 0,
"votes_by_client": {},
})
answer["versions"].append(version)
answer["updated_at"] = now
save_conversation(conversation)
return conversation, "ok"
return None, "answer not found"
def vote_version(
conversation_id: str,
answer_id: str,
version_id: str,
client_id: str,
delta: int,
) -> tuple[Optional[dict[str, Any]], str]:
conversation = load_conversation(conversation_id)
if conversation is None:
return None, "conversation not found"
delta = 1 if int(delta) >= 0 else -1
for answer in conversation["answers"]:
if str(answer.get("id")) != answer_id:
continue
for version in answer.get("versions", []):
if str(version.get("id")) != version_id:
continue
votes_by_client = version.setdefault("votes_by_client", {})
if not isinstance(votes_by_client, dict):
votes_by_client = {}
version["votes_by_client"] = votes_by_client
current = int(votes_by_client.get(client_id, 0))
if current == delta:
return conversation, "already_voted"
votes_by_client[client_id] = delta
version["votes"] = int(sum(int(v) for v in votes_by_client.values()))
if answer.get("versions"):
answer["active_version"] = max(
answer["versions"],
key=lambda v: (int(v.get("votes", 0)), str(v.get("created_at", ""))),
)["id"]
conversation["updated_at"] = now_iso()
save_conversation(conversation)
return conversation, "ok"
return None, "version not found"
# ────────────────────── Routes ──────────────────────
@app.get("/", response_class=HTMLResponse)
def home(request: Request):
init = {
"ok": True,
"client_id": get_client_id(request),
"conversation": None,
}
return templates.TemplateResponse(
"index.html",
{
"request": request,
"app_title": APP_TITLE,
"init_json": json.dumps(init, ensure_ascii=False),
},
)
@app.get("/logo.png")
def logo():
logo_path = Path(__file__).with_name("logo.png")
if logo_path.exists():
return FileResponse(logo_path)
return JSONResponse({"ok": False, "error": "logo not found"}, status_code=404)
@app.get("/health")
def health():
return {"ok": True}
@app.post("/api")
async def api(request: Request):
try:
payload = await request.json()
except Exception:
return JSONResponse({"ok": False, "error": "bad payload"})
action = str(payload.get("action", ""))
client_id = get_client_id(request, payload)
author = anon_label(client_id)
# ── init ──
if action == "init":
conversation_id = str(payload.get("conversation_id", "")).strip()
conversation = load_conversation(conversation_id) if conversation_id else None
return JSONResponse({
"ok": True,
"client_id": client_id,
"conversation": conversation,
})
# ── get_conversation ──
if action == "get_conversation":
conversation_id = str(payload.get("conversation_id", "")).strip()
conversation = load_conversation(conversation_id)
if conversation is None:
return JSONResponse({"ok": False, "error": "not found"})
return JSONResponse({"ok": True, "conversation": conversation})
# ── ask ──
if action == "ask":
question = str(payload.get("question", "")).strip()
if not question:
return JSONResponse({"ok": False, "error": "empty question"})
# 1) Search FIRST β€” before creating anything
match = find_similar_conversation(question)
if match and match.get("conversation"):
conversation = match["conversation"]
best = best_answer_payload(conversation)
related = find_top_k_similar(question, k=3)
return JSONResponse({
"ok": True,
"matched": True,
"similarity": match["score"],
"conversation": conversation,
"assistant_text": best["text"] if best else "No answer yet. You can write one.",
"best_answer": best,
"related": related,
})
# 2) No match β€” create new
conversation = create_conversation(question, author)
return JSONResponse({
"ok": True,
"matched": False,
"conversation": conversation,
"assistant_text": "No answer yet. You can write one.",
"best_answer": None,
"related": [],
})
# ── answer ──
if action == "answer":
conversation_id = str(payload.get("conversation_id", "")).strip()
text = str(payload.get("text", "")).strip()
question = str(payload.get("question", "")).strip() or None
conversation, msg = add_answer(
conversation_id=conversation_id,
text=text,
author=author,
question_if_new=question,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
return JSONResponse({"ok": True, "conversation": conversation})
# ── propose ──
if action == "propose":
conversation_id = str(payload.get("conversation_id", "")).strip()
answer_id = str(payload.get("answer_id", "")).strip()
text = str(payload.get("text", "")).strip()
conversation, msg = propose_version(
conversation_id=conversation_id,
answer_id=answer_id,
text=text,
author=author,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
return JSONResponse({"ok": True, "conversation": conversation})
# ── vote ──
if action == "vote":
conversation_id = str(payload.get("conversation_id", "")).strip()
answer_id = str(payload.get("answer_id", "")).strip()
version_id = str(payload.get("version_id", "")).strip()
delta = int(payload.get("delta", 1))
conversation, msg = vote_version(
conversation_id=conversation_id,
answer_id=answer_id,
version_id=version_id,
client_id=client_id,
delta=delta,
)
if conversation is None:
return JSONResponse({"ok": False, "error": msg})
if msg == "already_voted":
return JSONResponse({"ok": False, "error": "already voted"})
return JSONResponse({"ok": True, "conversation": conversation})
return JSONResponse({"ok": False, "error": f"unknown action: {action}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False)