verifacts-backend / app /services /orchestrator.py
Testys's picture
feat: adding verifact services with backend code and multi-agent workflow
55086fb
import logging
import asyncio
from typing import Dict, TypedDict, Annotated, List
from langchain_core.runnables import Runnable
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver # For state persistence
from redis import Redis # pip install redis
from langchain_community.cache import RedisCache
from app.services.identify.agent import SourceCredibilityAgent
from app.services.claims.agent import ClaimExtractionAgent
from app.services.fact_checker.agent import FactCheckAgent
from app.core.config import config
from app.services.shared_tools import tavily_search
from app.services.llm_wrapper import llm_wrapper
from langchain_core.prompts import ChatPromptTemplate
from app.core.models import FinalReport
from langchain_core.output_parsers import JsonOutputParser
from langgraph.checkpoint.memory import MemorySaver
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class WorkflowState(TypedDict):
url: str
selection: str
credibility: Annotated[Dict, "Source credibility report"]
claims: Annotated[List[Dict], "Extracted claims"]
fact_checks: Annotated[List[Dict], "Fact check verdicts"]
search_insights: Annotated[List[Dict], "Tavily search results with snippets for enrichment"]
error: Annotated[str, "Error message, if any"]
# === Agent Nodes ===
async def credibility_node(state: WorkflowState) -> WorkflowState:
agent = SourceCredibilityAgent()
try:
url = state.get("url")
if not url:
state["error"] = "No URL provided for credibility check"
return state
report = await agent.run(url) # Make sure agent.run() accepts url as string
state["credibility"] = report
logger.info(f"Credibility report: {report}")
trust_level = report.get("trust_level", "unknown")
if trust_level in ["low", "very_low"]:
state["error"] = "Source credibility too low to proceed"
except Exception as e:
logger.error(f"Credibility check error: {str(e)}")
state["error"] = f"Credibility check failed: {str(e)}"
return state
async def extraction_node(state: WorkflowState) -> WorkflowState:
if state.get("error"):
return state # Skip if previous error
agent = ClaimExtractionAgent()
try:
# Build verdict dict from state to pass to agent
verdict = {
"url": state.get("url"),
"selection": state.get("selection"),
"trust_level": state.get("credibility", {}).get("trust_level"),
"score": state.get("credibility", {}).get("score"),
}
claims = await agent.run(verdict) # Pass verdict to agent
logger.info(f"Extracted {len(claims)} claims")
state["claims"] = [c.text for c in claims if c.claim_type == "factual"]
except Exception as e:
logger.error(f"Claim extraction error: {str(e)}")
state["error"] = f"Claim extraction failed: {str(e)}"
return state
async def factcheck_node(state: WorkflowState) -> WorkflowState:
if state.get("error") or not state.get("claims"):
return state # Skip if previous error or no claims
agent = FactCheckAgent()
try:
fact_checks = []
for claim in state["claims"]:
result = await agent.run(claim)
logger.info(f"Fact-check result for claim '{claim[:30]}...': {result}")
fact_checks.append(result)
state["fact_checks"] = fact_checks
except Exception as e:
state["error"] = f"Fact-checking failed: {str(e)}"
return state
# === NEW: Tavily Enrichment (Always runs after extraction) ===
async def search_enrichment_node(state: WorkflowState) -> WorkflowState:
if state.get("error") or not state.get("claims"): return state
insights = []
for claim in state["claims"]:
try:
query = f"fact check: {claim} site:reputable"
results = await tavily_search.ainvoke(query=query, max_results=3)
insights.append({
"claim": claim,
"results": results, # Includes snippets, answers, sources
"sources": [r["url"] for r in results]
})
except Exception as e:
logger.warning(f"Tavily failed for claim '{claim}': {e}")
state["search_insights"] = insights
return state
# === NEW: Compile Final Report ===
async def compile_report_node(state: WorkflowState) -> WorkflowState:
# LLM summarizes overall
prompt = ChatPromptTemplate.from_template("""
You are a fact-check report compiler. Analyze the following state and generate a final report.
State:
- URL: {url}
- Source Credibility: {credibility}
- Claims Extracted: {claims}
- Fact Check Results: {fact_checks}
- Search Insights: {search_insights}
Rules for verdict:
- If most claims are verified → "verified"
- If most claims are debunked → "debunked"
- If mixed results → "mixture"
- If insufficient evidence → "unverified"
{format_instructions}
Respond ONLY with valid JSON. Do not include any markdown formatting, explanations, or text outside the JSON object.
""")
llm = llm_wrapper.get_llm()
output_parser = JsonOutputParser(pydantic_object=FinalReport)
chain = prompt | llm | output_parser
try:
compiled = await chain.ainvoke({
"url": state.get("url", ""),
"credibility": state.get("credibility", {}),
"claims": state.get("claims", []),
"fact_checks": state.get("fact_checks", []),
"search_insights": state.get("search_insights", []),
"format_instructions": output_parser.get_format_instructions()
})
logger.info(f"Compiled report: {compiled}")
state["overall_verdict"] = compiled.get("overall_verdict", "unverified")
state["summary"] = compiled.get("summary", "No summary generated")
state["sources"] = [s for insight in state.get("search_insights", []) for s in insight["sources"]]
except Exception as e:
logger.error(f"Report compilation error: {str(e)}")
# Fallback: Create a basic report without LLM
state["overall_verdict"] = "unverified"
state["summary"] = f"Report compilation failed. {len(state.get('claims', []))} claims extracted, {len(state.get('fact_checks', []))} fact-checks completed."
state["sources"] = [s for insight in state.get("search_insights", []) for s in insight.get("sources", [])]
return state
def decide_next_step(state: WorkflowState) -> str:
cred = state.get("credibility", {}).get("verdict", {}).get("trust_level", "unknown")
if cred in ["low", "very_low"]:
return END # Still skip if very low
return "extraction_node"
# === Orchestrator ===
workflow = StateGraph(state_schema=WorkflowState)
workflow.add_node("credibility_node", credibility_node)
workflow.add_node("extraction_node", extraction_node)
workflow.add_node("search_enrichment_node", search_enrichment_node)
workflow.add_node("factcheck_node", factcheck_node)
workflow.add_node("compile_report_node", compile_report_node)
workflow.set_entry_point("credibility_node")
workflow.add_conditional_edges(
"credibility_node", decide_next_step
)
workflow.add_edge("extraction_node", "search_enrichment_node")
workflow.add_edge("search_enrichment_node", "factcheck_node")
workflow.add_edge("factcheck_node", "compile_report_node")
workflow.add_edge("compile_report_node", END)
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
async def run_orchestrator(url: str, selection:str) -> WorkflowState:
initial_state: WorkflowState = {
"url": url,
"selection": selection,
"credibility": {},
"claims": [],
"fact_checks": [],
"error": "",
}
final_state = await graph.ainvoke(initial_state, config={"configurable": {"thread_id": "main"}})
return final_state
# Example usage
if __name__ == "__main__":
test_url = "https://www.nbcnews.com/politics/donald-trump/trump-cnn-warner-bros-discovery-netflix-paramount-rcna248518"
test_selection = "Paramount initiated a hostile bid, offering shareholders $30 per share."
result_state = asyncio.run(run_orchestrator(test_url, test_selection))
if result_state.get("error"):
logger.error(f"Orchestration failed: {result_state['error']}")
else:
logger.info(f"Orchestration completed successfully. Fact-checks: {result_state['fact_checks']}")