import asyncio import logging import re import time from typing import List, Optional, Dict, Any from datetime import datetime, timezone from .prompts import SYSTEM_PROMPTS from .models import AgentState, SearchResult, ScrapedContent, ExtractedFact, Citation, get_agent_settings from .search import search_manager from .scraper import scraper from .llm import llm_factory from .config import config as default_config from .resilience import retry_with_backoff, search_cache logger = logging.getLogger(__name__) def get_llm_config_from_state(state: AgentState) -> dict: return state.get("metadata", {}).get( "llm_config", { "provider": "nvidia", "model": "mistralai/mistral-nemotron", "api_key": None, }, ) def get_deep_research_flag(state: AgentState) -> bool: metadata = state.get("metadata", {}) if "deep_research" in metadata: return bool(metadata["deep_research"]) return state.get("deep_research", False) async def call_llm_with_retry( messages, llm_config: dict, temperature=0.5, max_tokens=500 ): async def _call(): llm = llm_factory.create( llm_config.get("provider", "nvidia"), llm_config.get("model"), api_key=llm_config.get("api_key"), base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None, timeout=120.0, ) return await llm.generate( messages, temperature=temperature, max_tokens=max_tokens ) return await retry_with_backoff( _call, max_retries=2, base_delay=1.0, exponential_base=2.0, ) def extract_refined_query(response_content: str, fallback_query: str) -> str: patterns = [ r'[""]([^""]+)[""]', r'\*\*Refined Search Query:\*\*\s*(.+?)(?:\n|$)', r'Refined Query:\s*(.+?)(?:\n|$)', r'Search query:\s*(.+?)(?:\n|$)', ] for pattern in patterns: match = re.search(pattern, response_content, re.IGNORECASE | re.MULTILINE) if match: query = match.group(1).strip()[:500] if query and len(query) >= 3: return query lines = response_content.strip().split("\n") for line in lines: line = line.strip() if line and len(line) >= 10 and not line.startswith(("=", "-", "*", "#")): cleaned = re.sub(r'^\d+[.)]\s*', '', line)[:500] if cleaned: return cleaned return fallback_query[:500] async def router_node(state: AgentState) -> AgentState: """Decides if the query needs web research or a direct answer.""" step_start = time.perf_counter() query = state["original_query"] llm_config = get_llm_config_from_state(state) logger.info(f"[ROUTER] Analyzing query: {query[:50]}...") messages = [ {"role": "system", "content": SYSTEM_PROMPTS["router"]}, {"role": "user", "content": query}, ] try: response = await call_llm_with_retry( messages, llm_config, temperature=0.0, max_tokens=50 ) content = response.content.strip() import json, re json_match = re.search(r'\{.*\}', content.replace('\n', ''), re.DOTALL) if json_match: parsed = json.loads(json_match.group(0)) decision = parsed.get("route", "RESEARCH").upper() intent = parsed.get("intent", "general").lower() else: decision = content.upper() intent = "general" if "CLARIFY" in decision: state["next_step"] = "clarify" elif "DIRECT" in decision: state["next_step"] = "direct" else: state["next_step"] = "research" state["query_intent"] = intent state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Router: Decision - {state['next_step']}, Intent - {intent} (took {time.perf_counter() - step_start:.2f}s)" ) except Exception as e: logger.error(f"[ROUTER] Error: {e}") state["next_step"] = "research" state["query_intent"] = "general" state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Router: Defaulting to research due to error: {e} (took {time.perf_counter() - step_start:.2f}s)" ) return state async def planner_node(state: AgentState) -> AgentState: """Analyzes query and creates research strategy.""" step_start = time.perf_counter() query = state["original_query"] llm_config = get_llm_config_from_state(state) deep_research = get_deep_research_flag(state) state["deep_research"] = deep_research settings = get_agent_settings(deep_research) state["metadata"]["agent_settings"] = settings logger.info( f"[PLANNER] Mode: {'Deep' if deep_research else 'Quick'}, " f"provider={llm_config.get('provider')}, model={llm_config.get('model')}" ) state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Planner: {'Deep' if deep_research else 'Quick'} research mode" ) messages = [ {"role": "system", "content": SYSTEM_PROMPTS["planner"]}, { "role": "user", "content": f"User question: {query}\n\nCreate a refined search query and brief plan.", }, ] try: logger.info("[PLANNER] Generating HyDE document and calling Planner LLM concurrently...") hyde_start = time.perf_counter() llm_start = time.perf_counter() hyde_msg = [{"role": "system", "content": SYSTEM_PROMPTS.get("hyde", "")}, {"role": "user", "content": query}] hyde_task = call_llm_with_retry(hyde_msg, llm_config, temperature=0.7, max_tokens=250) planner_task = call_llm_with_retry(messages, llm_config, temperature=0.5, max_tokens=500) hyde_res, response = await asyncio.gather(hyde_task, planner_task) state["hyde_document"] = hyde_res.content.strip() state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Planner: Generated HyDE document for semantic expansion (took {time.perf_counter() - hyde_start:.2f}s)") logger.debug(f"[PLANNER] LLM response: {response.content[:200]}...") import json, re json_match = re.search(r'\[.*\]', response.content.replace('\n', ''), re.DOTALL) if json_match: sub_queries = json.loads(json_match.group(0)) state["sub_queries"] = [str(q)[:150] for q in sub_queries if isinstance(q, str)] else: state["sub_queries"] = [query] state["refined_query"] = state["sub_queries"][0] if state.get("sub_queries") else query state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Planner: Generated {len(state.get('sub_queries', []))} sub-queries (took {time.perf_counter() - llm_start:.2f}s)" ) except Exception as e: logger.error(f"[PLANNER] Error: {e}") state["refined_query"] = query state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Planner: Using original query due to error: {e} (took {time.perf_counter() - step_start:.2f}s)" ) return state async def search_node(state: AgentState) -> AgentState: """Executes web search using configured provider with caching.""" step_start = time.perf_counter() query = state.get("refined_query") or state["original_query"] llm_config = get_llm_config_from_state(state) deep_research = state.get("deep_research", False) settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research)) agent_config = state.get("metadata", {}).get("agent_config", {}) search_provider = agent_config.get("search_provider", "duckduckgo") max_results = settings.get("max_search_results", 10) is_retry = state.get("retry_count", 0) > 0 if is_retry: # Search Query Evolution for fallback query = f"{query} deeper analysis and recent facts" state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Search: Evolved query for retry -> '{query}'" ) logger.info(f"[SEARCH] Query: {query}, provider: {search_provider}, deep={deep_research}") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Search: Searching for '{query[:50]}...'" ) cache_key = f"{search_provider}:{query}:{max_results}:{deep_research}" if not is_retry: cached = search_cache.get_sync(cache_key) if cached: state["search_results"] = cached state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Search: Using cached results ({len(cached)} items) (took {time.perf_counter() - step_start:.2f}s)" ) return state # Compile queries to run in parallel queries_to_run = [query] if state.get("sub_queries"): queries_to_run.extend(state["sub_queries"]) if state.get("hyde_document"): # Truncate HyDE document to not exceed search engine limits queries_to_run.append(state["hyde_document"][:150]) queries_to_run = list(set(queries_to_run)) # Deduplicate async def _search_and_eval(current_query: str) -> list: max_attempts = 2 if deep_research else 1 for attempt in range(max_attempts): try: async def _do_search(): return await search_manager.search(query=current_query, provider=search_provider, max_results=max_results) results = await retry_with_backoff( _do_search, max_retries=2, base_delay=1.0, exponential_base=2.0 ) valid = [r for r in results if r.title and r.url and len(r.url) > 5] if not valid or attempt == max_attempts - 1: return valid snippet_text = "\n".join([f"Source: {r.url}\nSnippet: {r.content}" for r in valid[:5]]) eval_messages = [ {"role": "system", "content": SYSTEM_PROMPTS.get("search_evaluator", "Reply PASS if good.")}, {"role": "user", "content": f"Query: {current_query}\n\nResults:\n{snippet_text}"} ] eval_response = await call_llm_with_retry(eval_messages, llm_config, temperature=0.2, max_tokens=50) if "PASS" in eval_response.content.upper(): return valid else: current_query = eval_response.content.strip('"\'') except Exception as e: logger.error(f"[SEARCH] Query search error for {current_query}: {e}") return [] return [] try: tasks = [_search_and_eval(q) for q in queries_to_run] all_results_lists = await asyncio.gather(*tasks) # Flatten and deduplicate by URL (Interleaving for fairness across sub-queries) seen_urls = set() valid_results = [] max_len = max((len(l) for l in all_results_lists), default=0) for i in range(max_len): for res_list in all_results_lists: if i < len(res_list): r = res_list[i] if r.url not in seen_urls: seen_urls.add(r.url) valid_results.append(r) if len(valid_results) == 0: logger.warning("[SEARCH] No results found after attempts!") state["search_results"] = [ { "title": r.title, "url": r.url, "content": r.content, "score": r.score, "published_date": r.published_date, } for r in valid_results ] search_cache.set_sync(cache_key, state["search_results"]) state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Search: Found {len(valid_results)} valid results (took {time.perf_counter() - step_start:.2f}s)" ) for r in state["search_results"][:5]: state["reasoning_trace"].append(f" 🔗 {r['title'][:50]}... ({r['url']})") except Exception as e: logger.error(f"[SEARCH] Error: {e}") state["error"] = f"Search failed: {str(e)}" state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Search: Error - {str(e)} (took {time.perf_counter() - step_start:.2f}s)" ) return state async def scrape_node(state: AgentState) -> AgentState: """Fetches and extracts content from top search results.""" step_start = time.perf_counter() search_results = state.get("search_results", []) deep_research = state.get("deep_research", False) settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research)) max_sources = settings.get("max_sources", 5) max_concurrent = settings.get("analyzer_concurrency", 3) logger.info(f"[SCRAPE] search_results count: {len(search_results)}") if not search_results: state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Scrape: No results to scrape (took {time.perf_counter() - step_start:.2f}s)" ) return state # Only race the top (max_sources + 3) URLs to preserve relevance while allowing for some timeouts top_results = search_results[:max_sources + 3] urls = [r["url"] for r in top_results] logger.info(f"[SCRAPE] Racing to fetch {max_sources} sources from top {len(urls)} URLs (max_concurrent={max_concurrent})") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Racing to fetch {max_sources} sources from top {len(urls)} URLs" ) try: scraped = await scraper.fetch_multiple(urls, max_concurrent=max_concurrent, min_results=max_sources) logger.info(f"[SCRAPE] Scraped {len(scraped)} items") scraped_dict = {s.url: s for s in scraped if s} max_scraped_content = settings.get("max_content_for_scraping", 5000) final_content = [] for r in top_results: url = r["url"] if url in scraped_dict and scraped_dict[url].content: s = scraped_dict[url] final_content.append( { "url": s.url, "title": s.title, "content": s.content[:max_scraped_content], "excerpt": s.excerpt, "author": s.author, "published_date": s.published_date, "fetched_at": s.fetched_at, "chunks": getattr(s, "chunks", []), } ) elif r.get("content"): logger.info(f"[SCRAPE] Using search snippet fallback for: {url}") snippet = r["content"] final_content.append( { "url": url, "title": r.get("title", "Unknown"), "content": snippet, "excerpt": snippet[:300] + "..." if len(snippet) > 300 else snippet, "author": None, "published_date": r.get("published_date"), "fetched_at": datetime.now(timezone.utc).isoformat(), "chunks": [snippet], } ) if len(final_content) >= max_sources: break state["scraped_content"] = final_content state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Successfully extracted content from {len(scraped)} sources (took {time.perf_counter() - step_start:.2f}s)" ) except Exception as e: logger.error(f"[SCRAPE] Error: {e}") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Scrape: Error - {str(e)} (took {time.perf_counter() - step_start:.2f}s)" ) return state async def analyze_source( source: dict, query: str, llm, semaphore: asyncio.Semaphore, settings: dict ) -> List[dict]: """Analyze a single source and extract facts using dynamic chunks.""" async with semaphore: chunks = source.get("chunks", []) if not chunks: logger.warning(f"[ANALYZER] No content for source: {source.get('url', 'unknown')}") return [] # Format chunks with XML tags to drastically improve LLM attention ("Lost in the Middle" prevention) max_chunks = 4 # Increased to ensure we capture the main article body, not just headers/cookie banners processed_content = "\n\n".join([f"\n{c}\n" for i, c in enumerate(chunks[:max_chunks])]) messages = [ {"role": "system", "content": SYSTEM_PROMPTS["analyzer"]}, { "role": "user", "content": f"User question: {query}\n\nSource title: {source.get('title', 'Unknown')}\nSource URL: {source.get('url', '')}\n\nContent:\n{processed_content}\n\nExtract key facts highly relevant to the question. Format EXACTLY as: FACT | CATEGORY | CONFIDENCE", }, ] async def _generate_facts(): return await llm.generate(messages, temperature=0.3, max_tokens=2500) try: logger.info(f"[ANALYZER] Calling LLM for {source.get('url', 'unknown')[:30]}...") response = await retry_with_backoff( _generate_facts, max_retries=3, base_delay=2.0, exponential_base=2.0 ) logger.debug( f"[ANALYZER] LLM response: {response.content[:200] if response.content else 'EMPTY'}" ) if not response.content or not response.content.strip(): logger.warning(f"[ANALYZER] Empty response from LLM") return [] facts = [] captured_facts = False content = response.content.strip() import re content = re.sub(r'^```[\w]*\n', '', content) content = re.sub(r'\n```$', '', content) for line in content.split("\n"): line = line.strip().strip("-").strip("*").strip() if not line or "---" in line or line.upper().startswith("FACT |"): continue if "|" in line: parts = line.split("|") if len(parts) >= 2: fact_text = parts[0].strip() if not fact_text or fact_text.upper() == "FACT": continue try: confidence = ( float(parts[2].strip()) if len(parts) > 2 else 0.8 ) except ValueError: confidence = 0.8 facts.append( { "source_url": source.get("url", ""), "source_title": source.get("title", ""), "fact": fact_text, "category": parts[1].strip(), "confidence": confidence, "source_sentence": fact_text, } ) captured_facts = True elif len(line) > 20 and not line.lower().startswith( ("here are", "sure", "these are", "extracted facts", "note:", "source:", "the key facts", "below are") ): facts.append( { "source_url": source.get("url", ""), "source_title": source.get("title", ""), "fact": line, "category": "general", "confidence": 0.7, "source_sentence": line[:200], } ) captured_facts = True if not captured_facts and response.content.strip(): facts.append( { "source_url": source.get("url", ""), "source_title": source.get("title", ""), "fact": response.content.strip()[:500], "category": "general", "confidence": 0.7, "source_sentence": response.content.strip()[:200], } ) logger.info( f"[ANALYZER] Extracted {len(facts)} facts from {source.get('url', 'unknown')[:30]}" ) return facts except Exception as e: logger.error(f"[ANALYZER] Error analyzing source: {e}") return [] async def analyzer_node(state: AgentState) -> AgentState: """Extracts key facts from scraped content with parallel processing.""" step_start = time.perf_counter() scraped = state.get("scraped_content", []) query = state["original_query"] deep_research = state.get("deep_research", False) settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research)) analysis_round = state.get("analysis_round", 0) if not scraped: state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: No content to analyze (took {time.perf_counter() - step_start:.2f}s)" ) return state state["analysis_round"] = analysis_round + 1 llm_config = get_llm_config_from_state(state) max_sources_to_analyze = settings.get("max_sources_to_analyze", 5) sources_to_analyze = scraped[:max_sources_to_analyze] state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Round {state['analysis_round']}, analyzing {len(sources_to_analyze)} sources" ) model = llm_config.get("model") or "deepseek-ai/deepseek-r1" logger.info(f"[ANALYZER] Using model: {model}, round={state['analysis_round']}") llm = llm_factory.create( llm_config.get("provider", "nvidia"), model, api_key=llm_config.get("api_key"), base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None, timeout=120.0, ) concurrency = settings.get("analyzer_concurrency", 3) semaphore = asyncio.Semaphore(concurrency) tasks = [analyze_source(source, query, llm, semaphore, settings) for source in sources_to_analyze] results = await asyncio.gather(*tasks, return_exceptions=True) facts = [] for result in results: if isinstance(result, Exception): logger.error(f"[ANALYZER] Exception in results: {result}") state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Partial failure during extraction.") elif isinstance(result, list): facts.extend(result) existing_facts = state.get("extracted_facts", []) all_facts = existing_facts + facts state["extracted_facts"] = all_facts state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Analyzer: Extracted {len(facts)} facts (total: {len(all_facts)}) (took {time.perf_counter() - step_start:.2f}s)" ) min_facts_threshold = settings.get("min_facts_threshold", 3) retry_count = state.get("retry_count", 0) if len(facts) < min_facts_threshold and retry_count == 0 and analysis_round == 0: logger.warning(f"[ANALYZER] Insufficient facts ({len(facts)} < {min_facts_threshold}), triggering retry") state["retry_count"] = 1 state["analysis_round"] = 0 state["extracted_facts"] = [] state["scraped_content"] = [] state["search_results"] = [] return state async def ranker_node(state: AgentState) -> AgentState: """Ranks and filters extracted facts based on relevance to the query.""" step_start = time.perf_counter() query = state["original_query"] facts = state.get("extracted_facts", []) llm_config = get_llm_config_from_state(state) if not facts or len(facts) < 10: return state logger.info(f"[RANKER] Ranking {len(facts)} facts...") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Ranker: Filtering {len(facts)} facts for relevance" ) # Group facts into blocks for ranking to save tokens fact_texts = [f"{i+1}. {f['fact']}" for i, f in enumerate(facts[:50])] messages = [ {"role": "system", "content": SYSTEM_PROMPTS["ranker"]}, { "role": "user", "content": f"Query: {query}\n\nFacts:\n" + "\n".join(fact_texts), }, ] try: response = await call_llm_with_retry( messages, llm_config, temperature=0.0, max_tokens=50 ) ranked_text = response.content.strip() import re indices = [int(idx) for idx in re.findall(r'\d+', ranked_text)] ranked_facts = [] for idx in set(indices): if 1 <= idx <= len(facts[:50]): ranked_facts.append(facts[idx-1]) if ranked_facts: logger.info(f"[RANKER] Kept {len(ranked_facts)} relevant facts") state["extracted_facts"] = ranked_facts state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Ranker: Reduced to {len(state['extracted_facts'])} high-relevance facts (took {time.perf_counter() - step_start:.2f}s)" ) except Exception as e: logger.error(f"[RANKER] Error: {e}") return state async def synthesizer_node(state: AgentState) -> AgentState: """Generates final answer with citations.""" step_start = time.perf_counter() query = state["original_query"] facts = state.get("extracted_facts", []) scraped = state.get("scraped_content", []) error = state.get("error") search_results = state.get("search_results", []) deep_research = state.get("deep_research", False) if error: logger.error(f"[SYNTHESIZER] Error in pipeline: {error}") state["final_answer"] = ( f"I encountered an error during research: {error}\n\n" f"Search returned {len(search_results)} results. " f"Scraper found {len(scraped)} articles." ) state["status"] = "complete" return state if not scraped and state.get("next_step") != "direct": logger.warning("[SYNTHESIZER] No content to synthesize. Proceeding with internal knowledge.") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: No web content extracted. Falling back to internal knowledge." ) state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Generating answer with {len(facts)} facts" ) llm_config = get_llm_config_from_state(state) agent_config = state.get("metadata", {}).get("agent_config", {}) settings = state.get("metadata", {}).get("agent_settings", get_agent_settings(deep_research)) max_sources = settings.get("max_sources", 5) max_content_for_synth = settings.get("max_content_for_synthesizer", 1500) context_parts = [] for i, source in enumerate(scraped[:max_sources], 1): context_parts.append( f"\nTitle: {source.get('title', 'Unknown')}\nURL: {source.get('url', '')}\nSnippet: {source.get('content', '')[:max_content_for_synth]}\n" ) context = "\n" + "\n".join(context_parts) + "\n" facts_context = "" if facts: facts_list = [] for j, fact in enumerate(facts[:30], 1): source_title = fact.get("source_title", "") facts_list.append(f"- {fact.get('fact', '')} (Source: {source_title})") facts_context = "\n" + "\n".join(facts_list) + "\n\n\n" intent = state.get("query_intent", "general") prompt_key = f"synthesizer_{intent}" base_prompt = SYSTEM_PROMPTS.get(prompt_key, SYSTEM_PROMPTS["synthesizer"]) if agent_config.get("rag_mode") == "strict": base_prompt += ( "\n\n🛡️ STRICT RAG MODE ENABLED:\n" "1. You MUST rely EXCLUSIVELY on the provided sources.\n" "2. DO NOT use your internal baseline knowledge to add new facts.\n" "3. Adopt an objective, clinical, and strictly data-driven tone. Act as a neutral intelligence reporter.\n" ) else: base_prompt += ( "\n\n🧠 CREATIVE RAG MODE ENABLED:\n" "1. You are actively encouraged to blend the scraped sources with your own deep expert knowledge.\n" "2. Add historical context, future predictions, thought leadership, and broader industry insights.\n" "3. Adopt an engaging, visionary, and analytical tone." ) if state.get("next_step") == "clarify": state["final_answer"] = "Your query was too vague or ambiguous. Could you please provide more specific details about what you would like me to research?" state["status"] = "complete" state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Requested clarification from user") return state if state.get("next_step") == "direct": messages = [ {"role": "system", "content": f"{base_prompt}\n\nAnswer the user's question directly and concisely based on your internal knowledge. Do not use citations since no web research was performed."}, {"role": "user", "content": query}, ] else: user_payload = ( f"Here is the research material:\n\n" f"{facts_context}" f"{context}\n\n" f"User question: {query}\n\n" f"CRITICAL INSTRUCTIONS:\n" f"1. Generate a comprehensive answer to the user's question.\n" f"2. You MUST use the [index] from the to cite your claims (e.g., [1], [2]).\n" f"3. Pay special attention to the as they contain the highest-signal information." ) messages = [ {"role": "system", "content": base_prompt}, { "role": "user", "content": user_payload, }, ] if state.get("critiques") and state.get("reflexion_steps", 0) > 0: latest_critique = state["critiques"][-1] messages[1]["content"] += f"\n\nIMPORTANT: YOUR PREVIOUS ANSWER HAD THE FOLLOWING ISSUES. PLEASE FIX THEM IN THIS REVISION:\n{latest_critique}" logger.info("[SYNTHESIZER] Injecting critique for reflexion loop.") async def _generate_with_retry(): llm = llm_factory.create( llm_config.get("provider", "nvidia"), llm_config.get("model"), api_key=llm_config.get("api_key"), base_url=llm_config.get("base_url") if llm_config.get("provider") in ("huggingface", "openrouter") else None, timeout=120.0, ) if agent_config.get("streaming", True): full_response = "" stream_cb = agent_config.get("stream_callback") async for chunk in llm.stream_generate( messages, temperature=agent_config.get("temperature", 0.7), max_tokens=agent_config.get("max_tokens", 4000), ): full_response += chunk if stream_cb: if asyncio.iscoroutinefunction(stream_cb): await stream_cb(chunk) else: stream_cb(chunk) return full_response else: response = await llm.generate( messages, temperature=agent_config.get("temperature", 0.7), max_tokens=agent_config.get("max_tokens", 4000), ) return response.content try: state["final_answer"] = await retry_with_backoff( _generate_with_retry, max_retries=2, base_delay=1.0, exponential_base=2.0, ) except Exception as e: logger.error(f"[SYNTHESIZER] Error generating answer: {e}") state["final_answer"] = f"Error generating answer: {str(e)}" citations = [] for i, source in enumerate(scraped[:max_sources], 1): citations.append( { "index": i, "url": source.get("url", ""), "title": source.get("title", "Unknown"), "context": source.get("excerpt", "")[:200], } ) state["citations"] = citations state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Synthesizer: Complete with {len(citations)} citations" ) return state async def verifier_node(state: AgentState) -> AgentState: """Verifies the final answer against extracted facts for accuracy.""" step_start = time.perf_counter() answer = state.get("final_answer", "") facts = state.get("extracted_facts", []) llm_config = get_llm_config_from_state(state) agent_config = state.get("metadata", {}).get("agent_config", {}) if not answer or not facts: state["status"] = "complete" return state logger.info("[VERIFIER] Verifying answer accuracy...") state["reasoning_trace"].append( f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Checking for hallucinations or inaccuracies" ) fact_texts = [f"- {f['fact']}" for f in facts[:30]] verifier_prompt = SYSTEM_PROMPTS.get("verifier_strict") if agent_config.get("rag_mode") == "strict" else SYSTEM_PROMPTS["verifier"] messages = [ {"role": "system", "content": verifier_prompt}, { "role": "user", "content": f"Facts:\n" + "\n".join(fact_texts) + f"\n\nAnswer:\n{answer}", }, ] try: response = await call_llm_with_retry( messages, llm_config, temperature=0.1, max_tokens=4000 ) content = response.content.strip() # Clean tags for reasoning models that output thoughts before the final answer clean_content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip() clean_upper = clean_content.upper() is_critique = False if "CRITIQUE:" in clean_upper[:50] or clean_upper.startswith("CRITIQUE"): is_critique = True if "PASS" in clean_upper[:30] and not clean_upper.startswith("CRITIQUE"): is_critique = False if is_critique: logger.info("[VERIFIER] Hallucinations detected. Triggering reflexion.") state["critiques"] = state.get("critiques", []) + [clean_content] state["reflexion_steps"] = state.get("reflexion_steps", 0) + 1 state["status"] = "pending_reflexion" state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Critique generated. Triggering reflexion loop. (took {time.perf_counter() - step_start:.2f}s)") else: logger.info("[VERIFIER] Answer verified as accurate") state["status"] = "complete" state["reasoning_trace"].append(f"[{datetime.now(timezone.utc).isoformat()}] Verifier: Answer verified against source facts (took {time.perf_counter() - step_start:.2f}s)") except Exception as e: logger.error(f"[VERIFIER] Error: {e}") state["status"] = "complete" return state