Spaces:
Running
Running
| """ | |
| ๋ฉํฐ์ธ์ ์๋ฎฌ๋ ์ด์ ์ค์ผ์คํธ๋ ์ด์ | |
| ํ ํ๋ฅด์๋์ N์ธ์ ์ ์์๋๋ก ์คํํ๋ฉด์: | |
| - ์ธ์ ๋ณ DNA ์ถ์ถ โ CRS โ PeekaReader ๋ํ โ self/judge ํ๊ฐ โ LTM ์ ๋ฐ์ดํธ | |
| - ํ๋ฅด์๋ ๋จ์ ChromaDB ๊ฒฉ๋ฆฌ (factory + unique path) | |
| - W&B๋ก ์ธ์ ๋ณ ๋ฉํธ๋ฆญ ๋ก๊น | |
| ์ฃผ์ ๋ณํ: | |
| - Langfuse ์ ๊ฑฐ, W&B ๋์ (์ค์ผ์คํธ๋ ์ด์ ๋ ์ด์ด์์๋ง) | |
| - ์ ์ญ app โ factory (create_app_fn ์ธ์) | |
| - sync app.invoke + Command โ async ainvoke + update_state (v4 ํจํด) | |
| - queue + threading + asyncio ํํ | |
| - PERSONA_BANK ์ ์ญ ์์กด ์ ๊ฑฐ โ full_persona ๋ช ์ ์ฃผ์ | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import copy | |
| import queue | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Callable, Optional | |
| import wandb | |
| from langchain_core.messages import AIMessage, HumanMessage | |
| from app.config import JUDGE_MODEL, KST, MAX_TURNS | |
| from app.simulation.peeka_judge import ( | |
| judge_session, | |
| update_long_term_memory, | |
| ) | |
| from app.simulation.peeka_reader_agent import PeekaReaderAgent, extract_session_dna | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # CRS โ PeekaReader ํํ (v4 ํจํด ๊ธฐ๋ฐ) | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def _extract_ai_responses(state: dict[str, Any]) -> list[str]: | |
| """๊ฐ์ฅ ๋ง์ง๋ง user ๋ฉ์์ง ์ดํ์ AI ์๋ต๋ค์ ์๊ฐ ์์ผ๋ก ๋ฐํํจ""" | |
| messages = state.get("messages", []) | |
| responses = [] | |
| for msg in reversed(messages): | |
| if isinstance(msg, HumanMessage) or getattr(msg, "type", None) == "human": | |
| break | |
| if isinstance(msg, AIMessage) or getattr(msg, "type", None) == "ai": | |
| responses.append(msg.content) | |
| responses.reverse() | |
| return responses | |
| async def _run_crs(app, | |
| thread_id: str, | |
| initial_state: dict, | |
| u2c: queue.Queue, | |
| c2u: queue.Queue) -> None: | |
| """CRS ๊ทธ๋ํ๋ฅผ ๋๋ฆฌ๋ฉด์ user input์ queue์์ ๋ฐ์ ์ฃผ์ ํจ""" | |
| session_config = {"configurable": {"thread_id": thread_id}} | |
| state = copy.deepcopy(initial_state) | |
| state["session_id"] = thread_id | |
| result = await app.ainvoke(state, config=session_config) | |
| while True: | |
| snapshot = app.get_state(session_config) | |
| if snapshot.next == (): | |
| c2u.put({"__done__": True, "result": snapshot.values}) | |
| break | |
| ai_responses = _extract_ai_responses(result) | |
| if ai_responses: | |
| c2u.put(ai_responses[-1]) | |
| user_input = u2c.get() | |
| if user_input is None: # timeout ์ ํธ โ CRS๋ ์ข ๋ฃ | |
| c2u.put({"__done__": True, "result": snapshot.values}) | |
| break | |
| app.update_state(session_config, {"messages": [HumanMessage(content=user_input)]}) | |
| result = await app.ainvoke(None, config=session_config) | |
| def _run_user_sim(persona_id: str, | |
| persona_dna: dict, | |
| collector: dict, | |
| u2c: queue.Queue, | |
| c2u: queue.Queue, | |
| max_turns: int, | |
| verbose: bool) -> None: | |
| """PeekaReader ์ค๋ ๋: queue์์ CRS ์ง๋ฌธ ๋ฐ๊ณ ์๋ต์ ๋ณด๋""" | |
| agent = PeekaReaderAgent(persona_id, persona_dna, verbose=verbose) | |
| collector["agent"] = agent | |
| collector["conversation"] = [] | |
| while True: | |
| message = c2u.get() | |
| # CRS ์ข ๋ฃ ์ ํธ | |
| if isinstance(message, dict) and message.get("__done__"): | |
| collector["crs_result"] = message["result"] | |
| collector["status"] = "success" | |
| break | |
| # max_turns ์ด๊ณผ | |
| if agent.turn_count >= max_turns: | |
| collector["status"] = "timeout" | |
| u2c.put(None) # CRS์ ์ข ๋ฃ ์์ฒญ | |
| # CRS์ __done__ ํ์ | |
| try: | |
| m = c2u.get(timeout=10) | |
| if isinstance(m, dict) and m.get("__done__"): | |
| collector["crs_result"] = m["result"] | |
| except queue.Empty: | |
| collector["crs_result"] = None | |
| break | |
| # ์ ์ ์๋ต | |
| ans = agent.answer(str(message)) | |
| collector["conversation"].append({ | |
| "turn": agent.turn_count, | |
| "csr": str(message), | |
| "thought": ans["thought"], | |
| "user": ans["utterance"], | |
| }) | |
| u2c.put(ans["utterance"]) | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ํ ์ธ์ ์คํ | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| async def run_session(app, | |
| initial_state: dict, | |
| persona_id: str, | |
| session_id: int, | |
| persona_dna: dict, | |
| *, | |
| max_turns: int = MAX_TURNS, | |
| verbose: bool = True) -> dict: | |
| """ | |
| ํ๋ฅด์๋ ํ ์ธ์ ์ ๋๊น์ง ์คํํจ (CRS โ PeekaReader ํํ). | |
| ์ถ์ฒ ์ข ๋ฃ ํ PeekaReader self-evaluation๊น์ง ์ํ. | |
| Args: | |
| app: create_app_fn(...)์ผ๋ก ์์ฑ๋ ์ปดํ์ผ๋ LangGraph | |
| initial_state: CRS ์ด๊ธฐ state dict (graph ๋ชจ๋์์ import) | |
| persona_id, session_id: ์๋ณ์ (thread_id ์์ฑ์ฉ) | |
| persona_dna: extract_session_dna() ๊ฒฐ๊ณผ | |
| max_turns: PeekaReader ๋ต๋ณ ์ต๋ ํ์ (timeout ์์ ์ฅ์น) | |
| """ | |
| u2c: queue.Queue = queue.Queue() | |
| c2u: queue.Queue = queue.Queue() | |
| collector: dict = {"status": "running"} | |
| thread_id = f"sim_{persona_id}_s{session_id}_{int(time.time())}" | |
| start = time.time() | |
| t = threading.Thread( | |
| target=_run_user_sim, | |
| args=(persona_id, persona_dna, collector, u2c, c2u, max_turns, verbose), | |
| daemon=True, | |
| ) | |
| t.start() | |
| try: | |
| await _run_crs(app, thread_id, initial_state, u2c, c2u) | |
| except Exception as e: | |
| collector["status"] = f"error: {e}" | |
| if verbose: | |
| print(f"\n[์ค๋ฅ] CRS ์คํ ์คํจ: {e}") | |
| finally: | |
| t.join(timeout=30) | |
| elapsed = round(time.time() - start, 2) | |
| agent = collector.get("agent") | |
| crs_result = collector.get("crs_result") | |
| status = collector.get("status", "unknown") | |
| # PeekaReader self-evaluation | |
| self_evaluation = None | |
| book_intros: dict = {} | |
| recommendation_text = None | |
| if status == "success" and crs_result and agent is not None: | |
| messages = crs_result.get("messages", []) | |
| if messages: | |
| last = messages[-1] | |
| recommendation_text = last.content if hasattr(last, "content") else str(last) | |
| # book_intros ์ถ์ถ | |
| try: | |
| recommendations = crs_result.get("recommendations", []) | |
| retrieved = { | |
| b["isbn"]: b | |
| for b in crs_result.get("retrieved_books", []) | |
| if b.get("isbn") | |
| } | |
| for rec in recommendations: | |
| isbn = rec.get("isbn", "") | |
| title = rec.get("title", "") | |
| if isbn in retrieved and retrieved[isbn].get("book_intro"): | |
| book_intros[title] = retrieved[isbn]["book_intro"] | |
| except Exception as e: | |
| if verbose: | |
| print(f" [book_intro ์ถ์ถ ์คํจ] {e}") | |
| if verbose: | |
| print(f" [book_intro {'๋ก๋' if book_intros else '์์ โ ํ๊ฐ ์คํต'}] " | |
| f"{len(book_intros)}๊ถ") | |
| if recommendation_text: | |
| self_evaluation = agent.evaluate(recommendation_text, book_intros) | |
| return { | |
| "persona_id": persona_id, | |
| "session_id": session_id, | |
| "status": status, | |
| "response_time_sec": elapsed, | |
| "total_turns": agent.turn_count if agent else 0, | |
| "conversation": collector.get("conversation", []), | |
| "recommendation_text": recommendation_text, | |
| "retrieved_books": crs_result.get("retrieved_books", []) if crs_result else [], | |
| "recommendations": crs_result.get("recommendations", []) if crs_result else [], | |
| "self_evaluation": self_evaluation, | |
| "eval_mode": "evaluated" if book_intros else "skipped", | |
| "book_intro_loaded": len(book_intros), | |
| "simulated_at": datetime.now(tz=KST).isoformat(), | |
| } | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๋ฉํฐ์ธ์ (ํ ํ๋ฅด์๋ ์ ์ฒด) | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # verdict๋ฅผ W&B numeric chart์ฉ์ผ๋ก ์ธ์ฝ๋ฉ (๋์์๋ก ๋ง์กฑ) | |
| VERDICT_CODES = { | |
| "satisfied": 4, | |
| "partial": 3, | |
| "unsatisfied": 2, | |
| "too_hard": 1, | |
| "genre_mismatch": 1, | |
| "duplicate": 1, | |
| } | |
| def _safe_wandb_log(data: dict, step: Optional[int] = None) -> None: | |
| """wandb.run์ด None์ด๋ฉด no-op. ์ง์ ์ ์์ init ์ ํ ๊ฒฝ์ฐ๋ ์์ ๋์.""" | |
| if wandb.run is None: | |
| return | |
| if step is not None: | |
| wandb.log(data, step=step) | |
| else: | |
| wandb.log(data) | |
| def run_multi_session(persona_id: str, | |
| full_persona: dict, | |
| run_id: str, | |
| create_app_fn: Callable, | |
| initial_state: dict, | |
| *, | |
| chroma_base_dir: str = "backend/chroma_db_runs", | |
| judge_model: str = JUDGE_MODEL, | |
| use_judge: bool = False, | |
| n_sessions: Optional[int] = None, | |
| max_turns: int = MAX_TURNS, | |
| verbose: bool = True) -> dict: | |
| """ | |
| ํ ํ๋ฅด์๋์ N์ธ์ ์ ์์๋๋ก ์คํํ๋ฉด์ ๋งค ์ธ์ : | |
| 1) extract_session_dna (๊ณ ์ ์์ฑ + ์ธ์ DNA + ๋์ ์ทจํฅ) | |
| 2) run_session (CRS โ PeekaReader ํํ + self-eval) | |
| 3) judge_session (PeekaJudge ๋ ๋ฆฝ ํ๊ฐ) | |
| 4) update_long_term_memory (๊ท์น ๊ธฐ๋ฐ verdict + LTM ๋์ ) | |
| 5) wandb.log (์ธ์ ๋ณ ๋ฉํธ๋ฆญ) | |
| Factory + unique path๋ก ChromaDB๋ ํ๋ฅด์๋ ๋จ์ ๊ฒฉ๋ฆฌ๋จ. | |
| Args: | |
| persona_id: ์) "A_์ต์ฌ์" | |
| full_persona: PERSONA_BANK[persona_id] dict | |
| run_id: ์คํ ์๋ณ์ (๋ณดํต timestamp). ๊ฐ์ ํ๋ฅด์๋๋ผ๋ run๋ง๋ค chroma ๊ฒฉ๋ฆฌ | |
| create_app_fn: factory ํจ์ (์: graph_test2.create_app) | |
| initial_state: graph ๋ชจ๋์์ importํ CRS ์ด๊ธฐ state | |
| judge_model: Judge ๋ชจ๋ธ. default๋ Claude Haiku 4.5. sweep์ผ๋ก ๋ชจ๋ธ ๋น๊ต ๊ฐ๋ฅ | |
| n_sessions: None์ด๋ฉด ํ๋ฅด์๋์ ๋ชจ๋ ์ธ์ ์คํ | |
| """ | |
| total = len(full_persona["sessions"]) | |
| if n_sessions is not None: | |
| total = min(total, n_sessions) | |
| chroma_db_path = f"{chroma_base_dir}/{run_id}_{persona_id}" | |
| app = create_app_fn(chroma_db_path=chroma_db_path) | |
| if verbose: | |
| print(f"\n{'='*60}") | |
| print(f"๋ฉํฐ์ธ์ ์์: {persona_id} ({total} ์ธ์ )") | |
| print(f"ChromaDB: {chroma_db_path}") | |
| print(f"Judge model: {judge_model}") | |
| print(f"{'='*60}") | |
| sessions_log: list = [] | |
| table_rows: list = [] | |
| for session_spec in full_persona["sessions"][:total]: | |
| session_id = session_spec["session_id"] | |
| if verbose: | |
| memory = full_persona["long_term_memory"] | |
| print(f"\n{'โ'*60}") | |
| print(f"[์ธ์ {session_id}/{total}] {session_spec.get('preferred_genre', '')}") | |
| print(f" [๋์ ์ทจํฅ] {memory['derived_preferences']}") | |
| print(f" [์ด์ ์ถ์ฒ] {len(memory['previously_recommended'])}๊ถ") | |
| print(f"{'โ'*60}") | |
| # 1) session DNA ์ถ์ถ | |
| session_dna = extract_session_dna(full_persona, session_id) | |
| # 2) ํ ์ธ์ ์คํ (async) | |
| try: | |
| session_result = asyncio.run(run_session( | |
| app=app, | |
| initial_state=initial_state, | |
| persona_id=persona_id, | |
| session_id=session_id, | |
| persona_dna=session_dna, | |
| max_turns=max_turns, | |
| verbose=verbose, | |
| )) | |
| except Exception as e: | |
| print(f" [์ค๋ฅ] run_session ์คํจ: {e}") | |
| sessions_log.append({"session_id": session_id, "status": f"error: {e}"}) | |
| continue | |
| # 3) PeekaJudge ํ๊ฐ | |
| judge_result: dict = {} | |
| if use_judge and session_result.get("status") == "success": | |
| try: | |
| judge_result = judge_session( | |
| session_result=session_result, | |
| persona=session_dna, | |
| stage="peekajudge", | |
| model=judge_model, | |
| verbose=verbose, | |
| ) | |
| except Exception as e: | |
| print(f" [์ค๋ฅ] judge_session ์คํจ: {e}") | |
| # 4) LTM ์ ๋ฐ์ดํธ | |
| verdict = None | |
| if session_result.get("status") == "success": | |
| try: | |
| update_long_term_memory( | |
| full_persona=full_persona, | |
| session_id=session_id, | |
| session_dna=session_dna, | |
| session_result=session_result, | |
| judge_result=judge_result, | |
| ) | |
| history = full_persona["long_term_memory"]["feedback_history"] | |
| verdict = history[-1]["verdict"] if history else None | |
| except Exception as e: | |
| print(f" [์ค๋ฅ] update_long_term_memory ์คํจ: {e}") | |
| # ๋ฉํธ๋ฆญ ๊ณ์ฐ | |
| self_eval = session_result.get("self_evaluation") or {} | |
| self_books = self_eval.get("books_evaluated", []) | |
| self_match_rate = ( | |
| sum(1 for b in self_books if b.get("match")) / len(self_books) | |
| if self_books else 0.0 | |
| ) | |
| judge_match_rate = judge_result.get("book_match_rate", 0.0) | |
| ltm = full_persona["long_term_memory"] | |
| latest_fb = ltm["feedback_history"][-1] if ltm["feedback_history"] else {} | |
| # 5) W&B log per session | |
| _safe_wandb_log({ | |
| "match_rate/peekareader_self": self_match_rate, | |
| "match_rate/peekajudge": judge_match_rate, | |
| "verdict_code": VERDICT_CODES.get(verdict, 0), | |
| "verdict": verdict or "unknown", | |
| "n_difficulty_mismatch": len(latest_fb.get("difficulty_mismatch", [])), | |
| "n_genre_mismatch": len(latest_fb.get("genre_mismatch", [])), | |
| "n_duplicates": len(latest_fb.get("duplicates", [])), | |
| "derived_prefs_count": len(ltm["derived_preferences"]), | |
| "previously_recommended_count": len(ltm["previously_recommended"]), | |
| "turn_count": session_result.get("total_turns", 0), | |
| "response_time_sec": session_result.get("response_time_sec", 0.0), | |
| "status_success": 1 if session_result.get("status") == "success" else 0, | |
| }, step=session_id) | |
| # session log ๋์ (multi_sim_logger.py์์ ์ ์ฅ) | |
| sessions_log.append({ | |
| "session_id": session_id, | |
| "preferred_genre": session_spec.get("preferred_genre", ""), | |
| "status": session_result.get("status", "unknown"), | |
| "self_match_rate": self_match_rate, | |
| "judge_match_rate": judge_match_rate, | |
| "verdict": verdict, | |
| "conversation": session_result.get("conversation", []), | |
| "recommendations": session_result.get("recommendations", []), | |
| }) | |
| # table row (๋ง์ง๋ง์ ํ๊บผ๋ฒ์ W&B Table๋ก log) | |
| recs_titles = ", ".join( | |
| r.get("title", "") for r in session_result.get("recommendations", []) | |
| ) | |
| table_rows.append([ | |
| session_id, | |
| session_spec.get("preferred_genre", "")[:30], | |
| session_result.get("status", "unknown"), | |
| round(self_match_rate, 2), | |
| round(judge_match_rate, 2), | |
| verdict or "โ", | |
| recs_titles[:120], | |
| ]) | |
| # ํ๋ฅด์๋ ์ข ๋ฃ โ W&B Table ํ ๋ฒ์ log | |
| if wandb.run is not None: | |
| table = wandb.Table( | |
| columns=["session_id", "preferred_genre", "status", | |
| "self_match_rate", "judge_match_rate", "verdict", "recommendations"], | |
| data=table_rows, | |
| ) | |
| wandb.log({"sessions_detail": table}) | |
| # ์์ฝ ์ถ๋ ฅ | |
| if verbose: | |
| print(f"\n{'='*60}") | |
| print(f"๋ฉํฐ์ธ์ ์๋ฃ: {persona_id}") | |
| print(f"{'='*60}") | |
| for s in sessions_log: | |
| jmr = s.get("judge_match_rate") | |
| rate = f"{jmr:.0%}" if jmr is not None else "N/A" | |
| print(f" ์ธ์ {s['session_id']:2d} | " | |
| f"{s.get('preferred_genre', '')[:20]:20s} | " | |
| f"judge: {rate:>5s} | verdict: {s.get('verdict') or 'โ'}") | |
| return { | |
| "persona_id": persona_id, | |
| "total_sessions": total, | |
| "sessions": sessions_log, | |
| "final_memory": copy.deepcopy(full_persona["long_term_memory"]), | |
| "completed_at": datetime.now(tz=KST).isoformat(), | |
| } | |