A newer version of the Gradio SDK is available:
6.6.0
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Core Architecture
This is a multi-agent RAG system for analyzing academic papers from arXiv. The system uses LangGraph for workflow orchestration and LangFuse for comprehensive observability.
Agent Pipeline Flow
User Query β Retriever β Analyzer β Filter β Synthesis β Citation β Output
β β β β β
[LangFuse Tracing for All Nodes]
Orchestration: The workflow is managed by LangGraph (orchestration/workflow_graph.py):
- Conditional routing (early termination if no papers found or all analyses fail)
- Automatic checkpointing with
MemorySaver - State management with type-safe
AgentStateTypedDict - Node wrappers in
orchestration/nodes.pywith automatic tracing
State Dictionary (utils/langgraph_state.py): All agents operate on a shared state dictionary that flows through the pipeline:
query: User's research questioncategory: Optional arXiv category filternum_papers: Number of papers to analyzepapers: List of Paper objects (populated by Retriever)chunks: List of PaperChunk objects (populated by Retriever)analyses: List of Analysis objects (populated by Analyzer)synthesis: SynthesisResult object (populated by Synthesis)validated_output: ValidatedOutput object (populated by Citation)errors: List of error messages accumulated across agentstoken_usage: Dict tracking input/output/embedding tokenstrace_id: LangFuse trace identifier (for observability)session_id: User session trackinguser_id: Optional user identifier
IMPORTANT: Only msgpack-serializable data should be stored in the state. Do NOT add complex objects like Gradio Progress, file handles, or callbacks to the state dictionary (see BUGFIX_MSGPACK_SERIALIZATION.md).
Agent Responsibilities
RetrieverAgent (
agents/retriever.py):- Decorated with
@observefor LangFuse tracing - Searches arXiv API using
ArxivClient,MCPArxivClient, orFastMCPArxivClient(configurable via env) - Downloads PDFs to
data/papers/(direct API) or MCP server storage (MCP mode) - Intelligent Fallback: Automatically falls back to direct API if primary MCP client fails
- Processes PDFs with
PDFProcessor(500-token chunks, 50-token overlap) - Generates embeddings via
EmbeddingGenerator(Azure OpenAI text-embedding-3-small, traced) - Stores chunks in ChromaDB via
VectorStore - FastMCP Support: Auto-start FastMCP server for standardized arXiv access
- Decorated with
AnalyzerAgent (
agents/analyzer.py):- Decorated with
@observe(as_type="generation")for LLM call tracing - Analyzes each paper individually using RAG
- Uses 4 broad queries per paper: methodology, results, conclusions, limitations
- Deduplicates chunks by chunk_id
- Calls Azure OpenAI with temperature=0 and JSON mode
- RAG retrieval automatically traced via
@observeonRAGRetriever.retrieve() - Returns structured
Analysisobjects with confidence scores
- Decorated with
SynthesisAgent (
agents/synthesis.py):- Decorated with
@observe(as_type="generation")for LLM call tracing - Compares findings across all papers
- Identifies consensus points, contradictions, research gaps
- Creates executive summary addressing user's query
- Uses temperature=0 for deterministic outputs
- Returns
SynthesisResultwith confidence scores
- Decorated with
CitationAgent (
agents/citation.py):- Decorated with
@observe(as_type="span")for data processing tracing - Generates APA-formatted citations for all papers
- Validates synthesis claims against source papers
- Calculates cost estimates (GPT-4o-mini pricing)
- Creates final
ValidatedOutputwith all metadata
- Decorated with
Critical Architecture Patterns
RAG Context Formatting: RAGRetriever.format_context() creates structured context with:
```
[Chunk N] Paper: {title}
Authors: {authors}
Section: {section}
Page: {page_number}
Source: {arxiv_url}
{content}
**Chunking Strategy**: PDFProcessor uses tiktoken encoding (cl100k_base) for precise token counting:
- Chunk size: 500 tokens
- Overlap: 50 tokens
- Page markers preserved: `[Page N]` tags in text
- Section detection via keyword matching (abstract, introduction, results, etc.)
**Vector Store Filtering**: ChromaDB searches support paper_id filtering:
- Single paper: `{"paper_id": "2401.00001"}`
- Multiple papers: `{"paper_id": {"$in": ["2401.00001", "2401.00002"]}}`
**Semantic Caching**: Cache hits when cosine similarity β₯ 0.95 between query embeddings. Cache key includes both query and category.
**Error Handling Philosophy**: Agents catch exceptions, log errors, append to `state["errors"]`, and return partial results rather than failing completely. For example, Analyzer returns confidence_score=0.0 on failure.
### LangGraph Orchestration (`orchestration/`)
**Workflow Graph** (`orchestration/workflow_graph.py`):
- `create_workflow_graph()`: Creates StateGraph with all nodes and conditional edges
- `run_workflow()`: Sync wrapper for Gradio compatibility (uses `nest-asyncio`)
- `run_workflow_async()`: Async streaming execution
- `get_workflow_state()`: Retrieve current state by thread ID
**Node Wrappers** (`orchestration/nodes.py`):
- `retriever_node()`: Executes RetrieverAgent with LangFuse tracing
- `analyzer_node()`: Executes AnalyzerAgent with LangFuse tracing
- `filter_node()`: Filters out low-confidence analyses (confidence_score < 0.7)
- `synthesis_node()`: Executes SynthesisAgent with LangFuse tracing
- `citation_node()`: Executes CitationAgent with LangFuse tracing
**Conditional Routing**:
- `should_continue_after_retriever()`: Returns "END" if no papers found, else "analyzer"
- `should_continue_after_filter()`: Returns "END" if all analyses filtered out, else "synthesis"
**Workflow Execution Flow**:
```python
# In app.py
workflow_app = create_workflow_graph(
retriever_agent=self.retriever_agent,
analyzer_agent=self.analyzer_agent,
synthesis_agent=self.synthesis_agent,
citation_agent=self.citation_agent
)
# Run workflow with checkpointing
config = {"configurable": {"thread_id": session_id}}
final_state = run_workflow(workflow_app, initial_state, config, progress)
State Serialization:
- LangGraph uses msgpack for state checkpointing
- CRITICAL: Only msgpack-serializable types allowed in state
- β Primitives: str, int, float, bool, None
- β Collections: list, dict
- β
Pydantic models (via
.model_dump()) - β Complex objects: Gradio Progress, file handles, callbacks
- See BUGFIX_MSGPACK_SERIALIZATION.md for detailed fix documentation
Development Commands
Running the Application
# Start Gradio interface (http://localhost:7860)
python app.py
Testing
# Run all tests with verbose output
pytest tests/ -v
# Run specific test file
pytest tests/test_analyzer.py -v
# Run single test
pytest tests/test_analyzer.py::TestAnalyzerAgent::test_analyze_paper_success -v
# Run with coverage
pytest tests/ --cov=agents --cov=rag --cov=utils -v
# Run tests matching pattern
pytest tests/ -k "analyzer" -v
Environment Setup
# Copy environment template
cp .env.example .env
# Required variables in .env:
# AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
# AZURE_OPENAI_API_KEY=your-key
# AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4o-mini
# AZURE_OPENAI_API_VERSION=2024-02-01 # optional
# Optional MCP (Model Context Protocol) variables:
# USE_MCP_ARXIV=false # Set to 'true' to use MCP (FastMCP by default)
# USE_LEGACY_MCP=false # Set to 'true' to use legacy MCP instead of FastMCP
# MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/ # MCP server storage path
# FASTMCP_SERVER_PORT=5555 # Port for FastMCP server (auto-started)
# Optional LangFuse observability variables:
# LANGFUSE_ENABLED=true # Enable LangFuse tracing
# LANGFUSE_PUBLIC_KEY=pk-lf-... # LangFuse public key
# LANGFUSE_SECRET_KEY=sk-lf-... # LangFuse secret key
# LANGFUSE_HOST=https://cloud.langfuse.com # LangFuse host (cloud or self-hosted)
# LANGFUSE_TRACE_ALL_LLM=true # Auto-trace all Azure OpenAI calls
# LANGFUSE_TRACE_RAG=true # Trace RAG operations
# LANGFUSE_FLUSH_AT=15 # Batch size for flushing traces
# LANGFUSE_FLUSH_INTERVAL=10 # Flush interval in seconds
Data Management
# Clear vector store (useful for testing)
rm -rf data/chroma_db/
# Clear cached papers
rm -rf data/papers/
# Clear semantic cache
rm -rf data/cache/
Key Implementation Details
Azure OpenAI Integration
All agents use temperature=0 and response_format={"type": "json_object"} for deterministic, structured outputs. Initialize clients like:
from openai import AzureOpenAI
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
Pydantic Schemas (utils/schemas.py and utils/langgraph_state.py)
All data structures use Pydantic for validation:
Paper: arXiv paper metadataPaperChunk: Text chunk with metadataAnalysis: Individual paper analysis resultsSynthesisResult: Cross-paper synthesis with ConsensusPoint and ContradictionValidatedOutput: Final output with citations and cost trackingAgentState: TypedDict for LangGraph state management (used in workflow orchestration)
Observability Models (observability/trace_reader.py):
TraceInfo: Trace metadata and performance metricsSpanInfo: Agent execution data with timingsGenerationInfo: LLM call details (prompt, completion, tokens, cost)
Analytics Models (observability/analytics.py):
AgentStats: Per-agent performance statistics (latency, tokens, cost, errors)WorkflowStats: Workflow-level aggregated metricsAgentTrajectory: Complete execution path with timings
Retry Logic
ArxivClient uses tenacity for resilient API calls:
- 3 retry attempts
- Exponential backoff (4s min, 10s max)
- Applied to search_papers() and download_paper()
MCP (Model Context Protocol) Integration
The system supports optional integration with arXiv MCP servers as an alternative to direct arXiv API access. FastMCP is now the default MCP implementation when USE_MCP_ARXIV=true.
Architecture Overview:
- Three client options: Direct ArxivClient, Legacy MCPArxivClient, FastMCPArxivClient
- All clients implement the same interface for drop-in compatibility
- RetrieverAgent includes intelligent fallback from MCP to direct API
- App selects client based on environment variables with cascading fallback
Client Selection Logic (app.py lines 75-135):
USE_MCP_ARXIV=falseβ Direct ArxivClient (default)USE_MCP_ARXIV=true+USE_LEGACY_MCP=trueβ Legacy MCPArxivClientUSE_MCP_ARXIV=true(default) β FastMCPArxivClient with auto-start server- Fallback cascade: FastMCP β Legacy MCP β Direct API
FastMCP Implementation (Recommended):
Server (utils/fastmcp_arxiv_server.py):
- Auto-start FastMCP server in background thread
- Implements tools:
search_papers,download_paper,list_papers - Uses standard
arxivlibrary for arXiv API access - Configurable port (default: 5555) via
FASTMCP_SERVER_PORT - Singleton pattern for application-wide server instance
- Graceful shutdown on app exit
- Compatible with local and HuggingFace Spaces deployment
Client (utils/fastmcp_arxiv_client.py):
- Async-first design with sync wrappers for Gradio compatibility
- Connects to FastMCP server via HTTP
- Lazy client initialization on first use
- Reuses legacy MCP's robust
_parse_mcp_paper()logic - Built-in fallback: Direct arXiv download if MCP fails
- Same retry logic (3 attempts, exponential backoff)
- Uses
nest-asynciofor event loop compatibility
Retriever Fallback Logic (agents/retriever.py lines 68-156):
- Two-tier fallback: Primary client β Fallback client
_search_with_fallback(): Try primary MCP, then fallback to direct API_download_with_fallback(): Try primary MCP, then fallback to direct API- Ensures paper retrieval never fails due to MCP issues
- Detailed logging of fallback events
Legacy MCP Client (utils/mcp_arxiv_client.py):
- In-process handler calls (imports MCP server functions directly)
- Stdio protocol for external MCP servers
- Maintained for backward compatibility
- Enable via
USE_LEGACY_MCP=truewhenUSE_MCP_ARXIV=true - All features from legacy implementation preserved
Key Features Across All MCP Clients:
- Async-first design with sync wrappers
- MCP tools:
search_papers,download_paper,list_papers - Transforms MCP responses to
PaperPydantic objects - Same retry logic and caching behavior as ArxivClient
- Automatic direct download fallback if MCP storage inaccessible
Zero Breaking Changes:
- Downstream agents (Analyzer, Synthesis, Citation) unaffected
- Same state dictionary structure maintained
- PDF processing, chunking, and RAG unchanged
- Toggle via environment variables without code changes
- Legacy MCP remains available for compatibility
Configuration (.env.example):
# Enable MCP (FastMCP by default)
USE_MCP_ARXIV=true
# Force legacy MCP instead of FastMCP (optional)
USE_LEGACY_MCP=false
# Storage path for papers (used by all MCP clients)
MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/
# FastMCP server port
FASTMCP_SERVER_PORT=5555
Testing:
- FastMCP:
pytest tests/test_fastmcp_arxiv.py -v(38 tests) - Legacy MCP:
pytest tests/test_mcp_arxiv_client.py -v(21 tests) - Both test suites cover: search, download, caching, error handling, fallback logic
PDF Processing Edge Cases
- Some PDFs may be scanned images (extraction fails gracefully)
- Page markers
[Page N]extracted during text extraction for chunk attribution - Section detection is heuristic-based (checks first 5 lines of chunk)
- Empty pages or extraction failures logged as warnings, not errors
Gradio UI Structure (app.py)
ResearchPaperAnalyzer class orchestrates the workflow:
- Initialize LangFuse client and instrument Azure OpenAI (if enabled)
- Create LangGraph workflow with all agents
- Check semantic cache first
- Initialize state dictionary with
create_initial_state() - Generate unique
session_idfor trace tracking - Run LangGraph workflow via
run_workflow()from orchestration module - Flush LangFuse traces to ensure upload
- Cache results on success
- Format output for 5 tabs: Papers, Analysis, Synthesis, Citations, Stats
LangGraph Workflow Execution:
- Nodes execute in order: retriever β analyzer β filter β synthesis β citation
- Conditional edges for early termination (no papers found, all analyses failed)
- Checkpointing enabled via
MemorySaverfor workflow state persistence - Progress updates still work via local variable (NOT in state to avoid msgpack serialization issues)
Testing Patterns
Tests use mocks to avoid external dependencies:
# Mock RAG retriever
mock_retriever = Mock(spec=RAGRetriever)
mock_retriever.retrieve.return_value = {"chunks": [...], "chunk_ids": [...]}
# Mock Azure OpenAI
with patch('agents.analyzer.AzureOpenAI', return_value=mock_client):
agent = AnalyzerAgent(rag_retriever=mock_retriever)
Current test coverage:
- AnalyzerAgent (18 tests): Core analysis workflow and error handling
- MCPArxivClient (21 tests): Legacy MCP tool integration, async/sync wrappers, response parsing
- FastMCPArxiv (38 tests): FastMCP server, client, integration, error handling, fallback logic
When adding tests for other agents, follow the same pattern:
- Fixtures for mock dependencies
- Test both success and error paths
- Verify state transformations
- Test edge cases (empty inputs, API failures)
- For async code, use
pytest-asynciowith@pytest.mark.asyncio
Observability and Analytics
LangFuse Integration
The system automatically traces all agent executions and LLM calls when LangFuse is enabled:
Configuration (utils/langfuse_client.py):
initialize_langfuse(): Initialize global LangFuse client at startupinstrument_openai(): Auto-trace all Azure OpenAI API calls@observedecorator: Trace custom functions/spansflush_langfuse(): Ensure all traces uploaded before shutdown
Automatic Tracing:
- All agent
run()methods decorated with@observe - LLM calls automatically captured (prompt, completion, tokens, cost)
- RAG operations traced (embeddings, vector search)
- Workflow state transitions logged
Trace Querying (observability/trace_reader.py)
from observability import TraceReader
reader = TraceReader()
# Get recent traces
traces = reader.get_traces(limit=10)
# Filter by user/session
traces = reader.get_traces(user_id="user-123", session_id="session-abc")
# Filter by date range
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=7)
traces = reader.filter_by_date_range(traces, start_date=start)
# Get specific agent executions
analyzer_spans = reader.filter_by_agent(traces, agent_name="analyzer_agent")
# Export traces
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")
Performance Analytics (observability/analytics.py)
from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer
# Performance metrics
perf_analyzer = AgentPerformanceAnalyzer()
# Get agent latency statistics
stats = perf_analyzer.agent_latency_stats("analyzer_agent", days=7)
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")
# Token usage breakdown
token_usage = perf_analyzer.token_usage_breakdown(days=7)
print(f"Total tokens: {sum(token_usage.values())}")
# Cost per agent
costs = perf_analyzer.cost_per_agent(days=7)
print(f"Total cost: ${sum(costs.values()):.4f}")
# Error rates
error_rates = perf_analyzer.error_rates(days=7)
# Workflow summary
summary = perf_analyzer.workflow_performance_summary(days=7)
print(f"Success rate: {summary.success_rate:.1f}%")
print(f"Avg duration: {summary.avg_duration_ms/1000:.2f}s")
# Trajectory analysis
traj_analyzer = AgentTrajectoryAnalyzer()
analysis = traj_analyzer.analyze_execution_paths(days=7)
print(f"Most common path: {analysis['most_common_path']}")
See observability/README.md for comprehensive documentation.
Common Modification Points
Adding a new agent:
- Create agent class with
run(state) -> statemethod - Decorate
run()with@observefor tracing - Add node wrapper in
orchestration/nodes.py - Add node to workflow graph in
orchestration/workflow_graph.py - Update conditional routing if needed
Modifying chunking:
- Adjust
chunk_sizeandchunk_overlapin PDFProcessor initialization - Affects retrieval quality vs. context size tradeoff
- Default 500/50 balances precision and coverage
Changing LLM model:
- Update
AZURE_OPENAI_DEPLOYMENT_NAMEin .env - Cost estimates in CitationAgent may need adjustment
- Temperature must stay 0 for deterministic outputs
Adding arXiv categories:
- Extend
ARXIV_CATEGORIESlist inapp.py - Format:
"code - Description"(e.g.,"cs.AI - Artificial Intelligence")
Switching between arXiv clients:
- Set
USE_MCP_ARXIV=false(default) β Direct ArxivClient - Set
USE_MCP_ARXIV=trueβ FastMCPArxivClient (default MCP) - Set
USE_MCP_ARXIV=true+USE_LEGACY_MCP=trueβ Legacy MCPArxivClient - Configure
MCP_ARXIV_STORAGE_PATHfor MCP server's storage location - Configure
FASTMCP_SERVER_PORTfor FastMCP server port (default: 5555) - No code changes required - client selected automatically in
app.py - All clients implement identical interface for seamless switching
- FastMCP server auto-starts when FastMCP client is selected
Cost and Performance Considerations
- Target: <$0.50 per 5-paper analysis
- Semantic cache reduces repeated query costs
- ChromaDB persistence prevents re-embedding same papers
- Batch embedding generation in PDFProcessor for efficiency
- Token usage tracked per request for monitoring
- LangFuse observability enables cost optimization insights
- LangGraph overhead: <1% for state management
- Trace upload overhead: ~5-10ms per trace (async, negligible impact)
Key Files and Modules
Core Application
app.py: Gradio UI and workflow orchestration entry pointutils/config.py: Configuration management (Azure OpenAI, LangFuse, MCP)utils/schemas.py: Pydantic data models for validationutils/langgraph_state.py: LangGraph state TypedDict and helpers
Agents
agents/retriever.py: Paper retrieval, PDF processing, embeddingsagents/analyzer.py: Individual paper analysis with RAGagents/synthesis.py: Cross-paper synthesis and insightsagents/citation.py: Citation generation and validation
RAG Components
rag/pdf_processor.py: PDF text extraction and chunkingrag/embeddings.py: Batch embedding generation (Azure OpenAI)rag/vector_store.py: ChromaDB vector store managementrag/retrieval.py: RAG retrieval with formatted context
Orchestration (LangGraph)
orchestration/__init__.py: Module exportsorchestration/nodes.py: Node wrappers with tracingorchestration/workflow_graph.py: LangGraph workflow builder
Observability (LangFuse)
observability/__init__.py: Module exportsobservability/trace_reader.py: Trace querying and export APIobservability/analytics.py: Performance analytics and trajectory analysisobservability/README.md: Comprehensive observability documentationutils/langfuse_client.py: LangFuse client initialization and helpers
Utilities
utils/arxiv_client.py: Direct arXiv API client with retry logicutils/mcp_arxiv_client.py: Legacy MCP client implementationutils/fastmcp_arxiv_client.py: FastMCP client (recommended)utils/fastmcp_arxiv_server.py: FastMCP server with auto-startutils/semantic_cache.py: Query caching with embeddings
Documentation
CLAUDE.md: This file - comprehensive developer guideREADME.md: User-facing project documentationREFACTORING_SUMMARY.md: LangGraph + LangFuse refactoring detailsBUGFIX_MSGPACK_SERIALIZATION.md: msgpack serialization fix documentation.env.example: Environment variable template with all options
Version History and Recent Changes
Version 2.6: LangGraph Orchestration + LangFuse Observability
Added:
- LangGraph workflow orchestration with conditional routing
- LangFuse automatic tracing for all agents and LLM calls
- Observability Python API for trace querying and analytics
- Performance analytics (latency, tokens, cost, error rates)
- Agent trajectory analysis
- Checkpointing with
MemorySaver
Fixed:
- msgpack serialization error (removed Gradio Progress from state)
Dependencies Added:
langgraph>=0.2.0langfuse>=2.0.0langfuse-openai>=1.0.0
Breaking Changes:
- None! Fully backward compatible
Documentation:
- Created
observability/README.md - Created
REFACTORING_SUMMARY.md - Created
BUGFIX_MSGPACK_SERIALIZATION.md - Updated
CLAUDE.md(this file) - Updated
.env.example
See REFACTORING_SUMMARY.md for detailed migration guide and architecture changes.