Spaces:
Sleeping
Sleeping
| """ | |
| LangGraph Workflow - Orchestrator kết nối 3 agents thành pipeline. | |
| Định nghĩa graph: START → Agent1 → Agent2 → Agent3 → END | |
| """ | |
| from langgraph.graph import StateGraph, START, END # type: ignore[import-untyped] | |
| from langgraph.graph.state import CompiledStateGraph # type: ignore[import-untyped] | |
| from graph.state import VerificationState | |
| from agents.query_agent import QueryAgent | |
| from agents.extractor_agent import ExtractorAgent | |
| from agents.reasoning_agent import ReasoningAgent | |
| from utils.logger import get_logger | |
| logger = get_logger("Graph.Workflow") | |
| def should_continue(state: VerificationState) -> str: | |
| """Quyết định có vòng lại QueryAgent hay kết thúc.""" | |
| if state.get("feedback_to_agent1"): | |
| logger.info("[Workflow] 🔄 Feedback Loop triggered. Routing back to query_agent...") | |
| return "query_agent" | |
| return END | |
| def create_workflow() -> CompiledStateGraph: | |
| """ | |
| Tạo LangGraph workflow với 3 nodes (3 agents). | |
| Flow: | |
| START → query_agent → extractor_agent → reasoning_agent | |
| reasoning_agent → (Nếu Feedback) → query_agent | |
| reasoning_agent → (Nếu OK) → END | |
| Returns: | |
| Compiled StateGraph sẵn sàng chạy | |
| """ | |
| # Khởi tạo agents | |
| query_agent = QueryAgent() | |
| extractor_agent = ExtractorAgent() | |
| reasoning_agent = ReasoningAgent() | |
| # Tạo graph | |
| workflow = StateGraph(VerificationState) | |
| # Thêm nodes (mỗi node là một agent) | |
| workflow.add_node("query_agent", query_agent.run) | |
| workflow.add_node("extractor_agent", extractor_agent.run) | |
| workflow.add_node("reasoning_agent", reasoning_agent.run) | |
| # Kết nối các nodes theo thứ tự | |
| workflow.add_edge(START, "query_agent") | |
| workflow.add_edge("query_agent", "extractor_agent") | |
| workflow.add_edge("extractor_agent", "reasoning_agent") | |
| # Kết nối có điều kiện (Conditional Edges) | |
| workflow.add_conditional_edges( | |
| "reasoning_agent", | |
| should_continue, | |
| { | |
| "query_agent": "query_agent", | |
| END: END | |
| } | |
| ) | |
| # Compile graph | |
| app = workflow.compile() | |
| logger.info("[Workflow] Graph compiled: START → Agent1 → Agent2 → Agent3 → END") | |
| return app | |
| def run_verification(claim: str) -> dict: | |
| """ | |
| Chạy toàn bộ verification pipeline cho một claim. | |
| Args: | |
| claim: Thông tin/tin tức cần kiểm chứng | |
| Returns: | |
| Final state dictionary chứa toàn bộ kết quả | |
| """ | |
| logger.info(f"[Workflow] Starting verification for: {claim[:100]}...") | |
| app = create_workflow() | |
| initial_state = { | |
| "user_input": claim, | |
| "agent_logs": [], | |
| } | |
| final_state = app.invoke(initial_state) | |
| logger.info("[Workflow] Verification complete!") | |
| return final_state | |
| def run_verification_with_cache(claim: str) -> dict: | |
| """ | |
| Chạy verification pipeline CÓ Two-Stage Semantic Cache. | |
| Flow: | |
| 1. Check cache (Vector Search + NER) → nếu HIT, trả kết quả ngay (~1-2s) | |
| 2. Nếu MISS → chạy full pipeline 3 agents (~200s) | |
| 3. Sau khi pipeline xong → lưu kết quả vào cache | |
| Args: | |
| claim: Thông tin/tin tức cần kiểm chứng | |
| Returns: | |
| Final state dictionary chứa toàn bộ kết quả | |
| """ | |
| import time | |
| from database.mongo_cache import get_cache | |
| cache = get_cache() | |
| # --- Stage 1 + 2: Check cache --- | |
| if cache: | |
| try: | |
| start_cache = time.time() | |
| cache_result = cache.check_cache(claim) | |
| cache_time = time.time() - start_cache | |
| if cache_result.get("hit"): | |
| logger.info( | |
| f"[Workflow] ⚡ CACHE HIT! " | |
| f"score={cache_result.get('score', 0):.4f} | " | |
| f"time={cache_time:.2f}s | " | |
| f"cached_query=\"{cache_result.get('cached_query', '')[:60]}...\"" | |
| ) | |
| verdict_data = cache_result["data"] | |
| # Quick Rewrite (Groq): Sửa ngữ cảnh summary cho khớp câu hỏi của user | |
| try: | |
| from agents.query_agent import QueryAgent | |
| fast_agent = QueryAgent() | |
| original_summary = verdict_data.get("summary", "") | |
| rewrite_prompt = ( | |
| f"Viết lại đoạn tóm tắt kết quả kiểm chứng dưới đây sao cho các từ ngữ/chủ ngữ giống trực tiếp với lời văn của tin đồn mà người dùng vừa hỏi.\n" | |
| f"TUYỆT ĐỐI GIỮ NGUYÊN ý nghĩa, kết luận và các bằng chứng.\n" | |
| f"KHÔNG giải thích thêm, chỉ trả về đoạn văn tóm tắt viết lại.\n\n" | |
| f"Tin đồn của người dùng: \"{claim}\"\n" | |
| f"Tóm tắt cũ (cần viết lại): \"{original_summary}\"\n\n" | |
| f"Tóm tắt mới:" | |
| ) | |
| new_summary = fast_agent.call_llm( | |
| "Bạn là một biên tập viên tin tức cần mẫn.", | |
| rewrite_prompt | |
| ) | |
| new_summary = new_summary.strip().strip('"') | |
| if new_summary: | |
| verdict_data["summary"] = new_summary | |
| logger.info( | |
| f"[Workflow] Dùng Groq viết lại summary thành công " | |
| f"({time.time() - start_cache:.2f}s)" | |
| ) | |
| except Exception as rewrite_err: | |
| logger.warning(f"[Workflow] Lỗi rewrite summary: {rewrite_err}") | |
| verdict_data["cached_query"] = claim | |
| return { | |
| "user_input": claim, | |
| "verdict": verdict_data, | |
| "agent_logs": [ | |
| f"[Cache] ⚡ Cache HIT trong {cache_time:.2f}s " | |
| f"(similarity={cache_result.get('score', 0):.4f})" | |
| ], | |
| "from_cache": True, | |
| } | |
| else: | |
| logger.info(f"[Workflow] Cache MISS ({cache_time:.2f}s) — running full pipeline") | |
| except Exception as e: | |
| logger.warning(f"[Workflow] Cache check error: {e}") | |
| # --- Cache MISS: chạy full pipeline --- | |
| final_state = run_verification(claim) | |
| # --- Lưu kết quả vào cache --- | |
| if cache and final_state.get("verdict"): | |
| try: | |
| cache.save_to_cache(claim, final_state["verdict"]) | |
| final_state.setdefault("agent_logs", []).append( | |
| "[Cache] 💾 Đã lưu kết quả vào MongoDB Cache" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"[Workflow] Cache save error: {e}") | |
| return final_state |