peekabook-api / app /simulation /multi_session_simulator.py
lael
feat: initial deploy
3960e2b
Raw
History Blame Contribute Delete
17.9 kB
"""
๋ฉ€ํ‹ฐ์„ธ์…˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜
ํ•œ ํŽ˜๋ฅด์†Œ๋‚˜์˜ N์„ธ์…˜์„ ์ˆœ์„œ๋Œ€๋กœ ์‹คํ–‰ํ•˜๋ฉด์„œ:
- ์„ธ์…˜๋ณ„ DNA ์ถ”์ถœ โ†’ CRS โ†” PeekaReader ๋Œ€ํ™” โ†’ self/judge ํ‰๊ฐ€ โ†’ LTM ์—…๋ฐ์ดํŠธ
- ํŽ˜๋ฅด์†Œ๋‚˜ ๋‹จ์œ„ ChromaDB ๊ฒฉ๋ฆฌ (factory + unique path)
- W&B๋กœ ์„ธ์…˜๋ณ„ ๋ฉ”ํŠธ๋ฆญ ๋กœ๊น…
์ฃผ์š” ๋ณ€ํ™˜:
- Langfuse ์ œ๊ฑฐ, W&B ๋„์ž… (์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ ๋ ˆ์ด์–ด์—์„œ๋งŒ)
- ์ „์—ญ app โ†’ factory (create_app_fn ์ธ์ž)
- sync app.invoke + Command โ†’ async ainvoke + update_state (v4 ํŒจํ„ด)
- queue + threading + asyncio ํ•‘ํ
- PERSONA_BANK ์ „์—ญ ์˜์กด ์ œ๊ฑฐ โ†’ full_persona ๋ช…์‹œ ์ฃผ์ž…
"""
from __future__ import annotations
import asyncio
import copy
import queue
import threading
import time
from datetime import datetime
from typing import Any, Callable, Optional
import wandb
from langchain_core.messages import AIMessage, HumanMessage
from app.config import JUDGE_MODEL, KST, MAX_TURNS
from app.simulation.peeka_judge import (
judge_session,
update_long_term_memory,
)
from app.simulation.peeka_reader_agent import PeekaReaderAgent, extract_session_dna
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# CRS โ†” PeekaReader ํ•‘ํ (v4 ํŒจํ„ด ๊ธฐ๋ฐ˜)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _extract_ai_responses(state: dict[str, Any]) -> list[str]:
"""๊ฐ€์žฅ ๋งˆ์ง€๋ง‰ user ๋ฉ”์‹œ์ง€ ์ดํ›„์˜ AI ์‘๋‹ต๋“ค์„ ์‹œ๊ฐ„ ์ˆœ์œผ๋กœ ๋ฐ˜ํ™˜ํ•จ"""
messages = state.get("messages", [])
responses = []
for msg in reversed(messages):
if isinstance(msg, HumanMessage) or getattr(msg, "type", None) == "human":
break
if isinstance(msg, AIMessage) or getattr(msg, "type", None) == "ai":
responses.append(msg.content)
responses.reverse()
return responses
async def _run_crs(app,
thread_id: str,
initial_state: dict,
u2c: queue.Queue,
c2u: queue.Queue) -> None:
"""CRS ๊ทธ๋ž˜ํ”„๋ฅผ ๋Œ๋ฆฌ๋ฉด์„œ user input์„ queue์—์„œ ๋ฐ›์•„ ์ฃผ์ž…ํ•จ"""
session_config = {"configurable": {"thread_id": thread_id}}
state = copy.deepcopy(initial_state)
state["session_id"] = thread_id
result = await app.ainvoke(state, config=session_config)
while True:
snapshot = app.get_state(session_config)
if snapshot.next == ():
c2u.put({"__done__": True, "result": snapshot.values})
break
ai_responses = _extract_ai_responses(result)
if ai_responses:
c2u.put(ai_responses[-1])
user_input = u2c.get()
if user_input is None: # timeout ์‹ ํ˜ธ โ†’ CRS๋„ ์ข…๋ฃŒ
c2u.put({"__done__": True, "result": snapshot.values})
break
app.update_state(session_config, {"messages": [HumanMessage(content=user_input)]})
result = await app.ainvoke(None, config=session_config)
def _run_user_sim(persona_id: str,
persona_dna: dict,
collector: dict,
u2c: queue.Queue,
c2u: queue.Queue,
max_turns: int,
verbose: bool) -> None:
"""PeekaReader ์Šค๋ ˆ๋“œ: queue์—์„œ CRS ์งˆ๋ฌธ ๋ฐ›๊ณ  ์‘๋‹ต์„ ๋ณด๋ƒ„"""
agent = PeekaReaderAgent(persona_id, persona_dna, verbose=verbose)
collector["agent"] = agent
collector["conversation"] = []
while True:
message = c2u.get()
# CRS ์ข…๋ฃŒ ์‹ ํ˜ธ
if isinstance(message, dict) and message.get("__done__"):
collector["crs_result"] = message["result"]
collector["status"] = "success"
break
# max_turns ์ดˆ๊ณผ
if agent.turn_count >= max_turns:
collector["status"] = "timeout"
u2c.put(None) # CRS์— ์ข…๋ฃŒ ์š”์ฒญ
# CRS์˜ __done__ ํšŒ์ˆ˜
try:
m = c2u.get(timeout=10)
if isinstance(m, dict) and m.get("__done__"):
collector["crs_result"] = m["result"]
except queue.Empty:
collector["crs_result"] = None
break
# ์ •์ƒ ์‘๋‹ต
ans = agent.answer(str(message))
collector["conversation"].append({
"turn": agent.turn_count,
"csr": str(message),
"thought": ans["thought"],
"user": ans["utterance"],
})
u2c.put(ans["utterance"])
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ํ•œ ์„ธ์…˜ ์‹คํ–‰
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def run_session(app,
initial_state: dict,
persona_id: str,
session_id: int,
persona_dna: dict,
*,
max_turns: int = MAX_TURNS,
verbose: bool = True) -> dict:
"""
ํŽ˜๋ฅด์†Œ๋‚˜ ํ•œ ์„ธ์…˜์„ ๋๊นŒ์ง€ ์‹คํ–‰ํ•จ (CRS โ†” PeekaReader ํ•‘ํ).
์ถ”์ฒœ ์ข…๋ฃŒ ํ›„ PeekaReader self-evaluation๊นŒ์ง€ ์ˆ˜ํ–‰.
Args:
app: create_app_fn(...)์œผ๋กœ ์ƒ์„ฑ๋œ ์ปดํŒŒ์ผ๋œ LangGraph
initial_state: CRS ์ดˆ๊ธฐ state dict (graph ๋ชจ๋“ˆ์—์„œ import)
persona_id, session_id: ์‹๋ณ„์ž (thread_id ์ƒ์„ฑ์šฉ)
persona_dna: extract_session_dna() ๊ฒฐ๊ณผ
max_turns: PeekaReader ๋‹ต๋ณ€ ์ตœ๋Œ€ ํšŸ์ˆ˜ (timeout ์•ˆ์ „์žฅ์น˜)
"""
u2c: queue.Queue = queue.Queue()
c2u: queue.Queue = queue.Queue()
collector: dict = {"status": "running"}
thread_id = f"sim_{persona_id}_s{session_id}_{int(time.time())}"
start = time.time()
t = threading.Thread(
target=_run_user_sim,
args=(persona_id, persona_dna, collector, u2c, c2u, max_turns, verbose),
daemon=True,
)
t.start()
try:
await _run_crs(app, thread_id, initial_state, u2c, c2u)
except Exception as e:
collector["status"] = f"error: {e}"
if verbose:
print(f"\n[์˜ค๋ฅ˜] CRS ์‹คํ–‰ ์‹คํŒจ: {e}")
finally:
t.join(timeout=30)
elapsed = round(time.time() - start, 2)
agent = collector.get("agent")
crs_result = collector.get("crs_result")
status = collector.get("status", "unknown")
# PeekaReader self-evaluation
self_evaluation = None
book_intros: dict = {}
recommendation_text = None
if status == "success" and crs_result and agent is not None:
messages = crs_result.get("messages", [])
if messages:
last = messages[-1]
recommendation_text = last.content if hasattr(last, "content") else str(last)
# book_intros ์ถ”์ถœ
try:
recommendations = crs_result.get("recommendations", [])
retrieved = {
b["isbn"]: b
for b in crs_result.get("retrieved_books", [])
if b.get("isbn")
}
for rec in recommendations:
isbn = rec.get("isbn", "")
title = rec.get("title", "")
if isbn in retrieved and retrieved[isbn].get("book_intro"):
book_intros[title] = retrieved[isbn]["book_intro"]
except Exception as e:
if verbose:
print(f" [book_intro ์ถ”์ถœ ์‹คํŒจ] {e}")
if verbose:
print(f" [book_intro {'๋กœ๋“œ' if book_intros else '์—†์Œ โ€” ํ‰๊ฐ€ ์Šคํ‚ต'}] "
f"{len(book_intros)}๊ถŒ")
if recommendation_text:
self_evaluation = agent.evaluate(recommendation_text, book_intros)
return {
"persona_id": persona_id,
"session_id": session_id,
"status": status,
"response_time_sec": elapsed,
"total_turns": agent.turn_count if agent else 0,
"conversation": collector.get("conversation", []),
"recommendation_text": recommendation_text,
"retrieved_books": crs_result.get("retrieved_books", []) if crs_result else [],
"recommendations": crs_result.get("recommendations", []) if crs_result else [],
"self_evaluation": self_evaluation,
"eval_mode": "evaluated" if book_intros else "skipped",
"book_intro_loaded": len(book_intros),
"simulated_at": datetime.now(tz=KST).isoformat(),
}
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ๋ฉ€ํ‹ฐ์„ธ์…˜ (ํ•œ ํŽ˜๋ฅด์†Œ๋‚˜ ์ „์ฒด)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# verdict๋ฅผ W&B numeric chart์šฉ์œผ๋กœ ์ธ์ฝ”๋”ฉ (๋†’์„์ˆ˜๋ก ๋งŒ์กฑ)
VERDICT_CODES = {
"satisfied": 4,
"partial": 3,
"unsatisfied": 2,
"too_hard": 1,
"genre_mismatch": 1,
"duplicate": 1,
}
def _safe_wandb_log(data: dict, step: Optional[int] = None) -> None:
"""wandb.run์ด None์ด๋ฉด no-op. ์ง„์ž…์ ์—์„œ init ์•ˆ ํ•œ ๊ฒฝ์šฐ๋„ ์•ˆ์ „ ๋™์ž‘."""
if wandb.run is None:
return
if step is not None:
wandb.log(data, step=step)
else:
wandb.log(data)
def run_multi_session(persona_id: str,
full_persona: dict,
run_id: str,
create_app_fn: Callable,
initial_state: dict,
*,
chroma_base_dir: str = "backend/chroma_db_runs",
judge_model: str = JUDGE_MODEL,
use_judge: bool = False,
n_sessions: Optional[int] = None,
max_turns: int = MAX_TURNS,
verbose: bool = True) -> dict:
"""
ํ•œ ํŽ˜๋ฅด์†Œ๋‚˜์˜ N์„ธ์…˜์„ ์ˆœ์„œ๋Œ€๋กœ ์‹คํ–‰ํ•˜๋ฉด์„œ ๋งค ์„ธ์…˜:
1) extract_session_dna (๊ณ ์ •์†์„ฑ + ์„ธ์…˜DNA + ๋ˆ„์ ์ทจํ–ฅ)
2) run_session (CRS โ†” PeekaReader ํ•‘ํ + self-eval)
3) judge_session (PeekaJudge ๋…๋ฆฝ ํ‰๊ฐ€)
4) update_long_term_memory (๊ทœ์น™ ๊ธฐ๋ฐ˜ verdict + LTM ๋ˆ„์ )
5) wandb.log (์„ธ์…˜๋ณ„ ๋ฉ”ํŠธ๋ฆญ)
Factory + unique path๋กœ ChromaDB๋Š” ํŽ˜๋ฅด์†Œ๋‚˜ ๋‹จ์œ„ ๊ฒฉ๋ฆฌ๋จ.
Args:
persona_id: ์˜ˆ) "A_์ตœ์žฌ์›"
full_persona: PERSONA_BANK[persona_id] dict
run_id: ์‹คํ–‰ ์‹๋ณ„์ž (๋ณดํ†ต timestamp). ๊ฐ™์€ ํŽ˜๋ฅด์†Œ๋‚˜๋ผ๋„ run๋งˆ๋‹ค chroma ๊ฒฉ๋ฆฌ
create_app_fn: factory ํ•จ์ˆ˜ (์˜ˆ: graph_test2.create_app)
initial_state: graph ๋ชจ๋“ˆ์—์„œ importํ•œ CRS ์ดˆ๊ธฐ state
judge_model: Judge ๋ชจ๋ธ. default๋Š” Claude Haiku 4.5. sweep์œผ๋กœ ๋ชจ๋ธ ๋น„๊ต ๊ฐ€๋Šฅ
n_sessions: None์ด๋ฉด ํŽ˜๋ฅด์†Œ๋‚˜์˜ ๋ชจ๋“  ์„ธ์…˜ ์‹คํ–‰
"""
total = len(full_persona["sessions"])
if n_sessions is not None:
total = min(total, n_sessions)
chroma_db_path = f"{chroma_base_dir}/{run_id}_{persona_id}"
app = create_app_fn(chroma_db_path=chroma_db_path)
if verbose:
print(f"\n{'='*60}")
print(f"๋ฉ€ํ‹ฐ์„ธ์…˜ ์‹œ์ž‘: {persona_id} ({total} ์„ธ์…˜)")
print(f"ChromaDB: {chroma_db_path}")
print(f"Judge model: {judge_model}")
print(f"{'='*60}")
sessions_log: list = []
table_rows: list = []
for session_spec in full_persona["sessions"][:total]:
session_id = session_spec["session_id"]
if verbose:
memory = full_persona["long_term_memory"]
print(f"\n{'โ”€'*60}")
print(f"[์„ธ์…˜ {session_id}/{total}] {session_spec.get('preferred_genre', '')}")
print(f" [๋ˆ„์  ์ทจํ–ฅ] {memory['derived_preferences']}")
print(f" [์ด์ „ ์ถ”์ฒœ] {len(memory['previously_recommended'])}๊ถŒ")
print(f"{'โ”€'*60}")
# 1) session DNA ์ถ”์ถœ
session_dna = extract_session_dna(full_persona, session_id)
# 2) ํ•œ ์„ธ์…˜ ์‹คํ–‰ (async)
try:
session_result = asyncio.run(run_session(
app=app,
initial_state=initial_state,
persona_id=persona_id,
session_id=session_id,
persona_dna=session_dna,
max_turns=max_turns,
verbose=verbose,
))
except Exception as e:
print(f" [์˜ค๋ฅ˜] run_session ์‹คํŒจ: {e}")
sessions_log.append({"session_id": session_id, "status": f"error: {e}"})
continue
# 3) PeekaJudge ํ‰๊ฐ€
judge_result: dict = {}
if use_judge and session_result.get("status") == "success":
try:
judge_result = judge_session(
session_result=session_result,
persona=session_dna,
stage="peekajudge",
model=judge_model,
verbose=verbose,
)
except Exception as e:
print(f" [์˜ค๋ฅ˜] judge_session ์‹คํŒจ: {e}")
# 4) LTM ์—…๋ฐ์ดํŠธ
verdict = None
if session_result.get("status") == "success":
try:
update_long_term_memory(
full_persona=full_persona,
session_id=session_id,
session_dna=session_dna,
session_result=session_result,
judge_result=judge_result,
)
history = full_persona["long_term_memory"]["feedback_history"]
verdict = history[-1]["verdict"] if history else None
except Exception as e:
print(f" [์˜ค๋ฅ˜] update_long_term_memory ์‹คํŒจ: {e}")
# ๋ฉ”ํŠธ๋ฆญ ๊ณ„์‚ฐ
self_eval = session_result.get("self_evaluation") or {}
self_books = self_eval.get("books_evaluated", [])
self_match_rate = (
sum(1 for b in self_books if b.get("match")) / len(self_books)
if self_books else 0.0
)
judge_match_rate = judge_result.get("book_match_rate", 0.0)
ltm = full_persona["long_term_memory"]
latest_fb = ltm["feedback_history"][-1] if ltm["feedback_history"] else {}
# 5) W&B log per session
_safe_wandb_log({
"match_rate/peekareader_self": self_match_rate,
"match_rate/peekajudge": judge_match_rate,
"verdict_code": VERDICT_CODES.get(verdict, 0),
"verdict": verdict or "unknown",
"n_difficulty_mismatch": len(latest_fb.get("difficulty_mismatch", [])),
"n_genre_mismatch": len(latest_fb.get("genre_mismatch", [])),
"n_duplicates": len(latest_fb.get("duplicates", [])),
"derived_prefs_count": len(ltm["derived_preferences"]),
"previously_recommended_count": len(ltm["previously_recommended"]),
"turn_count": session_result.get("total_turns", 0),
"response_time_sec": session_result.get("response_time_sec", 0.0),
"status_success": 1 if session_result.get("status") == "success" else 0,
}, step=session_id)
# session log ๋ˆ„์  (multi_sim_logger.py์—์„œ ์ €์žฅ)
sessions_log.append({
"session_id": session_id,
"preferred_genre": session_spec.get("preferred_genre", ""),
"status": session_result.get("status", "unknown"),
"self_match_rate": self_match_rate,
"judge_match_rate": judge_match_rate,
"verdict": verdict,
"conversation": session_result.get("conversation", []),
"recommendations": session_result.get("recommendations", []),
})
# table row (๋งˆ์ง€๋ง‰์— ํ•œ๊บผ๋ฒˆ์— W&B Table๋กœ log)
recs_titles = ", ".join(
r.get("title", "") for r in session_result.get("recommendations", [])
)
table_rows.append([
session_id,
session_spec.get("preferred_genre", "")[:30],
session_result.get("status", "unknown"),
round(self_match_rate, 2),
round(judge_match_rate, 2),
verdict or "โ€”",
recs_titles[:120],
])
# ํŽ˜๋ฅด์†Œ๋‚˜ ์ข…๋ฃŒ โ€” W&B Table ํ•œ ๋ฒˆ์— log
if wandb.run is not None:
table = wandb.Table(
columns=["session_id", "preferred_genre", "status",
"self_match_rate", "judge_match_rate", "verdict", "recommendations"],
data=table_rows,
)
wandb.log({"sessions_detail": table})
# ์š”์•ฝ ์ถœ๋ ฅ
if verbose:
print(f"\n{'='*60}")
print(f"๋ฉ€ํ‹ฐ์„ธ์…˜ ์™„๋ฃŒ: {persona_id}")
print(f"{'='*60}")
for s in sessions_log:
jmr = s.get("judge_match_rate")
rate = f"{jmr:.0%}" if jmr is not None else "N/A"
print(f" ์„ธ์…˜ {s['session_id']:2d} | "
f"{s.get('preferred_genre', '')[:20]:20s} | "
f"judge: {rate:>5s} | verdict: {s.get('verdict') or 'โ€”'}")
return {
"persona_id": persona_id,
"total_sessions": total,
"sessions": sessions_log,
"final_memory": copy.deepcopy(full_persona["long_term_memory"]),
"completed_at": datetime.now(tz=KST).isoformat(),
}