lore-demo / lore /evolve /daemon.py
AsadIsmail's picture
Update Lore demo Space
5cb0758 verified
"""
Daemon process: keeps the LoRA model in memory for instant suggestions.
Run once in a terminal:
uv run lore-train serve
Then all suggestion/training calls go through HTTP (~100ms) instead of
loading the model from scratch (~17s) each time.
Endpoints:
GET /suggest?n=3 β€” generate follow-up questions
POST /train/curiosity β€” trigger curiosity training
GET /status β€” model + training stats
GET /health β€” liveness check
"""
from __future__ import annotations
import threading
from pathlib import Path
import torch
from fastapi import FastAPI
from pydantic import BaseModel
from lore.config import LORA_CHECKPOINTS_DIR, LORA_BASE_MODEL_ID, HF_CACHE_DIR, get_device_map, get_torch_dtype
app = FastAPI(title="Lore Daemon", version="0.1.0")
_model = None
_tokenizer = None
_lock = threading.Lock()
_current_checkpoint: Path | None = None
def _get_latest_checkpoint() -> Path | None:
if not LORA_CHECKPOINTS_DIR.exists():
return None
checkpoints = sorted(LORA_CHECKPOINTS_DIR.glob("step-*"))
return checkpoints[-1] if checkpoints else None
def _load_model(checkpoint: Path | None = None):
global _model, _tokenizer, _current_checkpoint
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
print(f"[daemon] Loading model: {LORA_BASE_MODEL_ID}")
tokenizer = AutoTokenizer.from_pretrained(
LORA_BASE_MODEL_ID,
cache_dir=str(HF_CACHE_DIR),
trust_remote_code=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
base = AutoModelForCausalLM.from_pretrained(
LORA_BASE_MODEL_ID,
cache_dir=str(HF_CACHE_DIR),
torch_dtype=get_torch_dtype(),
device_map=get_device_map(),
trust_remote_code=True,
)
if checkpoint and checkpoint.exists():
print(f"[daemon] Loading LoRA checkpoint: {checkpoint.name}")
model = PeftModel.from_pretrained(base, str(checkpoint))
else:
model = base
model.eval()
_model = model
_tokenizer = tokenizer
_current_checkpoint = checkpoint
print(f"[daemon] Model ready (checkpoint: {checkpoint.name if checkpoint else 'base'})")
def _ensure_model():
global _model
with _lock:
if _model is None:
_load_model(_get_latest_checkpoint())
return _model, _tokenizer
def _reload_if_new_checkpoint():
"""Check for new checkpoint and reload if needed."""
global _current_checkpoint
latest = _get_latest_checkpoint()
if latest and latest != _current_checkpoint:
print(f"[daemon] New checkpoint detected: {latest.name}")
with _lock:
_load_model(latest)
@app.on_event("startup")
async def startup():
_ensure_model()
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": _model is not None}
@app.get("/status")
async def status():
from lore.evolve.trajectory import get_question_trace_stats
return {
"model": LORA_BASE_MODEL_ID,
"checkpoint": _current_checkpoint.name if _current_checkpoint else "base",
"question_traces": get_question_trace_stats(),
}
class SuggestResponse(BaseModel):
suggestions: list[dict]
@app.get("/suggest", response_model=SuggestResponse)
async def suggest(n: int = 3):
_reload_if_new_checkpoint()
from lore.evolve.curiosity import (
build_wiki_state_summary, question_reward, CURIOSITY_SYSTEM_PROMPT,
)
from lore.evolve.trajectory import get_all_past_questions
model, tokenizer = _ensure_model()
wiki_state = build_wiki_state_summary()
past_questions = get_all_past_questions()
if wiki_state == "[Empty wiki]":
return SuggestResponse(suggestions=[])
prompt = (
f"Wiki state:\n{wiki_state}\n\n"
f"Recent questions asked:\n"
+ "\n".join(f"- {q}" for q in past_questions[-10:])
+ "\n\nGenerate a follow-up question this researcher should explore:"
)
suggestions = []
for _ in range(n * 2):
messages = [
{"role": "system", "content": CURIOSITY_SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True,
enable_thinking=False,
)
inputs = tokenizer([text], return_tensors="pt", truncation=True, max_length=1024)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=100,
do_sample=True,
temperature=1.2,
top_p=0.95,
pad_token_id=tokenizer.eos_token_id,
)
new_tokens = output[0][inputs["input_ids"].shape[1]:]
candidate = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
if not candidate or len(candidate) < 10:
continue
candidate = candidate.split("\n")[0].strip()
reward = question_reward(candidate, wiki_state, past_questions)
suggestions.append({"question": candidate, **reward})
suggestions.sort(key=lambda s: s["combined"], reverse=True)
import re
def _norm(s): return re.sub(r"[^\w\s]", "", s.lower()).strip()
seen = set()
unique = []
for s in suggestions:
q_norm = _norm(s["question"])
if q_norm not in seen:
seen.add(q_norm)
unique.append(s)
if len(unique) >= n:
break
return SuggestResponse(suggestions=unique)
@app.post("/train/curiosity")
async def train_curiosity():
"""Trigger curiosity training in a background thread."""
def _run():
_reload_if_new_checkpoint()
from lore.evolve.trainer import run_curiosity_training
run_curiosity_training()
_reload_if_new_checkpoint()
thread = threading.Thread(target=_run, daemon=True)
thread.start()
return {"status": "training_started"}