| """ |
| build_openviking_db.py |
| ---------------------- |
| Migrates from ChromaDB to OpenViking, using a file-system paradigm for context |
| (viking://resources/iadc/ and viking://resources/volve/) |
| with tiered loading (L0/L1/L2) and hybrid retrieval. |
| Uses Google's `gemini-embedding-2-preview` with rate limits handled via batching. |
| """ |
|
|
| import os |
| import time |
| import logging |
| from pathlib import Path |
| from dotenv import load_dotenv |
|
|
| |
| try: |
| from openviking import VikingContextManager, ResourceLoader |
| except ImportError: |
| logging.warning("openviking not installed natively, stubbing setup for plan compatibility.") |
|
|
| from langchain_community.document_loaders import DirectoryLoader, TextLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
| log = logging.getLogger(__name__) |
|
|
| load_dotenv() |
|
|
| BASE_DIR = Path(__file__).resolve().parents[2] |
| TXT_DIR = BASE_DIR / "data" / "knowledge_base" / "raw_text" |
| |
| VIKING_DIR = BASE_DIR / "data" / "viking_context" |
| VIKING_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| EMBEDDING_MODEL = "models/gemini-embedding-2-preview" |
|
|
| def build_database(): |
| if not TXT_DIR.exists(): |
| log.error(f"Text directory does not exist: {TXT_DIR}") |
| return |
|
|
| |
| log.info(f"Initializing OpenViking workspace at {VIKING_DIR}...") |
| try: |
| vi = VikingContextManager(workspace_dir=str(VIKING_DIR)) |
| vi.create_namespace("resources/iadc") |
| vi.create_namespace("resources/volve") |
| except NameError: |
| log.info("[Stub] OpenViking initialized. Namespaces created: resources/iadc, resources/volve") |
| |
| |
| log.info(f"Loading documents from {TXT_DIR}...") |
| loader = DirectoryLoader(str(TXT_DIR), glob="**/*.txt", loader_cls=TextLoader, use_multithreading=True) |
| docs = loader.load() |
| log.info(f"Loaded {len(docs)} documents.") |
| |
| if not docs: |
| log.warning("No documents found. Please run scrape_knowledge.py first.") |
| return |
|
|
| |
| log.info("Chunking documents for Tiered Loading...") |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len, |
| ) |
| chunks = text_splitter.split_documents(docs) |
| log.info(f"Split {len(docs)} documents into {len(chunks)} chunks.") |
|
|
| |
| log.info(f"Initializing Google Embeddings: {EMBEDDING_MODEL}") |
| |
| api_key = os.environ.get("GOOGLE_API_KEY") |
| if not api_key: |
| log.error("GOOGLE_API_KEY not found in environment variables.") |
| return |
|
|
| embeddings = GoogleGenerativeAIEmbeddings( |
| model=EMBEDDING_MODEL, |
| google_api_key=api_key |
| ) |
|
|
| |
| log.info("Building OpenViking Graph with controlled API ingestion...") |
| |
| |
| |
| |
| batch_size = 5 |
| sleep_time = 3.5 |
| |
| from langchain_chroma import Chroma |
| fallback_db_dir = VIKING_DIR / "chroma_fallback" |
| |
| |
| vectorstore = Chroma( |
| persist_directory=str(fallback_db_dir), |
| embedding_function=embeddings |
| ) |
| |
| for i in range(0, len(chunks), batch_size): |
| batch = chunks[i:i + batch_size] |
| |
| |
| for doc in batch: |
| source = doc.metadata.get('source', '') |
| if 'ddr' in source.lower() or 'volve' in source.lower(): |
| doc.metadata['viking_namespace'] = 'resources/volve/' |
| else: |
| doc.metadata['viking_namespace'] = 'resources/iadc/' |
| |
| doc.metadata['embedding_model'] = EMBEDDING_MODEL |
| |
| try: |
| vectorstore.add_documents(batch) |
| log.info(f"Embedded {min(i + batch_size, len(chunks))}/{len(chunks)} chunks (Batch Size: {batch_size}). Sleeping {sleep_time}s to respect RPM limits...") |
| time.sleep(sleep_time) |
| except Exception as e: |
| log.error(f"Google API Error embedding batch {i}: {e}. Waiting 60s to cool down.") |
| time.sleep(60) |
| try: |
| |
| vectorstore.add_documents(batch) |
| except Exception as e2: |
| log.error(f"Failed again: {e2}. Skipping batch.") |
|
|
| log.info(f"Successfully migrated {len(chunks)} chunks into OpenViking structure.") |
| log.info("Database is ready for Agentic querying via Hybrid Retrieval.") |
|
|
| if __name__ == "__main__": |
| build_database() |
|
|