import os import time from typing import Any from dotenv import load_dotenv from huggingface_hub import InferenceClient from config_loader import cfg from data.vector_db import get_index_by_name from retriever.generator import RAGGenerator from retriever.processor import ChunkProcessor from retriever.retriever import HybridRetriever from backend.services.cache import get_cache_settings, load_cached_chunks from backend.services.models import build_models from backend.services.title import parse_title_model_candidates # main file for initializing the runtime. Actual defines the # pipeline objects, like retriever, generator and models # i think i def initialize_runtime_state(state: dict[str, Any]) -> None: startup_start = time.perf_counter() dotenv_start = time.perf_counter() load_dotenv() dotenv_time = time.perf_counter() - dotenv_start env_start = time.perf_counter() hf_token = os.getenv("HF_TOKEN") pinecone_api_key = os.getenv("PINECONE_API_KEY") env_time = time.perf_counter() - env_start if not pinecone_api_key: raise RuntimeError("PINECONE_API_KEY not found in environment variables") if not hf_token: raise RuntimeError("HF_TOKEN not found in environment variables") index_name = "cbt-book-recursive" embed_model_name = cfg.processing.get("embedding_model", "all-MiniLM-L6-v2") rerank_model_name = os.getenv( "RERANK_MODEL_NAME", cfg.retrieval.get("rerank_model", "mixedbread-ai/mxbai-rerank-base-v1"), ) cache_dir, force_cache_refresh = get_cache_settings() index_start = time.perf_counter() index = get_index_by_name(api_key=pinecone_api_key, index_name=index_name) index_time = time.perf_counter() - index_start chunks_start = time.perf_counter() final_chunks, chunk_source = load_cached_chunks( index=index, index_name=index_name, cache_dir=cache_dir, force_cache_refresh=force_cache_refresh, ) chunk_load_time = time.perf_counter() - chunks_start if not final_chunks: raise RuntimeError("No chunks found in Pinecone metadata. Run indexing once before API mode.") processor_start = time.perf_counter() proc = ChunkProcessor(model_name=embed_model_name, verbose=False, load_hf_embeddings=False) processor_time = time.perf_counter() - processor_start retriever_start = time.perf_counter() retriever = HybridRetriever( proc.encoder, rerank_model_name=rerank_model_name, verbose=False, ) retriever_time = time.perf_counter() - retriever_start rag_start = time.perf_counter() rag_engine = RAGGenerator() rag_time = time.perf_counter() - rag_start models_start = time.perf_counter() models = build_models(hf_token) models_time = time.perf_counter() - models_start state_start = time.perf_counter() chunk_lookup: dict[str, dict[str, Any]] = {} for chunk in final_chunks: metadata = chunk.get("metadata", {}) text = metadata.get("text") if not text or text in chunk_lookup: continue meta_without_text = {k: v for k, v in metadata.items() if k != "text"} meta_without_text["title"] = metadata.get("title", "Untitled") meta_without_text["url"] = metadata.get("url", "") meta_without_text["chunk_index"] = metadata.get("chunk_index") chunk_lookup[text] = meta_without_text state["index"] = index state["retriever"] = retriever state["rag_engine"] = rag_engine state["models"] = models state["chunk_lookup"] = chunk_lookup state["title_model_ids"] = parse_title_model_candidates() state["title_client"] = InferenceClient(token=hf_token) state_time = time.perf_counter() - state_start startup_time = time.perf_counter() - startup_start print( f"API startup complete | chunks={len(final_chunks)} | " f"dotenv={dotenv_time:.3f}s | " f"env={env_time:.3f}s | " f"index={index_time:.3f}s | " f"cache_dir={cache_dir} | " f"force_cache_refresh={force_cache_refresh} | " f"chunk_source={chunk_source} | " f"chunk_load={chunk_load_time:.3f}s | " f"processor={processor_time:.3f}s | " f"rerank_model={rerank_model_name} | " f"retriever={retriever_time:.3f}s | " f"rag={rag_time:.3f}s | " f"models={models_time:.3f}s | " f"state={state_time:.3f}s | " f"total={startup_time:.3f}s" )