awellis's picture
Implement proper async parallelism for multi-agent pipeline
e20e6e8
"""Configuration management for the RAG Email Assistant."""
import os
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
@dataclass
class OpenSearchConfig:
"""OpenSearch connection configuration."""
host: str
port: int
user: str
password: str
index_name: str
use_ssl: bool = True
verify_certs: bool = False
@classmethod
def from_env(cls) -> "OpenSearchConfig":
"""Create configuration from environment variables."""
return cls(
host=os.getenv("OPENSEARCH_HOST", "localhost"),
port=int(os.getenv("OPENSEARCH_PORT", "9200")),
user=os.getenv("OPENSEARCH_USER", "admin"),
password=os.getenv("OPENSEARCH_PASSWORD", ""),
index_name=os.getenv("INDEX_NAME", "bfh_admin_docs"),
use_ssl=os.getenv("OPENSEARCH_USE_SSL", "true").lower() == "true",
verify_certs=os.getenv("OPENSEARCH_VERIFY_CERTS", "false").lower() == "true",
)
@dataclass
class LLMConfig:
"""LLM configuration."""
api_key: str
model_name: str = "gpt-4o"
embedding_model: str = "text-embedding-3-small"
temperature: float = 0.7
max_tokens: int = 2000
@classmethod
def from_env(cls) -> "LLMConfig":
"""Create configuration from environment variables."""
api_key = os.getenv("OPENAI_API_KEY", "")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
return cls(
api_key=api_key,
model_name=os.getenv("LLM_MODEL", "gpt-4o"),
embedding_model=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"),
temperature=float(os.getenv("LLM_TEMPERATURE", "0.7")),
max_tokens=int(os.getenv("LLM_MAX_TOKENS", "2000")),
)
@dataclass
class DocumentProcessingConfig:
"""Document processing configuration."""
documents_path: str = "assets/markdown"
chunk_size: int = 300 # Target words per chunk
chunk_overlap: int = 50 # Words overlap between chunks
min_chunk_size: int = 100 # Minimum words per chunk
@classmethod
def from_env(cls) -> "DocumentProcessingConfig":
"""Create configuration from environment variables."""
return cls(
documents_path=os.getenv("DOCUMENTS_PATH", "assets/markdown"),
chunk_size=int(os.getenv("CHUNK_SIZE", "300")),
chunk_overlap=int(os.getenv("CHUNK_OVERLAP", "50")),
min_chunk_size=int(os.getenv("MIN_CHUNK_SIZE", "100")),
)
@dataclass
class RetrievalConfig:
"""Retrieval configuration."""
top_k: int = 5 # Number of documents to retrieve
bm25_weight: float = 0.5 # Weight for BM25 score
vector_weight: float = 0.5 # Weight for vector similarity score
min_score: float = 0.1 # Minimum relevance score threshold (lowered to be more permissive)
@classmethod
def from_env(cls) -> "RetrievalConfig":
"""Create configuration from environment variables."""
return cls(
top_k=int(os.getenv("RETRIEVAL_TOP_K", "5")),
bm25_weight=float(os.getenv("BM25_WEIGHT", "0.5")),
vector_weight=float(os.getenv("VECTOR_WEIGHT", "0.5")),
min_score=float(os.getenv("MIN_RELEVANCE_SCORE", "0.1")),
)
@dataclass
class AppConfig:
"""Main application configuration."""
llm: LLMConfig
document_processing: DocumentProcessingConfig
retrieval: RetrievalConfig
debug: bool = False
skip_fact_check: bool = False # Skip fact-checking for speed
# Async performance settings
use_parallel_processing: bool = True # Enable parallel agent execution
agent_timeout: int = 30 # Timeout per agent in seconds
@classmethod
def from_env(cls) -> "AppConfig":
"""Create complete configuration from environment variables."""
return cls(
llm=LLMConfig.from_env(),
document_processing=DocumentProcessingConfig.from_env(),
retrieval=RetrievalConfig.from_env(),
debug=os.getenv("DEBUG", "false").lower() == "true",
skip_fact_check=os.getenv("SKIP_FACT_CHECK", "false").lower() == "true",
use_parallel_processing=os.getenv("USE_PARALLEL", "true").lower() == "true",
agent_timeout=int(os.getenv("AGENT_TIMEOUT", "30")),
)
# Global configuration instance
_config: Optional[AppConfig] = None
def get_config() -> AppConfig:
"""Get or create the global configuration instance."""
global _config
if _config is None:
_config = AppConfig.from_env()
return _config
def reset_config():
"""Reset the global configuration instance (useful for testing)."""
global _config
_config = None