GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Core Architecture
This is a **multi-agent RAG system** for analyzing academic papers from arXiv. The system uses **LangGraph** for workflow orchestration and **LangFuse** for comprehensive observability.
### Agent Pipeline Flow
```
User Query β†’ Retriever β†’ Analyzer β†’ Filter β†’ Synthesis β†’ Citation β†’ Output
↓ ↓ ↓ ↓ ↓
[LangFuse Tracing for All Nodes]
```
**Orchestration**: The workflow is managed by LangGraph (`orchestration/workflow_graph.py`):
- Conditional routing (early termination if no papers found or all analyses fail)
- Automatic checkpointing with `MemorySaver`
- State management with type-safe `AgentState` TypedDict
- Node wrappers in `orchestration/nodes.py` with automatic tracing
**State Dictionary** (`utils/langgraph_state.py`): All agents operate on a shared state dictionary that flows through the pipeline:
- `query`: User's research question
- `category`: Optional arXiv category filter
- `num_papers`: Number of papers to analyze
- `papers`: List of Paper objects (populated by Retriever)
- `chunks`: List of PaperChunk objects (populated by Retriever)
- `analyses`: List of Analysis objects (populated by Analyzer)
- `synthesis`: SynthesisResult object (populated by Synthesis)
- `validated_output`: ValidatedOutput object (populated by Citation)
- `errors`: List of error messages accumulated across agents
- `token_usage`: Dict tracking input/output/embedding tokens
- `trace_id`: LangFuse trace identifier (for observability)
- `session_id`: User session tracking
- `user_id`: Optional user identifier
**IMPORTANT**: Only msgpack-serializable data should be stored in the state. Do NOT add complex objects like Gradio Progress, file handles, or callbacks to the state dictionary (see BUGFIX_MSGPACK_SERIALIZATION.md).
### Agent Responsibilities
1. **RetrieverAgent** (`agents/retriever.py`):
- Decorated with `@observe` for LangFuse tracing
- Searches arXiv API using `ArxivClient`, `MCPArxivClient`, or `FastMCPArxivClient` (configurable via env)
- Downloads PDFs to `data/papers/` (direct API) or MCP server storage (MCP mode)
- **Intelligent Fallback**: Automatically falls back to direct API if primary MCP client fails
- Processes PDFs with `PDFProcessor` (500-token chunks, 50-token overlap)
- Generates embeddings via `EmbeddingGenerator` (Azure OpenAI text-embedding-3-small, traced)
- Stores chunks in ChromaDB via `VectorStore`
- **FastMCP Support**: Auto-start FastMCP server for standardized arXiv access
2. **AnalyzerAgent** (`agents/analyzer.py`):
- Decorated with `@observe(as_type="generation")` for LLM call tracing
- Analyzes each paper individually using RAG
- Uses 4 broad queries per paper: methodology, results, conclusions, limitations
- Deduplicates chunks by chunk_id
- Calls Azure OpenAI with **temperature=0** and JSON mode
- RAG retrieval automatically traced via `@observe` on `RAGRetriever.retrieve()`
- Returns structured `Analysis` objects with confidence scores
3. **SynthesisAgent** (`agents/synthesis.py`):
- Decorated with `@observe(as_type="generation")` for LLM call tracing
- Compares findings across all papers
- Identifies consensus points, contradictions, research gaps
- Creates executive summary addressing user's query
- Uses **temperature=0** for deterministic outputs
- Returns `SynthesisResult` with confidence scores
4. **CitationAgent** (`agents/citation.py`):
- Decorated with `@observe(as_type="span")` for data processing tracing
- Generates APA-formatted citations for all papers
- Validates synthesis claims against source papers
- Calculates cost estimates (GPT-4o-mini pricing)
- Creates final `ValidatedOutput` with all metadata
### Critical Architecture Patterns
**RAG Context Formatting**: `RAGRetriever.format_context()` creates structured context with:
```
[Chunk N] Paper: {title}
Authors: {authors}
Section: {section}
Page: {page_number}
Source: {arxiv_url}
--------------------------------------------------------------------------------
{content}
```
**Chunking Strategy**: PDFProcessor uses tiktoken encoding (cl100k_base) for precise token counting:
- Chunk size: 500 tokens
- Overlap: 50 tokens
- Page markers preserved: `[Page N]` tags in text
- Section detection via keyword matching (abstract, introduction, results, etc.)
**Vector Store Filtering**: ChromaDB searches support paper_id filtering:
- Single paper: `{"paper_id": "2401.00001"}`
- Multiple papers: `{"paper_id": {"$in": ["2401.00001", "2401.00002"]}}`
**Semantic Caching**: Cache hits when cosine similarity β‰₯ 0.95 between query embeddings. Cache key includes both query and category.
**Error Handling Philosophy**: Agents catch exceptions, log errors, append to `state["errors"]`, and return partial results rather than failing completely. For example, Analyzer returns confidence_score=0.0 on failure.
### LangGraph Orchestration (`orchestration/`)
**Workflow Graph** (`orchestration/workflow_graph.py`):
- `create_workflow_graph()`: Creates StateGraph with all nodes and conditional edges
- `run_workflow()`: Sync wrapper for Gradio compatibility (uses `nest-asyncio`)
- `run_workflow_async()`: Async streaming execution
- `get_workflow_state()`: Retrieve current state by thread ID
**Node Wrappers** (`orchestration/nodes.py`):
- `retriever_node()`: Executes RetrieverAgent with LangFuse tracing
- `analyzer_node()`: Executes AnalyzerAgent with LangFuse tracing
- `filter_node()`: Filters out low-confidence analyses (confidence_score < 0.7)
- `synthesis_node()`: Executes SynthesisAgent with LangFuse tracing
- `citation_node()`: Executes CitationAgent with LangFuse tracing
**Conditional Routing**:
- `should_continue_after_retriever()`: Returns "END" if no papers found, else "analyzer"
- `should_continue_after_filter()`: Returns "END" if all analyses filtered out, else "synthesis"
**Workflow Execution Flow**:
```python
# In app.py
workflow_app = create_workflow_graph(
retriever_agent=self.retriever_agent,
analyzer_agent=self.analyzer_agent,
synthesis_agent=self.synthesis_agent,
citation_agent=self.citation_agent
)
# Run workflow with checkpointing
config = {"configurable": {"thread_id": session_id}}
final_state = run_workflow(workflow_app, initial_state, config, progress)
```
**State Serialization**:
- LangGraph uses msgpack for state checkpointing
- **CRITICAL**: Only msgpack-serializable types allowed in state
- βœ… Primitives: str, int, float, bool, None
- βœ… Collections: list, dict
- βœ… Pydantic models (via `.model_dump()`)
- ❌ Complex objects: Gradio Progress, file handles, callbacks
- See BUGFIX_MSGPACK_SERIALIZATION.md for detailed fix documentation
## Development Commands
### Running the Application
```bash
# Start Gradio interface (http://localhost:7860)
python app.py
```
### Testing
```bash
# Run all tests with verbose output
pytest tests/ -v
# Run specific test file
pytest tests/test_analyzer.py -v
# Run single test
pytest tests/test_analyzer.py::TestAnalyzerAgent::test_analyze_paper_success -v
# Run with coverage
pytest tests/ --cov=agents --cov=rag --cov=utils -v
# Run tests matching pattern
pytest tests/ -k "analyzer" -v
```
### Environment Setup
```bash
# Copy environment template
cp .env.example .env
# Required variables in .env:
# AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
# AZURE_OPENAI_API_KEY=your-key
# AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4o-mini
# AZURE_OPENAI_API_VERSION=2024-02-01 # optional
# Optional MCP (Model Context Protocol) variables:
# USE_MCP_ARXIV=false # Set to 'true' to use MCP (FastMCP by default)
# USE_LEGACY_MCP=false # Set to 'true' to use legacy MCP instead of FastMCP
# MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/ # MCP server storage path
# FASTMCP_SERVER_PORT=5555 # Port for FastMCP server (auto-started)
# Optional LangFuse observability variables:
# LANGFUSE_ENABLED=true # Enable LangFuse tracing
# LANGFUSE_PUBLIC_KEY=pk-lf-... # LangFuse public key
# LANGFUSE_SECRET_KEY=sk-lf-... # LangFuse secret key
# LANGFUSE_HOST=https://cloud.langfuse.com # LangFuse host (cloud or self-hosted)
# LANGFUSE_TRACE_ALL_LLM=true # Auto-trace all Azure OpenAI calls
# LANGFUSE_TRACE_RAG=true # Trace RAG operations
# LANGFUSE_FLUSH_AT=15 # Batch size for flushing traces
# LANGFUSE_FLUSH_INTERVAL=10 # Flush interval in seconds
```
### Data Management
```bash
# Clear vector store (useful for testing)
rm -rf data/chroma_db/
# Clear cached papers
rm -rf data/papers/
# Clear semantic cache
rm -rf data/cache/
```
## Key Implementation Details
### Azure OpenAI Integration
All agents use **temperature=0** and **response_format={"type": "json_object"}** for deterministic, structured outputs. Initialize clients like:
```python
from openai import AzureOpenAI
client = AzureOpenAI(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)
```
### Pydantic Schemas (`utils/schemas.py` and `utils/langgraph_state.py`)
All data structures use Pydantic for validation:
- `Paper`: arXiv paper metadata
- `PaperChunk`: Text chunk with metadata
- `Analysis`: Individual paper analysis results
- `SynthesisResult`: Cross-paper synthesis with ConsensusPoint and Contradiction
- `ValidatedOutput`: Final output with citations and cost tracking
- `AgentState`: TypedDict for LangGraph state management (used in workflow orchestration)
**Observability Models** (`observability/trace_reader.py`):
- `TraceInfo`: Trace metadata and performance metrics
- `SpanInfo`: Agent execution data with timings
- `GenerationInfo`: LLM call details (prompt, completion, tokens, cost)
**Analytics Models** (`observability/analytics.py`):
- `AgentStats`: Per-agent performance statistics (latency, tokens, cost, errors)
- `WorkflowStats`: Workflow-level aggregated metrics
- `AgentTrajectory`: Complete execution path with timings
### Retry Logic
ArxivClient uses tenacity for resilient API calls:
- 3 retry attempts
- Exponential backoff (4s min, 10s max)
- Applied to search_papers() and download_paper()
### MCP (Model Context Protocol) Integration
The system supports **optional** integration with arXiv MCP servers as an alternative to direct arXiv API access. **FastMCP is now the default MCP implementation** when `USE_MCP_ARXIV=true`.
**Architecture Overview**:
- Three client options: Direct ArxivClient, Legacy MCPArxivClient, FastMCPArxivClient
- All clients implement the same interface for drop-in compatibility
- RetrieverAgent includes intelligent fallback from MCP to direct API
- App selects client based on environment variables with cascading fallback
**Client Selection Logic** (`app.py` lines 75-135):
1. `USE_MCP_ARXIV=false` β†’ Direct ArxivClient (default)
2. `USE_MCP_ARXIV=true` + `USE_LEGACY_MCP=true` β†’ Legacy MCPArxivClient
3. `USE_MCP_ARXIV=true` (default) β†’ FastMCPArxivClient with auto-start server
4. Fallback cascade: FastMCP β†’ Legacy MCP β†’ Direct API
**FastMCP Implementation** (Recommended):
**Server** (`utils/fastmcp_arxiv_server.py`):
- Auto-start FastMCP server in background thread
- Implements tools: `search_papers`, `download_paper`, `list_papers`
- Uses standard `arxiv` library for arXiv API access
- Configurable port (default: 5555) via `FASTMCP_SERVER_PORT`
- Singleton pattern for application-wide server instance
- Graceful shutdown on app exit
- Compatible with local and HuggingFace Spaces deployment
**Client** (`utils/fastmcp_arxiv_client.py`):
- Async-first design with sync wrappers for Gradio compatibility
- Connects to FastMCP server via HTTP
- Lazy client initialization on first use
- Reuses legacy MCP's robust `_parse_mcp_paper()` logic
- **Built-in fallback**: Direct arXiv download if MCP fails
- Same retry logic (3 attempts, exponential backoff)
- Uses `nest-asyncio` for event loop compatibility
**Retriever Fallback Logic** (`agents/retriever.py` lines 68-156):
- Two-tier fallback: Primary client β†’ Fallback client
- `_search_with_fallback()`: Try primary MCP, then fallback to direct API
- `_download_with_fallback()`: Try primary MCP, then fallback to direct API
- Ensures paper retrieval never fails due to MCP issues
- Detailed logging of fallback events
**Legacy MCP Client** (`utils/mcp_arxiv_client.py`):
- In-process handler calls (imports MCP server functions directly)
- Stdio protocol for external MCP servers
- Maintained for backward compatibility
- Enable via `USE_LEGACY_MCP=true` when `USE_MCP_ARXIV=true`
- All features from legacy implementation preserved
**Key Features Across All MCP Clients**:
- Async-first design with sync wrappers
- MCP tools: `search_papers`, `download_paper`, `list_papers`
- Transforms MCP responses to `Paper` Pydantic objects
- Same retry logic and caching behavior as ArxivClient
- Automatic direct download fallback if MCP storage inaccessible
**Zero Breaking Changes**:
- Downstream agents (Analyzer, Synthesis, Citation) unaffected
- Same state dictionary structure maintained
- PDF processing, chunking, and RAG unchanged
- Toggle via environment variables without code changes
- Legacy MCP remains available for compatibility
**Configuration** (`.env.example`):
```bash
# Enable MCP (FastMCP by default)
USE_MCP_ARXIV=true
# Force legacy MCP instead of FastMCP (optional)
USE_LEGACY_MCP=false
# Storage path for papers (used by all MCP clients)
MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/
# FastMCP server port
FASTMCP_SERVER_PORT=5555
```
**Testing**:
- FastMCP: `pytest tests/test_fastmcp_arxiv.py -v` (38 tests)
- Legacy MCP: `pytest tests/test_mcp_arxiv_client.py -v` (21 tests)
- Both test suites cover: search, download, caching, error handling, fallback logic
### PDF Processing Edge Cases
- Some PDFs may be scanned images (extraction fails gracefully)
- Page markers `[Page N]` extracted during text extraction for chunk attribution
- Section detection is heuristic-based (checks first 5 lines of chunk)
- Empty pages or extraction failures logged as warnings, not errors
### Gradio UI Structure (`app.py`)
ResearchPaperAnalyzer class orchestrates the workflow:
1. Initialize LangFuse client and instrument Azure OpenAI (if enabled)
2. Create LangGraph workflow with all agents
3. Check semantic cache first
4. Initialize state dictionary with `create_initial_state()`
5. Generate unique `session_id` for trace tracking
6. Run LangGraph workflow via `run_workflow()` from orchestration module
7. Flush LangFuse traces to ensure upload
8. Cache results on success
9. Format output for 5 tabs: Papers, Analysis, Synthesis, Citations, Stats
**LangGraph Workflow Execution**:
- Nodes execute in order: retriever β†’ analyzer β†’ filter β†’ synthesis β†’ citation
- Conditional edges for early termination (no papers found, all analyses failed)
- Checkpointing enabled via `MemorySaver` for workflow state persistence
- Progress updates still work via local variable (NOT in state to avoid msgpack serialization issues)
## Testing Patterns
Tests use mocks to avoid external dependencies:
```python
# Mock RAG retriever
mock_retriever = Mock(spec=RAGRetriever)
mock_retriever.retrieve.return_value = {"chunks": [...], "chunk_ids": [...]}
# Mock Azure OpenAI
with patch('agents.analyzer.AzureOpenAI', return_value=mock_client):
agent = AnalyzerAgent(rag_retriever=mock_retriever)
```
Current test coverage:
- **AnalyzerAgent** (18 tests): Core analysis workflow and error handling
- **MCPArxivClient** (21 tests): Legacy MCP tool integration, async/sync wrappers, response parsing
- **FastMCPArxiv** (38 tests): FastMCP server, client, integration, error handling, fallback logic
When adding tests for other agents, follow the same pattern:
- Fixtures for mock dependencies
- Test both success and error paths
- Verify state transformations
- Test edge cases (empty inputs, API failures)
- For async code, use `pytest-asyncio` with `@pytest.mark.asyncio`
## Observability and Analytics
### LangFuse Integration
The system automatically traces all agent executions and LLM calls when LangFuse is enabled:
**Configuration** (`utils/langfuse_client.py`):
- `initialize_langfuse()`: Initialize global LangFuse client at startup
- `instrument_openai()`: Auto-trace all Azure OpenAI API calls
- `@observe` decorator: Trace custom functions/spans
- `flush_langfuse()`: Ensure all traces uploaded before shutdown
**Automatic Tracing**:
- All agent `run()` methods decorated with `@observe`
- LLM calls automatically captured (prompt, completion, tokens, cost)
- RAG operations traced (embeddings, vector search)
- Workflow state transitions logged
### Trace Querying (`observability/trace_reader.py`)
```python
from observability import TraceReader
reader = TraceReader()
# Get recent traces
traces = reader.get_traces(limit=10)
# Filter by user/session
traces = reader.get_traces(user_id="user-123", session_id="session-abc")
# Filter by date range
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=7)
traces = reader.filter_by_date_range(traces, start_date=start)
# Get specific agent executions
analyzer_spans = reader.filter_by_agent(traces, agent_name="analyzer_agent")
# Export traces
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")
```
### Performance Analytics (`observability/analytics.py`)
```python
from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer
# Performance metrics
perf_analyzer = AgentPerformanceAnalyzer()
# Get agent latency statistics
stats = perf_analyzer.agent_latency_stats("analyzer_agent", days=7)
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")
# Token usage breakdown
token_usage = perf_analyzer.token_usage_breakdown(days=7)
print(f"Total tokens: {sum(token_usage.values())}")
# Cost per agent
costs = perf_analyzer.cost_per_agent(days=7)
print(f"Total cost: ${sum(costs.values()):.4f}")
# Error rates
error_rates = perf_analyzer.error_rates(days=7)
# Workflow summary
summary = perf_analyzer.workflow_performance_summary(days=7)
print(f"Success rate: {summary.success_rate:.1f}%")
print(f"Avg duration: {summary.avg_duration_ms/1000:.2f}s")
# Trajectory analysis
traj_analyzer = AgentTrajectoryAnalyzer()
analysis = traj_analyzer.analyze_execution_paths(days=7)
print(f"Most common path: {analysis['most_common_path']}")
```
See `observability/README.md` for comprehensive documentation.
## Common Modification Points
**Adding a new agent**:
1. Create agent class with `run(state) -> state` method
2. Decorate `run()` with `@observe` for tracing
3. Add node wrapper in `orchestration/nodes.py`
4. Add node to workflow graph in `orchestration/workflow_graph.py`
5. Update conditional routing if needed
**Modifying chunking**:
- Adjust `chunk_size` and `chunk_overlap` in PDFProcessor initialization
- Affects retrieval quality vs. context size tradeoff
- Default 500/50 balances precision and coverage
**Changing LLM model**:
- Update `AZURE_OPENAI_DEPLOYMENT_NAME` in .env
- Cost estimates in CitationAgent may need adjustment
- Temperature must stay 0 for deterministic outputs
**Adding arXiv categories**:
- Extend `ARXIV_CATEGORIES` list in `app.py`
- Format: `"code - Description"` (e.g., `"cs.AI - Artificial Intelligence"`)
**Switching between arXiv clients**:
- Set `USE_MCP_ARXIV=false` (default) β†’ Direct ArxivClient
- Set `USE_MCP_ARXIV=true` β†’ FastMCPArxivClient (default MCP)
- Set `USE_MCP_ARXIV=true` + `USE_LEGACY_MCP=true` β†’ Legacy MCPArxivClient
- Configure `MCP_ARXIV_STORAGE_PATH` for MCP server's storage location
- Configure `FASTMCP_SERVER_PORT` for FastMCP server port (default: 5555)
- No code changes required - client selected automatically in `app.py`
- All clients implement identical interface for seamless switching
- FastMCP server auto-starts when FastMCP client is selected
## Cost and Performance Considerations
- Target: <$0.50 per 5-paper analysis
- Semantic cache reduces repeated query costs
- ChromaDB persistence prevents re-embedding same papers
- Batch embedding generation in PDFProcessor for efficiency
- Token usage tracked per request for monitoring
- LangFuse observability enables cost optimization insights
- LangGraph overhead: <1% for state management
- Trace upload overhead: ~5-10ms per trace (async, negligible impact)
## Key Files and Modules
### Core Application
- `app.py`: Gradio UI and workflow orchestration entry point
- `utils/config.py`: Configuration management (Azure OpenAI, LangFuse, MCP)
- `utils/schemas.py`: Pydantic data models for validation
- `utils/langgraph_state.py`: LangGraph state TypedDict and helpers
### Agents
- `agents/retriever.py`: Paper retrieval, PDF processing, embeddings
- `agents/analyzer.py`: Individual paper analysis with RAG
- `agents/synthesis.py`: Cross-paper synthesis and insights
- `agents/citation.py`: Citation generation and validation
### RAG Components
- `rag/pdf_processor.py`: PDF text extraction and chunking
- `rag/embeddings.py`: Batch embedding generation (Azure OpenAI)
- `rag/vector_store.py`: ChromaDB vector store management
- `rag/retrieval.py`: RAG retrieval with formatted context
### Orchestration (LangGraph)
- `orchestration/__init__.py`: Module exports
- `orchestration/nodes.py`: Node wrappers with tracing
- `orchestration/workflow_graph.py`: LangGraph workflow builder
### Observability (LangFuse)
- `observability/__init__.py`: Module exports
- `observability/trace_reader.py`: Trace querying and export API
- `observability/analytics.py`: Performance analytics and trajectory analysis
- `observability/README.md`: Comprehensive observability documentation
- `utils/langfuse_client.py`: LangFuse client initialization and helpers
### Utilities
- `utils/arxiv_client.py`: Direct arXiv API client with retry logic
- `utils/mcp_arxiv_client.py`: Legacy MCP client implementation
- `utils/fastmcp_arxiv_client.py`: FastMCP client (recommended)
- `utils/fastmcp_arxiv_server.py`: FastMCP server with auto-start
- `utils/semantic_cache.py`: Query caching with embeddings
### Documentation
- `CLAUDE.md`: This file - comprehensive developer guide
- `README.md`: User-facing project documentation
- `REFACTORING_SUMMARY.md`: LangGraph + LangFuse refactoring details
- `BUGFIX_MSGPACK_SERIALIZATION.md`: msgpack serialization fix documentation
- `.env.example`: Environment variable template with all options
## Version History and Recent Changes
### Version 2.6: LangGraph Orchestration + LangFuse Observability
**Added:**
- LangGraph workflow orchestration with conditional routing
- LangFuse automatic tracing for all agents and LLM calls
- Observability Python API for trace querying and analytics
- Performance analytics (latency, tokens, cost, error rates)
- Agent trajectory analysis
- Checkpointing with `MemorySaver`
**Fixed:**
- msgpack serialization error (removed Gradio Progress from state)
**Dependencies Added:**
- `langgraph>=0.2.0`
- `langfuse>=2.0.0`
- `langfuse-openai>=1.0.0`
**Breaking Changes:**
- None! Fully backward compatible
**Documentation:**
- Created `observability/README.md`
- Created `REFACTORING_SUMMARY.md`
- Created `BUGFIX_MSGPACK_SERIALIZATION.md`
- Updated `CLAUDE.md` (this file)
- Updated `.env.example`
See `REFACTORING_SUMMARY.md` for detailed migration guide and architecture changes.