Spaces:
Sleeping
Sleeping
Technical Architecture - HuggingFace Edition
System Overview
This document describes the technical architecture of the HuggingFace-powered RAG application for corporate policy analysis.
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PolicyWise RAG Application β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Web Interface Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Flask App Factory β Chat API β Search API β Health API β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β RAG Pipeline Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Query Processing β Context Assembly β Response Generation β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β HuggingFace Services Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β HF Embedding API β HF Dataset Store β HF Inference API β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Document Processing Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Document Parser β Text Chunker β Metadata Manager β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Core Components
1. Application Layer
Flask App Factory (src/app_factory.py)
- Purpose: Flask application factory with lazy service loading
- Key Features:
- Triple-layer HuggingFace service override system
- Cached service initialization for performance
- Memory-optimized startup with on-demand loading
- Comprehensive health monitoring
def create_app():
"""Create Flask app with HuggingFace services"""
app = Flask(__name__)
# Force HuggingFace services when HF_TOKEN available
@app.before_first_request
def init_services():
if os.getenv("HF_TOKEN"):
ensure_hf_services()
return app
Configuration Management (src/config.py)
- Purpose: Centralized configuration with HF override logic
- Key Features:
- Automatic HF_TOKEN detection
- Configuration precedence management
- Environment-specific settings
- Debug logging for configuration decisions
2. RAG Pipeline Layer
RAG Pipeline (src/rag/rag_pipeline.py)
- Purpose: Orchestrates the complete retrieval-augmented generation workflow
- Key Components:
- Query processing and expansion
- Vector similarity search coordination
- Context assembly and optimization
- Response generation and citation formatting
- Source attribution with metadata lookup
class RAGPipeline:
def __init__(self, embedding_service, vector_store, llm_service):
self.embedding_service = embedding_service # HF Embedding API
self.vector_store = vector_store # HF Dataset Store
self.llm_service = llm_service # HF Inference API
def process_query(self, query: str) -> dict:
"""Complete RAG workflow with HF services"""
# 1. Generate query embedding
query_embedding = self.embedding_service.embed_text(query)
# 2. Search vector store
results = self.vector_store.search(query_embedding, top_k=5)
# 3. Assemble context
context = self._assemble_context(results)
# 4. Generate response
response = self.llm_service.generate_response(query, context)
# 5. Format with citations
return self._format_response_with_citations(response, results)
3. HuggingFace Services Layer
HuggingFace Embedding Service (src/embedding/hf_embedding_with_fallback.py)
- Model:
intfloat/multilingual-e5-large - Dimensions: 1024
- Features:
- HuggingFace Inference API integration
- Automatic batching for efficiency
- Local ONNX fallback for development
- Memory-optimized processing
class HuggingFaceEmbeddingServiceWithFallback:
def __init__(self, hf_token: str):
self.hf_token = hf_token
self.model_name = "intfloat/multilingual-e5-large"
self.api_url = f"https://router.huggingface.co/hf-inference/models/{self.model_name}"
def embed_text(self, text: str) -> List[float]:
"""Generate embedding using HF Inference API"""
response = requests.post(
self.api_url,
headers={"Authorization": f"Bearer {self.hf_token}"},
json={"inputs": text}
)
return response.json()
HuggingFace Dataset Vector Store (src/vector_store/hf_dataset_store.py)
- Purpose: Persistent vector storage using HuggingFace Datasets
- Features:
- JSON string serialization for complex metadata
- Cosine similarity search with native operations
- Parquet and JSON fallback storage
- Complete interface compatibility
class HFDatasetVectorStore:
def __init__(self, dataset_name: str = "policy-vectors"):
self.dataset_name = dataset_name
self.dataset = None
def search(self, query_embedding: List[float], top_k: int = 5) -> List[dict]:
"""Cosine similarity search using HF Dataset operations"""
# Calculate cosine similarities
similarities = cosine_similarity([query_embedding], embeddings)[0]
# Get top-k results
top_indices = np.argsort(similarities)[-top_k:][::-1]
return [
{
"content": self.dataset[idx]["content"],
"metadata": json.loads(self.dataset[idx]["metadata"]),
"similarity_score": float(similarities[idx])
}
for idx in top_indices
]
HuggingFace LLM Service (src/llm/hf_llm_service.py)
- Model:
meta-llama/Meta-Llama-3-8B-Instruct - Features:
- HuggingFace Inference API integration
- Automatic prompt formatting
- Response parsing and validation
- Built-in safety filtering
4. Document Processing Layer
Document Processing Pipeline (scripts/hf_process_documents.py)
- Purpose: Automated document ingestion and embedding generation
- Workflow:
- Read policy documents from
synthetic_policies/ - Split into semantic chunks with overlap
- Generate embeddings via HF Inference API
- Store in HuggingFace Dataset with metadata
- Validate processing and report statistics
- Read policy documents from
def process_documents():
"""Process all policy documents using HF services"""
# Initialize HF services
embedding_service = HuggingFaceEmbeddingServiceWithFallback(hf_token)
vector_store = HFDatasetVectorStore()
# Process each document
for file_path in policy_files:
# Read and chunk document
chunks = chunk_document(read_file(file_path))
# Generate embeddings
embeddings = embedding_service.embed_batch([chunk.content for chunk in chunks])
# Store with metadata
for chunk, embedding in zip(chunks, embeddings):
vector_store.add_embedding(
embedding=embedding,
content=chunk.content,
metadata={
"source_file": os.path.basename(file_path),
"chunk_index": chunk.index,
"category": chunk.category
}
)
Configuration Override System
Triple-Layer Override Architecture
To ensure HuggingFace services are used even when OpenAI environment variables exist, we implement a comprehensive override system:
Layer 1: Configuration Override
# src/config.py
if os.getenv("HF_TOKEN"):
USE_OPENAI_EMBEDDING = False
print("π€ HF_TOKEN detected - forcing HuggingFace services")
Layer 2: App Factory Override
# src/app_factory.py
def get_rag_pipeline():
hf_token = os.getenv("HF_TOKEN")
if hf_token:
# Force HF services regardless of other configuration
return create_hf_rag_pipeline(hf_token)
else:
# Fall back to configured services
return create_default_rag_pipeline()
Layer 3: Startup Override
# src/app_factory.py
def ensure_embeddings_on_startup():
if os.getenv("HF_TOKEN"):
# HF services don't need startup embedding checks
print("π€ HF services detected - skipping startup checks")
return
# Continue with standard startup checks
Data Flow Architecture
Query Processing Flow
User Query β Query Expansion β Embedding Generation β Vector Search β
Context Assembly β LLM Generation β Response Formatting β Citation Extraction
- Query Reception: User submits question via web interface or API
- Query Expansion: Enhance query with synonyms and domain terms
- Embedding Generation: Generate 1024-dimensional embedding via HF API
- Vector Search: Cosine similarity search in HF Dataset
- Context Assembly: Combine relevant chunks with metadata
- LLM Generation: Generate response via HF Inference API
- Response Formatting: Format with citations and confidence scores
- Citation Extraction: Extract and validate source attributions
Document Processing Flow
Policy Documents β Text Extraction β Chunking β Embedding Generation β
Metadata Creation β Dataset Storage β Index Building
- Document Discovery: Scan
synthetic_policies/directory - Text Extraction: Read markdown content with metadata preservation
- Intelligent Chunking: Split into semantic chunks with overlap
- Embedding Generation: Batch process via HF Inference API
- Metadata Creation: Preserve source, category, and structural information
- Dataset Storage: Store in HuggingFace Dataset with JSON serialization
- Index Building: Build search indices for efficient retrieval
Service Integration Patterns
HuggingFace Service Discovery
def detect_hf_environment():
"""Detect HuggingFace environment and configure services"""
hf_token = os.getenv("HF_TOKEN")
if hf_token:
return {
"embedding_service": "huggingface_inference_api",
"vector_store": "huggingface_dataset",
"llm_service": "huggingface_inference_api",
"deployment": "huggingface_spaces"
}
else:
return {
"embedding_service": "local_onnx",
"vector_store": "chromadb",
"llm_service": "openrouter",
"deployment": "local"
}
Error Handling and Resilience
class HFServiceWithFallback:
"""Base class for HF services with fallback support"""
def __init__(self, hf_token: str):
self.hf_token = hf_token
self.fallback_service = None
def call_with_retry(self, func, max_retries=3):
"""Call HF API with exponential backoff"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
# Use fallback service if available
if self.fallback_service:
return self.fallback_service.call(func)
raise e
time.sleep(2 ** attempt)
Performance Optimization
Caching Strategy
- Service Caching: Cache initialized services for request reuse
- Embedding Caching: Cache frequently requested embeddings
- Search Result Caching: Cache popular queries and results
- Model Caching: Cache downloaded models for faster startup
Memory Management
- Batch Processing: Process documents in memory-efficient batches
- Lazy Loading: Load services only when needed
- Garbage Collection: Explicit cleanup after processing operations
- Resource Monitoring: Track memory usage and trigger cleanup
API Optimization
- Request Batching: Batch multiple embedding requests
- Connection Pooling: Reuse HTTP connections to HF APIs
- Response Caching: Cache API responses for duplicate requests
- Rate Limiting: Respect HF API rate limits with backoff
Security and Privacy
API Security
- Token Management: Secure HF_TOKEN handling and rotation
- Request Validation: Validate all inputs before processing
- Rate Limiting: Prevent abuse with request throttling
- CORS Configuration: Secure cross-origin request handling
Data Privacy
- Local Processing: No sensitive data sent to external APIs
- Metadata Sanitization: Remove PII from document metadata
- Query Logging: Optional query logging with privacy controls
- Secure Storage: Encrypt sensitive configuration data
Deployment Architecture
HuggingFace Spaces Deployment
# HuggingFace Spaces Configuration
title: "MSSE AI Engineering - HuggingFace Edition"
emoji: "π§ "
sdk: "docker"
python_version: "3.11"
suggested_hardware: "cpu-basic"
app_port: 8080
Local Development Setup
# Environment Configuration
export HF_TOKEN="your_hf_token"
export FLASK_ENV="development"
export LOG_LEVEL="DEBUG"
# Service Initialization
python app.py # Automatic HF service detection and setup
Production Considerations
- Resource Scaling: Monitor HF API usage and scale accordingly
- Backup Strategy: Regular backup of HF Dataset storage
- Monitoring: Comprehensive health monitoring and alerting
- Update Strategy: Automated updates for models and dependencies
Monitoring and Observability
Health Monitoring
def get_system_health():
"""Comprehensive system health check"""
return {
"services": {
"hf_embedding_api": check_hf_embedding_api(),
"hf_inference_api": check_hf_inference_api(),
"hf_dataset_store": check_hf_dataset_store()
},
"configuration": {
"use_openai_embedding": False,
"hf_token_configured": bool(os.getenv("HF_TOKEN")),
"embedding_model": "intfloat/multilingual-e5-large",
"embedding_dimensions": 1024
},
"statistics": {
"total_documents": get_document_count(),
"vector_store_size": get_vector_count(),
"average_response_time": get_avg_response_time()
}
}
Performance Metrics
- Response Time: Track API response times and latency
- Throughput: Monitor requests per second and processing capacity
- Error Rate: Track API errors and failure rates
- Resource Usage: Monitor memory, CPU, and network usage
Logging Strategy
import logging
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
# Component-specific loggers
embedding_logger = logging.getLogger('embedding_service')
vector_store_logger = logging.getLogger('vector_store')
rag_pipeline_logger = logging.getLogger('rag_pipeline')
Testing Architecture
Test Strategy
- Unit Tests: Individual component testing with mocks
- Integration Tests: Service interaction testing
- End-to-End Tests: Complete workflow testing
- Performance Tests: Load and stress testing
Test Structure
tests/
βββ unit/
β βββ test_embedding_service.py
β βββ test_vector_store.py
β βββ test_rag_pipeline.py
βββ integration/
β βββ test_hf_services_integration.py
β βββ test_document_processing.py
βββ e2e/
βββ test_chat_workflow.py
βββ test_search_workflow.py
Future Architecture Considerations
Scalability Enhancements
- Microservices: Split into independent services
- Load Balancing: Distribute requests across multiple instances
- Caching Layer: Add Redis for distributed caching
- Database Sharding: Partition large document collections
Feature Extensions
- Multi-modal Support: Add support for images and PDFs
- Real-time Updates: Live document updates and reprocessing
- Custom Models: Support for fine-tuned domain-specific models
- Advanced Analytics: Query analytics and usage insights
This architecture provides a robust, scalable, and cost-effective foundation for the PolicyWise RAG application using HuggingFace's free-tier services.