Spaces:
Runtime error
Runtime error
| # AGENTS.md | |
| **A Technical Deep-Dive into Multi-Agent Architecture** | |
| This document provides a comprehensive technical reference for understanding, building, and debugging agents in the Multi-Agent Research Paper Analysis System. It focuses on agent design patterns, state transformations, error handling, observability, and extensibility. | |
| --- | |
| ## Table of Contents | |
| 1. [Introduction](#1-introduction) | |
| - [The 4-Agent Sequential Pipeline](#the-4-agent-sequential-pipeline) | |
| - [Agent Design Philosophy](#agent-design-philosophy) | |
| - [How Agents Differ from Traditional Microservices](#how-agents-differ-from-traditional-microservices) | |
| 2. [Agent Architecture Fundamentals](#2-agent-architecture-fundamentals) | |
| - [The Common Agent Interface](#the-common-agent-interface) | |
| - [State Transformation Contract](#state-transformation-contract) | |
| - [Dependency Injection Pattern](#dependency-injection-pattern) | |
| - [LangGraph Integration Through Node Wrappers](#langgraph-integration-through-node-wrappers) | |
| 3. [Individual Agent Deep Dives](#3-individual-agent-deep-dives) | |
| - [RetrieverAgent](#retrieveragent) | |
| - [AnalyzerAgent](#analyzeragent) | |
| - [SynthesisAgent](#synthesisagent) | |
| - [CitationAgent](#citationagent) | |
| 4. [Cross-Cutting Patterns](#4-cross-cutting-patterns) | |
| - [State Management](#state-management) | |
| - [Error Handling Philosophy](#error-handling-philosophy) | |
| - [Observability Integration](#observability-integration) | |
| - [Performance Optimizations](#performance-optimizations) | |
| 5. [Workflow Orchestration](#5-workflow-orchestration) | |
| - [LangGraph Workflow Structure](#langgraph-workflow-structure) | |
| - [Node Wrapper Pattern](#node-wrapper-pattern) | |
| - [Conditional Routing](#conditional-routing) | |
| - [Checkpointing and State Persistence](#checkpointing-and-state-persistence) | |
| 6. [Building New Agents](#6-building-new-agents) | |
| - [Step-by-Step Development Guide](#step-by-step-development-guide) | |
| - [Minimal Agent Template](#minimal-agent-template) | |
| - [Testing Patterns](#testing-patterns) | |
| - [Best Practices Checklist](#best-practices-checklist) | |
| 7. [Agent Comparison Reference](#7-agent-comparison-reference) | |
| 8. [Troubleshooting and Debugging](#8-troubleshooting-and-debugging) | |
| - [Common Issues and Solutions](#common-issues-and-solutions) | |
| - [Reading LangFuse Traces](#reading-langfuse-traces) | |
| - [State Inspection Techniques](#state-inspection-techniques) | |
| - [Log Analysis Patterns](#log-analysis-patterns) | |
| --- | |
| ## 1. Introduction | |
| ### The 4-Agent Sequential Pipeline | |
| The Multi-Agent Research Paper Analysis System implements a **sequential pipeline** of four specialized agents orchestrated by LangGraph: | |
| ``` | |
| User Query β Retriever β Analyzer β Filter β Synthesis β Citation β Output | |
| β β β β β | |
| [LangFuse Tracing for All Nodes] | |
| ``` | |
| Each agent: | |
| - Operates on a **shared state dictionary** that flows through the pipeline | |
| - Performs a **specialized task** (retrieval, analysis, synthesis, citation) | |
| - Transforms the state by **reading inputs** and **writing outputs** | |
| - **Never blocks the workflow** - returns partial results on failure | |
| - Is **automatically traced** by LangFuse for observability | |
| ### Agent Design Philosophy | |
| The architecture follows these core principles: | |
| **1. Pure Functions, Not Stateful Services** | |
| - Agents are pure functions: `run(state) -> state` | |
| - No instance state between invocations | |
| - Deterministic outputs for same inputs (temperature=0) | |
| **2. Resilience Through Graceful Degradation** | |
| - Never raise exceptions from `run()` | |
| - Return partial results with degraded confidence scores | |
| - Append errors to state for debugging | |
| - Circuit breakers prevent cascading failures | |
| **3. Observability by Design** | |
| - All agents decorated with `@observe` for automatic tracing | |
| - Three-tier tracing: node-level, agent-level, LLM-level | |
| - Session IDs track multi-turn conversations | |
| - Token usage accumulated for cost monitoring | |
| **4. Separation of Concerns** | |
| - Agent logic: Domain-specific transformations | |
| - Node wrappers: Orchestration concerns (tracing, error handling, logging) | |
| - Workflow graph: Routing and conditional execution | |
| **5. Explicit Contracts** | |
| - Pydantic schemas validate all data structures | |
| - AgentState TypedDict defines state shape | |
| - msgpack serialization constraints enforced | |
| ### How Agents Differ from Traditional Microservices | |
| | Aspect | Traditional Microservices | Our Agents | | |
| |--------|--------------------------|------------| | |
| | **Communication** | HTTP/gRPC between services | Shared state dictionary | | |
| | **State** | Each service has database | Stateless, state flows through pipeline | | |
| | **Failure Handling** | Retry with exponential backoff | Graceful degradation with partial results | | |
| | **Orchestration** | Service mesh, API gateway | LangGraph with conditional routing | | |
| | **Observability** | Distributed tracing (Jaeger, Zipkin) | LangFuse with automatic instrumentation | | |
| | **Deployment** | Independent containers | Single process, modular architecture | | |
| | **Scaling** | Horizontal scaling | Parallel processing within agents (ThreadPoolExecutor) | | |
| **Key Insight**: Agents are lightweight, composable functions orchestrated by a workflow graph, not heavyweight network services. | |
| --- | |
| ## 2. Agent Architecture Fundamentals | |
| ### The Common Agent Interface | |
| All agents implement a consistent interface: | |
| ```python | |
| from typing import Dict, Any | |
| class BaseAgent: | |
| """Base interface for all agents in the system.""" | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Transform the workflow state. | |
| Args: | |
| state: Current workflow state (AgentState TypedDict) | |
| Returns: | |
| Updated state with new keys/values added | |
| Raises: | |
| Never raises - catches all exceptions and appends to state["errors"] | |
| """ | |
| raise NotImplementedError | |
| ``` | |
| **Critical Contract Rules:** | |
| 1. **Never Mutate State In-Place**: Always return a new/modified dictionary | |
| 2. **Never Raise Exceptions**: Catch all exceptions, append to `state["errors"]` | |
| 3. **Always Return State**: Even on failure, return state with partial results | |
| 4. **Use Pydantic Models**: Validate outputs before adding to state | |
| ### State Transformation Contract | |
| Agents follow a clear input/output pattern: | |
| ```python | |
| # Input: Read specific keys from state | |
| query = state.get("query") | |
| papers = state.get("papers", []) | |
| category = state.get("category") | |
| # Processing: Transform data using dependencies | |
| results = self.process(query, papers) | |
| # Output: Write new keys to state (never overwrite critical keys) | |
| state["analyses"] = results | |
| state["token_usage"]["input_tokens"] += prompt_tokens | |
| state["errors"].append(error_message) # Only on error | |
| # Return: Modified state | |
| return state | |
| ``` | |
| **State Flow Example:** | |
| ```python | |
| # Initial state (from user input) | |
| { | |
| "query": "What are recent advances in transformer architectures?", | |
| "category": "cs.AI", | |
| "num_papers": 5, | |
| "errors": [], | |
| "token_usage": {"input_tokens": 0, "output_tokens": 0, "embedding_tokens": 0} | |
| } | |
| # After RetrieverAgent | |
| { | |
| # ... original keys ... | |
| "papers": [Paper(...), Paper(...), ...], # NEW | |
| "chunks": [PaperChunk(...), ...], # NEW | |
| "token_usage": {"embedding_tokens": 15000} # UPDATED | |
| } | |
| # After AnalyzerAgent | |
| { | |
| # ... all previous keys ... | |
| "analyses": [Analysis(...), Analysis(...), ...], # NEW | |
| "token_usage": { | |
| "input_tokens": 12000, # UPDATED | |
| "output_tokens": 3000, # UPDATED | |
| "embedding_tokens": 15000 | |
| } | |
| } | |
| # And so on through the pipeline... | |
| ``` | |
| ### Dependency Injection Pattern | |
| Agents receive their dependencies via constructor injection: | |
| ```python | |
| # agents/analyzer.py | |
| class AnalyzerAgent: | |
| """Analyzes individual papers using RAG.""" | |
| def __init__( | |
| self, | |
| rag_retriever, # Injected dependency | |
| azure_openai_config: Dict[str, str], | |
| max_workers: int = 4, | |
| timeout: int = 60 | |
| ): | |
| self.rag_retriever = rag_retriever | |
| self.client = self._initialize_client(azure_openai_config) | |
| self.max_workers = max_workers | |
| self.timeout = timeout | |
| self.consecutive_failures = 0 | |
| self.max_consecutive_failures = 2 | |
| self.token_lock = threading.Lock() | |
| ``` | |
| **Benefits:** | |
| - **Testability**: Easy to mock dependencies in tests | |
| - **Flexibility**: Different implementations can be injected (e.g., ArxivClient vs MCPArxivClient) | |
| - **Clarity**: Dependencies are explicit in constructor signature | |
| **Initialization in app.py:** | |
| ```python | |
| # app.py:298-345 | |
| rag_retriever = RAGRetriever(vector_store=vector_store, embedding_generator=embedding_generator) | |
| analyzer_agent = AnalyzerAgent(rag_retriever=rag_retriever, azure_openai_config=azure_config) | |
| synthesis_agent = SynthesisAgent(rag_retriever=rag_retriever, azure_openai_config=azure_config) | |
| citation_agent = CitationAgent(rag_retriever=rag_retriever) | |
| ``` | |
| ### LangGraph Integration Through Node Wrappers | |
| Agents integrate with LangGraph through a **node wrapper pattern**: | |
| ```python | |
| # orchestration/nodes.py | |
| from langfuse.decorators import observe | |
| @observe(name="analyzer_agent", as_type="span") | |
| def analyzer_node(state: AgentState, analyzer_agent) -> AgentState: | |
| """ | |
| Node wrapper for AnalyzerAgent. | |
| Responsibilities: | |
| - LangFuse tracing (via @observe decorator) | |
| - Structured logging | |
| - Error handling (catch exceptions) | |
| - State transformation delegation | |
| """ | |
| logger.info("Starting analyzer agent...") | |
| try: | |
| # Delegate to agent's run() method | |
| updated_state = analyzer_agent.run(state) | |
| logger.info(f"Analyzer completed. Analyses: {len(updated_state.get('analyses', []))}") | |
| return updated_state | |
| except Exception as e: | |
| logger.error(f"Analyzer node failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Analyzer failed: {str(e)}") | |
| return state | |
| ``` | |
| **Workflow Graph Definition:** | |
| ```python | |
| # orchestration/workflow_graph.py:75-88 | |
| from langgraph.graph import StateGraph, END | |
| workflow = StateGraph(AgentState) | |
| # Add nodes (lambda binds agent instance to node wrapper) | |
| workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent)) | |
| workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent)) | |
| workflow.add_node("filter", filter_node) | |
| workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent)) | |
| workflow.add_node("citation", lambda state: citation_node(state, citation_agent)) | |
| # Define edges (execution flow) | |
| workflow.set_entry_point("retriever") | |
| workflow.add_edge("analyzer", "filter") | |
| workflow.add_edge("synthesis", "citation") | |
| workflow.add_edge("citation", END) | |
| ``` | |
| **Why Node Wrappers?** | |
| 1. **Separation of Concerns**: Agent logic stays pure, orchestration concerns in wrapper | |
| 2. **Automatic Tracing**: `@observe` decorator applies to all agents uniformly | |
| 3. **Centralized Error Handling**: Catch-all exception handling prevents workflow crashes | |
| 4. **Consistent Logging**: Structured logs with same format across all agents | |
| --- | |
| ## 3. Individual Agent Deep Dives | |
| ### RetrieverAgent | |
| **File**: `agents/retriever.py` | |
| **Core Responsibilities:** | |
| 1. Search arXiv for papers matching user query and category | |
| 2. Download PDFs via configurable clients (Direct API, Legacy MCP, FastMCP) | |
| 3. Process PDFs into 500-token chunks with 50-token overlap | |
| 4. Generate embeddings using Azure OpenAI text-embedding-3-small | |
| 5. Store chunks in ChromaDB vector database | |
| **State Transformations:** | |
| ```python | |
| # Input Keys | |
| query = state.get("query") # str: "What are recent advances in transformers?" | |
| category = state.get("category") # Optional[str]: "cs.AI" | |
| num_papers = state.get("num_papers", 5) # int: 5 | |
| # Output Keys (added to state) | |
| state["papers"] = [Paper(...), ...] # List[Paper]: Paper metadata | |
| state["chunks"] = [PaperChunk(...), ...] # List[PaperChunk]: Text chunks | |
| state["token_usage"]["embedding_tokens"] = 15000 # Estimated tokens | |
| state["errors"].append("Failed to download paper X") # On partial failure | |
| ``` | |
| **Dependencies:** | |
| ```python | |
| def __init__( | |
| self, | |
| arxiv_client, # ArxivClient | MCPArxivClient | FastMCPArxivClient | |
| pdf_processor, # PDFProcessor | |
| embedding_generator, # EmbeddingGenerator | |
| vector_store, # VectorStore (ChromaDB) | |
| fallback_client=None # Optional fallback client | |
| ): | |
| ``` | |
| **Key Design Pattern: Two-Tier Fallback** | |
| ```python | |
| # agents/retriever.py:69-97 | |
| def _search_with_fallback( | |
| self, | |
| query: str, | |
| max_results: int, | |
| category: Optional[str] = None | |
| ) -> List[Paper]: | |
| """Search with automatic fallback to secondary client.""" | |
| # Try primary client (e.g., FastMCP) | |
| try: | |
| logger.info(f"Searching with primary client: {type(self.arxiv_client).__name__}") | |
| papers = self.arxiv_client.search_papers( | |
| query=query, | |
| max_results=max_results, | |
| category=category | |
| ) | |
| if papers: | |
| return papers | |
| logger.warning("Primary client returned no results, trying fallback...") | |
| except Exception as e: | |
| logger.warning(f"Primary client failed: {str(e)}, trying fallback...") | |
| # Fallback to secondary client (e.g., Direct API) | |
| if self.fallback_client: | |
| try: | |
| logger.info(f"Searching with fallback client: {type(self.fallback_client).__name__}") | |
| return self.fallback_client.search_papers( | |
| query=query, | |
| max_results=max_results, | |
| category=category | |
| ) | |
| except Exception as e: | |
| logger.error(f"Fallback client also failed: {str(e)}") | |
| return [] | |
| return [] | |
| ``` | |
| **Why This Pattern?** | |
| - **Resilience**: MCP servers may be unavailable, fallback ensures retrieval succeeds | |
| - **Transparency**: Logs show which client succeeded | |
| - **Zero User Impact**: Fallback is automatic and invisible | |
| **Key Design Pattern: Data Validation Filtering** | |
| ```python | |
| # agents/retriever.py:198-242 | |
| def _validate_papers(self, papers: List[Paper]) -> List[Paper]: | |
| """Validate and filter papers to ensure Pydantic compliance.""" | |
| valid_papers = [] | |
| for paper in papers: | |
| try: | |
| # Ensure all list fields are actually lists | |
| if not isinstance(paper.authors, list): | |
| paper.authors = [paper.authors] if paper.authors else [] | |
| if not isinstance(paper.categories, list): | |
| paper.categories = [paper.categories] if paper.categories else [] | |
| # Re-validate with Pydantic | |
| validated_paper = Paper(**paper.model_dump()) | |
| valid_papers.append(validated_paper) | |
| except Exception as e: | |
| logger.warning(f"Skipping invalid paper {paper.arxiv_id}: {str(e)}") | |
| continue | |
| logger.info(f"Validated {len(valid_papers)}/{len(papers)} papers") | |
| return valid_papers | |
| ``` | |
| **Why This Pattern?** | |
| - **Defensive Programming**: MCP servers may return malformed data | |
| - **Partial Success**: Continue with valid papers instead of failing completely | |
| - **Type Safety**: Ensures downstream agents can rely on Pydantic schemas | |
| **Error Handling Strategy:** | |
| ```python | |
| # agents/retriever.py:249-302 | |
| @observe(name="retriever_agent_run", as_type="generation") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| # Step 1: Search (with fallback) | |
| papers = self._search_with_fallback(query, max_results, category) | |
| if not papers: | |
| state["errors"].append("No papers found for query") | |
| return state # Early return, no exception | |
| # Step 2: Download PDFs (continue on partial failures) | |
| for paper in papers: | |
| try: | |
| pdf_path = self._download_with_fallback(paper) | |
| # Process PDF... | |
| except Exception as e: | |
| logger.warning(f"Failed to process {paper.arxiv_id}: {str(e)}") | |
| continue # Skip this paper, process others | |
| # Step 3: Generate embeddings (batch operation) | |
| try: | |
| embeddings = self.embedding_generator.generate_batch(chunks) | |
| except Exception as e: | |
| logger.error(f"Embedding generation failed: {str(e)}") | |
| state["errors"].append("Embedding generation failed") | |
| return state # Return papers/chunks without embeddings | |
| # Success: Return enriched state | |
| state["papers"] = papers | |
| state["chunks"] = chunks | |
| state["token_usage"]["embedding_tokens"] = len(chunks) * 300 | |
| return state | |
| except Exception as e: | |
| logger.error(f"Retriever agent failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Retriever failed: {str(e)}") | |
| return state # Never raise | |
| ``` | |
| **Observability Integration:** | |
| ```python | |
| @observe(name="retriever_agent_run", as_type="generation") | |
| ``` | |
| - **Type**: `"generation"` (includes embedding generation) | |
| - **Trace Data**: Search query, paper count, chunk count, embedding tokens | |
| - **LangFuse View**: Shows retrieval duration, embedding API calls | |
| **Critical File Paths:** | |
| - `agents/retriever.py:69-97` - Fallback search logic | |
| - `agents/retriever.py:100-157` - Fallback download logic | |
| - `agents/retriever.py:198-242` - Paper validation | |
| - `agents/retriever.py:249-302` - Main `run()` method | |
| --- | |
| ### AnalyzerAgent | |
| **File**: `agents/analyzer.py` | |
| **Core Responsibilities:** | |
| 1. Analyze each paper individually using RAG context | |
| 2. Execute 4 broad queries per paper for comprehensive coverage | |
| 3. Call Azure OpenAI (GPT-4o-mini) with temperature=0 for deterministic JSON | |
| 4. Extract methodology, findings, conclusions, limitations, contributions | |
| 5. Calculate confidence scores based on context completeness | |
| **State Transformations:** | |
| ```python | |
| # Input Keys | |
| papers = state.get("papers", []) # List[Paper] from RetrieverAgent | |
| # Output Keys (added to state) | |
| state["analyses"] = [Analysis(...), ...] # List[Analysis]: One per paper | |
| state["token_usage"]["input_tokens"] += 12000 # Cumulative prompt tokens | |
| state["token_usage"]["output_tokens"] += 3000 # Cumulative completion tokens | |
| state["errors"].append("Failed to analyze paper X") # On failure | |
| ``` | |
| **Dependencies:** | |
| ```python | |
| def __init__( | |
| self, | |
| rag_retriever, # RAGRetriever: Semantic search + context formatting | |
| azure_openai_config: Dict[str, str], | |
| max_workers: int = 4, # Parallel analysis threads | |
| timeout: int = 60 # LLM call timeout | |
| ): | |
| ``` | |
| **Key Design Pattern: Parallel Processing with Circuit Breaker** | |
| ```python | |
| # agents/analyzer.py:333-359 | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| papers = state.get("papers", []) | |
| analyses = [] | |
| # Reset circuit breaker | |
| self.consecutive_failures = 0 | |
| # Parallel processing with ThreadPoolExecutor | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_paper = { | |
| executor.submit(self.analyze_paper, paper): paper | |
| for paper in papers | |
| } | |
| for future in as_completed(future_to_paper): | |
| # Circuit breaker check | |
| if self.consecutive_failures >= self.max_consecutive_failures: | |
| logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures") | |
| break | |
| paper = future_to_paper[future] | |
| try: | |
| analysis = future.result() | |
| if analysis.confidence_score > 0: | |
| analyses.append(analysis) | |
| self.consecutive_failures = 0 # Reset on success | |
| else: | |
| self.consecutive_failures += 1 | |
| except Exception as e: | |
| logger.error(f"Analysis failed for {paper.arxiv_id}: {str(e)}") | |
| self.consecutive_failures += 1 | |
| state["analyses"] = analyses | |
| return state | |
| ``` | |
| **Why This Pattern?** | |
| - **Throughput**: Analyzes 4 papers concurrently (max_workers=4) | |
| - **Circuit Breaker**: Stops after 2 consecutive failures (prevents wasted API calls) | |
| - **Thread Safety**: `self.token_lock` protects shared token counter | |
| - **Graceful Degradation**: Partial analyses returned even if some papers fail | |
| **Key Design Pattern: Comprehensive RAG Queries** | |
| ```python | |
| # agents/analyzer.py:208-252 | |
| def _retrieve_comprehensive_context(self, paper: Paper, top_k: int = 10) -> Tuple[str, List[str]]: | |
| """ | |
| Retrieve chunks using multiple broad queries to ensure full coverage. | |
| """ | |
| # 4 broad queries to cover different aspects | |
| queries = [ | |
| "methodology approach methods experimental setup techniques", | |
| "results findings data experiments performance evaluation", | |
| "conclusions contributions implications significance impact", | |
| "limitations future work challenges open problems directions" | |
| ] | |
| all_chunks = [] | |
| all_chunk_ids = [] | |
| # Retrieve top_k/4 chunks per query (10 total chunks by default) | |
| chunks_per_query = max(1, top_k // len(queries)) | |
| for query in queries: | |
| result = self.rag_retriever.retrieve( | |
| query=query, | |
| top_k=chunks_per_query, | |
| paper_ids=[paper.arxiv_id] # Filter to this paper only | |
| ) | |
| all_chunks.extend(result["chunks"]) | |
| all_chunk_ids.extend(result["chunk_ids"]) | |
| # Deduplicate by chunk_id | |
| seen = set() | |
| unique_chunks = [] | |
| unique_ids = [] | |
| for chunk, chunk_id in zip(all_chunks, all_chunk_ids): | |
| if chunk_id not in seen: | |
| seen.add(chunk_id) | |
| unique_chunks.append(chunk) | |
| unique_ids.append(chunk_id) | |
| # Format context with metadata | |
| context = self.rag_retriever.format_context(unique_chunks) | |
| return context, unique_ids | |
| ``` | |
| **Why This Pattern?** | |
| - **Comprehensive Coverage**: Single query misses sections (e.g., "methods" misses conclusions) | |
| - **Semantic Diversity**: Broad queries capture different aspects of the paper | |
| - **Deduplication**: Prevents redundant chunks from multiple queries | |
| - **Filtered Search**: `paper_ids` ensures we only retrieve from current paper | |
| **Key Design Pattern: LLM Response Normalization** | |
| ```python | |
| # agents/analyzer.py:107-178 | |
| def _normalize_analysis_response(self, data: dict) -> dict: | |
| """ | |
| Normalize malformed LLM responses to match Pydantic schema. | |
| Common issues: | |
| - Nested lists: ["finding 1", ["finding 2", "finding 3"]] | |
| - None values in lists: [None, "valid finding"] | |
| - Mixed types: [123, "text", {"key": "value"}] | |
| """ | |
| def flatten_and_clean(value): | |
| """Recursively flatten nested lists and convert to strings.""" | |
| if value is None: | |
| return "" | |
| elif isinstance(value, list): | |
| flattened = [] | |
| for item in value: | |
| cleaned = flatten_and_clean(item) | |
| if isinstance(cleaned, list): | |
| flattened.extend(cleaned) | |
| elif cleaned: # Skip empty strings | |
| flattened.append(cleaned) | |
| return flattened | |
| elif isinstance(value, (dict, int, float, bool)): | |
| return str(value) | |
| else: | |
| return str(value) | |
| # Normalize all list fields | |
| normalized = {} | |
| list_fields = ["methodology", "key_findings", "conclusions", "limitations", "contributions"] | |
| for field in list_fields: | |
| if field in data: | |
| cleaned = flatten_and_clean(data[field]) | |
| normalized[field] = cleaned if isinstance(cleaned, list) else [cleaned] | |
| else: | |
| normalized[field] = [] | |
| # Preserve scalar fields | |
| normalized["confidence_score"] = float(data.get("confidence_score", 0.0)) | |
| normalized["arxiv_id"] = data.get("arxiv_id", "") | |
| normalized["title"] = data.get("title", "") | |
| return normalized | |
| ``` | |
| **Why This Pattern?** | |
| - **LLM Hallucinations**: GPT-4o-mini occasionally returns malformed JSON | |
| - **Defensive Parsing**: Prevents Pydantic validation errors | |
| - **Data Salvage**: Extracts valid data even from malformed responses | |
| **Error Handling Strategy:** | |
| ```python | |
| # agents/analyzer.py:260-325 | |
| def analyze_paper(self, paper: Paper) -> Analysis: | |
| """Analyze a single paper (called by ThreadPoolExecutor).""" | |
| try: | |
| # Step 1: Retrieve context via RAG | |
| context, chunk_ids = self._retrieve_comprehensive_context(paper) | |
| # Step 2: Call LLM with structured prompt | |
| response = self.client.chat.completions.create( | |
| model=self.deployment_name, | |
| messages=[ | |
| {"role": "system", "content": "You are a research paper analyzer..."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.0, # Deterministic | |
| response_format={"type": "json_object"}, # Force JSON | |
| max_tokens=2000, | |
| timeout=self.timeout | |
| ) | |
| # Step 3: Parse and normalize response | |
| data = json.loads(response.choices[0].message.content) | |
| normalized = self._normalize_analysis_response(data) | |
| # Step 4: Create Pydantic model | |
| analysis = Analysis(**normalized) | |
| # Step 5: Track tokens (thread-safe) | |
| with self.token_lock: | |
| self.total_input_tokens += response.usage.prompt_tokens | |
| self.total_output_tokens += response.usage.completion_tokens | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"Failed to analyze {paper.arxiv_id}: {str(e)}", exc_info=True) | |
| # Return minimal analysis with confidence=0.0 | |
| return Analysis( | |
| arxiv_id=paper.arxiv_id, | |
| title=paper.title, | |
| methodology=[], | |
| key_findings=[], | |
| conclusions=[], | |
| limitations=[], | |
| contributions=[], | |
| confidence_score=0.0 | |
| ) | |
| ``` | |
| **Observability Integration:** | |
| ```python | |
| @observe(name="analyzer_agent_run", as_type="generation") | |
| ``` | |
| - **Type**: `"generation"` (LLM-heavy workload) | |
| - **Trace Data**: Paper count, analysis count, token usage, parallel execution | |
| - **LangFuse View**: Shows individual LLM calls via `langfuse-openai` instrumentation | |
| **Critical File Paths:** | |
| - `agents/analyzer.py:107-178` - Response normalization | |
| - `agents/analyzer.py:208-252` - Comprehensive RAG queries | |
| - `agents/analyzer.py:260-325` - Single paper analysis | |
| - `agents/analyzer.py:333-359` - Parallel processing with circuit breaker | |
| --- | |
| ### SynthesisAgent | |
| **File**: `agents/synthesis.py` | |
| **Core Responsibilities:** | |
| 1. Compare findings across all analyzed papers | |
| 2. Identify consensus points (where papers agree) | |
| 3. Identify contradictions (where papers disagree) | |
| 4. Identify research gaps (what's missing) | |
| 5. Generate executive summary addressing user's original query | |
| **State Transformations:** | |
| ```python | |
| # Input Keys | |
| papers = state.get("papers", []) # List[Paper] | |
| analyses = state.get("analyses", []) # List[Analysis] from AnalyzerAgent | |
| query = state.get("query") # Original user question | |
| # Output Keys (added to state) | |
| state["synthesis"] = SynthesisResult( | |
| consensus_points=[ConsensusPoint(...), ...], | |
| contradictions=[Contradiction(...), ...], | |
| research_gaps=["Gap 1", "Gap 2", ...], | |
| summary="Executive summary addressing user query...", | |
| papers_analyzed=["arxiv_id_1", "arxiv_id_2", ...], | |
| confidence_score=0.85 | |
| ) | |
| state["token_usage"]["input_tokens"] += 8000 | |
| state["token_usage"]["output_tokens"] += 2000 | |
| ``` | |
| **Dependencies:** | |
| ```python | |
| def __init__( | |
| self, | |
| rag_retriever, # RAGRetriever (passed but not actively used) | |
| azure_openai_config: Dict[str, str], | |
| timeout: int = 90 # Longer timeout for synthesis (more complex task) | |
| ): | |
| ``` | |
| **Key Design Pattern: Cross-Paper Synthesis Prompt** | |
| ```python | |
| # agents/synthesis.py:54-133 | |
| def _create_synthesis_prompt( | |
| self, | |
| query: str, | |
| papers: List[Paper], | |
| analyses: List[Analysis] | |
| ) -> str: | |
| """ | |
| Create structured prompt for cross-paper synthesis. | |
| """ | |
| # Format all analyses into structured summaries | |
| paper_summaries = [] | |
| for paper, analysis in zip(papers, analyses): | |
| summary = f""" | |
| [Paper {paper.arxiv_id}] | |
| Title: {paper.title} | |
| Authors: {', '.join(paper.authors[:3])}... | |
| Published: {paper.published} | |
| Methodology: {' | '.join(analysis.methodology[:3])} | |
| Key Findings: {' | '.join(analysis.key_findings[:3])} | |
| Conclusions: {' | '.join(analysis.conclusions[:2])} | |
| Limitations: {' | '.join(analysis.limitations[:2])} | |
| Contributions: {' | '.join(analysis.contributions[:2])} | |
| """ | |
| paper_summaries.append(summary) | |
| # Synthesis prompt | |
| prompt = f""" | |
| You are synthesizing findings from {len(papers)} research papers to answer this question: | |
| "{query}" | |
| # Paper Summaries | |
| {chr(10).join(paper_summaries)} | |
| # Task | |
| Analyze the papers above and provide: | |
| 1. **Consensus Points**: What do multiple papers agree on? | |
| - For each consensus point, list supporting papers (use arxiv_id) | |
| - Provide evidence from the papers | |
| 2. **Contradictions**: Where do papers disagree or present conflicting findings? | |
| - Describe the contradiction clearly | |
| - List papers on each side (papers_a, papers_b) | |
| 3. **Research Gaps**: What questions remain unanswered? What future directions are suggested? | |
| 4. **Summary**: A concise executive summary (2-3 paragraphs) answering the user's original question | |
| Return as JSON: | |
| {{ | |
| "consensus_points": [ | |
| {{ | |
| "point": "Description of consensus", | |
| "supporting_papers": ["arxiv_id_1", "arxiv_id_2"], | |
| "evidence": "Evidence from papers" | |
| }} | |
| ], | |
| "contradictions": [ | |
| {{ | |
| "description": "Description of contradiction", | |
| "papers_a": ["arxiv_id_1"], | |
| "papers_b": ["arxiv_id_2"], | |
| "context": "Additional context" | |
| }} | |
| ], | |
| "research_gaps": ["Gap 1", "Gap 2", ...], | |
| "summary": "Executive summary here...", | |
| "confidence_score": 0.85 | |
| }} | |
| """ | |
| return prompt | |
| ``` | |
| **Why This Pattern?** | |
| - **Structured Input**: LLM receives formatted summaries for all papers | |
| - **Explicit Citations**: Requires grounding claims in specific papers | |
| - **JSON Schema**: Forces structured output for Pydantic validation | |
| - **Comprehensive Analysis**: Covers consensus, contradictions, gaps, and summary | |
| **Key Design Pattern: Nested Data Normalization** | |
| ```python | |
| # agents/synthesis.py:135-196 | |
| def _normalize_synthesis_response(self, data: dict) -> dict: | |
| """ | |
| Normalize nested structures in synthesis response. | |
| Handles: | |
| - consensus_points[].supporting_papers (list) | |
| - consensus_points[].citations (list) | |
| - contradictions[].papers_a (list) | |
| - contradictions[].papers_b (list) | |
| - research_gaps (list) | |
| """ | |
| def ensure_list_of_strings(value): | |
| if value is None: | |
| return [] | |
| if isinstance(value, str): | |
| return [value] | |
| if isinstance(value, list): | |
| return [str(item) for item in value if item] | |
| return [str(value)] | |
| normalized = { | |
| "consensus_points": [], | |
| "contradictions": [], | |
| "research_gaps": ensure_list_of_strings(data.get("research_gaps", [])), | |
| "summary": str(data.get("summary", "")), | |
| "confidence_score": float(data.get("confidence_score", 0.0)) | |
| } | |
| # Normalize consensus points | |
| for cp in data.get("consensus_points", []): | |
| normalized["consensus_points"].append({ | |
| "point": str(cp.get("point", "")), | |
| "supporting_papers": ensure_list_of_strings(cp.get("supporting_papers", [])), | |
| "evidence": str(cp.get("evidence", "")), | |
| "citations": ensure_list_of_strings(cp.get("citations", [])) | |
| }) | |
| # Normalize contradictions | |
| for contr in data.get("contradictions", []): | |
| normalized["contradictions"].append({ | |
| "description": str(contr.get("description", "")), | |
| "papers_a": ensure_list_of_strings(contr.get("papers_a", [])), | |
| "papers_b": ensure_list_of_strings(contr.get("papers_b", [])), | |
| "context": str(contr.get("context", "")), | |
| "citations": ensure_list_of_strings(contr.get("citations", [])) | |
| }) | |
| return normalized | |
| ``` | |
| **Why This Pattern?** | |
| - **Nested Schema Complexity**: ConsensusPoint and Contradiction have nested lists | |
| - **LLM Inconsistency**: May return strings instead of lists for single items | |
| - **Defensive Parsing**: Ensures Pydantic validation succeeds | |
| **Error Handling Strategy:** | |
| ```python | |
| # agents/synthesis.py:242-310 | |
| @observe(name="synthesis_agent_run", as_type="generation") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| papers = state.get("papers", []) | |
| analyses = state.get("analyses", []) | |
| query = state.get("query", "") | |
| # Handle paper count mismatch (defensive) | |
| if len(papers) != len(analyses): | |
| logger.warning(f"Paper count mismatch: {len(papers)} papers, {len(analyses)} analyses") | |
| # Use minimum length to avoid index errors | |
| min_len = min(len(papers), len(analyses)) | |
| papers = papers[:min_len] | |
| analyses = analyses[:min_len] | |
| # Create synthesis prompt | |
| prompt = self._create_synthesis_prompt(query, papers, analyses) | |
| # Call LLM | |
| response = self.client.chat.completions.create( | |
| model=self.deployment_name, | |
| messages=[ | |
| {"role": "system", "content": "You are a research synthesis expert..."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.0, | |
| response_format={"type": "json_object"}, | |
| max_tokens=3000, | |
| timeout=self.timeout | |
| ) | |
| # Parse and normalize | |
| data = json.loads(response.choices[0].message.content) | |
| normalized = self._normalize_synthesis_response(data) | |
| # Add paper IDs | |
| normalized["papers_analyzed"] = [p.arxiv_id for p in papers] | |
| # Create Pydantic model | |
| synthesis = SynthesisResult(**normalized) | |
| # Update state | |
| state["synthesis"] = synthesis | |
| state["token_usage"]["input_tokens"] += response.usage.prompt_tokens | |
| state["token_usage"]["output_tokens"] += response.usage.completion_tokens | |
| return state | |
| except Exception as e: | |
| logger.error(f"Synthesis failed: {str(e)}", exc_info=True) | |
| # Return minimal synthesis with confidence=0.0 | |
| papers_analyzed = [p.arxiv_id for p in state.get("papers", [])] | |
| state["synthesis"] = SynthesisResult( | |
| consensus_points=[], | |
| contradictions=[], | |
| research_gaps=[], | |
| summary=f"Synthesis failed: {str(e)}", | |
| papers_analyzed=papers_analyzed, | |
| confidence_score=0.0 | |
| ) | |
| state["errors"].append(f"Synthesis failed: {str(e)}") | |
| return state | |
| ``` | |
| **Observability Integration:** | |
| ```python | |
| @observe(name="synthesis_agent_run", as_type="generation") | |
| ``` | |
| - **Type**: `"generation"` (single LLM call for cross-paper analysis) | |
| - **Trace Data**: Paper count, synthesis complexity, token usage | |
| - **LangFuse View**: Shows synthesis LLM call with full prompt/completion | |
| **Critical File Paths:** | |
| - `agents/synthesis.py:54-133` - Synthesis prompt creation | |
| - `agents/synthesis.py:135-196` - Nested data normalization | |
| - `agents/synthesis.py:242-310` - Main `run()` method with error handling | |
| --- | |
| ### CitationAgent | |
| **File**: `agents/citation.py` | |
| **Core Responsibilities:** | |
| 1. Generate APA-formatted citations for all papers | |
| 2. Validate synthesis claims against source papers | |
| 3. Calculate cost estimates using dynamic pricing configuration | |
| 4. Create final ValidatedOutput with all metadata | |
| **State Transformations:** | |
| ```python | |
| # Input Keys | |
| synthesis = state.get("synthesis") # SynthesisResult from SynthesisAgent | |
| papers = state.get("papers", []) # List[Paper] | |
| token_usage = state.get("token_usage", {}) | |
| model_desc = state.get("model_desc", {}) | |
| # Output Keys (added to state) | |
| state["validated_output"] = ValidatedOutput( | |
| synthesis=synthesis, | |
| citations=[Citation(...), ...], | |
| retrieved_chunks=[chunk_id_1, chunk_id_2, ...], | |
| token_usage=token_usage, | |
| cost_estimate=0.0234, # USD | |
| processing_time=12.5 # seconds | |
| ) | |
| ``` | |
| **Dependencies:** | |
| ```python | |
| def __init__( | |
| self, | |
| rag_retriever # RAGRetriever (injected but not actively used) | |
| ): | |
| ``` | |
| **Key Design Pattern: APA Citation Formatting** | |
| ```python | |
| # agents/citation.py:31-61 | |
| def _format_apa_citation(self, paper: Paper) -> str: | |
| """ | |
| Format paper in APA style. | |
| Format: Authors. (Year). Title. arXiv:ID. URL | |
| """ | |
| # Handle different author counts | |
| if len(paper.authors) == 1: | |
| author_str = paper.authors[0] | |
| elif len(paper.authors) == 2: | |
| author_str = f"{paper.authors[0]} & {paper.authors[1]}" | |
| else: | |
| # 3+ authors: List all with ampersand before last | |
| author_str = ", ".join(paper.authors[:-1]) + f", & {paper.authors[-1]}" | |
| # Extract year from published date (format: "2024-01-15T10:30:00Z") | |
| year = paper.published.split("-")[0] if paper.published else "n.d." | |
| # Format citation | |
| citation = ( | |
| f"{author_str}. ({year}). {paper.title}. " | |
| f"arXiv:{paper.arxiv_id}. {paper.arxiv_url}" | |
| ) | |
| return citation | |
| ``` | |
| **Why This Pattern?** | |
| - **Academic Standard**: APA is widely recognized format | |
| - **Consistent Formatting**: Handles 1, 2, or many authors uniformly | |
| - **Traceability**: Includes arXiv ID and URL for easy lookup | |
| **Key Design Pattern: Synthesis Validation** | |
| ```python | |
| # agents/citation.py:90-134 | |
| def validate_synthesis( | |
| self, | |
| synthesis: SynthesisResult, | |
| papers: List[Paper] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Validate synthesis claims against source papers. | |
| Returns: | |
| - total_consensus_points: int | |
| - total_contradictions: int | |
| - referenced_papers: List[str] (arxiv IDs mentioned) | |
| - chunk_ids: List[str] (chunks used for grounding) | |
| """ | |
| validation_data = { | |
| "total_consensus_points": len(synthesis.consensus_points), | |
| "total_contradictions": len(synthesis.contradictions), | |
| "referenced_papers": [], | |
| "chunk_ids": [] | |
| } | |
| # Collect all referenced paper IDs | |
| for cp in synthesis.consensus_points: | |
| validation_data["referenced_papers"].extend(cp.supporting_papers) | |
| validation_data["chunk_ids"].extend(cp.citations) | |
| for contr in synthesis.contradictions: | |
| validation_data["referenced_papers"].extend(contr.papers_a) | |
| validation_data["referenced_papers"].extend(contr.papers_b) | |
| validation_data["chunk_ids"].extend(contr.citations) | |
| # Deduplicate | |
| validation_data["referenced_papers"] = list(set(validation_data["referenced_papers"])) | |
| validation_data["chunk_ids"] = list(set(validation_data["chunk_ids"])) | |
| logger.info( | |
| f"Validation: {validation_data['total_consensus_points']} consensus points, " | |
| f"{validation_data['total_contradictions']} contradictions, " | |
| f"{len(validation_data['referenced_papers'])} papers referenced" | |
| ) | |
| return validation_data | |
| ``` | |
| **Why This Pattern?** | |
| - **Traceability**: Tracks which papers are actually cited | |
| - **Metadata Extraction**: Chunk IDs for provenance tracking | |
| - **Quality Signal**: High citation count indicates well-grounded synthesis | |
| **Key Design Pattern: Dynamic Cost Calculation** | |
| ```python | |
| # agents/citation.py:164-183 | |
| def calculate_cost( | |
| self, | |
| token_usage: Dict[str, int], | |
| model_desc: Dict[str, str] | |
| ) -> float: | |
| """ | |
| Calculate cost estimate using dynamic pricing from config. | |
| """ | |
| from utils.config import get_pricing_config | |
| pricing_config = get_pricing_config() | |
| # Get model-specific pricing | |
| llm_model = model_desc.get("llm_model", "gpt-4o-mini") | |
| embedding_model = model_desc.get("embedding_model", "text-embedding-3-small") | |
| llm_pricing = pricing_config.get_model_pricing(llm_model) | |
| embedding_pricing = pricing_config.get_embedding_pricing(embedding_model) | |
| # Calculate costs (pricing is per 1M tokens) | |
| input_cost = (token_usage.get("input_tokens", 0) / 1_000_000) * llm_pricing["input"] | |
| output_cost = (token_usage.get("output_tokens", 0) / 1_000_000) * llm_pricing["output"] | |
| embedding_cost = (token_usage.get("embedding_tokens", 0) / 1_000_000) * embedding_pricing | |
| total_cost = input_cost + output_cost + embedding_cost | |
| return round(total_cost, 4) | |
| ``` | |
| **Why This Pattern?** | |
| - **Centralized Pricing**: Single source of truth in `utils/config.py` | |
| - **Model Flexibility**: Supports any Azure OpenAI model (falls back to defaults) | |
| - **Transparency**: Breaks down cost by operation type | |
| **Error Handling Strategy:** | |
| ```python | |
| # agents/citation.py:200-254 | |
| @observe(name="citation_agent_run", as_type="span") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Note: Citation agent rarely fails (pure data transformation). | |
| No complex error handling needed. | |
| """ | |
| try: | |
| synthesis = state.get("synthesis") | |
| papers = state.get("papers", []) | |
| token_usage = state.get("token_usage", {}) | |
| model_desc = state.get("model_desc", {}) | |
| start_time = state.get("start_time", time.time()) | |
| # Generate citations | |
| citations = [] | |
| for paper in papers: | |
| citation_text = self._format_apa_citation(paper) | |
| citations.append(Citation( | |
| arxiv_id=paper.arxiv_id, | |
| citation_text=citation_text | |
| )) | |
| # Validate synthesis | |
| validation_data = self.validate_synthesis(synthesis, papers) | |
| # Calculate cost and timing | |
| cost_estimate = self.calculate_cost(token_usage, model_desc) | |
| processing_time = time.time() - start_time | |
| # Create final output | |
| validated_output = ValidatedOutput( | |
| synthesis=synthesis, | |
| citations=citations, | |
| retrieved_chunks=validation_data["chunk_ids"], | |
| token_usage=token_usage, | |
| cost_estimate=cost_estimate, | |
| processing_time=round(processing_time, 2) | |
| ) | |
| state["validated_output"] = validated_output | |
| logger.info( | |
| f"Citation agent completed. Cost: ${cost_estimate:.4f}, " | |
| f"Time: {processing_time:.2f}s" | |
| ) | |
| return state | |
| except Exception as e: | |
| logger.error(f"Citation agent failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Citation failed: {str(e)}") | |
| return state | |
| ``` | |
| **Observability Integration:** | |
| ```python | |
| @observe(name="citation_agent_run", as_type="span") | |
| ``` | |
| - **Type**: `"span"` (data processing only, no LLM calls) | |
| - **Trace Data**: Citation count, cost estimate, processing time | |
| - **LangFuse View**: Shows data transformation duration | |
| **Critical File Paths:** | |
| - `agents/citation.py:31-61` - APA citation formatting | |
| - `agents/citation.py:90-134` - Synthesis validation | |
| - `agents/citation.py:164-183` - Dynamic cost calculation | |
| - `agents/citation.py:200-254` - Main `run()` method | |
| --- | |
| ## 4. Cross-Cutting Patterns | |
| ### State Management | |
| #### AgentState TypedDict | |
| All workflow state is managed through a strongly-typed dictionary defined in `utils/langgraph_state.py`: | |
| ```python | |
| from typing import TypedDict, List, Dict, Optional, Any | |
| from utils.schemas import Paper, PaperChunk, Analysis, SynthesisResult, ValidatedOutput | |
| class AgentState(TypedDict, total=False): | |
| # Input fields (from user) | |
| query: str | |
| category: Optional[str] | |
| num_papers: int | |
| # Agent outputs | |
| papers: List[Paper] | |
| chunks: List[PaperChunk] | |
| analyses: List[Analysis] | |
| filtered_analyses: List[Analysis] # After filter node | |
| synthesis: SynthesisResult | |
| validated_output: ValidatedOutput | |
| # Metadata | |
| errors: List[str] | |
| token_usage: Dict[str, int] # {input_tokens, output_tokens, embedding_tokens} | |
| start_time: float | |
| processing_time: float | |
| model_desc: Dict[str, str] # {llm_model, embedding_model} | |
| # Tracing | |
| trace_id: Optional[str] | |
| session_id: Optional[str] | |
| user_id: Optional[str] | |
| ``` | |
| **Key Benefits:** | |
| - **Type Safety**: IDEs provide autocomplete and type checking | |
| - **Documentation**: State shape is self-documenting | |
| - **Validation**: LangGraph validates state structure at runtime | |
| #### Serialization Requirements (msgpack) | |
| **CRITICAL**: LangGraph uses msgpack for state checkpointing. Only these types are allowed in state: | |
| **β Allowed:** | |
| ```python | |
| # Primitives | |
| state["query"] = "transformer architectures" # str | |
| state["num_papers"] = 5 # int | |
| state["processing_time"] = 12.5 # float | |
| state["enabled"] = True # bool | |
| state["optional_field"] = None # None | |
| # Collections | |
| state["errors"] = ["Error 1", "Error 2"] # list | |
| state["token_usage"] = {"input": 1000} # dict | |
| # Pydantic models (via .model_dump()) | |
| state["papers"] = [paper.model_dump() for paper in papers] # WRONG | |
| state["papers"] = papers # CORRECT (LangGraph serializes automatically) | |
| ``` | |
| **β Prohibited:** | |
| ```python | |
| # Complex objects | |
| state["progress"] = gr.Progress() # β Gradio components | |
| state["file"] = open("data.txt") # β File handles | |
| state["thread"] = threading.Thread() # β Thread objects | |
| state["callback"] = lambda x: x # β Functions/callbacks | |
| ``` | |
| **Real Bug Example** (from `BUGFIX_MSGPACK_SERIALIZATION.md`): | |
| ```python | |
| # BEFORE (broken) | |
| def run_workflow(workflow_app, initial_state, config, progress): | |
| initial_state["progress"] = progress # β Non-serializable | |
| final_state = workflow_app.invoke(initial_state, config) | |
| # CRASH: TypeError: can't serialize gr.Progress | |
| # AFTER (fixed) | |
| def run_workflow(workflow_app, initial_state, config, progress): | |
| # Keep progress as local variable, NOT in state | |
| for event in workflow_app.stream(initial_state, config): | |
| # Update progress using local variable | |
| if progress: | |
| progress(0.5, desc="Processing...") | |
| return final_state | |
| ``` | |
| #### Token Usage Tracking Pattern | |
| All agents update the shared `token_usage` dictionary: | |
| ```python | |
| # Initialize in create_initial_state() (utils/langgraph_state.py:46-91) | |
| initial_state["token_usage"] = { | |
| "input_tokens": 0, | |
| "output_tokens": 0, | |
| "embedding_tokens": 0 | |
| } | |
| # RetrieverAgent updates embedding tokens | |
| state["token_usage"]["embedding_tokens"] = len(chunks) * 300 # Estimate | |
| # AnalyzerAgent updates LLM tokens (thread-safe) | |
| with self.token_lock: | |
| self.total_input_tokens += response.usage.prompt_tokens | |
| self.total_output_tokens += response.usage.completion_tokens | |
| # After all analyses | |
| state["token_usage"]["input_tokens"] = self.total_input_tokens | |
| state["token_usage"]["output_tokens"] = self.total_output_tokens | |
| # SynthesisAgent accumulates (+=, not =) | |
| state["token_usage"]["input_tokens"] += response.usage.prompt_tokens | |
| state["token_usage"]["output_tokens"] += response.usage.completion_tokens | |
| # CitationAgent reads final totals | |
| cost_estimate = self.calculate_cost(state["token_usage"], model_desc) | |
| ``` | |
| **Why This Pattern?** | |
| - **Centralized Tracking**: Single source of truth for token usage | |
| - **Cost Transparency**: Users see exact token consumption | |
| - **Performance Monitoring**: Track token usage trends over time | |
| --- | |
| ### Error Handling Philosophy | |
| #### The Golden Rule: Never Raise from run() | |
| **All agents follow this contract:** | |
| ```python | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| # Agent logic here | |
| return state | |
| except Exception as e: | |
| logger.error(f"Agent failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Agent failed: {str(e)}") | |
| return state # NEVER raise | |
| ``` | |
| **Why?** | |
| - **Workflow Resilience**: One agent's failure doesn't crash entire pipeline | |
| - **Partial Results**: Downstream agents can work with available data | |
| - **Debugging**: Errors collected in state for tracing | |
| #### Error Handling Strategies by Agent | |
| **RetrieverAgent**: Fallback + Partial Success | |
| ```python | |
| # Two-tier fallback for search | |
| papers = self._search_with_fallback(query, max_results, category) | |
| if not papers: | |
| state["errors"].append("No papers found") | |
| return state # Early return, not exception | |
| # Continue with partial results on download failures | |
| for paper in papers: | |
| try: | |
| pdf_path = self._download_with_fallback(paper) | |
| except Exception as e: | |
| logger.warning(f"Skipping {paper.arxiv_id}: {str(e)}") | |
| continue # Process other papers | |
| ``` | |
| **AnalyzerAgent**: Circuit Breaker + Minimal Analysis | |
| ```python | |
| # Circuit breaker: Stop after 2 consecutive failures | |
| if self.consecutive_failures >= self.max_consecutive_failures: | |
| logger.error("Circuit breaker triggered") | |
| break | |
| # On failure, return minimal analysis with confidence=0.0 | |
| except Exception as e: | |
| return Analysis( | |
| arxiv_id=paper.arxiv_id, | |
| title=paper.title, | |
| methodology=[], key_findings=[], conclusions=[], | |
| limitations=[], contributions=[], | |
| confidence_score=0.0 # Signal failure | |
| ) | |
| ``` | |
| **SynthesisAgent**: Paper Count Mismatch Handling | |
| ```python | |
| # Defensive: Handle mismatched paper/analysis counts | |
| if len(papers) != len(analyses): | |
| logger.warning(f"Count mismatch: {len(papers)} papers, {len(analyses)} analyses") | |
| min_len = min(len(papers), len(analyses)) | |
| papers = papers[:min_len] | |
| analyses = analyses[:min_len] | |
| # On failure, return empty synthesis with confidence=0.0 | |
| except Exception as e: | |
| state["synthesis"] = SynthesisResult( | |
| consensus_points=[], contradictions=[], research_gaps=[], | |
| summary=f"Synthesis failed: {str(e)}", | |
| papers_analyzed=[p.arxiv_id for p in papers], | |
| confidence_score=0.0 | |
| ) | |
| ``` | |
| **CitationAgent**: Rare Failures (Data Transformation Only) | |
| ```python | |
| # Simpler error handling (no LLM, no external APIs) | |
| try: | |
| # Pure data transformation | |
| citations = [self._format_apa_citation(p) for p in papers] | |
| cost = self.calculate_cost(token_usage, model_desc) | |
| return state | |
| except Exception as e: | |
| logger.error(f"Citation failed: {str(e)}") | |
| state["errors"].append(f"Citation failed: {str(e)}") | |
| return state | |
| ``` | |
| #### Confidence Score as Quality Signal | |
| All agents that can fail use `confidence_score` to indicate quality: | |
| ```python | |
| # High confidence: Successful analysis with good context | |
| Analysis(confidence_score=0.85, ...) | |
| # Low confidence: Successful but limited context | |
| Analysis(confidence_score=0.45, ...) | |
| # Zero confidence: Failure (filter node removes these) | |
| Analysis(confidence_score=0.0, ...) | |
| ``` | |
| **Filter Node** uses confidence scores to remove bad analyses: | |
| ```python | |
| # orchestration/nodes.py:74-107 | |
| @observe(name="filter_low_confidence", as_type="span") | |
| def filter_node(state: AgentState) -> AgentState: | |
| analyses = state.get("analyses", []) | |
| threshold = 0.7 # Configurable | |
| filtered = [a for a in analyses if a.confidence_score >= threshold] | |
| logger.info( | |
| f"Filtered {len(filtered)}/{len(analyses)} analyses " | |
| f"(threshold={threshold})" | |
| ) | |
| state["filtered_analyses"] = filtered | |
| return state | |
| ``` | |
| --- | |
| ### Observability Integration | |
| #### Three-Tier Tracing Architecture | |
| **Tier 1: Node-Level Tracing** (orchestration layer) | |
| ```python | |
| # orchestration/nodes.py | |
| @observe(name="analyzer_agent", as_type="span") | |
| def analyzer_node(state: AgentState, analyzer_agent) -> AgentState: | |
| logger.info("Starting analyzer agent...") | |
| updated_state = analyzer_agent.run(state) | |
| logger.info(f"Analyzer completed. Analyses: {len(updated_state.get('analyses', []))}") | |
| return updated_state | |
| ``` | |
| **What's Captured:** | |
| - Node execution duration | |
| - Input/output state snapshots | |
| - Errors caught by node wrapper | |
| **Tier 2: Agent-Level Tracing** (agent logic) | |
| ```python | |
| # agents/analyzer.py | |
| @observe(name="analyzer_agent_run", as_type="generation") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| # Agent logic... | |
| return state | |
| ``` | |
| **What's Captured:** | |
| - Agent execution duration | |
| - State transformations | |
| - Agent-specific metadata (paper count, analysis count) | |
| **Tier 3: LLM-Level Tracing** (automatic instrumentation) | |
| ```python | |
| # utils/langfuse_client.py:74-94 | |
| from langfuse.openai import openai | |
| def instrument_openai(): | |
| """ | |
| Instrument Azure OpenAI client for automatic tracing. | |
| All chat.completions.create() calls are automatically traced. | |
| """ | |
| langfuse_client = get_langfuse_client() | |
| if langfuse_client: | |
| openai.langfuse_auth(langfuse_client) | |
| ``` | |
| **What's Captured:** | |
| - Full prompt (system + user messages) | |
| - Full completion (response text) | |
| - Token usage (prompt_tokens, completion_tokens) | |
| - Model metadata (model, temperature, max_tokens) | |
| - Latency (time to first token, total time) | |
| - Cost (calculated from token usage) | |
| #### @observe Decorator Patterns | |
| **Generation Type** (for LLM-heavy agents): | |
| ```python | |
| @observe(name="analyzer_agent_run", as_type="generation") | |
| def run(self, state): | |
| # Marks this as an LLM generation task | |
| # LangFuse shows token usage, cost, latency | |
| pass | |
| ``` | |
| **Span Type** (for data processing): | |
| ```python | |
| @observe(name="filter_low_confidence", as_type="span") | |
| def filter_node(state): | |
| # Marks this as a processing step | |
| # LangFuse shows duration, input/output | |
| pass | |
| ``` | |
| **Nested Tracing** (automatic): | |
| ```python | |
| retriever_node() # Creates span "retriever_agent" | |
| ββ retriever_agent.run() # Creates generation "retriever_agent_run" | |
| ββ embedding_generator.generate_batch() # Creates generation "embeddings" | |
| ββ Azure OpenAI API call # Automatic instrumentation | |
| ``` | |
| #### Session and Trace ID Tracking | |
| ```python | |
| # app.py:421-434 | |
| import uuid | |
| # Generate unique session ID per workflow execution | |
| session_id = f"session-{uuid.uuid4().hex[:8]}" | |
| initial_state = create_initial_state( | |
| query=query, | |
| category=category, | |
| num_papers=num_papers, | |
| model_desc=model_desc, | |
| start_time=start_time, | |
| session_id=session_id, | |
| user_id=None # Optional: for multi-user tracking | |
| ) | |
| ``` | |
| **Use Cases:** | |
| - **Session Grouping**: Group all traces from single workflow execution | |
| - **User Tracking**: Analyze behavior across multiple sessions | |
| - **Debugging**: Find all traces for failed session | |
| #### Graceful Degradation When LangFuse Unavailable | |
| ```python | |
| # utils/langfuse_client.py:97-138 | |
| def observe(name: str = None, as_type: str = "span", **kwargs): | |
| """ | |
| Wrapper for @observe decorator with graceful degradation. | |
| If LangFuse not configured, returns identity decorator. | |
| """ | |
| langfuse_client = get_langfuse_client() | |
| if langfuse_client is None: | |
| # Return no-op decorator | |
| def identity_decorator(func): | |
| return func | |
| return identity_decorator | |
| # Return actual LangFuse decorator | |
| from langfuse.decorators import observe as langfuse_observe | |
| return langfuse_observe(name=name, as_type=as_type, **kwargs) | |
| ``` | |
| **Why This Pattern?** | |
| - **Optional Observability**: App works without LangFuse configured | |
| - **No Import Errors**: Doesn't fail if `langfuse` package missing | |
| - **Zero Code Changes**: Same decorator usage regardless of config | |
| --- | |
| ### Performance Optimizations | |
| #### Parallel Processing in AnalyzerAgent | |
| ```python | |
| # agents/analyzer.py:333-359 | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| papers = state.get("papers", []) | |
| analyses = [] | |
| # Parallel analysis with ThreadPoolExecutor | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # Submit all papers for analysis | |
| future_to_paper = { | |
| executor.submit(self.analyze_paper, paper): paper | |
| for paper in papers | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_paper): | |
| paper = future_to_paper[future] | |
| try: | |
| analysis = future.result() | |
| analyses.append(analysis) | |
| except Exception as e: | |
| logger.error(f"Failed to analyze {paper.arxiv_id}: {str(e)}") | |
| ``` | |
| **Performance Impact:** | |
| - **Serial**: 5 papers Γ 60s = 300s (5 minutes) | |
| - **Parallel (4 workers)**: ~75s (80% reduction) | |
| **Thread Safety:** | |
| ```python | |
| # agents/analyzer.py:48-51 | |
| import threading | |
| def __init__(self, ...): | |
| self.token_lock = threading.Lock() | |
| self.total_input_tokens = 0 | |
| self.total_output_tokens = 0 | |
| # In analyze_paper() method | |
| with self.token_lock: | |
| self.total_input_tokens += response.usage.prompt_tokens | |
| self.total_output_tokens += response.usage.completion_tokens | |
| ``` | |
| #### Circuit Breaker Pattern | |
| ```python | |
| # agents/analyzer.py:54-57 | |
| def __init__(self, ...): | |
| self.consecutive_failures = 0 | |
| self.max_consecutive_failures = 2 | |
| # In run() method | |
| for future in as_completed(future_to_paper): | |
| # Check circuit breaker BEFORE processing next result | |
| if self.consecutive_failures >= self.max_consecutive_failures: | |
| logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures") | |
| break | |
| # Process result | |
| if analysis.confidence_score > 0: | |
| self.consecutive_failures = 0 # Reset on success | |
| else: | |
| self.consecutive_failures += 1 | |
| ``` | |
| **Why Circuit Breaker?** | |
| - **Fail Fast**: Stops after 2 failures instead of wasting 3 more LLM calls | |
| - **Cost Savings**: Prevents runaway API usage on systemic failures | |
| - **User Experience**: Faster failure feedback | |
| #### Batch Operations | |
| **Embedding Generation** (RetrieverAgent): | |
| ```python | |
| # rag/embeddings.py | |
| def generate_batch(self, chunks: List[PaperChunk]) -> List[List[float]]: | |
| """ | |
| Generate embeddings for multiple chunks in a single API call. | |
| Azure OpenAI supports batch size up to 2048 inputs. | |
| """ | |
| texts = [chunk.content for chunk in chunks] | |
| # Single API call for all chunks | |
| response = self.client.embeddings.create( | |
| model=self.deployment_name, | |
| input=texts # List of strings | |
| ) | |
| return [item.embedding for item in response.data] | |
| ``` | |
| **Performance Impact:** | |
| - **Serial**: 100 chunks Γ 50ms = 5000ms | |
| - **Batch**: 1 call Γ 200ms = 200ms (96% reduction) | |
| #### Timeout Configuration | |
| Different agents have different timeout needs: | |
| ```python | |
| # AnalyzerAgent (60s) - moderate timeout | |
| self.client.chat.completions.create( | |
| timeout=60 # Analyzing single paper | |
| ) | |
| # SynthesisAgent (90s) - longer timeout | |
| self.client.chat.completions.create( | |
| timeout=90 # Cross-paper synthesis more complex | |
| ) | |
| ``` | |
| **Why Different Timeouts?** | |
| - **Synthesis is slower**: Processes all papers simultaneously, larger context | |
| - **Prevents premature failures**: Allows complex reasoning to complete | |
| - **Still bounded**: Avoids infinite hangs | |
| --- | |
| ## 5. Workflow Orchestration | |
| ### LangGraph Workflow Structure | |
| The workflow is defined in `orchestration/workflow_graph.py`: | |
| ```python | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from utils.langgraph_state import AgentState | |
| def create_workflow_graph( | |
| retriever_agent, | |
| analyzer_agent, | |
| synthesis_agent, | |
| citation_agent, | |
| use_checkpointing: bool = True | |
| ) -> Any: | |
| """ | |
| Create LangGraph workflow with all agents and conditional routing. | |
| """ | |
| # Create state graph | |
| workflow = StateGraph(AgentState) | |
| # Add nodes (lambda binds agent instances) | |
| workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent)) | |
| workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent)) | |
| workflow.add_node("filter", filter_node) | |
| workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent)) | |
| workflow.add_node("citation", lambda state: citation_node(state, citation_agent)) | |
| workflow.add_node("finalize", finalize_node) | |
| # Set entry point | |
| workflow.set_entry_point("retriever") | |
| # Add conditional edges | |
| workflow.add_conditional_edges( | |
| "retriever", | |
| should_continue_after_retriever, | |
| { | |
| "continue": "analyzer", | |
| "end": END | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "filter", | |
| should_continue_after_filter, | |
| { | |
| "continue": "synthesis", | |
| "end": END | |
| } | |
| ) | |
| # Add standard edges | |
| workflow.add_edge("analyzer", "filter") | |
| workflow.add_edge("synthesis", "citation") | |
| workflow.add_edge("citation", "finalize") | |
| workflow.add_edge("finalize", END) | |
| # Compile with checkpointing | |
| if use_checkpointing: | |
| checkpointer = MemorySaver() | |
| return workflow.compile(checkpointer=checkpointer) | |
| else: | |
| return workflow.compile() | |
| ``` | |
| **Complete Workflow Flow:** | |
| ``` | |
| START | |
| β | |
| retriever | |
| β | |
| [Check: papers found?] | |
| ββ No β END | |
| ββ Yes β analyzer | |
| β | |
| filter | |
| β | |
| [Check: valid analyses?] | |
| ββ No β END | |
| ββ Yes β synthesis | |
| β | |
| citation | |
| β | |
| finalize | |
| β | |
| END | |
| ``` | |
| ### Node Wrapper Pattern | |
| **Purpose of Node Wrappers:** | |
| 1. **Orchestration Concerns**: Tracing, logging, error handling | |
| 2. **Agent Logic Isolation**: Keeps agents pure and testable | |
| 3. **Consistent Interface**: All nodes follow same pattern | |
| **Standard Node Wrapper Template:** | |
| ```python | |
| from langfuse.decorators import observe | |
| from utils.langgraph_state import AgentState | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| @observe(name="<agent_name>", as_type="<span|generation>") | |
| def <agent>_node(state: AgentState, agent_instance) -> AgentState: | |
| """ | |
| Node wrapper for <AgentName>. | |
| Responsibilities: | |
| - LangFuse tracing (via @observe) | |
| - Structured logging | |
| - Error handling | |
| - State transformation delegation | |
| """ | |
| logger.info("Starting <agent_name> agent...") | |
| try: | |
| # Delegate to agent's run() method | |
| updated_state = agent_instance.run(state) | |
| # Log completion with metrics | |
| logger.info(f"<Agent> completed. <Metric>: {len(updated_state.get('<key>', []))}") | |
| return updated_state | |
| except Exception as e: | |
| # Catch-all error handling | |
| logger.error(f"<Agent> node failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"<Agent> failed: {str(e)}") | |
| return state # Return original state on failure | |
| ``` | |
| **Example: FilterNode** (standalone logic, no agent instance) | |
| ```python | |
| # orchestration/nodes.py:74-107 | |
| @observe(name="filter_low_confidence", as_type="span") | |
| def filter_node(state: AgentState) -> AgentState: | |
| """ | |
| Filter out low-confidence analyses. | |
| Note: This is NOT an agent wrapper - it's standalone logic. | |
| """ | |
| analyses = state.get("analyses", []) | |
| threshold = 0.7 | |
| # Filter logic | |
| filtered = [a for a in analyses if a.confidence_score >= threshold] | |
| logger.info( | |
| f"Filtered {len(filtered)}/{len(analyses)} analyses " | |
| f"(threshold={threshold})" | |
| ) | |
| state["filtered_analyses"] = filtered | |
| return state | |
| ``` | |
| ### Conditional Routing | |
| **Two Routing Decision Points:** | |
| **1. After Retriever: Check if Papers Found** | |
| ```python | |
| # orchestration/nodes.py:168-179 | |
| def should_continue_after_retriever(state: AgentState) -> str: | |
| """ | |
| Route based on paper retrieval success. | |
| Returns: | |
| "continue": Papers found, proceed to analyzer | |
| "end": No papers found, terminate workflow | |
| """ | |
| papers = state.get("papers", []) | |
| if len(papers) == 0: | |
| logger.warning("No papers retrieved. Ending workflow.") | |
| return "end" | |
| logger.info(f"Retrieved {len(papers)} papers. Continuing to analyzer.") | |
| return "continue" | |
| ``` | |
| **Why Early Termination?** | |
| - **Cost Savings**: No point running LLM analysis if no papers | |
| - **User Experience**: Immediate feedback that search failed | |
| - **Error Clarity**: Clear error message vs generic "no results" | |
| **2. After Filter: Check if Valid Analyses Remain** | |
| ```python | |
| # orchestration/nodes.py:182-193 | |
| def should_continue_after_filter(state: AgentState) -> str: | |
| """ | |
| Route based on filter results. | |
| Returns: | |
| "continue": Valid analyses exist, proceed to synthesis | |
| "end": All analyses filtered out, terminate workflow | |
| """ | |
| filtered = state.get("filtered_analyses", []) | |
| if len(filtered) == 0: | |
| logger.warning("No valid analyses after filtering. Ending workflow.") | |
| return "end" | |
| logger.info(f"{len(filtered)} valid analyses. Continuing to synthesis.") | |
| return "continue" | |
| ``` | |
| **Why Filter Check?** | |
| - **Quality Gate**: Prevents synthesis on all-failed analyses | |
| - **Confidence Threshold**: Only synthesizes high-quality analyses (>0.7) | |
| - **Cost Savings**: Avoids synthesis LLM call on garbage data | |
| ### Checkpointing and State Persistence | |
| **MemorySaver Checkpointer:** | |
| ```python | |
| # orchestration/workflow_graph.py:120-126 | |
| from langgraph.checkpoint.memory import MemorySaver | |
| if use_checkpointing: | |
| checkpointer = MemorySaver() | |
| return workflow.compile(checkpointer=checkpointer) | |
| ``` | |
| **What Gets Checkpointed:** | |
| - **State after each node**: Full AgentState dictionary | |
| - **Serialized to msgpack**: Efficient binary format | |
| - **Stored in memory**: Checkpointer holds state history | |
| **Use Cases:** | |
| **1. Workflow Resumption:** | |
| ```python | |
| # Get state at specific point | |
| thread_id = "thread-abc123" | |
| state = workflow.get_state(thread_id, checkpoint_id="checkpoint-5") | |
| # Resume from that state | |
| final_state = workflow.invoke(state, config={"thread_id": thread_id}) | |
| ``` | |
| **2. Debugging:** | |
| ```python | |
| # Inspect state after analyzer node | |
| state_after_analyzer = workflow.get_state(thread_id, checkpoint_id="after-analyzer") | |
| print(f"Analyses: {state_after_analyzer['analyses']}") | |
| ``` | |
| **3. Time Travel (Replay):** | |
| ```python | |
| # Re-run from specific checkpoint with different parameters | |
| state["num_papers"] = 10 # Change parameter | |
| workflow.invoke(state, config={"thread_id": thread_id}) | |
| ``` | |
| **Configuration:** | |
| ```python | |
| # app.py:464-470 | |
| config = { | |
| "configurable": { | |
| "thread_id": session_id # Unique per execution | |
| } | |
| } | |
| final_state = run_workflow(workflow_app, initial_state, config, progress) | |
| ``` | |
| **Why Checkpointing?** | |
| - **Resilience**: Can resume on crashes | |
| - **Debugging**: Inspect intermediate state | |
| - **Experimentation**: Replay from checkpoints with different configs | |
| --- | |
| ## 6. Building New Agents | |
| ### Step-by-Step Development Guide | |
| Follow this workflow to create a new agent that integrates seamlessly with the system: | |
| #### Step 1: Define Agent Responsibilities | |
| **Questions to Answer:** | |
| - What specific task does this agent perform? | |
| - What are its inputs (which state keys)? | |
| - What are its outputs (which state keys added/modified)? | |
| - Does it call external APIs or LLMs? | |
| - Can it fail? How should it degrade gracefully? | |
| **Example: SummarizerAgent** | |
| - **Task**: Generate concise summaries for each paper | |
| - **Inputs**: `papers`, `chunks` | |
| - **Outputs**: `summaries` (List[PaperSummary]) | |
| - **External Calls**: Azure OpenAI (LLM) | |
| - **Failure Mode**: Return empty summary with confidence=0.0 | |
| #### Step 2: Create Pydantic Schemas | |
| Add output schemas to `utils/schemas.py`: | |
| ```python | |
| # utils/schemas.py | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| class PaperSummary(BaseModel): | |
| """Summary of a single paper.""" | |
| arxiv_id: str = Field(..., description="arXiv ID of the paper") | |
| title: str = Field(..., description="Paper title") | |
| summary: str = Field(..., description="3-4 sentence summary") | |
| key_points: List[str] = Field(default_factory=list, description="Bullet points") | |
| confidence_score: float = Field(..., ge=0.0, le=1.0, description="Summary quality") | |
| ``` | |
| #### Step 3: Implement Agent Class | |
| Create `agents/summarizer.py`: | |
| ```python | |
| from typing import Dict, Any, List | |
| import logging | |
| import json | |
| from openai import AzureOpenAI | |
| from utils.schemas import Paper, PaperChunk, PaperSummary | |
| from langfuse.decorators import observe | |
| logger = logging.getLogger(__name__) | |
| class SummarizerAgent: | |
| """Generates concise summaries for each paper.""" | |
| def __init__( | |
| self, | |
| azure_openai_config: Dict[str, str], | |
| max_summary_tokens: int = 500, | |
| timeout: int = 30 | |
| ): | |
| """ | |
| Initialize SummarizerAgent. | |
| Args: | |
| azure_openai_config: Azure OpenAI credentials | |
| max_summary_tokens: Max tokens for summary generation | |
| timeout: LLM call timeout in seconds | |
| """ | |
| self.deployment_name = azure_openai_config["deployment_name"] | |
| self.max_summary_tokens = max_summary_tokens | |
| self.timeout = timeout | |
| # Initialize Azure OpenAI client | |
| self.client = AzureOpenAI( | |
| api_key=azure_openai_config["api_key"], | |
| api_version=azure_openai_config.get("api_version", "2024-02-01"), | |
| azure_endpoint=azure_openai_config["endpoint"] | |
| ) | |
| def _create_summary_prompt(self, paper: Paper, chunks: List[PaperChunk]) -> str: | |
| """Create prompt for summarization.""" | |
| # Get abstract and introduction chunks | |
| relevant_chunks = [ | |
| c for c in chunks | |
| if c.paper_id == paper.arxiv_id and c.section in ["abstract", "introduction"] | |
| ][:5] # First 5 chunks | |
| context = "\n\n".join([c.content for c in relevant_chunks]) | |
| prompt = f""" | |
| Summarize this research paper concisely. | |
| Title: {paper.title} | |
| Authors: {', '.join(paper.authors[:3])} | |
| Paper Content: | |
| {context} | |
| Provide: | |
| 1. A 3-4 sentence summary | |
| 2. 3-5 key points (bullet list) | |
| Return as JSON: | |
| {{ | |
| "summary": "3-4 sentence summary here...", | |
| "key_points": ["Point 1", "Point 2", ...], | |
| "confidence_score": 0.85 | |
| }} | |
| """ | |
| return prompt | |
| def _normalize_summary_response(self, data: dict, paper: Paper) -> dict: | |
| """Normalize LLM response to match Pydantic schema.""" | |
| def ensure_string(value): | |
| return str(value) if value else "" | |
| def ensure_list_of_strings(value): | |
| if isinstance(value, list): | |
| return [str(item) for item in value if item] | |
| return [str(value)] if value else [] | |
| return { | |
| "arxiv_id": paper.arxiv_id, | |
| "title": paper.title, | |
| "summary": ensure_string(data.get("summary", "")), | |
| "key_points": ensure_list_of_strings(data.get("key_points", [])), | |
| "confidence_score": float(data.get("confidence_score", 0.0)) | |
| } | |
| def summarize_paper(self, paper: Paper, chunks: List[PaperChunk]) -> PaperSummary: | |
| """ | |
| Summarize a single paper. | |
| Args: | |
| paper: Paper metadata | |
| chunks: All chunks (filtered to this paper in method) | |
| Returns: | |
| PaperSummary with summary, key points, confidence | |
| """ | |
| try: | |
| # Create prompt | |
| prompt = self._create_summary_prompt(paper, chunks) | |
| # Call LLM | |
| response = self.client.chat.completions.create( | |
| model=self.deployment_name, | |
| messages=[ | |
| {"role": "system", "content": "You are a research paper summarizer."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.0, # Deterministic | |
| response_format={"type": "json_object"}, | |
| max_tokens=self.max_summary_tokens, | |
| timeout=self.timeout | |
| ) | |
| # Parse and normalize | |
| data = json.loads(response.choices[0].message.content) | |
| normalized = self._normalize_summary_response(data, paper) | |
| # Create Pydantic model | |
| summary = PaperSummary(**normalized) | |
| logger.info(f"Summarized {paper.arxiv_id} (confidence={summary.confidence_score:.2f})") | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Failed to summarize {paper.arxiv_id}: {str(e)}", exc_info=True) | |
| # Return minimal summary with confidence=0.0 | |
| return PaperSummary( | |
| arxiv_id=paper.arxiv_id, | |
| title=paper.title, | |
| summary="", | |
| key_points=[], | |
| confidence_score=0.0 | |
| ) | |
| @observe(name="summarizer_agent_run", as_type="generation") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Run summarizer agent on all papers. | |
| Args: | |
| state: Workflow state (requires 'papers' and 'chunks' keys) | |
| Returns: | |
| Updated state with 'summaries' key added | |
| """ | |
| try: | |
| papers = state.get("papers", []) | |
| chunks = state.get("chunks", []) | |
| if not papers: | |
| logger.warning("No papers to summarize") | |
| state["summaries"] = [] | |
| return state | |
| # Summarize each paper | |
| summaries = [] | |
| for paper in papers: | |
| summary = self.summarize_paper(paper, chunks) | |
| summaries.append(summary) | |
| # Update state | |
| state["summaries"] = summaries | |
| logger.info(f"Summarized {len(summaries)} papers") | |
| return state | |
| except Exception as e: | |
| logger.error(f"Summarizer agent failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Summarizer failed: {str(e)}") | |
| state["summaries"] = [] | |
| return state # Never raise | |
| ``` | |
| #### Step 4: Add Observability Decorators | |
| Already added in Step 3: | |
| ```python | |
| @observe(name="summarizer_agent_run", as_type="generation") | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| # Agent logic... | |
| ``` | |
| **Decorator Type Selection:** | |
| - Use `as_type="generation"` if agent calls LLM | |
| - Use `as_type="span"` if agent only processes data | |
| - Decorator is automatically no-op if LangFuse not configured | |
| #### Step 5: Create Node Wrapper | |
| Add to `orchestration/nodes.py`: | |
| ```python | |
| # orchestration/nodes.py | |
| from langfuse.decorators import observe | |
| from utils.langgraph_state import AgentState | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| @observe(name="summarizer_agent", as_type="span") | |
| def summarizer_node(state: AgentState, summarizer_agent) -> AgentState: | |
| """ | |
| Node wrapper for SummarizerAgent. | |
| Responsibilities: | |
| - LangFuse tracing | |
| - Structured logging | |
| - Error handling | |
| """ | |
| logger.info("Starting summarizer agent...") | |
| try: | |
| updated_state = summarizer_agent.run(state) | |
| summaries = updated_state.get("summaries", []) | |
| logger.info(f"Summarizer completed. Summaries: {len(summaries)}") | |
| return updated_state | |
| except Exception as e: | |
| logger.error(f"Summarizer node failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Summarizer node failed: {str(e)}") | |
| return state | |
| ``` | |
| #### Step 6: Add to Workflow Graph | |
| Update `orchestration/workflow_graph.py`: | |
| ```python | |
| def create_workflow_graph( | |
| retriever_agent, | |
| analyzer_agent, | |
| summarizer_agent, # NEW: Add parameter | |
| synthesis_agent, | |
| citation_agent, | |
| use_checkpointing: bool = True | |
| ): | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent)) | |
| workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent)) | |
| workflow.add_node("summarizer", lambda state: summarizer_node(state, summarizer_agent)) # NEW | |
| workflow.add_node("filter", filter_node) | |
| workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent)) | |
| workflow.add_node("citation", lambda state: citation_node(state, citation_agent)) | |
| workflow.add_node("finalize", finalize_node) | |
| # Set entry point | |
| workflow.set_entry_point("retriever") | |
| # Add edges (NEW: Insert summarizer between retriever and analyzer) | |
| workflow.add_edge("retriever", "summarizer") # NEW | |
| workflow.add_edge("summarizer", "analyzer") # NEW | |
| # workflow.add_edge("retriever", "analyzer") # REMOVE: Old direct edge | |
| workflow.add_edge("analyzer", "filter") | |
| workflow.add_edge("filter", "synthesis") | |
| workflow.add_edge("synthesis", "citation") | |
| workflow.add_edge("citation", "finalize") | |
| workflow.add_edge("finalize", END) | |
| # Compile with checkpointing | |
| if use_checkpointing: | |
| checkpointer = MemorySaver() | |
| return workflow.compile(checkpointer=checkpointer) | |
| else: | |
| return workflow.compile() | |
| ``` | |
| #### Step 7: Update Conditional Routing (if needed) | |
| If your agent can fail and should terminate the workflow: | |
| ```python | |
| # orchestration/nodes.py | |
| def should_continue_after_summarizer(state: AgentState) -> str: | |
| """ | |
| Route based on summarizer success. | |
| Returns: | |
| "continue": Summaries generated, proceed | |
| "end": All summaries failed, terminate | |
| """ | |
| summaries = state.get("summaries", []) | |
| # Filter successful summaries (confidence > 0) | |
| valid_summaries = [s for s in summaries if s.confidence_score > 0] | |
| if len(valid_summaries) == 0: | |
| logger.warning("No valid summaries. Ending workflow.") | |
| return "end" | |
| logger.info(f"{len(valid_summaries)} valid summaries. Continuing.") | |
| return "continue" | |
| # In workflow graph | |
| workflow.add_conditional_edges( | |
| "summarizer", | |
| should_continue_after_summarizer, | |
| { | |
| "continue": "analyzer", | |
| "end": END | |
| } | |
| ) | |
| ``` | |
| #### Step 8: Initialize Agent in app.py | |
| ```python | |
| # app.py | |
| from agents.summarizer import SummarizerAgent | |
| class ResearchPaperAnalyzer: | |
| def __init__(self): | |
| # ... existing initialization ... | |
| # Initialize new agent | |
| self.summarizer_agent = SummarizerAgent( | |
| azure_openai_config=azure_config, | |
| max_summary_tokens=500, | |
| timeout=30 | |
| ) | |
| # Create workflow with new agent | |
| self.workflow_app = create_workflow_graph( | |
| retriever_agent=self.retriever_agent, | |
| analyzer_agent=self.analyzer_agent, | |
| summarizer_agent=self.summarizer_agent, # NEW | |
| synthesis_agent=self.synthesis_agent, | |
| citation_agent=self.citation_agent | |
| ) | |
| ``` | |
| #### Step 9: Update AgentState TypedDict | |
| Add new state keys to `utils/langgraph_state.py`: | |
| ```python | |
| # utils/langgraph_state.py | |
| from typing import TypedDict, List, Optional | |
| from utils.schemas import Paper, PaperChunk, Analysis, PaperSummary # NEW import | |
| class AgentState(TypedDict, total=False): | |
| # ... existing fields ... | |
| # Agent outputs | |
| papers: List[Paper] | |
| chunks: List[PaperChunk] | |
| summaries: List[PaperSummary] # NEW: Summaries from SummarizerAgent | |
| analyses: List[Analysis] | |
| filtered_analyses: List[Analysis] | |
| synthesis: SynthesisResult | |
| validated_output: ValidatedOutput | |
| # ... rest of fields ... | |
| ``` | |
| --- | |
| ### Minimal Agent Template | |
| Use this template as a starting point for new agents: | |
| ```python | |
| # agents/template_agent.py | |
| from typing import Dict, Any | |
| import logging | |
| from langfuse.decorators import observe | |
| logger = logging.getLogger(__name__) | |
| class TemplateAgent: | |
| """ | |
| [Description of what this agent does] | |
| """ | |
| def __init__(self, dependency1, dependency2, **kwargs): | |
| """ | |
| Initialize TemplateAgent. | |
| Args: | |
| dependency1: Description | |
| dependency2: Description | |
| **kwargs: Additional configuration | |
| """ | |
| self.dependency1 = dependency1 | |
| self.dependency2 = dependency2 | |
| # Initialize any other state | |
| def _helper_method(self, input_data): | |
| """Helper method for internal processing.""" | |
| # Implementation... | |
| pass | |
| @observe(name="template_agent_run", as_type="span") # or "generation" if LLM | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Transform workflow state. | |
| Args: | |
| state: Current workflow state | |
| Returns: | |
| Updated state with new keys added | |
| """ | |
| try: | |
| # 1. Read inputs from state | |
| input_data = state.get("input_key", []) | |
| if not input_data: | |
| logger.warning("No input data found") | |
| state["output_key"] = [] | |
| return state | |
| # 2. Process data | |
| results = self._helper_method(input_data) | |
| # 3. Update state with outputs | |
| state["output_key"] = results | |
| # 4. Log completion | |
| logger.info(f"TemplateAgent completed. Results: {len(results)}") | |
| return state | |
| except Exception as e: | |
| # 5. Error handling: NEVER raise, always return state | |
| logger.error(f"TemplateAgent failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"TemplateAgent failed: {str(e)}") | |
| state["output_key"] = [] # Provide default/empty output | |
| return state | |
| ``` | |
| **Node Wrapper Template:** | |
| ```python | |
| # orchestration/nodes.py | |
| @observe(name="template_agent", as_type="span") | |
| def template_node(state: AgentState, template_agent) -> AgentState: | |
| """Node wrapper for TemplateAgent.""" | |
| logger.info("Starting template agent...") | |
| try: | |
| updated_state = template_agent.run(state) | |
| logger.info(f"Template agent completed.") | |
| return updated_state | |
| except Exception as e: | |
| logger.error(f"Template node failed: {str(e)}", exc_info=True) | |
| state["errors"].append(f"Template node failed: {str(e)}") | |
| return state | |
| ``` | |
| --- | |
| ### Testing Patterns | |
| Create `tests/test_template_agent.py`: | |
| ```python | |
| import pytest | |
| from unittest.mock import Mock, patch | |
| from agents.template_agent import TemplateAgent | |
| class TestTemplateAgent: | |
| """Test suite for TemplateAgent.""" | |
| @pytest.fixture | |
| def mock_dependency(self): | |
| """Mock external dependencies.""" | |
| mock_dep = Mock() | |
| mock_dep.some_method.return_value = ["result1", "result2"] | |
| return mock_dep | |
| @pytest.fixture | |
| def agent(self, mock_dependency): | |
| """Create TemplateAgent instance with mocked dependencies.""" | |
| return TemplateAgent( | |
| dependency1=mock_dependency, | |
| dependency2=Mock() | |
| ) | |
| def test_run_success(self, agent): | |
| """Test successful agent execution.""" | |
| # Arrange | |
| state = { | |
| "input_key": ["data1", "data2"], | |
| "errors": [] | |
| } | |
| # Act | |
| result = agent.run(state) | |
| # Assert | |
| assert "output_key" in result | |
| assert len(result["output_key"]) > 0 | |
| assert len(result["errors"]) == 0 | |
| def test_run_empty_input(self, agent): | |
| """Test agent handles empty input gracefully.""" | |
| # Arrange | |
| state = { | |
| "input_key": [], | |
| "errors": [] | |
| } | |
| # Act | |
| result = agent.run(state) | |
| # Assert | |
| assert result["output_key"] == [] | |
| assert len(result["errors"]) == 0 | |
| def test_run_missing_input_key(self, agent): | |
| """Test agent handles missing state keys.""" | |
| # Arrange | |
| state = {"errors": []} | |
| # Act | |
| result = agent.run(state) | |
| # Assert | |
| assert result["output_key"] == [] | |
| assert len(result["errors"]) == 0 | |
| def test_run_dependency_failure(self, agent, mock_dependency): | |
| """Test agent handles dependency failures gracefully.""" | |
| # Arrange | |
| mock_dependency.some_method.side_effect = Exception("API error") | |
| state = { | |
| "input_key": ["data1"], | |
| "errors": [] | |
| } | |
| # Act | |
| result = agent.run(state) | |
| # Assert | |
| assert result["output_key"] == [] # Empty on failure | |
| assert len(result["errors"]) > 0 # Error logged | |
| assert "TemplateAgent failed" in result["errors"][0] | |
| def test_state_not_mutated(self, agent): | |
| """Test agent doesn't mutate input state.""" | |
| # Arrange | |
| original_state = { | |
| "input_key": ["data1"], | |
| "errors": [] | |
| } | |
| state_copy = original_state.copy() | |
| # Act | |
| result = agent.run(state_copy) | |
| # Assert | |
| assert "output_key" not in original_state # Original unchanged | |
| assert "output_key" in result # Result has new key | |
| ``` | |
| **Run Tests:** | |
| ```bash | |
| # Run all tests for this agent | |
| pytest tests/test_template_agent.py -v | |
| # Run with coverage | |
| pytest tests/test_template_agent.py --cov=agents.template_agent -v | |
| # Run single test | |
| pytest tests/test_template_agent.py::TestTemplateAgent::test_run_success -v | |
| ``` | |
| --- | |
| ### Best Practices Checklist | |
| Use this checklist when building or reviewing agent code: | |
| **Agent Design:** | |
| - [ ] Agent has single, clear responsibility | |
| - [ ] Agent implements `run(state) -> state` interface | |
| - [ ] Dependencies injected via constructor | |
| - [ ] No instance state between invocations (stateless) | |
| **State Management:** | |
| - [ ] Reads inputs using `state.get(key, default)` | |
| - [ ] Adds new keys to state (doesn't overwrite critical keys) | |
| - [ ] Returns modified state (doesn't mutate in-place) | |
| - [ ] All state values are msgpack-serializable (no Gradio components, file handles, etc.) | |
| **Error Handling:** | |
| - [ ] Never raises exceptions from `run()` | |
| - [ ] Catches all exceptions and logs with `exc_info=True` | |
| - [ ] Appends errors to `state["errors"]` | |
| - [ ] Returns partial/degraded results on failure | |
| - [ ] Uses confidence scores to signal quality | |
| **Pydantic Schemas:** | |
| - [ ] Output data modeled with Pydantic classes | |
| - [ ] Schema includes validation (Field with constraints) | |
| - [ ] Normalization method handles malformed LLM responses | |
| - [ ] Schema added to `utils/schemas.py` and imported in `AgentState` | |
| **LLM Configuration (if applicable):** | |
| - [ ] Uses `temperature=0.0` for deterministic outputs | |
| - [ ] Uses `response_format={"type": "json_object"}` for structured data | |
| - [ ] Sets appropriate `timeout` (60s for analysis, 90s for synthesis) | |
| - [ ] Sets appropriate `max_tokens` limit | |
| - [ ] Tracks token usage in `state["token_usage"]` | |
| **Observability:** | |
| - [ ] `run()` method decorated with `@observe` | |
| - [ ] Uses `as_type="generation"` for LLM calls, `as_type="span"` for data processing | |
| - [ ] Structured logging with INFO/WARNING/ERROR levels | |
| - [ ] Logs start/completion with metrics (count, duration, etc.) | |
| **Performance:** | |
| - [ ] Uses parallel processing if applicable (ThreadPoolExecutor) | |
| - [ ] Implements circuit breaker if making repeated external calls | |
| - [ ] Uses batch operations where possible (embeddings, database) | |
| - [ ] Appropriate timeout configuration | |
| **Testing:** | |
| - [ ] Test suite in `tests/test_<agent_name>.py` | |
| - [ ] Tests cover: success, empty input, missing keys, dependency failures | |
| - [ ] Uses mocks for external dependencies | |
| - [ ] Tests verify state transformations | |
| - [ ] Tests verify error handling (no exceptions raised) | |
| **Integration:** | |
| - [ ] Node wrapper created in `orchestration/nodes.py` | |
| - [ ] Agent added to workflow graph in `orchestration/workflow_graph.py` | |
| - [ ] Conditional routing added if needed | |
| - [ ] Agent initialized in `app.py` | |
| - [ ] AgentState TypedDict updated with new state keys | |
| **Documentation:** | |
| - [ ] Docstrings for class and all public methods | |
| - [ ] Type hints for all parameters and returns | |
| - [ ] Comments for complex logic | |
| - [ ] Example added to AGENTS.md (this document) | |
| --- | |
| ## 7. Agent Comparison Reference | |
| Quick reference table comparing all agents: | |
| | Aspect | RetrieverAgent | AnalyzerAgent | SynthesisAgent | CitationAgent | | |
| |--------|---------------|---------------|----------------|---------------| | |
| | **File** | `agents/retriever.py` | `agents/analyzer.py` | `agents/synthesis.py` | `agents/citation.py` | | |
| | **Primary Task** | Search arXiv, download PDFs, chunk, embed | Analyze individual papers with RAG | Cross-paper synthesis | Generate citations, validate, cost calculation | | |
| | **Input State Keys** | `query`, `category`, `num_papers` | `papers` | `papers`, `analyses`, `query` | `synthesis`, `papers`, `token_usage`, `model_desc` | | |
| | **Output State Keys** | `papers`, `chunks`, `token_usage[embedding_tokens]` | `analyses`, `token_usage[input/output_tokens]` | `synthesis`, `token_usage[input/output_tokens]` | `validated_output` | | |
| | **External APIs** | arXiv API (or MCP), Azure OpenAI (embeddings) | Azure OpenAI (LLM) | Azure OpenAI (LLM) | None | | |
| | **LLM Calls** | No (only embeddings) | Yes (one per paper) | Yes (one for all papers) | No | | |
| | **Model** | text-embedding-3-small | gpt-4o-mini (configurable) | gpt-4o-mini (configurable) | N/A | | |
| | **Temperature** | N/A | 0.0 | 0.0 | N/A | | |
| | **Timeout** | 30s (download), 60s (embedding) | 60s | 90s | N/A | | |
| | **Parallel Processing** | No | Yes (ThreadPoolExecutor, 4 workers) | No | No | | |
| | **Observability Type** | `generation` (includes embeddings) | `generation` (LLM-heavy) | `generation` (LLM) | `span` (data only) | | |
| | **Error Handling** | Two-tier fallback, partial success | Circuit breaker, minimal analysis (confidence=0.0) | Paper count mismatch, empty synthesis (confidence=0.0) | Rare failures (pure data transformation) | | |
| | **Confidence Scoring** | N/A | Based on RAG context quality | Based on synthesis completeness | N/A | | |
| | **Main Dependencies** | ArxivClient, PDFProcessor, EmbeddingGenerator, VectorStore | RAGRetriever, AzureOpenAI | RAGRetriever, AzureOpenAI | PricingConfig | | |
| | **Failure Mode** | Returns empty papers/chunks, appends errors | Returns confidence=0.0 analyses | Returns empty synthesis, confidence=0.0 | Returns original state, appends errors | | |
| | **Cost Impact** | Embedding tokens (~$0.01 per 100k tokens) | Input/output tokens (~$0.15-$0.60 per 1M tokens) | Input/output tokens (~$0.15-$0.60 per 1M tokens) | None (calculates cost, doesn't incur) | | |
| | **Typical Duration** | 5-15s (download + embed) | 30-60s (parallel, 4 papers) | 10-20s (single synthesis) | <1s | | |
| | **State Mutation** | Adds `papers`, `chunks` | Adds `analyses` | Adds `synthesis` | Adds `validated_output` | | |
| | **Thread Safety** | N/A | Yes (token_lock for shared counter) | N/A | N/A | | |
| | **Deterministic** | Yes (fixed search results, deterministic embeddings) | Yes (temperature=0) | Yes (temperature=0) | Yes | | |
| --- | |
| ## 8. Troubleshooting and Debugging | |
| ### Common Issues and Solutions | |
| #### Issue 1: msgpack Serialization Error | |
| **Symptom:** | |
| ``` | |
| TypeError: can't serialize <class 'gradio.Progress'> | |
| ``` | |
| **Cause:** Non-serializable object added to state (Gradio Progress, file handles, callbacks) | |
| **Solution:** | |
| 1. **Never** add complex objects to state | |
| 2. Keep them as local variables instead | |
| 3. See `BUGFIX_MSGPACK_SERIALIZATION.md` for detailed fix | |
| **Example Fix:** | |
| ```python | |
| # WRONG | |
| def run_workflow(workflow_app, initial_state, config, progress): | |
| initial_state["progress"] = progress # β | |
| return workflow_app.invoke(initial_state, config) | |
| # CORRECT | |
| def run_workflow(workflow_app, initial_state, config, progress): | |
| # Keep progress as local variable | |
| for event in workflow_app.stream(initial_state, config): | |
| if progress: | |
| progress(0.5, desc="Processing...") # β | |
| return final_state | |
| ``` | |
| --- | |
| #### Issue 2: All Analyses Filtered Out | |
| **Symptom:** | |
| ``` | |
| WARNING: No valid analyses after filtering. Ending workflow. | |
| ``` | |
| **Cause:** All analyses have confidence_score < 0.7 (filter threshold) | |
| **Root Causes:** | |
| - RAG retrieval failed (no chunks found) | |
| - LLM returned malformed JSON repeatedly | |
| - Circuit breaker triggered after 2 failures | |
| **Debugging Steps:** | |
| 1. **Check LangFuse traces:** See which papers failed | |
| ```python | |
| from observability import TraceReader | |
| reader = TraceReader() | |
| traces = reader.get_traces(session_id="session-abc123") | |
| analyzer_spans = reader.filter_by_agent(traces, "analyzer_agent") | |
| for span in analyzer_spans: | |
| print(f"Paper: {span.metadata.get('arxiv_id')}") | |
| print(f"Confidence: {span.metadata.get('confidence_score')}") | |
| ``` | |
| 2. **Check RAG retrieval:** Verify chunks were found | |
| ```python | |
| # In analyzer_agent.py, add logging | |
| logger.info(f"Retrieved {len(unique_chunks)} chunks for {paper.arxiv_id}") | |
| ``` | |
| 3. **Lower filter threshold temporarily:** | |
| ```python | |
| # orchestration/nodes.py:77 | |
| threshold = 0.5 # Lower from 0.7 to accept more analyses | |
| ``` | |
| 4. **Check circuit breaker:** | |
| ```python | |
| # agents/analyzer.py | |
| logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures") | |
| # If you see this, investigate first 2 failures | |
| ``` | |
| --- | |
| #### Issue 3: Retriever Returns No Papers | |
| **Symptom:** | |
| ``` | |
| WARNING: No papers retrieved. Ending workflow. | |
| ``` | |
| **Cause:** arXiv search returned no results (or primary/fallback clients both failed) | |
| **Debugging Steps:** | |
| 1. **Check query and category:** | |
| ```python | |
| logger.info(f"Searching arXiv: query='{query}', category='{category}'") | |
| # Verify query is reasonable and category is valid (e.g., 'cs.AI', not 'AI') | |
| ``` | |
| 2. **Test arXiv search manually:** | |
| ```bash | |
| # In terminal | |
| python -c "import arxiv; print(list(arxiv.Search('transformer').results())[:3])" | |
| ``` | |
| 3. **Check fallback client:** | |
| ```python | |
| # agents/retriever.py:69-97 | |
| logger.warning(f"Primary client failed: {str(e)}, trying fallback...") | |
| # If you see this, primary client (MCP) is failing | |
| ``` | |
| 4. **Disable MCP temporarily:** | |
| ```bash | |
| # .env | |
| USE_MCP_ARXIV=false # Force direct arXiv API | |
| ``` | |
| --- | |
| #### Issue 4: Synthesis Returns Empty Results | |
| **Symptom:** | |
| ```json | |
| { | |
| "consensus_points": [], | |
| "contradictions": [], | |
| "research_gaps": [], | |
| "summary": "" | |
| } | |
| ``` | |
| **Cause:** LLM returned empty synthesis (or normalization stripped all data) | |
| **Debugging Steps:** | |
| 1. **Check LangFuse trace for synthesis LLM call:** | |
| - View full prompt sent to LLM | |
| - View full completion received | |
| - Check if completion was actually empty or normalization failed | |
| 2. **Verify paper summaries in prompt:** | |
| ```python | |
| # agents/synthesis.py:54-133 | |
| logger.debug(f"Synthesis prompt:\n{prompt}") | |
| # Check if paper summaries are actually populated | |
| ``` | |
| 3. **Check normalization:** | |
| ```python | |
| # agents/synthesis.py:135-196 | |
| logger.debug(f"Raw LLM response: {data}") | |
| logger.debug(f"Normalized response: {normalized}") | |
| # Verify normalization isn't stripping valid data | |
| ``` | |
| 4. **Increase max_tokens:** | |
| ```python | |
| # agents/synthesis.py:280 | |
| max_tokens=3000 # Increase from default if synthesis is cut off | |
| ``` | |
| --- | |
| #### Issue 5: Cost Estimate is $0.00 | |
| **Symptom:** | |
| ``` | |
| Cost: $0.0000 | |
| ``` | |
| **Cause:** Token usage not tracked properly | |
| **Debugging Steps:** | |
| 1. **Check token_usage in state:** | |
| ```python | |
| logger.info(f"Token usage: {state['token_usage']}") | |
| # Should show non-zero input_tokens, output_tokens, embedding_tokens | |
| ``` | |
| 2. **Verify agents are updating token_usage:** | |
| ```python | |
| # AnalyzerAgent should do: | |
| state["token_usage"]["input_tokens"] = self.total_input_tokens | |
| # SynthesisAgent should do: | |
| state["token_usage"]["input_tokens"] += response.usage.prompt_tokens | |
| ``` | |
| 3. **Check pricing configuration:** | |
| ```python | |
| from utils.config import get_pricing_config | |
| pricing = get_pricing_config() | |
| print(pricing.get_model_pricing("gpt-4o-mini")) | |
| # Should return {"input": 0.15, "output": 0.60} per 1M tokens | |
| ``` | |
| --- | |
| ### Reading LangFuse Traces | |
| **Accessing LangFuse:** | |
| 1. **Web UI:** https://cloud.langfuse.com (or self-hosted URL) | |
| 2. **Python API:** | |
| ```python | |
| from observability import TraceReader | |
| reader = TraceReader() | |
| traces = reader.get_traces(limit=10) | |
| ``` | |
| **Trace Structure:** | |
| ``` | |
| Trace (session-abc123) | |
| β | |
| ββ Span: retriever_agent | |
| β ββ Generation: retriever_agent_run | |
| β ββ Generation: embeddings (Azure OpenAI) | |
| β | |
| ββ Span: analyzer_agent | |
| β ββ Generation: analyzer_agent_run | |
| β ββ Generation: LLM Call 1 (paper 1) | |
| β ββ Generation: LLM Call 2 (paper 2) | |
| β ββ Span: rag_retrieve | |
| β | |
| ββ Span: filter_low_confidence | |
| β | |
| ββ Span: synthesis_agent | |
| β ββ Generation: synthesis_agent_run | |
| β ββ Generation: LLM Call (synthesis) | |
| β | |
| ββ Span: citation_agent | |
| ββ Span: citation_agent_run | |
| ``` | |
| **What to Look For:** | |
| **1. Execution Duration:** | |
| - Span duration = total time including child spans | |
| - Generation duration = time for single LLM call | |
| - Look for slow spans (>60s) indicating bottlenecks | |
| **2. Token Usage:** | |
| - Generations show `usage.prompt_tokens` and `usage.completion_tokens` | |
| - High token usage = higher cost | |
| - Unusually low tokens may indicate truncation | |
| **3. Errors:** | |
| - Spans with `level: ERROR` indicate failures | |
| - Check `metadata.error` for exception details | |
| - Trace errors back to specific papers/operations | |
| **4. LLM Prompts/Completions:** | |
| - Click on Generation to see full prompt and completion | |
| - Verify prompt includes expected context | |
| - Check if completion is valid JSON | |
| **Example Query:** | |
| ```python | |
| from observability import TraceReader, AgentPerformanceAnalyzer | |
| reader = TraceReader() | |
| analyzer = AgentPerformanceAnalyzer() | |
| # Get failed traces | |
| traces = reader.get_traces(limit=100) | |
| failed_traces = [t for t in traces if t.status == "ERROR"] | |
| print(f"Failed traces: {len(failed_traces)}/{len(traces)}") | |
| # Analyze analyzer latency | |
| stats = analyzer.agent_latency_stats("analyzer_agent", days=7) | |
| print(f"Analyzer P95 latency: {stats.p95_latency_ms:.2f}ms") | |
| # Check error rates | |
| error_rates = analyzer.error_rates(days=7) | |
| for agent, rate in error_rates.items(): | |
| print(f"{agent}: {rate:.1%} error rate") | |
| ``` | |
| --- | |
| ### State Inspection Techniques | |
| **During Development (in agent code):** | |
| ```python | |
| # agents/analyzer.py | |
| def run(self, state: Dict[str, Any]) -> Dict[str, Any]: | |
| # Print state keys at entry | |
| logger.debug(f"State keys: {state.keys()}") | |
| # Print specific values | |
| papers = state.get("papers", []) | |
| logger.debug(f"Received {len(papers)} papers: {[p.arxiv_id for p in papers]}") | |
| # ... agent logic ... | |
| # Print state changes before return | |
| logger.debug(f"Returning {len(state.get('analyses', []))} analyses") | |
| return state | |
| ``` | |
| **In Gradio UI (during workflow execution):** | |
| ```python | |
| # app.py | |
| final_state = run_workflow(workflow_app, initial_state, config, progress) | |
| # Inspect final state | |
| print(f"Final state keys: {final_state.keys()}") | |
| print(f"Papers: {len(final_state.get('papers', []))}") | |
| print(f"Analyses: {len(final_state.get('analyses', []))}") | |
| print(f"Errors: {final_state.get('errors', [])}") | |
| print(f"Token usage: {final_state.get('token_usage', {})}") | |
| ``` | |
| **Using Checkpointer (post-execution):** | |
| ```python | |
| # Get state at specific checkpoint | |
| from orchestration.workflow_graph import get_workflow_state | |
| thread_id = "session-abc123" | |
| state_after_analyzer = get_workflow_state(workflow_app, thread_id, checkpoint_id="after-analyzer") | |
| print(f"Analyses after analyzer: {len(state_after_analyzer.get('analyses', []))}") | |
| # Compare with state after filter | |
| state_after_filter = get_workflow_state(workflow_app, thread_id, checkpoint_id="after-filter") | |
| print(f"Analyses after filter: {len(state_after_filter.get('filtered_analyses', []))}") | |
| print(f"Filtered out: {len(state_after_analyzer['analyses']) - len(state_after_filter['filtered_analyses'])}") | |
| ``` | |
| --- | |
| ### Log Analysis Patterns | |
| **Log Levels:** | |
| - **INFO**: Normal workflow progress (agent start/completion, counts) | |
| - **WARNING**: Recoverable issues (fallback triggered, empty results, low confidence) | |
| - **ERROR**: Failures (exceptions caught, agent failures, API errors) | |
| - **DEBUG**: Detailed debugging (state contents, intermediate values) | |
| **Useful Log Patterns:** | |
| **1. Track Workflow Progress:** | |
| ```bash | |
| # In terminal, tail logs and grep for agent completions | |
| tail -f app.log | grep "completed" | |
| # Output: | |
| # INFO: Retriever completed. Papers: 5, Chunks: 237 | |
| # INFO: Analyzer completed. Analyses: 5 | |
| # INFO: Filter completed. Valid: 4/5 | |
| # INFO: Synthesis completed. Consensus: 3, Contradictions: 1 | |
| # INFO: Citation completed. Cost: $0.0234 | |
| ``` | |
| **2. Identify Failures:** | |
| ```bash | |
| # Grep for ERROR logs | |
| grep "ERROR" app.log | tail -20 | |
| # Analyze common failure patterns | |
| grep "ERROR" app.log | cut -d':' -f4- | sort | uniq -c | sort -rn | |
| ``` | |
| **3. Track Fallback Usage:** | |
| ```bash | |
| # Check how often fallback client is used | |
| grep "trying fallback" app.log | wc -l | |
| grep "Searching with fallback client" app.log | wc -l | |
| ``` | |
| **4. Monitor Circuit Breaker:** | |
| ```bash | |
| # Check if circuit breaker is triggering | |
| grep "Circuit breaker triggered" app.log | |
| # If found, investigate what caused consecutive failures | |
| grep "consecutive_failures" app.log | |
| ``` | |
| **5. Analyze Token Usage:** | |
| ```bash | |
| # Extract token usage from logs | |
| grep "Token usage" app.log | tail -10 | |
| # Calculate total cost | |
| grep "Cost:" app.log | awk '{sum+=$NF} END {print "Total: $"sum}' | |
| ``` | |
| --- | |
| ## Appendix: File Reference | |
| **Agent Implementations:** | |
| - `agents/retriever.py` - RetrieverAgent with fallback mechanisms | |
| - `agents/analyzer.py` - AnalyzerAgent with parallel processing and circuit breaker | |
| - `agents/synthesis.py` - SynthesisAgent with cross-paper analysis | |
| - `agents/citation.py` - CitationAgent with APA formatting and cost calculation | |
| **Orchestration:** | |
| - `orchestration/__init__.py` - Module exports | |
| - `orchestration/nodes.py` - Node wrappers with tracing and error handling | |
| - `orchestration/workflow_graph.py` - LangGraph workflow builder and execution | |
| **State Management:** | |
| - `utils/langgraph_state.py` - AgentState TypedDict and initialization helpers | |
| - `utils/schemas.py` - Pydantic models for all data structures | |
| **Observability:** | |
| - `utils/langfuse_client.py` - LangFuse client initialization and @observe decorator | |
| - `observability/trace_reader.py` - Trace querying and export API | |
| - `observability/analytics.py` - Performance analytics and trajectory analysis | |
| **Configuration:** | |
| - `utils/config.py` - Pricing configuration and environment variables | |
| - `.env.example` - Environment variable template | |
| **Documentation:** | |
| - `CLAUDE.md` - Comprehensive system-wide developer guide | |
| - `AGENTS.md` - This document (agent architecture deep-dive) | |
| - `REFACTORING_SUMMARY.md` - LangGraph + LangFuse refactoring details | |
| - `BUGFIX_MSGPACK_SERIALIZATION.md` - msgpack serialization fix | |
| - `observability/README.md` - Observability documentation | |
| --- | |
| ## Document Maintenance | |
| **Last Updated:** 2025-12-20 | |
| **Version:** 1.0 | |
| **Authors:** Claude Sonnet 4.5 (auto-generated from codebase exploration) | |
| **Changelog:** | |
| - 2025-12-20: Initial creation with comprehensive agent documentation | |
| **Contributing:** | |
| - When adding new agents, update Section 3 (Individual Agent Deep Dives) | |
| - When adding new patterns, update Section 4 (Cross-Cutting Patterns) | |
| - When modifying workflow, update Section 5 (Workflow Orchestration) | |
| - Keep Agent Comparison Reference (Section 7) in sync with agent changes | |
| --- | |
| **End of AGENTS.md** | |