NextQuest.ai / src /agents.py
ajeet9843
Deploy to Hugging Face Spaces
f10fe83
import asyncio
import logging
import re
import time
from typing import List, Optional, Dict, Any
from datetime import datetime, timezone
from .prompts import SYSTEM_PROMPTS
from .models import AgentState, SearchResult, ScrapedContent, ExtractedFact, Citation, get_agent_settings
from .search import search_manager
from .scraper import scraper
from .llm import llm_factory
from .config import config as default_config
from .resilience import retry_with_backoff, search_cache
logger = logging.getLogger(__name__)
def get_llm_config_from_state(state: AgentState) -> dict:
return state.get("metadata", {}).get(
"llm_config",
{
"provider": "nvidia",
"model": "mistralai/mistral-nemotron",
"api_key": None,
},
)
def get_deep_research_flag(state: AgentState) -> bool:
metadata = state.get("metadata", {})
if "deep_research" in metadata:
return bool(metadata["deep_research"])
return state.get("deep_research", False)
async def call_llm_with_retry(
messages, llm_config: dict, temperature=0.5, max_tokens=500
):
async def _call():
llm = llm_factory.create(
llm_config.get("provider", "nvidia"),
llm_config.get("model"),
api_key=llm_config.get("api_key"),
base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None,
timeout=120.0,
)
return await llm.generate(
messages, temperature=temperature, max_tokens=max_tokens
)
return await retry_with_backoff(
_call,
max_retries=2,
base_delay=1.0,
exponential_base=2.0,
)
def extract_refined_query(response_content: str, fallback_query: str) -> str:
patterns = [
r'[""]([^""]+)[""]',
r'\*\*Refined Search Query:\*\*\s*(.+?)(?:\n|$)',
r'Refined Query:\s*(.+?)(?:\n|$)',
r'Search query:\s*(.+?)(?:\n|$)',
]
for pattern in patterns:
match = re.search(pattern, response_content, re.IGNORECASE | re.MULTILINE)
if match:
query = match.group(1).strip()[:500]
if query and len(query) >= 3:
return query
lines = response_content.strip().split("\n")
for line in lines:
line = line.strip()
if line and len(line) >= 10 and not line.startswith(("=", "-", "*", "#")):
cleaned = re.sub(r'^\d+[.)]\s*', '', line)[:500]
if cleaned:
return cleaned
return fallback_query[:500]
async def router_node(state: AgentState) -> AgentState:
"""Decides if the query needs web research or a direct answer."""
step_start = time.perf_counter()
query = state["original_query"]
llm_config = get_llm_config_from_state(state)
logger.info(f"[ROUTER] Analyzing query: {query[:50]}...")
messages = [
{"role": "system", "content": SYSTEM_PROMPTS["router"]},
{"role": "user", "content": query},
]
try:
response = await call_llm_with_retry(
messages, llm_config, temperature=0.0, max_tokens=50
)
content = response.content.strip()
import json, re
json_match = re.search(r'\{.*\}', content.replace('\n', ''), re.DOTALL)
if json_match:
parsed = json.loads(json_match.group(0))
decision = parsed.get("route", "RESEARCH").upper()
intent = parsed.get("intent", "general").lower()
else:
decision = content.upper()
intent = "general"
if "CLARIFY" in decision:
state["next_step"] = "clarify"
elif "DIRECT" in decision:
state["next_step"] = "direct"
else:
state["next_step"] = "research"
state["query_intent"] = intent
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Router: Decision - {state['next_step']}, Intent - {intent} (took {time.perf_counter() - step_start:.2f}s)"
)
except Exception as e:
logger.error(f"[ROUTER] Error: {e}")
state["next_step"] = "research"
state["query_intent"] = "general"
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Router: Defaulting to research due to error: {e} (took {time.perf_counter() - step_start:.2f}s)"
)
return state
async def planner_node(state: AgentState) -> AgentState:
"""Analyzes query and creates research strategy."""
step_start = time.perf_counter()
query = state["original_query"]
llm_config = get_llm_config_from_state(state)
deep_research = get_deep_research_flag(state)
state["deep_research"] = deep_research
settings = get_agent_settings(deep_research)
state["metadata"]["agent_settings"] = settings
logger.info(
f"[PLANNER] Mode: {'Deep' if deep_research else 'Quick'}, "
f"provider={llm_config.get('provider')}, model={llm_config.get('model')}"
)
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Planner: {'Deep' if deep_research else 'Quick'} research mode"
)
messages = [
{"role": "system", "content": SYSTEM_PROMPTS["planner"]},
{
"role": "user",
"content": f"User question: {query}\n\nCreate a refined search query and brief plan.",
},
]
try:
logger.info("[PLANNER] Generating HyDE document and calling Planner LLM concurrently...")
hyde_start = time.perf_counter()
llm_start = time.perf_counter()
hyde_msg = [{"role": "system", "content": SYSTEM_PROMPTS.get("hyde", "")}, {"role": "user", "content": query}]
hyde_task = call_llm_with_retry(hyde_msg, llm_config, temperature=0.7, max_tokens=250)
planner_task = call_llm_with_retry(messages, llm_config, temperature=0.5, max_tokens=500)
hyde_res, response = await asyncio.gather(hyde_task, planner_task)
state["hyde_document"] = hyde_res.content.strip()
state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Planner: Generated HyDE document for semantic expansion (took {time.perf_counter() - hyde_start:.2f}s)")
logger.debug(f"[PLANNER] LLM response: {response.content[:200]}...")
import json, re
json_match = re.search(r'\[.*\]', response.content.replace('\n', ''), re.DOTALL)
if json_match:
sub_queries = json.loads(json_match.group(0))
state["sub_queries"] = [str(q)[:150] for q in sub_queries if isinstance(q, str)]
else:
state["sub_queries"] = [query]
state["refined_query"] = state["sub_queries"][0] if state.get("sub_queries") else query
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Planner: Generated {len(state.get('sub_queries', []))} sub-queries (took {time.perf_counter() - llm_start:.2f}s)"
)
except Exception as e:
logger.error(f"[PLANNER] Error: {e}")
state["refined_query"] = query
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Planner: Using original query due to error: {e} (took {time.perf_counter() - step_start:.2f}s)"
)
return state
async def search_node(state: AgentState) -> AgentState:
"""Executes web search using configured provider with caching."""
step_start = time.perf_counter()
query = state.get("refined_query") or state["original_query"]
llm_config = get_llm_config_from_state(state)
deep_research = state.get("deep_research", False)
settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research))
agent_config = state.get("metadata", {}).get("agent_config", {})
search_provider = agent_config.get("search_provider", "duckduckgo")
max_results = settings.get("max_search_results", 10)
is_retry = state.get("retry_count", 0) > 0
if is_retry:
# Search Query Evolution for fallback
query = f"{query} deeper analysis and recent facts"
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Search: Evolved query for retry -> '{query}'"
)
logger.info(f"[SEARCH] Query: {query}, provider: {search_provider}, deep={deep_research}")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Search: Searching for '{query[:50]}...'"
)
cache_key = f"{search_provider}:{query}:{max_results}:{deep_research}"
if not is_retry:
cached = search_cache.get_sync(cache_key)
if cached:
state["search_results"] = cached
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Search: Using cached results ({len(cached)} items) (took {time.perf_counter() - step_start:.2f}s)"
)
return state
# Compile queries to run in parallel
queries_to_run = [query]
if state.get("sub_queries"):
queries_to_run.extend(state["sub_queries"])
if state.get("hyde_document"):
# Truncate HyDE document to not exceed search engine limits
queries_to_run.append(state["hyde_document"][:150])
queries_to_run = list(set(queries_to_run)) # Deduplicate
async def _search_and_eval(current_query: str) -> list:
max_attempts = 2 if deep_research else 1
for attempt in range(max_attempts):
try:
async def _do_search():
return await search_manager.search(query=current_query, provider=search_provider, max_results=max_results)
results = await retry_with_backoff(
_do_search,
max_retries=2, base_delay=1.0, exponential_base=2.0
)
valid = [r for r in results if r.title and r.url and len(r.url) > 5]
if not valid or attempt == max_attempts - 1:
return valid
snippet_text = "\n".join([f"Source: {r.url}\nSnippet: {r.content}" for r in valid[:5]])
eval_messages = [
{"role": "system", "content": SYSTEM_PROMPTS.get("search_evaluator", "Reply PASS if good.")},
{"role": "user", "content": f"Query: {current_query}\n\nResults:\n{snippet_text}"}
]
eval_response = await call_llm_with_retry(eval_messages, llm_config, temperature=0.2, max_tokens=50)
if "PASS" in eval_response.content.upper():
return valid
else:
current_query = eval_response.content.strip('"\'')
except Exception as e:
logger.error(f"[SEARCH] Query search error for {current_query}: {e}")
return []
return []
try:
tasks = [_search_and_eval(q) for q in queries_to_run]
all_results_lists = await asyncio.gather(*tasks)
# Flatten and deduplicate by URL (Interleaving for fairness across sub-queries)
seen_urls = set()
valid_results = []
max_len = max((len(l) for l in all_results_lists), default=0)
for i in range(max_len):
for res_list in all_results_lists:
if i < len(res_list):
r = res_list[i]
if r.url not in seen_urls:
seen_urls.add(r.url)
valid_results.append(r)
if len(valid_results) == 0:
logger.warning("[SEARCH] No results found after attempts!")
state["search_results"] = [
{
"title": r.title,
"url": r.url,
"content": r.content,
"score": r.score,
"published_date": r.published_date,
}
for r in valid_results
]
search_cache.set_sync(cache_key, state["search_results"])
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Search: Found {len(valid_results)} valid results (took {time.perf_counter() - step_start:.2f}s)"
)
for r in state["search_results"][:5]:
state["reasoning_trace"].append(f" 🔗 {r['title'][:50]}... ({r['url']})")
except Exception as e:
logger.error(f"[SEARCH] Error: {e}")
state["error"] = f"Search failed: {str(e)}"
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Search: Error - {str(e)} (took {time.perf_counter() - step_start:.2f}s)"
)
return state
async def scrape_node(state: AgentState) -> AgentState:
"""Fetches and extracts content from top search results."""
step_start = time.perf_counter()
search_results = state.get("search_results", [])
deep_research = state.get("deep_research", False)
settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research))
max_sources = settings.get("max_sources", 5)
max_concurrent = settings.get("analyzer_concurrency", 3)
logger.info(f"[SCRAPE] search_results count: {len(search_results)}")
if not search_results:
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Scrape: No results to scrape (took {time.perf_counter() - step_start:.2f}s)"
)
return state
# Only race the top (max_sources + 3) URLs to preserve relevance while allowing for some timeouts
top_results = search_results[:max_sources + 3]
urls = [r["url"] for r in top_results]
logger.info(f"[SCRAPE] Racing to fetch {max_sources} sources from top {len(urls)} URLs (max_concurrent={max_concurrent})")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Racing to fetch {max_sources} sources from top {len(urls)} URLs"
)
try:
scraped = await scraper.fetch_multiple(urls, max_concurrent=max_concurrent, min_results=max_sources)
logger.info(f"[SCRAPE] Scraped {len(scraped)} items")
scraped_dict = {s.url: s for s in scraped if s}
max_scraped_content = settings.get("max_content_for_scraping", 5000)
final_content = []
for r in top_results:
url = r["url"]
if url in scraped_dict and scraped_dict[url].content:
s = scraped_dict[url]
final_content.append(
{
"url": s.url,
"title": s.title,
"content": s.content[:max_scraped_content],
"excerpt": s.excerpt,
"author": s.author,
"published_date": s.published_date,
"fetched_at": s.fetched_at,
"chunks": getattr(s, "chunks", []),
}
)
elif r.get("content"):
logger.info(f"[SCRAPE] Using search snippet fallback for: {url}")
snippet = r["content"]
final_content.append(
{
"url": url,
"title": r.get("title", "Unknown"),
"content": snippet,
"excerpt": snippet[:300] + "..." if len(snippet) > 300 else snippet,
"author": None,
"published_date": r.get("published_date"),
"fetched_at": datetime.now(timezone.utc).isoformat(),
"chunks": [snippet],
}
)
if len(final_content) >= max_sources:
break
state["scraped_content"] = final_content
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Successfully extracted content from {len(scraped)} sources (took {time.perf_counter() - step_start:.2f}s)"
)
except Exception as e:
logger.error(f"[SCRAPE] Error: {e}")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Error - {str(e)} (took {time.perf_counter() - step_start:.2f}s)"
)
return state
async def analyze_source(
source: dict, query: str, llm, semaphore: asyncio.Semaphore, settings: dict
) -> List[dict]:
"""Analyze a single source and extract facts using dynamic chunks."""
async with semaphore:
chunks = source.get("chunks", [])
if not chunks:
logger.warning(f"[ANALYZER] No content for source: {source.get('url', 'unknown')}")
return []
# Format chunks with XML tags to drastically improve LLM attention ("Lost in the Middle" prevention)
max_chunks = 4 # Increased to ensure we capture the main article body, not just headers/cookie banners
processed_content = "\n\n".join([f"<chunk id={i+1}>\n{c}\n</chunk>" for i, c in enumerate(chunks[:max_chunks])])
messages = [
{"role": "system", "content": SYSTEM_PROMPTS["analyzer"]},
{
"role": "user",
"content": f"User question: {query}\n\nSource title: {source.get('title', 'Unknown')}\nSource URL: {source.get('url', '')}\n\nContent:\n{processed_content}\n\nExtract key facts highly relevant to the question. Format EXACTLY as: FACT | CATEGORY | CONFIDENCE",
},
]
async def _generate_facts():
return await llm.generate(messages, temperature=0.3, max_tokens=2500)
try:
logger.info(f"[ANALYZER] Calling LLM for {source.get('url', 'unknown')[:30]}...")
response = await retry_with_backoff(
_generate_facts, max_retries=3, base_delay=2.0, exponential_base=2.0
)
logger.debug(
f"[ANALYZER] LLM response: {response.content[:200] if response.content else 'EMPTY'}"
)
if not response.content or not response.content.strip():
logger.warning(f"[ANALYZER] Empty response from LLM")
return []
facts = []
captured_facts = False
content = response.content.strip()
import re
content = re.sub(r'^```[\w]*\n', '', content)
content = re.sub(r'\n```$', '', content)
for line in content.split("\n"):
line = line.strip().strip("-").strip("*").strip()
if not line or "---" in line or line.upper().startswith("FACT |"):
continue
if "|" in line:
parts = line.split("|")
if len(parts) >= 2:
fact_text = parts[0].strip()
if not fact_text or fact_text.upper() == "FACT":
continue
try:
confidence = (
float(parts[2].strip()) if len(parts) > 2 else 0.8
)
except ValueError:
confidence = 0.8
facts.append(
{
"source_url": source.get("url", ""),
"source_title": source.get("title", ""),
"fact": fact_text,
"category": parts[1].strip(),
"confidence": confidence,
"source_sentence": fact_text,
}
)
captured_facts = True
elif len(line) > 20 and not line.lower().startswith(
("here are", "sure", "these are", "extracted facts", "note:", "source:", "the key facts", "below are")
):
facts.append(
{
"source_url": source.get("url", ""),
"source_title": source.get("title", ""),
"fact": line,
"category": "general",
"confidence": 0.7,
"source_sentence": line[:200],
}
)
captured_facts = True
if not captured_facts and response.content.strip():
facts.append(
{
"source_url": source.get("url", ""),
"source_title": source.get("title", ""),
"fact": response.content.strip()[:500],
"category": "general",
"confidence": 0.7,
"source_sentence": response.content.strip()[:200],
}
)
logger.info(
f"[ANALYZER] Extracted {len(facts)} facts from {source.get('url', 'unknown')[:30]}"
)
return facts
except Exception as e:
logger.error(f"[ANALYZER] Error analyzing source: {e}")
return []
async def analyzer_node(state: AgentState) -> AgentState:
"""Extracts key facts from scraped content with parallel processing."""
step_start = time.perf_counter()
scraped = state.get("scraped_content", [])
query = state["original_query"]
deep_research = state.get("deep_research", False)
settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research))
analysis_round = state.get("analysis_round", 0)
if not scraped:
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: No content to analyze (took {time.perf_counter() - step_start:.2f}s)"
)
return state
state["analysis_round"] = analysis_round + 1
llm_config = get_llm_config_from_state(state)
max_sources_to_analyze = settings.get("max_sources_to_analyze", 5)
sources_to_analyze = scraped[:max_sources_to_analyze]
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Round {state['analysis_round']}, analyzing {len(sources_to_analyze)} sources"
)
model = llm_config.get("model") or "deepseek-ai/deepseek-r1"
logger.info(f"[ANALYZER] Using model: {model}, round={state['analysis_round']}")
llm = llm_factory.create(
llm_config.get("provider", "nvidia"),
model,
api_key=llm_config.get("api_key"),
base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None,
timeout=120.0,
)
concurrency = settings.get("analyzer_concurrency", 3)
semaphore = asyncio.Semaphore(concurrency)
tasks = [analyze_source(source, query, llm, semaphore, settings) for source in sources_to_analyze]
results = await asyncio.gather(*tasks, return_exceptions=True)
facts = []
for result in results:
if isinstance(result, Exception):
logger.error(f"[ANALYZER] Exception in results: {result}")
state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Partial failure during extraction.")
elif isinstance(result, list):
facts.extend(result)
existing_facts = state.get("extracted_facts", [])
all_facts = existing_facts + facts
state["extracted_facts"] = all_facts
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Extracted {len(facts)} facts (total: {len(all_facts)}) (took {time.perf_counter() - step_start:.2f}s)"
)
min_facts_threshold = settings.get("min_facts_threshold", 3)
retry_count = state.get("retry_count", 0)
if len(facts) < min_facts_threshold and retry_count == 0 and analysis_round == 0:
logger.warning(f"[ANALYZER] Insufficient facts ({len(facts)} < {min_facts_threshold}), triggering retry")
state["retry_count"] = 1
state["analysis_round"] = 0
state["extracted_facts"] = []
state["scraped_content"] = []
state["search_results"] = []
return state
async def ranker_node(state: AgentState) -> AgentState:
"""Ranks and filters extracted facts based on relevance to the query."""
step_start = time.perf_counter()
query = state["original_query"]
facts = state.get("extracted_facts", [])
llm_config = get_llm_config_from_state(state)
if not facts or len(facts) < 10:
return state
logger.info(f"[RANKER] Ranking {len(facts)} facts...")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Ranker: Filtering {len(facts)} facts for relevance"
)
# Group facts into blocks for ranking to save tokens
fact_texts = [f"{i+1}. {f['fact']}" for i, f in enumerate(facts[:50])]
messages = [
{"role": "system", "content": SYSTEM_PROMPTS["ranker"]},
{
"role": "user",
"content": f"Query: {query}\n\nFacts:\n" + "\n".join(fact_texts),
},
]
try:
response = await call_llm_with_retry(
messages, llm_config, temperature=0.0, max_tokens=50
)
ranked_text = response.content.strip()
import re
indices = [int(idx) for idx in re.findall(r'\d+', ranked_text)]
ranked_facts = []
for idx in set(indices):
if 1 <= idx <= len(facts[:50]):
ranked_facts.append(facts[idx-1])
if ranked_facts:
logger.info(f"[RANKER] Kept {len(ranked_facts)} relevant facts")
state["extracted_facts"] = ranked_facts
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Ranker: Reduced to {len(state['extracted_facts'])} high-relevance facts (took {time.perf_counter() - step_start:.2f}s)"
)
except Exception as e:
logger.error(f"[RANKER] Error: {e}")
return state
async def synthesizer_node(state: AgentState) -> AgentState:
"""Generates final answer with citations."""
step_start = time.perf_counter()
query = state["original_query"]
facts = state.get("extracted_facts", [])
scraped = state.get("scraped_content", [])
error = state.get("error")
search_results = state.get("search_results", [])
deep_research = state.get("deep_research", False)
if error:
logger.error(f"[SYNTHESIZER] Error in pipeline: {error}")
state["final_answer"] = (
f"I encountered an error during research: {error}\n\n"
f"Search returned {len(search_results)} results. "
f"Scraper found {len(scraped)} articles."
)
state["status"] = "complete"
return state
if not scraped and state.get("next_step") != "direct":
logger.warning("[SYNTHESIZER] No content to synthesize. Proceeding with internal knowledge.")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: No web content extracted. Falling back to internal knowledge."
)
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Generating answer with {len(facts)} facts"
)
llm_config = get_llm_config_from_state(state)
agent_config = state.get("metadata", {}).get("agent_config", {})
settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research))
max_sources = settings.get("max_sources", 5)
max_content_for_synth = settings.get("max_content_for_synthesizer", 1500)
context_parts = []
for i, source in enumerate(scraped[:max_sources], 1):
context_parts.append(
f"<source index=\"{i}\">\nTitle: {source.get('title', 'Unknown')}\nURL: {source.get('url', '')}\nSnippet: {source.get('content', '')[:max_content_for_synth]}\n</source>"
)
context = "<raw_sources>\n" + "\n".join(context_parts) + "\n</raw_sources>"
facts_context = ""
if facts:
facts_list = []
for j, fact in enumerate(facts[:30], 1):
source_title = fact.get("source_title", "")
facts_list.append(f"- {fact.get('fact', '')} (Source: {source_title})")
facts_context = "<highly_relevant_facts>\n" + "\n".join(facts_list) + "\n</highly_relevant_facts>\n\n"
intent = state.get("query_intent", "general")
prompt_key = f"synthesizer_{intent}"
base_prompt = SYSTEM_PROMPTS.get(prompt_key, SYSTEM_PROMPTS["synthesizer"])
if agent_config.get("rag_mode") == "strict":
base_prompt += (
"\n\n🛡️ STRICT RAG MODE ENABLED:\n"
"1. You MUST rely EXCLUSIVELY on the provided sources.\n"
"2. DO NOT use your internal baseline knowledge to add new facts.\n"
"3. Adopt an objective, clinical, and strictly data-driven tone. Act as a neutral intelligence reporter.\n"
)
else:
base_prompt += (
"\n\n🧠 CREATIVE RAG MODE ENABLED:\n"
"1. You are actively encouraged to blend the scraped sources with your own deep expert knowledge.\n"
"2. Add historical context, future predictions, thought leadership, and broader industry insights.\n"
"3. Adopt an engaging, visionary, and analytical tone."
)
if state.get("next_step") == "clarify":
state["final_answer"] = "Your query was too vague or ambiguous. Could you please provide more specific details about what you would like me to research?"
state["status"] = "complete"
state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Requested clarification from user")
return state
if state.get("next_step") == "direct":
messages = [
{"role": "system", "content": f"{base_prompt}\n\nAnswer the user's question directly and concisely based on your internal knowledge. Do not use citations since no web research was performed."},
{"role": "user", "content": query},
]
else:
user_payload = (
f"Here is the research material:\n\n"
f"{facts_context}"
f"{context}\n\n"
f"User question: {query}\n\n"
f"CRITICAL INSTRUCTIONS:\n"
f"1. Generate a comprehensive answer to the user's question.\n"
f"2. You MUST use the [index] from the <raw_sources> to cite your claims (e.g., [1], [2]).\n"
f"3. Pay special attention to the <highly_relevant_facts> as they contain the highest-signal information."
)
messages = [
{"role": "system", "content": base_prompt},
{
"role": "user",
"content": user_payload,
},
]
if state.get("critiques") and state.get("reflexion_steps", 0) > 0:
latest_critique = state["critiques"][-1]
messages[1]["content"] += f"\n\nIMPORTANT: YOUR PREVIOUS ANSWER HAD THE FOLLOWING ISSUES. PLEASE FIX THEM IN THIS REVISION:\n{latest_critique}"
logger.info("[SYNTHESIZER] Injecting critique for reflexion loop.")
async def _generate_with_retry():
llm = llm_factory.create(
llm_config.get("provider", "nvidia"),
llm_config.get("model"),
api_key=llm_config.get("api_key"),
base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None,
timeout=120.0,
)
if agent_config.get("streaming", True):
full_response = ""
stream_cb = agent_config.get("stream_callback")
async for chunk in llm.stream_generate(
messages,
temperature=agent_config.get("temperature", 0.7),
max_tokens=agent_config.get("max_tokens", 4000),
):
full_response += chunk
if stream_cb:
if asyncio.iscoroutinefunction(stream_cb):
await stream_cb(chunk)
else:
stream_cb(chunk)
return full_response
else:
response = await llm.generate(
messages,
temperature=agent_config.get("temperature", 0.7),
max_tokens=agent_config.get("max_tokens", 4000),
)
return response.content
try:
state["final_answer"] = await retry_with_backoff(
_generate_with_retry,
max_retries=2,
base_delay=1.0,
exponential_base=2.0,
)
except Exception as e:
logger.error(f"[SYNTHESIZER] Error generating answer: {e}")
state["final_answer"] = f"Error generating answer: {str(e)}"
citations = []
for i, source in enumerate(scraped[:max_sources], 1):
citations.append(
{
"index": i,
"url": source.get("url", ""),
"title": source.get("title", "Unknown"),
"context": source.get("excerpt", "")[:200],
}
)
state["citations"] = citations
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Complete with {len(citations)} citations"
)
return state
async def verifier_node(state: AgentState) -> AgentState:
"""Verifies the final answer against extracted facts for accuracy."""
step_start = time.perf_counter()
answer = state.get("final_answer", "")
facts = state.get("extracted_facts", [])
llm_config = get_llm_config_from_state(state)
agent_config = state.get("metadata", {}).get("agent_config", {})
if not answer or not facts:
state["status"] = "complete"
return state
logger.info("[VERIFIER] Verifying answer accuracy...")
state["reasoning_trace"].append(
f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Checking for hallucinations or inaccuracies"
)
fact_texts = [f"- {f['fact']}" for f in facts[:30]]
verifier_prompt = SYSTEM_PROMPTS.get("verifier_strict") if agent_config.get("rag_mode") == "strict" else SYSTEM_PROMPTS["verifier"]
messages = [
{"role": "system", "content": verifier_prompt},
{
"role": "user",
"content": f"Facts:\n" + "\n".join(fact_texts) + f"\n\nAnswer:\n{answer}",
},
]
try:
response = await call_llm_with_retry(
messages, llm_config, temperature=0.1, max_tokens=4000
)
content = response.content.strip()
# Clean <think> tags for reasoning models that output thoughts before the final answer
clean_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL).strip()
clean_upper = clean_content.upper()
is_critique = False
if "CRITIQUE:" in clean_upper[:50] or clean_upper.startswith("CRITIQUE"):
is_critique = True
if "PASS" in clean_upper[:30] and not clean_upper.startswith("CRITIQUE"):
is_critique = False
if is_critique:
logger.info("[VERIFIER] Hallucinations detected. Triggering reflexion.")
state["critiques"] = state.get("critiques", []) + [clean_content]
state["reflexion_steps"] = state.get("reflexion_steps", 0) + 1
state["status"] = "pending_reflexion"
state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Critique generated. Triggering reflexion loop. (took {time.perf_counter() - step_start:.2f}s)")
else:
logger.info("[VERIFIER] Answer verified as accurate")
state["status"] = "complete"
state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Answer verified against source facts (took {time.perf_counter() - step_start:.2f}s)")
except Exception as e:
logger.error(f"[VERIFIER] Error: {e}")
state["status"] = "complete"
return state