GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4

A newer version of the Gradio SDK is available: 6.6.0

Upgrade

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Core Architecture

This is a multi-agent RAG system for analyzing academic papers from arXiv. The system uses LangGraph for workflow orchestration and LangFuse for comprehensive observability.

Agent Pipeline Flow

User Query β†’ Retriever β†’ Analyzer β†’ Filter β†’ Synthesis β†’ Citation β†’ Output
                ↓          ↓         ↓         ↓           ↓
            [LangFuse Tracing for All Nodes]

Orchestration: The workflow is managed by LangGraph (orchestration/workflow_graph.py):

  • Conditional routing (early termination if no papers found or all analyses fail)
  • Automatic checkpointing with MemorySaver
  • State management with type-safe AgentState TypedDict
  • Node wrappers in orchestration/nodes.py with automatic tracing

State Dictionary (utils/langgraph_state.py): All agents operate on a shared state dictionary that flows through the pipeline:

  • query: User's research question
  • category: Optional arXiv category filter
  • num_papers: Number of papers to analyze
  • papers: List of Paper objects (populated by Retriever)
  • chunks: List of PaperChunk objects (populated by Retriever)
  • analyses: List of Analysis objects (populated by Analyzer)
  • synthesis: SynthesisResult object (populated by Synthesis)
  • validated_output: ValidatedOutput object (populated by Citation)
  • errors: List of error messages accumulated across agents
  • token_usage: Dict tracking input/output/embedding tokens
  • trace_id: LangFuse trace identifier (for observability)
  • session_id: User session tracking
  • user_id: Optional user identifier

IMPORTANT: Only msgpack-serializable data should be stored in the state. Do NOT add complex objects like Gradio Progress, file handles, or callbacks to the state dictionary (see BUGFIX_MSGPACK_SERIALIZATION.md).

Agent Responsibilities

  1. RetrieverAgent (agents/retriever.py):

    • Decorated with @observe for LangFuse tracing
    • Searches arXiv API using ArxivClient, MCPArxivClient, or FastMCPArxivClient (configurable via env)
    • Downloads PDFs to data/papers/ (direct API) or MCP server storage (MCP mode)
    • Intelligent Fallback: Automatically falls back to direct API if primary MCP client fails
    • Processes PDFs with PDFProcessor (500-token chunks, 50-token overlap)
    • Generates embeddings via EmbeddingGenerator (Azure OpenAI text-embedding-3-small, traced)
    • Stores chunks in ChromaDB via VectorStore
    • FastMCP Support: Auto-start FastMCP server for standardized arXiv access
  2. AnalyzerAgent (agents/analyzer.py):

    • Decorated with @observe(as_type="generation") for LLM call tracing
    • Analyzes each paper individually using RAG
    • Uses 4 broad queries per paper: methodology, results, conclusions, limitations
    • Deduplicates chunks by chunk_id
    • Calls Azure OpenAI with temperature=0 and JSON mode
    • RAG retrieval automatically traced via @observe on RAGRetriever.retrieve()
    • Returns structured Analysis objects with confidence scores
  3. SynthesisAgent (agents/synthesis.py):

    • Decorated with @observe(as_type="generation") for LLM call tracing
    • Compares findings across all papers
    • Identifies consensus points, contradictions, research gaps
    • Creates executive summary addressing user's query
    • Uses temperature=0 for deterministic outputs
    • Returns SynthesisResult with confidence scores
  4. CitationAgent (agents/citation.py):

    • Decorated with @observe(as_type="span") for data processing tracing
    • Generates APA-formatted citations for all papers
    • Validates synthesis claims against source papers
    • Calculates cost estimates (GPT-4o-mini pricing)
    • Creates final ValidatedOutput with all metadata

Critical Architecture Patterns

RAG Context Formatting: RAGRetriever.format_context() creates structured context with: ``` [Chunk N] Paper: {title} Authors: {authors} Section: {section} Page: {page_number} Source: {arxiv_url}

{content}


**Chunking Strategy**: PDFProcessor uses tiktoken encoding (cl100k_base) for precise token counting:
- Chunk size: 500 tokens
- Overlap: 50 tokens
- Page markers preserved: `[Page N]` tags in text
- Section detection via keyword matching (abstract, introduction, results, etc.)

**Vector Store Filtering**: ChromaDB searches support paper_id filtering:
- Single paper: `{"paper_id": "2401.00001"}`
- Multiple papers: `{"paper_id": {"$in": ["2401.00001", "2401.00002"]}}`

**Semantic Caching**: Cache hits when cosine similarity β‰₯ 0.95 between query embeddings. Cache key includes both query and category.

**Error Handling Philosophy**: Agents catch exceptions, log errors, append to `state["errors"]`, and return partial results rather than failing completely. For example, Analyzer returns confidence_score=0.0 on failure.

### LangGraph Orchestration (`orchestration/`)

**Workflow Graph** (`orchestration/workflow_graph.py`):
- `create_workflow_graph()`: Creates StateGraph with all nodes and conditional edges
- `run_workflow()`: Sync wrapper for Gradio compatibility (uses `nest-asyncio`)
- `run_workflow_async()`: Async streaming execution
- `get_workflow_state()`: Retrieve current state by thread ID

**Node Wrappers** (`orchestration/nodes.py`):
- `retriever_node()`: Executes RetrieverAgent with LangFuse tracing
- `analyzer_node()`: Executes AnalyzerAgent with LangFuse tracing
- `filter_node()`: Filters out low-confidence analyses (confidence_score < 0.7)
- `synthesis_node()`: Executes SynthesisAgent with LangFuse tracing
- `citation_node()`: Executes CitationAgent with LangFuse tracing

**Conditional Routing**:
- `should_continue_after_retriever()`: Returns "END" if no papers found, else "analyzer"
- `should_continue_after_filter()`: Returns "END" if all analyses filtered out, else "synthesis"

**Workflow Execution Flow**:
```python
# In app.py
workflow_app = create_workflow_graph(
    retriever_agent=self.retriever_agent,
    analyzer_agent=self.analyzer_agent,
    synthesis_agent=self.synthesis_agent,
    citation_agent=self.citation_agent
)

# Run workflow with checkpointing
config = {"configurable": {"thread_id": session_id}}
final_state = run_workflow(workflow_app, initial_state, config, progress)

State Serialization:

  • LangGraph uses msgpack for state checkpointing
  • CRITICAL: Only msgpack-serializable types allowed in state
  • βœ… Primitives: str, int, float, bool, None
  • βœ… Collections: list, dict
  • βœ… Pydantic models (via .model_dump())
  • ❌ Complex objects: Gradio Progress, file handles, callbacks
  • See BUGFIX_MSGPACK_SERIALIZATION.md for detailed fix documentation

Development Commands

Running the Application

# Start Gradio interface (http://localhost:7860)
python app.py

Testing

# Run all tests with verbose output
pytest tests/ -v

# Run specific test file
pytest tests/test_analyzer.py -v

# Run single test
pytest tests/test_analyzer.py::TestAnalyzerAgent::test_analyze_paper_success -v

# Run with coverage
pytest tests/ --cov=agents --cov=rag --cov=utils -v

# Run tests matching pattern
pytest tests/ -k "analyzer" -v

Environment Setup

# Copy environment template
cp .env.example .env

# Required variables in .env:
# AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
# AZURE_OPENAI_API_KEY=your-key
# AZURE_OPENAI_DEPLOYMENT_NAME=gpt-4o-mini
# AZURE_OPENAI_API_VERSION=2024-02-01  # optional

# Optional MCP (Model Context Protocol) variables:
# USE_MCP_ARXIV=false              # Set to 'true' to use MCP (FastMCP by default)
# USE_LEGACY_MCP=false              # Set to 'true' to use legacy MCP instead of FastMCP
# MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/  # MCP server storage path
# FASTMCP_SERVER_PORT=5555          # Port for FastMCP server (auto-started)

# Optional LangFuse observability variables:
# LANGFUSE_ENABLED=true            # Enable LangFuse tracing
# LANGFUSE_PUBLIC_KEY=pk-lf-...    # LangFuse public key
# LANGFUSE_SECRET_KEY=sk-lf-...    # LangFuse secret key
# LANGFUSE_HOST=https://cloud.langfuse.com  # LangFuse host (cloud or self-hosted)
# LANGFUSE_TRACE_ALL_LLM=true      # Auto-trace all Azure OpenAI calls
# LANGFUSE_TRACE_RAG=true          # Trace RAG operations
# LANGFUSE_FLUSH_AT=15             # Batch size for flushing traces
# LANGFUSE_FLUSH_INTERVAL=10       # Flush interval in seconds

Data Management

# Clear vector store (useful for testing)
rm -rf data/chroma_db/

# Clear cached papers
rm -rf data/papers/

# Clear semantic cache
rm -rf data/cache/

Key Implementation Details

Azure OpenAI Integration

All agents use temperature=0 and response_format={"type": "json_object"} for deterministic, structured outputs. Initialize clients like:

from openai import AzureOpenAI
client = AzureOpenAI(
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
)

Pydantic Schemas (utils/schemas.py and utils/langgraph_state.py)

All data structures use Pydantic for validation:

  • Paper: arXiv paper metadata
  • PaperChunk: Text chunk with metadata
  • Analysis: Individual paper analysis results
  • SynthesisResult: Cross-paper synthesis with ConsensusPoint and Contradiction
  • ValidatedOutput: Final output with citations and cost tracking
  • AgentState: TypedDict for LangGraph state management (used in workflow orchestration)

Observability Models (observability/trace_reader.py):

  • TraceInfo: Trace metadata and performance metrics
  • SpanInfo: Agent execution data with timings
  • GenerationInfo: LLM call details (prompt, completion, tokens, cost)

Analytics Models (observability/analytics.py):

  • AgentStats: Per-agent performance statistics (latency, tokens, cost, errors)
  • WorkflowStats: Workflow-level aggregated metrics
  • AgentTrajectory: Complete execution path with timings

Retry Logic

ArxivClient uses tenacity for resilient API calls:

  • 3 retry attempts
  • Exponential backoff (4s min, 10s max)
  • Applied to search_papers() and download_paper()

MCP (Model Context Protocol) Integration

The system supports optional integration with arXiv MCP servers as an alternative to direct arXiv API access. FastMCP is now the default MCP implementation when USE_MCP_ARXIV=true.

Architecture Overview:

  • Three client options: Direct ArxivClient, Legacy MCPArxivClient, FastMCPArxivClient
  • All clients implement the same interface for drop-in compatibility
  • RetrieverAgent includes intelligent fallback from MCP to direct API
  • App selects client based on environment variables with cascading fallback

Client Selection Logic (app.py lines 75-135):

  1. USE_MCP_ARXIV=false β†’ Direct ArxivClient (default)
  2. USE_MCP_ARXIV=true + USE_LEGACY_MCP=true β†’ Legacy MCPArxivClient
  3. USE_MCP_ARXIV=true (default) β†’ FastMCPArxivClient with auto-start server
  4. Fallback cascade: FastMCP β†’ Legacy MCP β†’ Direct API

FastMCP Implementation (Recommended):

Server (utils/fastmcp_arxiv_server.py):

  • Auto-start FastMCP server in background thread
  • Implements tools: search_papers, download_paper, list_papers
  • Uses standard arxiv library for arXiv API access
  • Configurable port (default: 5555) via FASTMCP_SERVER_PORT
  • Singleton pattern for application-wide server instance
  • Graceful shutdown on app exit
  • Compatible with local and HuggingFace Spaces deployment

Client (utils/fastmcp_arxiv_client.py):

  • Async-first design with sync wrappers for Gradio compatibility
  • Connects to FastMCP server via HTTP
  • Lazy client initialization on first use
  • Reuses legacy MCP's robust _parse_mcp_paper() logic
  • Built-in fallback: Direct arXiv download if MCP fails
  • Same retry logic (3 attempts, exponential backoff)
  • Uses nest-asyncio for event loop compatibility

Retriever Fallback Logic (agents/retriever.py lines 68-156):

  • Two-tier fallback: Primary client β†’ Fallback client
  • _search_with_fallback(): Try primary MCP, then fallback to direct API
  • _download_with_fallback(): Try primary MCP, then fallback to direct API
  • Ensures paper retrieval never fails due to MCP issues
  • Detailed logging of fallback events

Legacy MCP Client (utils/mcp_arxiv_client.py):

  • In-process handler calls (imports MCP server functions directly)
  • Stdio protocol for external MCP servers
  • Maintained for backward compatibility
  • Enable via USE_LEGACY_MCP=true when USE_MCP_ARXIV=true
  • All features from legacy implementation preserved

Key Features Across All MCP Clients:

  • Async-first design with sync wrappers
  • MCP tools: search_papers, download_paper, list_papers
  • Transforms MCP responses to Paper Pydantic objects
  • Same retry logic and caching behavior as ArxivClient
  • Automatic direct download fallback if MCP storage inaccessible

Zero Breaking Changes:

  • Downstream agents (Analyzer, Synthesis, Citation) unaffected
  • Same state dictionary structure maintained
  • PDF processing, chunking, and RAG unchanged
  • Toggle via environment variables without code changes
  • Legacy MCP remains available for compatibility

Configuration (.env.example):

# Enable MCP (FastMCP by default)
USE_MCP_ARXIV=true

# Force legacy MCP instead of FastMCP (optional)
USE_LEGACY_MCP=false

# Storage path for papers (used by all MCP clients)
MCP_ARXIV_STORAGE_PATH=./data/mcp_papers/

# FastMCP server port
FASTMCP_SERVER_PORT=5555

Testing:

  • FastMCP: pytest tests/test_fastmcp_arxiv.py -v (38 tests)
  • Legacy MCP: pytest tests/test_mcp_arxiv_client.py -v (21 tests)
  • Both test suites cover: search, download, caching, error handling, fallback logic

PDF Processing Edge Cases

  • Some PDFs may be scanned images (extraction fails gracefully)
  • Page markers [Page N] extracted during text extraction for chunk attribution
  • Section detection is heuristic-based (checks first 5 lines of chunk)
  • Empty pages or extraction failures logged as warnings, not errors

Gradio UI Structure (app.py)

ResearchPaperAnalyzer class orchestrates the workflow:

  1. Initialize LangFuse client and instrument Azure OpenAI (if enabled)
  2. Create LangGraph workflow with all agents
  3. Check semantic cache first
  4. Initialize state dictionary with create_initial_state()
  5. Generate unique session_id for trace tracking
  6. Run LangGraph workflow via run_workflow() from orchestration module
  7. Flush LangFuse traces to ensure upload
  8. Cache results on success
  9. Format output for 5 tabs: Papers, Analysis, Synthesis, Citations, Stats

LangGraph Workflow Execution:

  • Nodes execute in order: retriever β†’ analyzer β†’ filter β†’ synthesis β†’ citation
  • Conditional edges for early termination (no papers found, all analyses failed)
  • Checkpointing enabled via MemorySaver for workflow state persistence
  • Progress updates still work via local variable (NOT in state to avoid msgpack serialization issues)

Testing Patterns

Tests use mocks to avoid external dependencies:

# Mock RAG retriever
mock_retriever = Mock(spec=RAGRetriever)
mock_retriever.retrieve.return_value = {"chunks": [...], "chunk_ids": [...]}

# Mock Azure OpenAI
with patch('agents.analyzer.AzureOpenAI', return_value=mock_client):
    agent = AnalyzerAgent(rag_retriever=mock_retriever)

Current test coverage:

  • AnalyzerAgent (18 tests): Core analysis workflow and error handling
  • MCPArxivClient (21 tests): Legacy MCP tool integration, async/sync wrappers, response parsing
  • FastMCPArxiv (38 tests): FastMCP server, client, integration, error handling, fallback logic

When adding tests for other agents, follow the same pattern:

  • Fixtures for mock dependencies
  • Test both success and error paths
  • Verify state transformations
  • Test edge cases (empty inputs, API failures)
  • For async code, use pytest-asyncio with @pytest.mark.asyncio

Observability and Analytics

LangFuse Integration

The system automatically traces all agent executions and LLM calls when LangFuse is enabled:

Configuration (utils/langfuse_client.py):

  • initialize_langfuse(): Initialize global LangFuse client at startup
  • instrument_openai(): Auto-trace all Azure OpenAI API calls
  • @observe decorator: Trace custom functions/spans
  • flush_langfuse(): Ensure all traces uploaded before shutdown

Automatic Tracing:

  • All agent run() methods decorated with @observe
  • LLM calls automatically captured (prompt, completion, tokens, cost)
  • RAG operations traced (embeddings, vector search)
  • Workflow state transitions logged

Trace Querying (observability/trace_reader.py)

from observability import TraceReader

reader = TraceReader()

# Get recent traces
traces = reader.get_traces(limit=10)

# Filter by user/session
traces = reader.get_traces(user_id="user-123", session_id="session-abc")

# Filter by date range
from datetime import datetime, timedelta
start = datetime.now() - timedelta(days=7)
traces = reader.filter_by_date_range(traces, start_date=start)

# Get specific agent executions
analyzer_spans = reader.filter_by_agent(traces, agent_name="analyzer_agent")

# Export traces
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")

Performance Analytics (observability/analytics.py)

from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer

# Performance metrics
perf_analyzer = AgentPerformanceAnalyzer()

# Get agent latency statistics
stats = perf_analyzer.agent_latency_stats("analyzer_agent", days=7)
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")

# Token usage breakdown
token_usage = perf_analyzer.token_usage_breakdown(days=7)
print(f"Total tokens: {sum(token_usage.values())}")

# Cost per agent
costs = perf_analyzer.cost_per_agent(days=7)
print(f"Total cost: ${sum(costs.values()):.4f}")

# Error rates
error_rates = perf_analyzer.error_rates(days=7)

# Workflow summary
summary = perf_analyzer.workflow_performance_summary(days=7)
print(f"Success rate: {summary.success_rate:.1f}%")
print(f"Avg duration: {summary.avg_duration_ms/1000:.2f}s")

# Trajectory analysis
traj_analyzer = AgentTrajectoryAnalyzer()
analysis = traj_analyzer.analyze_execution_paths(days=7)
print(f"Most common path: {analysis['most_common_path']}")

See observability/README.md for comprehensive documentation.

Common Modification Points

Adding a new agent:

  1. Create agent class with run(state) -> state method
  2. Decorate run() with @observe for tracing
  3. Add node wrapper in orchestration/nodes.py
  4. Add node to workflow graph in orchestration/workflow_graph.py
  5. Update conditional routing if needed

Modifying chunking:

  • Adjust chunk_size and chunk_overlap in PDFProcessor initialization
  • Affects retrieval quality vs. context size tradeoff
  • Default 500/50 balances precision and coverage

Changing LLM model:

  • Update AZURE_OPENAI_DEPLOYMENT_NAME in .env
  • Cost estimates in CitationAgent may need adjustment
  • Temperature must stay 0 for deterministic outputs

Adding arXiv categories:

  • Extend ARXIV_CATEGORIES list in app.py
  • Format: "code - Description" (e.g., "cs.AI - Artificial Intelligence")

Switching between arXiv clients:

  • Set USE_MCP_ARXIV=false (default) β†’ Direct ArxivClient
  • Set USE_MCP_ARXIV=true β†’ FastMCPArxivClient (default MCP)
  • Set USE_MCP_ARXIV=true + USE_LEGACY_MCP=true β†’ Legacy MCPArxivClient
  • Configure MCP_ARXIV_STORAGE_PATH for MCP server's storage location
  • Configure FASTMCP_SERVER_PORT for FastMCP server port (default: 5555)
  • No code changes required - client selected automatically in app.py
  • All clients implement identical interface for seamless switching
  • FastMCP server auto-starts when FastMCP client is selected

Cost and Performance Considerations

  • Target: <$0.50 per 5-paper analysis
  • Semantic cache reduces repeated query costs
  • ChromaDB persistence prevents re-embedding same papers
  • Batch embedding generation in PDFProcessor for efficiency
  • Token usage tracked per request for monitoring
  • LangFuse observability enables cost optimization insights
  • LangGraph overhead: <1% for state management
  • Trace upload overhead: ~5-10ms per trace (async, negligible impact)

Key Files and Modules

Core Application

  • app.py: Gradio UI and workflow orchestration entry point
  • utils/config.py: Configuration management (Azure OpenAI, LangFuse, MCP)
  • utils/schemas.py: Pydantic data models for validation
  • utils/langgraph_state.py: LangGraph state TypedDict and helpers

Agents

  • agents/retriever.py: Paper retrieval, PDF processing, embeddings
  • agents/analyzer.py: Individual paper analysis with RAG
  • agents/synthesis.py: Cross-paper synthesis and insights
  • agents/citation.py: Citation generation and validation

RAG Components

  • rag/pdf_processor.py: PDF text extraction and chunking
  • rag/embeddings.py: Batch embedding generation (Azure OpenAI)
  • rag/vector_store.py: ChromaDB vector store management
  • rag/retrieval.py: RAG retrieval with formatted context

Orchestration (LangGraph)

  • orchestration/__init__.py: Module exports
  • orchestration/nodes.py: Node wrappers with tracing
  • orchestration/workflow_graph.py: LangGraph workflow builder

Observability (LangFuse)

  • observability/__init__.py: Module exports
  • observability/trace_reader.py: Trace querying and export API
  • observability/analytics.py: Performance analytics and trajectory analysis
  • observability/README.md: Comprehensive observability documentation
  • utils/langfuse_client.py: LangFuse client initialization and helpers

Utilities

  • utils/arxiv_client.py: Direct arXiv API client with retry logic
  • utils/mcp_arxiv_client.py: Legacy MCP client implementation
  • utils/fastmcp_arxiv_client.py: FastMCP client (recommended)
  • utils/fastmcp_arxiv_server.py: FastMCP server with auto-start
  • utils/semantic_cache.py: Query caching with embeddings

Documentation

  • CLAUDE.md: This file - comprehensive developer guide
  • README.md: User-facing project documentation
  • REFACTORING_SUMMARY.md: LangGraph + LangFuse refactoring details
  • BUGFIX_MSGPACK_SERIALIZATION.md: msgpack serialization fix documentation
  • .env.example: Environment variable template with all options

Version History and Recent Changes

Version 2.6: LangGraph Orchestration + LangFuse Observability

Added:

  • LangGraph workflow orchestration with conditional routing
  • LangFuse automatic tracing for all agents and LLM calls
  • Observability Python API for trace querying and analytics
  • Performance analytics (latency, tokens, cost, error rates)
  • Agent trajectory analysis
  • Checkpointing with MemorySaver

Fixed:

  • msgpack serialization error (removed Gradio Progress from state)

Dependencies Added:

  • langgraph>=0.2.0
  • langfuse>=2.0.0
  • langfuse-openai>=1.0.0

Breaking Changes:

  • None! Fully backward compatible

Documentation:

  • Created observability/README.md
  • Created REFACTORING_SUMMARY.md
  • Created BUGFIX_MSGPACK_SERIALIZATION.md
  • Updated CLAUDE.md (this file)
  • Updated .env.example

See REFACTORING_SUMMARY.md for detailed migration guide and architecture changes.