SMART-FC / backend /graph /workflow.py
Phuc-HugigFace's picture
Upload base web cloud XD
aedbe7e verified
"""
LangGraph Workflow - Orchestrator kết nối 3 agents thành pipeline.
Định nghĩa graph: START → Agent1 → Agent2 → Agent3 → END
"""
from langgraph.graph import StateGraph, START, END # type: ignore[import-untyped]
from langgraph.graph.state import CompiledStateGraph # type: ignore[import-untyped]
from graph.state import VerificationState
from agents.query_agent import QueryAgent
from agents.extractor_agent import ExtractorAgent
from agents.reasoning_agent import ReasoningAgent
from utils.logger import get_logger
logger = get_logger("Graph.Workflow")
def should_continue(state: VerificationState) -> str:
"""Quyết định có vòng lại QueryAgent hay kết thúc."""
if state.get("feedback_to_agent1"):
logger.info("[Workflow] 🔄 Feedback Loop triggered. Routing back to query_agent...")
return "query_agent"
return END
def create_workflow() -> CompiledStateGraph:
"""
Tạo LangGraph workflow với 3 nodes (3 agents).
Flow:
START → query_agent → extractor_agent → reasoning_agent
reasoning_agent → (Nếu Feedback) → query_agent
reasoning_agent → (Nếu OK) → END
Returns:
Compiled StateGraph sẵn sàng chạy
"""
# Khởi tạo agents
query_agent = QueryAgent()
extractor_agent = ExtractorAgent()
reasoning_agent = ReasoningAgent()
# Tạo graph
workflow = StateGraph(VerificationState)
# Thêm nodes (mỗi node là một agent)
workflow.add_node("query_agent", query_agent.run)
workflow.add_node("extractor_agent", extractor_agent.run)
workflow.add_node("reasoning_agent", reasoning_agent.run)
# Kết nối các nodes theo thứ tự
workflow.add_edge(START, "query_agent")
workflow.add_edge("query_agent", "extractor_agent")
workflow.add_edge("extractor_agent", "reasoning_agent")
# Kết nối có điều kiện (Conditional Edges)
workflow.add_conditional_edges(
"reasoning_agent",
should_continue,
{
"query_agent": "query_agent",
END: END
}
)
# Compile graph
app = workflow.compile()
logger.info("[Workflow] Graph compiled: START → Agent1 → Agent2 → Agent3 → END")
return app
def run_verification(claim: str) -> dict:
"""
Chạy toàn bộ verification pipeline cho một claim.
Args:
claim: Thông tin/tin tức cần kiểm chứng
Returns:
Final state dictionary chứa toàn bộ kết quả
"""
logger.info(f"[Workflow] Starting verification for: {claim[:100]}...")
app = create_workflow()
initial_state = {
"user_input": claim,
"agent_logs": [],
}
final_state = app.invoke(initial_state)
logger.info("[Workflow] Verification complete!")
return final_state
def run_verification_with_cache(claim: str) -> dict:
"""
Chạy verification pipeline CÓ Two-Stage Semantic Cache.
Flow:
1. Check cache (Vector Search + NER) → nếu HIT, trả kết quả ngay (~1-2s)
2. Nếu MISS → chạy full pipeline 3 agents (~200s)
3. Sau khi pipeline xong → lưu kết quả vào cache
Args:
claim: Thông tin/tin tức cần kiểm chứng
Returns:
Final state dictionary chứa toàn bộ kết quả
"""
import time
from database.mongo_cache import get_cache
cache = get_cache()
# --- Stage 1 + 2: Check cache ---
if cache:
try:
start_cache = time.time()
cache_result = cache.check_cache(claim)
cache_time = time.time() - start_cache
if cache_result.get("hit"):
logger.info(
f"[Workflow] ⚡ CACHE HIT! "
f"score={cache_result.get('score', 0):.4f} | "
f"time={cache_time:.2f}s | "
f"cached_query=\"{cache_result.get('cached_query', '')[:60]}...\""
)
verdict_data = cache_result["data"]
# Quick Rewrite (Groq): Sửa ngữ cảnh summary cho khớp câu hỏi của user
try:
from agents.query_agent import QueryAgent
fast_agent = QueryAgent()
original_summary = verdict_data.get("summary", "")
rewrite_prompt = (
f"Viết lại đoạn tóm tắt kết quả kiểm chứng dưới đây sao cho các từ ngữ/chủ ngữ giống trực tiếp với lời văn của tin đồn mà người dùng vừa hỏi.\n"
f"TUYỆT ĐỐI GIỮ NGUYÊN ý nghĩa, kết luận và các bằng chứng.\n"
f"KHÔNG giải thích thêm, chỉ trả về đoạn văn tóm tắt viết lại.\n\n"
f"Tin đồn của người dùng: \"{claim}\"\n"
f"Tóm tắt cũ (cần viết lại): \"{original_summary}\"\n\n"
f"Tóm tắt mới:"
)
new_summary = fast_agent.call_llm(
"Bạn là một biên tập viên tin tức cần mẫn.",
rewrite_prompt
)
new_summary = new_summary.strip().strip('"')
if new_summary:
verdict_data["summary"] = new_summary
logger.info(
f"[Workflow] Dùng Groq viết lại summary thành công "
f"({time.time() - start_cache:.2f}s)"
)
except Exception as rewrite_err:
logger.warning(f"[Workflow] Lỗi rewrite summary: {rewrite_err}")
verdict_data["cached_query"] = claim
return {
"user_input": claim,
"verdict": verdict_data,
"agent_logs": [
f"[Cache] ⚡ Cache HIT trong {cache_time:.2f}s "
f"(similarity={cache_result.get('score', 0):.4f})"
],
"from_cache": True,
}
else:
logger.info(f"[Workflow] Cache MISS ({cache_time:.2f}s) — running full pipeline")
except Exception as e:
logger.warning(f"[Workflow] Cache check error: {e}")
# --- Cache MISS: chạy full pipeline ---
final_state = run_verification(claim)
# --- Lưu kết quả vào cache ---
if cache and final_state.get("verdict"):
try:
cache.save_to_cache(claim, final_state["verdict"])
final_state.setdefault("agent_logs", []).append(
"[Cache] 💾 Đã lưu kết quả vào MongoDB Cache"
)
except Exception as e:
logger.warning(f"[Workflow] Cache save error: {e}")
return final_state