""" Resource manager to handle loading and persisting data across requests. """ import pickle import faiss import os import sys import traceback class ResourceManager: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(ResourceManager, cls).__new__(cls) cls._instance.faiss_index = None cls._instance.doc_chunks = None cls._instance.embedding_vectors = None cls._instance.initialized = False cls._instance.director = None return cls._instance def load_resources(self): """Load all required resources""" if self.initialized: print("Resources already loaded, skipping...") return True success = True # Load FAISS index try: print("Loading FAISS index...") self.faiss_index = faiss.read_index("embeddings/faiss_index.index") print("FAISS index loaded successfully") except Exception as e: print(f"Error loading FAISS index: {e}") success = False # Load document chunks try: print("Loading document chunks...") with open("data/doc_chunks.pkl", "rb") as f: self.doc_chunks = pickle.load(f) print(f"Loaded {len(self.doc_chunks)} document chunks") except Exception as e: print(f"Error loading document chunks: {e}") success = False # Load embeddings if available try: print("Loading embeddings...") with open("embeddings/embeddings.pkl", "rb") as f: self.embedding_vectors = pickle.load(f) print("Embeddings loaded successfully") except Exception as e: print(f"Error loading embeddings: {e}") # This is not critical, so don't set success to False if success: self.initialized = True print("All critical resources loaded successfully!") return success def get_faiss_index(self): """Get the FAISS index, loading if necessary""" if not self.initialized: self.load_resources() return self.faiss_index def get_doc_chunks(self): """Get the document chunks, loading if necessary""" if not self.initialized: self.load_resources() return self.doc_chunks def get_embedding_vectors(self): """Get the embedding vectors, loading if necessary""" if not self.initialized: self.load_resources() return self.embedding_vectors def get_director(self): """Get the agent director, initializing if necessary""" return self.director def set_director(self, director): """Set the agent director""" self.director = director def check_data_files(): """Check if required data files exist and download if needed.""" data_files = [ "embeddings/faiss_index.index", "data/doc_chunks.pkl", "embeddings/embeddings.pkl" ] missing_files = [f for f in data_files if not os.path.exists(f)] data_ready = True if missing_files: print(f"Missing data files: {missing_files}") print("Downloading or creating required data files...") try: import subprocess result = subprocess.run( [sys.executable, "download_from_hub.py"], check=False, capture_output=True, text=True ) print(result.stdout) if result.returncode != 0: print(f"Warning: Data preparation finished with return code {result.returncode}") print(f"Error output: {result.stderr}") data_ready = False except Exception as e: print(f"Error preparing data: {e}") traceback.print_exc() data_ready = False # Verify files exist before importing modules that need them if not all(os.path.exists(f) for f in data_files): print("Warning: Some required data files are still missing.") print("The application may not function correctly.") data_ready = False else: # Load resources into the resource manager resource_manager = ResourceManager() data_ready = resource_manager.load_resources() return data_ready