ai-engineering-project / docs /TECHNICAL_ARCHITECTURE.md
GitHub Action
Clean deployment without binary files
f884e6e

Technical Architecture - HuggingFace Edition

System Overview

This document describes the technical architecture of the HuggingFace-powered RAG application for corporate policy analysis.

Architecture Diagram

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   PolicyWise RAG Application                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      Web Interface Layer                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Flask App Factory  β”‚  Chat API  β”‚  Search API  β”‚  Health API   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      RAG Pipeline Layer                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   Query Processing  β”‚  Context Assembly  β”‚  Response Generation β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    HuggingFace Services Layer                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  HF Embedding API   β”‚  HF Dataset Store  β”‚  HF Inference API    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                     Document Processing Layer                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚   Document Parser   β”‚   Text Chunker    β”‚   Metadata Manager   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

1. Application Layer

Flask App Factory (src/app_factory.py)

  • Purpose: Flask application factory with lazy service loading
  • Key Features:
    • Triple-layer HuggingFace service override system
    • Cached service initialization for performance
    • Memory-optimized startup with on-demand loading
    • Comprehensive health monitoring
def create_app():
    """Create Flask app with HuggingFace services"""
    app = Flask(__name__)

    # Force HuggingFace services when HF_TOKEN available
    @app.before_first_request
    def init_services():
        if os.getenv("HF_TOKEN"):
            ensure_hf_services()

    return app

Configuration Management (src/config.py)

  • Purpose: Centralized configuration with HF override logic
  • Key Features:
    • Automatic HF_TOKEN detection
    • Configuration precedence management
    • Environment-specific settings
    • Debug logging for configuration decisions

2. RAG Pipeline Layer

RAG Pipeline (src/rag/rag_pipeline.py)

  • Purpose: Orchestrates the complete retrieval-augmented generation workflow
  • Key Components:
    • Query processing and expansion
    • Vector similarity search coordination
    • Context assembly and optimization
    • Response generation and citation formatting
    • Source attribution with metadata lookup
class RAGPipeline:
    def __init__(self, embedding_service, vector_store, llm_service):
        self.embedding_service = embedding_service  # HF Embedding API
        self.vector_store = vector_store            # HF Dataset Store
        self.llm_service = llm_service              # HF Inference API

    def process_query(self, query: str) -> dict:
        """Complete RAG workflow with HF services"""
        # 1. Generate query embedding
        query_embedding = self.embedding_service.embed_text(query)

        # 2. Search vector store
        results = self.vector_store.search(query_embedding, top_k=5)

        # 3. Assemble context
        context = self._assemble_context(results)

        # 4. Generate response
        response = self.llm_service.generate_response(query, context)

        # 5. Format with citations
        return self._format_response_with_citations(response, results)

3. HuggingFace Services Layer

HuggingFace Embedding Service (src/embedding/hf_embedding_with_fallback.py)

  • Model: intfloat/multilingual-e5-large
  • Dimensions: 1024
  • Features:
    • HuggingFace Inference API integration
    • Automatic batching for efficiency
    • Local ONNX fallback for development
    • Memory-optimized processing
class HuggingFaceEmbeddingServiceWithFallback:
    def __init__(self, hf_token: str):
        self.hf_token = hf_token
        self.model_name = "intfloat/multilingual-e5-large"
    self.api_url = f"https://router.huggingface.co/hf-inference/models/{self.model_name}"

    def embed_text(self, text: str) -> List[float]:
        """Generate embedding using HF Inference API"""
        response = requests.post(
            self.api_url,
            headers={"Authorization": f"Bearer {self.hf_token}"},
            json={"inputs": text}
        )
        return response.json()

HuggingFace Dataset Vector Store (src/vector_store/hf_dataset_store.py)

  • Purpose: Persistent vector storage using HuggingFace Datasets
  • Features:
    • JSON string serialization for complex metadata
    • Cosine similarity search with native operations
    • Parquet and JSON fallback storage
    • Complete interface compatibility
class HFDatasetVectorStore:
    def __init__(self, dataset_name: str = "policy-vectors"):
        self.dataset_name = dataset_name
        self.dataset = None

    def search(self, query_embedding: List[float], top_k: int = 5) -> List[dict]:
        """Cosine similarity search using HF Dataset operations"""
        # Calculate cosine similarities
        similarities = cosine_similarity([query_embedding], embeddings)[0]

        # Get top-k results
        top_indices = np.argsort(similarities)[-top_k:][::-1]

        return [
            {
                "content": self.dataset[idx]["content"],
                "metadata": json.loads(self.dataset[idx]["metadata"]),
                "similarity_score": float(similarities[idx])
            }
            for idx in top_indices
        ]

HuggingFace LLM Service (src/llm/hf_llm_service.py)

  • Model: meta-llama/Meta-Llama-3-8B-Instruct
  • Features:
    • HuggingFace Inference API integration
    • Automatic prompt formatting
    • Response parsing and validation
    • Built-in safety filtering

4. Document Processing Layer

Document Processing Pipeline (scripts/hf_process_documents.py)

  • Purpose: Automated document ingestion and embedding generation
  • Workflow:
    1. Read policy documents from synthetic_policies/
    2. Split into semantic chunks with overlap
    3. Generate embeddings via HF Inference API
    4. Store in HuggingFace Dataset with metadata
    5. Validate processing and report statistics
def process_documents():
    """Process all policy documents using HF services"""
    # Initialize HF services
    embedding_service = HuggingFaceEmbeddingServiceWithFallback(hf_token)
    vector_store = HFDatasetVectorStore()

    # Process each document
    for file_path in policy_files:
        # Read and chunk document
        chunks = chunk_document(read_file(file_path))

        # Generate embeddings
        embeddings = embedding_service.embed_batch([chunk.content for chunk in chunks])

        # Store with metadata
        for chunk, embedding in zip(chunks, embeddings):
            vector_store.add_embedding(
                embedding=embedding,
                content=chunk.content,
                metadata={
                    "source_file": os.path.basename(file_path),
                    "chunk_index": chunk.index,
                    "category": chunk.category
                }
            )

Configuration Override System

Triple-Layer Override Architecture

To ensure HuggingFace services are used even when OpenAI environment variables exist, we implement a comprehensive override system:

Layer 1: Configuration Override

# src/config.py
if os.getenv("HF_TOKEN"):
    USE_OPENAI_EMBEDDING = False
    print("πŸ€— HF_TOKEN detected - forcing HuggingFace services")

Layer 2: App Factory Override

# src/app_factory.py
def get_rag_pipeline():
    hf_token = os.getenv("HF_TOKEN")
    if hf_token:
        # Force HF services regardless of other configuration
        return create_hf_rag_pipeline(hf_token)
    else:
        # Fall back to configured services
        return create_default_rag_pipeline()

Layer 3: Startup Override

# src/app_factory.py
def ensure_embeddings_on_startup():
    if os.getenv("HF_TOKEN"):
        # HF services don't need startup embedding checks
        print("πŸ€— HF services detected - skipping startup checks")
        return
    # Continue with standard startup checks

Data Flow Architecture

Query Processing Flow

User Query β†’ Query Expansion β†’ Embedding Generation β†’ Vector Search β†’
Context Assembly β†’ LLM Generation β†’ Response Formatting β†’ Citation Extraction
  1. Query Reception: User submits question via web interface or API
  2. Query Expansion: Enhance query with synonyms and domain terms
  3. Embedding Generation: Generate 1024-dimensional embedding via HF API
  4. Vector Search: Cosine similarity search in HF Dataset
  5. Context Assembly: Combine relevant chunks with metadata
  6. LLM Generation: Generate response via HF Inference API
  7. Response Formatting: Format with citations and confidence scores
  8. Citation Extraction: Extract and validate source attributions

Document Processing Flow

Policy Documents β†’ Text Extraction β†’ Chunking β†’ Embedding Generation β†’
Metadata Creation β†’ Dataset Storage β†’ Index Building
  1. Document Discovery: Scan synthetic_policies/ directory
  2. Text Extraction: Read markdown content with metadata preservation
  3. Intelligent Chunking: Split into semantic chunks with overlap
  4. Embedding Generation: Batch process via HF Inference API
  5. Metadata Creation: Preserve source, category, and structural information
  6. Dataset Storage: Store in HuggingFace Dataset with JSON serialization
  7. Index Building: Build search indices for efficient retrieval

Service Integration Patterns

HuggingFace Service Discovery

def detect_hf_environment():
    """Detect HuggingFace environment and configure services"""
    hf_token = os.getenv("HF_TOKEN")

    if hf_token:
        return {
            "embedding_service": "huggingface_inference_api",
            "vector_store": "huggingface_dataset",
            "llm_service": "huggingface_inference_api",
            "deployment": "huggingface_spaces"
        }
    else:
        return {
            "embedding_service": "local_onnx",
            "vector_store": "chromadb",
            "llm_service": "openrouter",
            "deployment": "local"
        }

Error Handling and Resilience

class HFServiceWithFallback:
    """Base class for HF services with fallback support"""

    def __init__(self, hf_token: str):
        self.hf_token = hf_token
        self.fallback_service = None

    def call_with_retry(self, func, max_retries=3):
        """Call HF API with exponential backoff"""
        for attempt in range(max_retries):
            try:
                return func()
            except Exception as e:
                if attempt == max_retries - 1:
                    # Use fallback service if available
                    if self.fallback_service:
                        return self.fallback_service.call(func)
                    raise e
                time.sleep(2 ** attempt)

Performance Optimization

Caching Strategy

  1. Service Caching: Cache initialized services for request reuse
  2. Embedding Caching: Cache frequently requested embeddings
  3. Search Result Caching: Cache popular queries and results
  4. Model Caching: Cache downloaded models for faster startup

Memory Management

  1. Batch Processing: Process documents in memory-efficient batches
  2. Lazy Loading: Load services only when needed
  3. Garbage Collection: Explicit cleanup after processing operations
  4. Resource Monitoring: Track memory usage and trigger cleanup

API Optimization

  1. Request Batching: Batch multiple embedding requests
  2. Connection Pooling: Reuse HTTP connections to HF APIs
  3. Response Caching: Cache API responses for duplicate requests
  4. Rate Limiting: Respect HF API rate limits with backoff

Security and Privacy

API Security

  1. Token Management: Secure HF_TOKEN handling and rotation
  2. Request Validation: Validate all inputs before processing
  3. Rate Limiting: Prevent abuse with request throttling
  4. CORS Configuration: Secure cross-origin request handling

Data Privacy

  1. Local Processing: No sensitive data sent to external APIs
  2. Metadata Sanitization: Remove PII from document metadata
  3. Query Logging: Optional query logging with privacy controls
  4. Secure Storage: Encrypt sensitive configuration data

Deployment Architecture

HuggingFace Spaces Deployment

# HuggingFace Spaces Configuration
title: "MSSE AI Engineering - HuggingFace Edition"
emoji: "🧠"
sdk: "docker"
python_version: "3.11"
suggested_hardware: "cpu-basic"
app_port: 8080

Local Development Setup

# Environment Configuration
export HF_TOKEN="your_hf_token"
export FLASK_ENV="development"
export LOG_LEVEL="DEBUG"

# Service Initialization
python app.py  # Automatic HF service detection and setup

Production Considerations

  1. Resource Scaling: Monitor HF API usage and scale accordingly
  2. Backup Strategy: Regular backup of HF Dataset storage
  3. Monitoring: Comprehensive health monitoring and alerting
  4. Update Strategy: Automated updates for models and dependencies

Monitoring and Observability

Health Monitoring

def get_system_health():
    """Comprehensive system health check"""
    return {
        "services": {
            "hf_embedding_api": check_hf_embedding_api(),
            "hf_inference_api": check_hf_inference_api(),
            "hf_dataset_store": check_hf_dataset_store()
        },
        "configuration": {
            "use_openai_embedding": False,
            "hf_token_configured": bool(os.getenv("HF_TOKEN")),
            "embedding_model": "intfloat/multilingual-e5-large",
            "embedding_dimensions": 1024
        },
        "statistics": {
            "total_documents": get_document_count(),
            "vector_store_size": get_vector_count(),
            "average_response_time": get_avg_response_time()
        }
    }

Performance Metrics

  1. Response Time: Track API response times and latency
  2. Throughput: Monitor requests per second and processing capacity
  3. Error Rate: Track API errors and failure rates
  4. Resource Usage: Monitor memory, CPU, and network usage

Logging Strategy

import logging

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('app.log'),
        logging.StreamHandler()
    ]
)

# Component-specific loggers
embedding_logger = logging.getLogger('embedding_service')
vector_store_logger = logging.getLogger('vector_store')
rag_pipeline_logger = logging.getLogger('rag_pipeline')

Testing Architecture

Test Strategy

  1. Unit Tests: Individual component testing with mocks
  2. Integration Tests: Service interaction testing
  3. End-to-End Tests: Complete workflow testing
  4. Performance Tests: Load and stress testing

Test Structure

tests/
β”œβ”€β”€ unit/
β”‚   β”œβ”€β”€ test_embedding_service.py
β”‚   β”œβ”€β”€ test_vector_store.py
β”‚   └── test_rag_pipeline.py
β”œβ”€β”€ integration/
β”‚   β”œβ”€β”€ test_hf_services_integration.py
β”‚   └── test_document_processing.py
└── e2e/
    β”œβ”€β”€ test_chat_workflow.py
    └── test_search_workflow.py

Future Architecture Considerations

Scalability Enhancements

  1. Microservices: Split into independent services
  2. Load Balancing: Distribute requests across multiple instances
  3. Caching Layer: Add Redis for distributed caching
  4. Database Sharding: Partition large document collections

Feature Extensions

  1. Multi-modal Support: Add support for images and PDFs
  2. Real-time Updates: Live document updates and reprocessing
  3. Custom Models: Support for fine-tuned domain-specific models
  4. Advanced Analytics: Query analytics and usage insights

This architecture provides a robust, scalable, and cost-effective foundation for the PolicyWise RAG application using HuggingFace's free-tier services.