|
|
"""Resource manager for lazy-loaded application resources. |
|
|
|
|
|
This module provides a ResourceManager class that handles lazy initialization |
|
|
and caching of heavy application resources like the retriever and settings. |
|
|
Resources are NOT loaded on application startup - they are loaded on the first |
|
|
request that needs them, then cached for subsequent requests. |
|
|
|
|
|
Key Features: |
|
|
- **Deferred Loading**: Resources load on first request, not startup |
|
|
- **Caching**: Once loaded, resources are cached for fast access |
|
|
- **Concurrent Load Protection**: Prevents multiple simultaneous loads |
|
|
- **Artifact Download**: Downloads RAG artifacts from HuggingFace if needed |
|
|
- **Metrics Tracking**: Tracks startup time and memory usage |
|
|
- **Ready State**: Exposes ready state for health check endpoints |
|
|
|
|
|
Performance Targets: |
|
|
- Cold start (first request): < 30 seconds |
|
|
- Warm start (subsequent requests): < 5ms (cached) |
|
|
- Memory usage: Tracked and logged after loading |
|
|
|
|
|
Architecture: |
|
|
The ResourceManager is a singleton accessed via get_resource_manager(). |
|
|
It stores: |
|
|
- Settings: Application configuration from environment |
|
|
- Retriever: HybridRetriever wrapped with optional reranking |
|
|
|
|
|
The loading pipeline includes artifact download from HuggingFace: |
|
|
1. Load Settings from environment variables |
|
|
2. Download/verify artifacts via ArtifactDownloader (Step 7.7) |
|
|
3. Create retriever using factory function |
|
|
4. Record metrics (duration, memory) |
|
|
|
|
|
Lazy Loading Strategy: |
|
|
All heavy dependencies (torch, faiss, sentence-transformers) are imported |
|
|
inside methods rather than at module level. This ensures: |
|
|
- Fast module import time |
|
|
- Minimal memory usage until resources are needed |
|
|
- Clean separation between import and initialization |
|
|
|
|
|
Usage: |
|
|
The ResourceManager is used by route handlers to access shared resources: |
|
|
|
|
|
>>> from rag_chatbot.api.resources import get_resource_manager |
|
|
>>> |
|
|
>>> async def query_handler(query: str): |
|
|
... manager = get_resource_manager() |
|
|
... await manager.ensure_loaded() # Lazy load if needed |
|
|
... retriever = manager.get_retriever() |
|
|
... results = retriever.retrieve(query) |
|
|
... return results |
|
|
|
|
|
Integration with Health Checks: |
|
|
The /health/ready endpoint uses is_ready() to report whether the |
|
|
application is ready to serve requests: |
|
|
|
|
|
>>> manager = get_resource_manager() |
|
|
>>> if manager.is_ready(): |
|
|
... return {"ready": True} |
|
|
... else: |
|
|
... return {"ready": False} |
|
|
|
|
|
See Also |
|
|
-------- |
|
|
Settings : Configuration module |
|
|
Application configuration (src/rag_chatbot/config/settings.py) |
|
|
RetrieverWithReranker : Retriever wrapper |
|
|
Retriever wrapper (src/rag_chatbot/retrieval/factory.py) |
|
|
ArtifactDownloader : Artifact downloader |
|
|
Downloads artifacts from HuggingFace (artifact_downloader.py) |
|
|
_lifespan : Lifecycle manager |
|
|
Application lifecycle (src/rag_chatbot/api/main.py) |
|
|
|
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
import time |
|
|
from typing import TYPE_CHECKING |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from rag_chatbot.config.settings import Settings |
|
|
from rag_chatbot.retrieval.factory import RetrieverWithReranker |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__: list[str] = ["ResourceManager", "get_resource_manager"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_COLD_START_WARNING_THRESHOLD_MS: int = 30000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_resource_manager: ResourceManager | None = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResourceManager: |
|
|
"""Manager for lazy-loaded application resources. |
|
|
|
|
|
This class handles the lifecycle of heavy application resources like the |
|
|
retriever and settings. Resources are loaded lazily on first request and |
|
|
cached for subsequent access. |
|
|
|
|
|
Design Principles: |
|
|
1. **Lazy Loading**: Resources load on first access, not initialization |
|
|
2. **Thread-Safe**: Uses asyncio.Lock to prevent concurrent loads |
|
|
3. **Metrics Tracking**: Records load time and memory usage |
|
|
4. **Ready State**: Tracks whether resources are loaded for health checks |
|
|
|
|
|
Resource Lifecycle: |
|
|
1. ResourceManager is instantiated (empty - no resources loaded) |
|
|
2. First request calls ensure_loaded() |
|
|
3. Resources are loaded and cached (settings, retriever) |
|
|
4. Subsequent requests use cached resources (fast path) |
|
|
5. Application shutdown calls shutdown() for cleanup |
|
|
|
|
|
Attributes: |
|
|
---------- |
|
|
_retriever : RetrieverWithReranker | None |
|
|
The cached retriever instance. None until ensure_loaded() completes. |
|
|
|
|
|
_settings : Settings | None |
|
|
The cached settings instance. None until ensure_loaded() completes. |
|
|
|
|
|
_loaded : bool |
|
|
Whether resources have been successfully loaded. |
|
|
|
|
|
_loading : bool |
|
|
Whether a load operation is currently in progress. |
|
|
Used to prevent concurrent loads. |
|
|
|
|
|
_load_lock : asyncio.Lock |
|
|
Lock to ensure only one coroutine loads resources at a time. |
|
|
|
|
|
_load_start_time : float | None |
|
|
Timestamp when loading started (time.perf_counter()). |
|
|
Used to calculate load duration. |
|
|
|
|
|
_load_duration_ms : int | None |
|
|
Time taken to load resources in milliseconds. |
|
|
Logged for monitoring cold start performance. |
|
|
|
|
|
_memory_mb : float | None |
|
|
Process memory usage after loading in megabytes. |
|
|
Logged for monitoring resource consumption. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> retriever = manager.get_retriever() |
|
|
>>> results = retriever.retrieve("What is PMV?") |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This class should not be instantiated directly. Use get_resource_manager() |
|
|
to get the singleton instance. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self) -> None: |
|
|
"""Initialize the ResourceManager with empty state. |
|
|
|
|
|
Creates a new ResourceManager with no resources loaded. Resources are |
|
|
loaded lazily when ensure_loaded() is called. |
|
|
|
|
|
The constructor does NOT import any heavy dependencies. All imports |
|
|
of torch, faiss, sentence-transformers, etc. happen inside methods |
|
|
when resources are actually loaded. |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This constructor should only be called by get_resource_manager(). |
|
|
Direct instantiation is discouraged to maintain singleton pattern. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._retriever: RetrieverWithReranker | None = None |
|
|
self._settings: Settings | None = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._loaded: bool = False |
|
|
self._loading: bool = False |
|
|
self._load_lock: asyncio.Lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._load_start_time: float | None = None |
|
|
self._load_duration_ms: int | None = None |
|
|
self._memory_mb: float | None = None |
|
|
|
|
|
logger.debug("ResourceManager initialized (resources not yet loaded)") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_memory_usage_mb(self) -> float: |
|
|
"""Get current process memory usage in megabytes. |
|
|
|
|
|
Uses psutil to get the Resident Set Size (RSS) of the current process. |
|
|
RSS represents the actual physical memory used by the process. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
Current process memory usage in MB (megabytes). |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This requires psutil to be installed. If psutil is not available, |
|
|
returns 0.0 and logs a warning. |
|
|
|
|
|
Memory is measured after resource loading completes to track the |
|
|
impact of loading FAISS indexes, embeddings, and models. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> memory = self._get_memory_usage_mb() |
|
|
>>> print(f"Memory usage: {memory:.2f} MB") |
|
|
Memory usage: 512.34 MB |
|
|
|
|
|
""" |
|
|
try: |
|
|
import psutil |
|
|
|
|
|
process = psutil.Process() |
|
|
|
|
|
memory_bytes: int = process.memory_info().rss |
|
|
return float(memory_bytes) / (1024 * 1024) |
|
|
except ImportError: |
|
|
logger.warning( |
|
|
"psutil not installed - cannot measure memory usage. " |
|
|
"Install with: pip install psutil" |
|
|
) |
|
|
return 0.0 |
|
|
except Exception: |
|
|
|
|
|
logger.warning("Failed to get memory usage", exc_info=True) |
|
|
return 0.0 |
|
|
|
|
|
async def _load_resources(self) -> None: |
|
|
"""Load all application resources. |
|
|
|
|
|
This is the core loading method that initializes all heavy dependencies. |
|
|
It is called by ensure_loaded() when resources need to be loaded. |
|
|
|
|
|
Loading Steps: |
|
|
1. Load Settings from environment variables |
|
|
2. Download/verify artifacts from HuggingFace via ArtifactDownloader |
|
|
3. Create retriever using factory function |
|
|
4. Record metrics (duration, memory) |
|
|
|
|
|
The method imports heavy dependencies inside the function to ensure |
|
|
they are only loaded when actually needed, not at module import time. |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
RuntimeError: If loading fails for any reason, including: |
|
|
- Failed to download artifacts from HuggingFace |
|
|
- Failed to create retriever from artifacts |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This method assumes it is called while holding _load_lock. |
|
|
Do not call directly - use ensure_loaded() instead. |
|
|
|
|
|
The retriever itself performs lazy loading of its components |
|
|
(FAISS index, BM25 index, encoder model). The first retrieve() |
|
|
call will trigger additional loading. |
|
|
|
|
|
""" |
|
|
logger.info("Loading application resources...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Loading settings from environment") |
|
|
|
|
|
from rag_chatbot.config.settings import Settings |
|
|
|
|
|
self._settings = Settings() |
|
|
|
|
|
logger.debug( |
|
|
"Settings loaded: use_hybrid=%s, use_reranker=%s, top_k=%d", |
|
|
self._settings.use_hybrid, |
|
|
self._settings.use_reranker, |
|
|
self._settings.top_k, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug( |
|
|
"Ensuring artifacts are available (repo=%s, force_refresh=%s)", |
|
|
self._settings.hf_index_repo, |
|
|
self._settings.force_artifact_refresh, |
|
|
) |
|
|
|
|
|
|
|
|
from rag_chatbot.api.artifact_downloader import ( |
|
|
ArtifactDownloader, |
|
|
ArtifactDownloadError, |
|
|
) |
|
|
|
|
|
|
|
|
artifact_start_time = time.perf_counter() |
|
|
|
|
|
try: |
|
|
downloader = ArtifactDownloader(self._settings) |
|
|
artifact_path = await downloader.ensure_artifacts_available() |
|
|
except ArtifactDownloadError as e: |
|
|
|
|
|
logger.exception( |
|
|
"Failed to download artifacts from HuggingFace (repo=%s)", |
|
|
self._settings.hf_index_repo, |
|
|
) |
|
|
|
|
|
msg = ( |
|
|
f"Failed to download RAG artifacts from HuggingFace: {e}. " |
|
|
f"Check HF_TOKEN is valid, repo '{self._settings.hf_index_repo}' " |
|
|
"exists, and network connectivity to HuggingFace." |
|
|
) |
|
|
raise RuntimeError(msg) from e |
|
|
|
|
|
artifact_elapsed_ms = int((time.perf_counter() - artifact_start_time) * 1000) |
|
|
logger.info( |
|
|
"Artifact download/verification completed in %d ms, path: %s", |
|
|
artifact_elapsed_ms, |
|
|
artifact_path, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Validating dataset freshness...") |
|
|
|
|
|
|
|
|
from rag_chatbot.api.freshness import ( |
|
|
FreshnessValidationError, |
|
|
FreshnessValidator, |
|
|
) |
|
|
|
|
|
freshness_validator = FreshnessValidator(artifact_path, self._settings) |
|
|
|
|
|
try: |
|
|
manifest = freshness_validator.validate() |
|
|
|
|
|
if manifest is not None: |
|
|
logger.info( |
|
|
"Dataset validated: index_version=%s, schema_version=%s", |
|
|
manifest.index_version, |
|
|
manifest.schema_version, |
|
|
) |
|
|
else: |
|
|
|
|
|
logger.info( |
|
|
"Dataset loaded with legacy manifest format (validation skipped)" |
|
|
) |
|
|
except FreshnessValidationError as e: |
|
|
|
|
|
logger.exception( |
|
|
"Dataset freshness validation failed - server cannot start" |
|
|
) |
|
|
msg = ( |
|
|
f"Dataset freshness validation failed: {e}. " |
|
|
f"The server cannot start with incompatible or corrupt artifacts. " |
|
|
f"Repository: {self._settings.hf_index_repo}" |
|
|
) |
|
|
raise RuntimeError(msg) from e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug("Creating retriever from factory") |
|
|
|
|
|
from rag_chatbot.retrieval.factory import get_default_retriever |
|
|
|
|
|
self._retriever = get_default_retriever( |
|
|
index_path=artifact_path, |
|
|
settings=self._settings, |
|
|
) |
|
|
|
|
|
logger.debug( |
|
|
"Retriever created: type=%s, use_reranker=%s", |
|
|
type(self._retriever.retriever).__name__, |
|
|
self._retriever.use_reranker, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def ensure_loaded(self) -> None: |
|
|
"""Ensure all resources are loaded, loading them if necessary. |
|
|
|
|
|
This is the main entry point for resource loading. It implements |
|
|
lazy loading with the following behavior: |
|
|
|
|
|
1. If already loaded: Return immediately (fast path, < 1ms) |
|
|
2. If another coroutine is loading: Wait for it to complete |
|
|
3. If not loaded: Acquire lock and load resources |
|
|
|
|
|
The method uses an asyncio.Lock to ensure that only one coroutine |
|
|
performs the actual loading. Other concurrent calls will wait for |
|
|
the loading to complete rather than loading redundantly. |
|
|
|
|
|
Performance: |
|
|
- Warm path (already loaded): < 1ms |
|
|
- Cold path (first load): 10-30 seconds depending on index size |
|
|
- Concurrent path (waiting): Same as cold path + minor wait overhead |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
RuntimeError: If resource loading fails. The error is logged and |
|
|
re-raised. The manager remains in unloaded state for retry. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> await manager.ensure_loaded() # May take 10-30s on cold start |
|
|
>>> await manager.ensure_loaded() # Returns immediately (cached) |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This method is idempotent - calling it multiple times is safe. |
|
|
After the first successful load, subsequent calls return immediately. |
|
|
|
|
|
If loading fails, _loaded remains False and the next call will |
|
|
attempt to load again. This provides automatic retry behavior. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self._loaded: |
|
|
logger.debug("Resources already loaded (fast path)") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async with self._load_lock: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self._loaded: |
|
|
logger.debug("Resources loaded by another coroutine (waited)") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._loading = True |
|
|
self._load_start_time = time.perf_counter() |
|
|
|
|
|
try: |
|
|
|
|
|
await self._load_resources() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_end_time = time.perf_counter() |
|
|
self._load_duration_ms = int( |
|
|
(load_end_time - self._load_start_time) * 1000 |
|
|
) |
|
|
self._memory_mb = self._get_memory_usage_mb() |
|
|
|
|
|
|
|
|
|
|
|
if self._load_duration_ms > _COLD_START_WARNING_THRESHOLD_MS: |
|
|
logger.warning( |
|
|
"Resources loaded in %d ms (exceeds 30s target), " |
|
|
"memory: %.2f MB", |
|
|
self._load_duration_ms, |
|
|
self._memory_mb, |
|
|
) |
|
|
else: |
|
|
logger.info( |
|
|
"Resources loaded in %d ms, memory: %.2f MB", |
|
|
self._load_duration_ms, |
|
|
self._memory_mb, |
|
|
) |
|
|
|
|
|
|
|
|
self._loaded = True |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
elapsed_ms = int((time.perf_counter() - self._load_start_time) * 1000) |
|
|
logger.exception( |
|
|
"Failed to load resources after %d ms", |
|
|
elapsed_ms, |
|
|
) |
|
|
msg = f"Failed to load resources: {e}" |
|
|
raise RuntimeError(msg) from e |
|
|
|
|
|
finally: |
|
|
|
|
|
self._loading = False |
|
|
|
|
|
def is_ready(self) -> bool: |
|
|
"""Check if resources are loaded and ready for requests. |
|
|
|
|
|
This method is used by health check endpoints to report whether the |
|
|
application is ready to serve requests. An application is ready when: |
|
|
- Resources have been loaded successfully |
|
|
- Retriever is available for queries |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
True if resources are loaded and ready, False otherwise. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> manager.is_ready() |
|
|
False # Not yet loaded |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> manager.is_ready() |
|
|
True # Now ready |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This method does NOT trigger loading. Use ensure_loaded() to |
|
|
trigger lazy loading. This method only checks current state. |
|
|
|
|
|
The ready state is used by: |
|
|
- /health/ready endpoint for Kubernetes readiness probes |
|
|
- Load balancers to determine if instance can serve traffic |
|
|
|
|
|
""" |
|
|
return self._loaded |
|
|
|
|
|
def get_retriever(self) -> RetrieverWithReranker: |
|
|
"""Get the cached retriever instance. |
|
|
|
|
|
Returns the RetrieverWithReranker that was loaded by ensure_loaded(). |
|
|
This is used by query handlers to retrieve relevant documents. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
The cached RetrieverWithReranker instance. |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
RuntimeError: If called before ensure_loaded() completes. |
|
|
Always call ensure_loaded() first. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> retriever = manager.get_retriever() |
|
|
>>> results = retriever.retrieve("What is PMV?", top_k=5) |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This method does NOT trigger loading. It returns the cached |
|
|
instance or raises an error if not loaded. |
|
|
|
|
|
The retriever performs additional lazy loading on first retrieve() |
|
|
call (encoder model). This is handled internally by the retriever. |
|
|
|
|
|
""" |
|
|
if self._retriever is None: |
|
|
msg = ( |
|
|
"Retriever not loaded. Call ensure_loaded() first. " |
|
|
"This error indicates a programming bug - ensure_loaded() " |
|
|
"should be called before accessing resources." |
|
|
) |
|
|
raise RuntimeError(msg) |
|
|
|
|
|
return self._retriever |
|
|
|
|
|
def get_settings(self) -> Settings: |
|
|
"""Get the cached settings instance. |
|
|
|
|
|
Returns the Settings that were loaded by ensure_loaded(). |
|
|
This provides access to application configuration. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
The cached Settings instance. |
|
|
|
|
|
Raises: |
|
|
------ |
|
|
RuntimeError: If called before ensure_loaded() completes. |
|
|
Always call ensure_loaded() first. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> settings = manager.get_settings() |
|
|
>>> print(f"Using top_k={settings.top_k}") |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This method does NOT trigger loading. It returns the cached |
|
|
instance or raises an error if not loaded. |
|
|
|
|
|
For settings access before loading, create a new Settings() |
|
|
instance directly (but prefer using the cached one when available). |
|
|
|
|
|
""" |
|
|
if self._settings is None: |
|
|
msg = ( |
|
|
"Settings not loaded. Call ensure_loaded() first. " |
|
|
"This error indicates a programming bug - ensure_loaded() " |
|
|
"should be called before accessing resources." |
|
|
) |
|
|
raise RuntimeError(msg) |
|
|
|
|
|
return self._settings |
|
|
|
|
|
def get_load_stats(self) -> dict[str, int | float | bool | None]: |
|
|
"""Get loading statistics for monitoring and debugging. |
|
|
|
|
|
Returns a dictionary with metrics about resource loading: |
|
|
- loaded: Whether resources are loaded |
|
|
- loading: Whether loading is in progress |
|
|
- load_duration_ms: Time taken to load (ms), None if not loaded |
|
|
- memory_mb: Memory usage after loading (MB), None if not loaded |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
Dictionary with loading statistics. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> stats = manager.get_load_stats() |
|
|
>>> # Before loading: loaded=False, loading=False |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> stats = manager.get_load_stats() |
|
|
>>> # After loading: loaded=True, load_duration_ms=15234 |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This is primarily used for: |
|
|
- Health check endpoints to report startup metrics |
|
|
- Debugging slow startups |
|
|
- Monitoring memory consumption |
|
|
|
|
|
""" |
|
|
return { |
|
|
"loaded": self._loaded, |
|
|
"loading": self._loading, |
|
|
"load_duration_ms": self._load_duration_ms, |
|
|
"memory_mb": self._memory_mb, |
|
|
} |
|
|
|
|
|
async def shutdown(self) -> None: |
|
|
"""Clean up resources on application shutdown. |
|
|
|
|
|
This method is called during application shutdown to release resources |
|
|
and perform cleanup tasks: |
|
|
- Clear cached retriever reference |
|
|
- Clear cached settings reference |
|
|
- Log shutdown metrics |
|
|
|
|
|
The cleanup allows garbage collection of heavy objects (FAISS index, |
|
|
encoder model, etc.) and ensures clean shutdown. |
|
|
|
|
|
Note: |
|
|
---- |
|
|
After shutdown(), the manager can be reloaded by calling |
|
|
ensure_loaded() again. This supports restart scenarios. |
|
|
|
|
|
This method should be called from the application lifespan |
|
|
context manager's shutdown phase. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager = get_resource_manager() |
|
|
>>> await manager.ensure_loaded() |
|
|
>>> # ... serve requests ... |
|
|
>>> await manager.shutdown() # Clean up on exit |
|
|
|
|
|
See Also: |
|
|
-------- |
|
|
_lifespan in src/rag_chatbot/api/main.py for integration. |
|
|
|
|
|
""" |
|
|
logger.info("Shutting down ResourceManager...") |
|
|
|
|
|
|
|
|
if self._loaded: |
|
|
logger.info( |
|
|
"Final resource stats: load_duration=%s ms, memory=%s MB", |
|
|
self._load_duration_ms, |
|
|
self._memory_mb, |
|
|
) |
|
|
|
|
|
|
|
|
self._retriever = None |
|
|
self._settings = None |
|
|
self._loaded = False |
|
|
self._loading = False |
|
|
|
|
|
logger.info("ResourceManager shutdown complete") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_resource_manager() -> ResourceManager: |
|
|
"""Get or create the singleton ResourceManager instance. |
|
|
|
|
|
This function provides access to the global ResourceManager singleton. |
|
|
On first call, it creates the ResourceManager. Subsequent calls return |
|
|
the same instance. |
|
|
|
|
|
Returns: |
|
|
------- |
|
|
The singleton ResourceManager instance. |
|
|
|
|
|
Example: |
|
|
------- |
|
|
>>> manager1 = get_resource_manager() |
|
|
>>> manager2 = get_resource_manager() |
|
|
>>> manager1 is manager2 |
|
|
True |
|
|
|
|
|
Note: |
|
|
---- |
|
|
This function is thread-safe for access (single assignment). |
|
|
The ResourceManager itself uses asyncio.Lock for thread-safe loading. |
|
|
|
|
|
The singleton pattern ensures: |
|
|
- Shared state across route handlers |
|
|
- Resources loaded only once |
|
|
- Consistent metrics tracking |
|
|
|
|
|
""" |
|
|
global _resource_manager |
|
|
|
|
|
if _resource_manager is None: |
|
|
_resource_manager = ResourceManager() |
|
|
logger.debug("Created ResourceManager singleton") |
|
|
|
|
|
return _resource_manager |
|
|
|