#!/usr/bin/env python """ Knowledge Base Manager Manages multiple knowledge bases and provides utilities for accessing them. """ from contextlib import contextmanager from datetime import datetime import hashlib import json import os from pathlib import Path import shutil import sys from deeptutor.logging import get_logger from deeptutor.services.rag.components.routing import FileTypeRouter from deeptutor.services.rag.factory import DEFAULT_PROVIDER, LEGACY_PROVIDER_ALIASES, normalize_provider_name logger = get_logger("KnowledgeBaseManager") # Cross-platform file locking @contextmanager def file_lock_shared(file_handle): """Acquire a shared (read) lock on a file - cross-platform.""" if sys.platform == "win32": import msvcrt msvcrt.locking(file_handle.fileno(), msvcrt.LK_NBLCK, 1) try: yield finally: file_handle.seek(0) msvcrt.locking(file_handle.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl fcntl.flock(file_handle.fileno(), fcntl.LOCK_SH) try: yield finally: fcntl.flock(file_handle.fileno(), fcntl.LOCK_UN) @contextmanager def file_lock_exclusive(file_handle): """Acquire an exclusive (write) lock on a file - cross-platform.""" if sys.platform == "win32": import msvcrt msvcrt.locking(file_handle.fileno(), msvcrt.LK_NBLCK, 1) try: yield finally: file_handle.seek(0) msvcrt.locking(file_handle.fileno(), msvcrt.LK_UNLCK, 1) else: import fcntl fcntl.flock(file_handle.fileno(), fcntl.LOCK_EX) try: yield finally: fcntl.flock(file_handle.fileno(), fcntl.LOCK_UN) from deeptutor.services.path_service import PathService class KnowledgeBaseManager: """Manager for knowledge bases""" def __init__(self, base_dir=str(PathService.get_instance().get_knowledge_bases_dir())): self.base_dir = Path(base_dir) self.base_dir.mkdir(parents=True, exist_ok=True) # Config file to track knowledge bases self.config_file = self.base_dir / "kb_config.json" self.config = self._load_config() def _load_config(self) -> dict: """Load knowledge base configuration from the canonical kb_config.json file.""" if self.config_file.exists(): try: with open(self.config_file, encoding="utf-8") as f: with file_lock_shared(f): content = f.read() if not content.strip(): # Empty file, return default return {"knowledge_bases": {}} config = json.loads(content) # Ensure knowledge_bases key exists if "knowledge_bases" not in config: config["knowledge_bases"] = {} # Migration: remove old "default" field if present if "default" in config: del config["default"] # Note: Don't save during load to avoid recursion issues # The next _save_config() call will persist this change # Migration: normalize legacy providers to llamaindex and # mark legacy index-only KBs as needs_reindex. knowledge_bases = config.get("knowledge_bases", {}) config_changed = False for kb_name, kb_entry in knowledge_bases.items(): if not isinstance(kb_entry, dict): continue raw_provider = kb_entry.get("rag_provider") normalized_provider = normalize_provider_name(raw_provider or DEFAULT_PROVIDER) if kb_entry.get("rag_provider") != normalized_provider: kb_entry["rag_provider"] = normalized_provider config_changed = True if ( isinstance(raw_provider, str) and raw_provider.strip().lower() in LEGACY_PROVIDER_ALIASES ): if not kb_entry.get("needs_reindex", False): kb_entry["needs_reindex"] = True config_changed = True kb_dir = self.base_dir / kb_name legacy_storage = kb_dir / "rag_storage" llamaindex_storage = kb_dir / "llamaindex_storage" if legacy_storage.exists() and legacy_storage.is_dir() and not ( llamaindex_storage.exists() and llamaindex_storage.is_dir() ): if not kb_entry.get("needs_reindex", False): kb_entry["needs_reindex"] = True config_changed = True if config_changed: try: with open(self.config_file, "w", encoding="utf-8") as f: with file_lock_exclusive(f): json.dump(config, f, indent=2, ensure_ascii=False) f.flush() os.fsync(f.fileno()) except Exception as save_err: logger.warning(f"Failed to persist normalized KB config: {save_err}") return config except (json.JSONDecodeError, Exception) as e: logger.warning(f"Error loading config: {e}") return {"knowledge_bases": {}} return {"knowledge_bases": {}} def _save_config(self): """Save knowledge base configuration (thread-safe with file locking)""" # Use exclusive lock for writing with open(self.config_file, "w", encoding="utf-8") as f: with file_lock_exclusive(f): json.dump(self.config, f, indent=2, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Ensure data is written to disk def update_kb_status( self, name: str, status: str, progress: dict | None = None, ): """ Update knowledge base status and progress in kb_config.json. Args: name: Knowledge base name status: Status string ("initializing", "processing", "ready", "error") progress: Optional progress dict with keys like: - stage: Current stage name - message: Human-readable message - percent: Progress percentage (0-100) - current: Current item number - total: Total items - file_name: Current file being processed - error: Error message (if status is "error") """ # Reload config to get latest state self.config = self._load_config() if "knowledge_bases" not in self.config: self.config["knowledge_bases"] = {} if name not in self.config["knowledge_bases"]: # Auto-register if not exists self.config["knowledge_bases"][name] = { "path": name, "description": f"Knowledge base: {name}", } kb_config = self.config["knowledge_bases"][name] kb_config["status"] = status kb_config["updated_at"] = datetime.now().isoformat() if progress is not None: kb_config["progress"] = progress elif status == "ready": # Clear progress when ready kb_config["progress"] = { "stage": "completed", "message": "Ready", "percent": 100, } self._save_config() def get_kb_status(self, name: str) -> dict | None: """Get status and progress for a knowledge base.""" self.config = self._load_config() kb_config = self.config.get("knowledge_bases", {}).get(name) if not kb_config: return None return { "status": kb_config.get("status", "unknown"), "progress": kb_config.get("progress"), "updated_at": kb_config.get("updated_at"), } def list_knowledge_bases(self) -> list[str]: """List all available knowledge bases. This method: 1. Loads registered KBs from kb_config.json 2. Scans the directory for existing KBs not yet registered 3. Auto-registers any discovered KBs with valid structure (rag_storage or llamaindex_storage) """ # Always reload config from file to ensure we have the latest data self.config = self._load_config() # Read knowledge base list from config file config_kbs = self.config.get("knowledge_bases", {}) kb_list = set(config_kbs.keys()) # Also scan directory for KBs that may not be registered yet # This ensures backward compatibility and auto-discovery if self.base_dir.exists(): config_changed = False for item in self.base_dir.iterdir(): if not item.is_dir() or item.name.startswith(("__", ".")): continue # Skip if already in config if item.name in kb_list: continue # Check if this is a valid KB directory (legacy rag_storage or llamaindex_storage) rag_storage = item / "rag_storage" llamaindex_storage = item / "llamaindex_storage" is_valid_kb = ( (rag_storage.exists() and rag_storage.is_dir()) or (llamaindex_storage.exists() and llamaindex_storage.is_dir()) ) if is_valid_kb: # Auto-register this KB to kb_config.json kb_list.add(item.name) self._auto_register_kb(item.name) config_changed = True # Save config if we registered new KBs if config_changed: self._save_config() return sorted(kb_list) def _auto_register_kb(self, name: str): """Auto-register an existing KB to kb_config.json. Reads info from metadata.json (if exists) for backward compatibility. """ kb_dir = self.base_dir / name # Default values kb_entry = { "path": name, "description": f"Knowledge base: {name}", "status": "ready", # Existing KB with storage is considered ready "updated_at": datetime.now().isoformat(), } # Try to read metadata.json for existing info (backward compatibility) metadata_file = kb_dir / "metadata.json" if metadata_file.exists(): try: with open(metadata_file, encoding="utf-8") as f: metadata = json.load(f) # Migrate relevant fields if metadata.get("description"): kb_entry["description"] = metadata["description"] if metadata.get("rag_provider"): raw_provider = str(metadata["rag_provider"]).strip().lower() kb_entry["rag_provider"] = normalize_provider_name(raw_provider) if str(raw_provider).strip().lower() in LEGACY_PROVIDER_ALIASES: kb_entry["needs_reindex"] = True if metadata.get("created_at"): kb_entry["created_at"] = metadata["created_at"] if metadata.get("last_updated"): kb_entry["updated_at"] = metadata["last_updated"] except Exception as e: logger.warning(f"Failed to read metadata.json for '{name}': {e}") # Detect rag_provider from storage type if not set if "rag_provider" not in kb_entry: rag_storage = kb_dir / "rag_storage" llamaindex_storage = kb_dir / "llamaindex_storage" if llamaindex_storage.exists(): kb_entry["rag_provider"] = DEFAULT_PROVIDER elif rag_storage.exists(): kb_entry["rag_provider"] = DEFAULT_PROVIDER kb_entry["needs_reindex"] = True # Add to config if "knowledge_bases" not in self.config: self.config["knowledge_bases"] = {} self.config["knowledge_bases"][name] = kb_entry logger.info(f"Auto-registered KB '{name}' to kb_config.json") def register_knowledge_base(self, name: str, description: str = "", set_default: bool = False): """Register a knowledge base""" kb_dir = self.base_dir / name if not kb_dir.exists(): raise ValueError(f"Knowledge base directory does not exist: {kb_dir}") if "knowledge_bases" not in self.config: self.config["knowledge_bases"] = {} self.config["knowledge_bases"][name] = {"path": name, "description": description} # Only set default if explicitly requested if set_default: self.set_default(name) self._save_config() def get_knowledge_base_path(self, name: str | None = None) -> Path: """Get path to a knowledge base""" if name is None: name = self.config.get("default") if name is None: raise ValueError("No default knowledge base set") kb_dir = self.base_dir / name if not kb_dir.exists(): raise ValueError(f"Knowledge base not found: {name}") return kb_dir def get_rag_storage_path(self, name: str | None = None) -> Path: """Get active index storage path for a knowledge base.""" kb_dir = self.get_knowledge_base_path(name) llamaindex_storage = kb_dir / "llamaindex_storage" legacy_storage = kb_dir / "rag_storage" if llamaindex_storage.exists(): return llamaindex_storage if legacy_storage.exists(): return legacy_storage raise ValueError(f"Index storage not found for knowledge base: {name or 'default'}") def get_images_path(self, name: str | None = None) -> Path: """Get images path for a knowledge base""" kb_dir = self.get_knowledge_base_path(name) return kb_dir / "images" def get_content_list_path(self, name: str | None = None) -> Path: """Get content list path for a knowledge base""" kb_dir = self.get_knowledge_base_path(name) return kb_dir / "content_list" def get_raw_path(self, name: str | None = None) -> Path: """Get raw documents path for a knowledge base""" kb_dir = self.get_knowledge_base_path(name) return kb_dir / "raw" def set_default(self, name: str): """Set default knowledge base using centralized config service.""" if name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {name}") # Persist default KB selection via the canonical KB config service. try: from deeptutor.services.config import get_kb_config_service kb_config_service = get_kb_config_service() kb_config_service.set_default_kb(name) except Exception as e: logger.warning(f"Failed to save default to centralized config: {e}") def get_default(self) -> str | None: """ Get default knowledge base name. Priority: 1. Canonical KB config service (`PathService.get_instance().get_knowledge_bases_dir() / kb_config.json`) 2. First knowledge base in the list (auto-fallback) """ # Try centralized config first try: from deeptutor.services.config import get_kb_config_service kb_config_service = get_kb_config_service() default_kb = kb_config_service.get_default_kb() if default_kb and default_kb in self.list_knowledge_bases(): return default_kb except Exception: pass # Fallback to first knowledge base in sorted list kb_list = self.list_knowledge_bases() if kb_list: return kb_list[0] return None def get_metadata(self, name: str | None = None) -> dict: """Get knowledge base metadata. Source: 1. kb_config.json (authoritative source) """ kb_name = name if kb_name is None: kb_name = self.get_default() if kb_name is None: return {} # First, try kb_config.json (authoritative source) self.config = self._load_config() kb_config = self.config.get("knowledge_bases", {}).get(kb_name, {}) if kb_config: # Build metadata from config metadata = { "name": kb_name, "description": kb_config.get("description", f"Knowledge base: {kb_name}"), "rag_provider": normalize_provider_name(kb_config.get("rag_provider")), "needs_reindex": bool(kb_config.get("needs_reindex", False)), "created_at": kb_config.get("created_at"), "last_updated": kb_config.get("updated_at"), } # Remove None values metadata = {k: v for k, v in metadata.items() if v is not None} return metadata return {} def get_info(self, name: str | None = None) -> dict: """Get detailed information about a knowledge base. This method: 1. Gets the KB name (from parameter or default) 2. Reads all config from kb_config.json (authoritative source) 3. Falls back to metadata.json for legacy KBs 4. Collects statistics about files and RAG status """ # Reload config to get latest status self.config = self._load_config() kb_name = name or self.get_default() if kb_name is None: raise ValueError("No knowledge base name provided and no default set") # Get knowledge base path kb_dir = self.base_dir / kb_name # Get config from kb_config.json (authoritative source) kb_config = self.config.get("knowledge_bases", {}).get(kb_name, {}) status = kb_config.get("status") progress = kb_config.get("progress") description = kb_config.get("description", f"Knowledge base: {kb_name}") rag_provider = normalize_provider_name(kb_config.get("rag_provider")) needs_reindex = bool(kb_config.get("needs_reindex", False)) created_at = kb_config.get("created_at") updated_at = kb_config.get("updated_at") # KB might not have a directory yet if still initializing dir_exists = kb_dir.exists() # For old KBs without status field, determine status from rag_storage if needs_reindex: status = "needs_reindex" elif not status and dir_exists: rag_storage_dir = kb_dir / "rag_storage" llamaindex_storage_dir = kb_dir / "llamaindex_storage" if llamaindex_storage_dir.exists() and any(llamaindex_storage_dir.iterdir()): status = "ready" elif rag_storage_dir.exists() and any(rag_storage_dir.iterdir()): status = "needs_reindex" needs_reindex = True else: status = "unknown" elif not status: status = "unknown" # Build metadata from kb_config.json (authoritative source) metadata = { "name": kb_name, "description": description, "rag_provider": rag_provider, "needs_reindex": needs_reindex, } if created_at: metadata["created_at"] = created_at if updated_at: metadata["last_updated"] = updated_at # Remove None values metadata = {k: v for k, v in metadata.items() if v is not None} info = { "name": kb_name, "path": str(kb_dir), "is_default": kb_name == self.get_default(), "metadata": metadata, "status": status, "progress": progress, } # Count files - handle errors gracefully raw_dir = kb_dir / "raw" if dir_exists else None images_dir = kb_dir / "images" if dir_exists else None content_list_dir = kb_dir / "content_list" if dir_exists else None rag_storage_dir = kb_dir / "rag_storage" if dir_exists else None llamaindex_storage_dir = kb_dir / "llamaindex_storage" if dir_exists else None raw_count = 0 images_count = 0 content_lists_count = 0 if dir_exists: try: raw_count = ( len([f for f in raw_dir.iterdir() if f.is_file()]) if raw_dir.exists() else 0 ) except Exception: pass try: images_count = ( len([f for f in images_dir.iterdir() if f.is_file()]) if images_dir.exists() else 0 ) except Exception: pass try: content_lists_count = ( len(list(content_list_dir.glob("*.json"))) if content_list_dir.exists() else 0 ) except Exception: pass # Check rag_initialized (llamaindex storage only) rag_initialized = ( (dir_exists and llamaindex_storage_dir and llamaindex_storage_dir.exists() and llamaindex_storage_dir.is_dir()) ) info["statistics"] = { "raw_documents": raw_count, "images": images_count, "content_lists": content_lists_count, "rag_initialized": rag_initialized, "rag_provider": rag_provider, "needs_reindex": needs_reindex, # Include status and progress in statistics for backward compatibility "status": status, "progress": progress, } return info def delete_knowledge_base(self, name: str, confirm: bool = False) -> bool: """ Delete a knowledge base Args: name: Knowledge base name confirm: If True, skip confirmation (use with caution!) Returns: True if deleted successfully """ if name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {name}") kb_dir = self.get_knowledge_base_path(name) if not confirm: # Ask for confirmation in CLI print(f"⚠️ Warning: This will permanently delete the knowledge base '{name}'") print(f" Path: {kb_dir}") response = input("Are you sure? Type 'yes' to confirm: ") if response.lower() != "yes": print("Deletion cancelled.") return False # Delete the directory shutil.rmtree(kb_dir) # Remove from config if name in self.config.get("knowledge_bases", {}): del self.config["knowledge_bases"][name] # Update default if this was the default if self.config.get("default") == name: remaining = self.list_knowledge_bases() self.config["default"] = remaining[0] if remaining else None self._save_config() return True def clean_rag_storage(self, name: str | None = None, backup: bool = True) -> bool: """ Clean (delete) index storage for a knowledge base. Args: name: Knowledge base name (default if not specified) backup: If True, backup storage before deleting Returns: True if cleaned successfully """ kb_name = name or self.get_default() kb_dir = self.get_knowledge_base_path(kb_name) llamaindex_storage_dir = kb_dir / "llamaindex_storage" legacy_storage_dir = kb_dir / "rag_storage" if not llamaindex_storage_dir.exists() and not legacy_storage_dir.exists(): logger.info(f"Index storage does not exist for '{kb_name}'") return False targets = [] if llamaindex_storage_dir.exists(): targets.append(("llamaindex_storage", llamaindex_storage_dir)) if legacy_storage_dir.exists(): targets.append(("rag_storage", legacy_storage_dir)) for label, target in targets: if backup: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir = kb_dir / f"{label}_backup_{timestamp}" shutil.copytree(target, backup_dir) logger.info(f"Backed up {label} to: {backup_dir}") shutil.rmtree(target) target.mkdir(parents=True, exist_ok=True) logger.info(f"Cleaned {label} for '{kb_name}'") return True def link_folder(self, kb_name: str, folder_path: str) -> dict: """ Link a local folder to a knowledge base. Args: kb_name: Knowledge base name folder_path: Path to local folder (supports ~, relative paths) Returns: Dict with folder info including id, path, and file count Raises: ValueError: If KB not found or folder doesn't exist """ if kb_name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {kb_name}") # Normalize path (cross-platform: handles ~, relative paths, etc.) folder = Path(folder_path).expanduser().resolve() if not folder.exists(): raise ValueError(f"Folder does not exist: {folder}") if not folder.is_dir(): raise ValueError(f"Path is not a directory: {folder}") # Get RAG provider from kb_config.json to determine supported extensions self.config = self._load_config() kb_config = self.config.get("knowledge_bases", {}).get(kb_name, {}) provider = normalize_provider_name(kb_config.get("rag_provider") or DEFAULT_PROVIDER) # Get supported files in folder based on provider supported_extensions = FileTypeRouter.get_extensions_for_provider(provider) files: list[Path] = [] for ext in supported_extensions: files.extend(folder.glob(f"**/*{ext}")) # Generate folder ID folder_id = hashlib.md5( # noqa: S324 str(folder).encode(), usedforsecurity=False ).hexdigest()[:8] # Load existing linked folders from metadata kb_dir = self.base_dir / kb_name metadata_file = kb_dir / "metadata.json" metadata: dict = {} if metadata_file.exists(): try: with open(metadata_file, encoding="utf-8") as fp: metadata = json.load(fp) except Exception: metadata = {} if "linked_folders" not in metadata: metadata["linked_folders"] = [] # Check if already linked existing_ids = [item["id"] for item in metadata.get("linked_folders", [])] if folder_id in existing_ids: # If already linked, treat as success (idempotent) # Find and return existing info for item in metadata.get("linked_folders", []): if item["id"] == folder_id: return item # Add folder info folder_info = { "id": folder_id, "path": str(folder), "added_at": datetime.now().isoformat(), "file_count": len(files), } metadata["linked_folders"].append(folder_info) # Save metadata with open(metadata_file, "w", encoding="utf-8") as fp: json.dump(metadata, fp, indent=2, ensure_ascii=False) return folder_info def get_linked_folders(self, kb_name: str) -> list[dict]: """ Get list of linked folders for a knowledge base. Args: kb_name: Knowledge base name Returns: List of linked folder info dicts """ if kb_name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {kb_name}") kb_dir = self.base_dir / kb_name metadata_file = kb_dir / "metadata.json" if not metadata_file.exists(): return [] try: with open(metadata_file, encoding="utf-8") as f: metadata = json.load(f) return metadata.get("linked_folders", []) except Exception: return [] def unlink_folder(self, kb_name: str, folder_id: str) -> bool: """ Unlink a folder from a knowledge base. Args: kb_name: Knowledge base name folder_id: Folder ID to unlink Returns: True if unlinked successfully, False if not found """ if kb_name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {kb_name}") kb_dir = self.base_dir / kb_name metadata_file = kb_dir / "metadata.json" if not metadata_file.exists(): return False try: with open(metadata_file, encoding="utf-8") as f: metadata = json.load(f) except Exception: return False linked = metadata.get("linked_folders", []) new_linked = [f for f in linked if f["id"] != folder_id] if len(new_linked) == len(linked): return False # Not found metadata["linked_folders"] = new_linked with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) return True def scan_linked_folder(self, folder_path: str, provider: str = DEFAULT_PROVIDER) -> list[str]: """ Scan a linked folder and return list of supported file paths. Args: folder_path: Path to folder provider: RAG provider to determine supported extensions (default: llamaindex) Returns: List of file paths (as strings) """ folder = Path(folder_path).expanduser().resolve() if not folder.exists() or not folder.is_dir(): return [] supported_extensions = FileTypeRouter.get_extensions_for_provider(provider) files = [] for ext in supported_extensions: for file_path in folder.glob(f"**/*{ext}"): files.append(str(file_path)) return sorted(files) def detect_folder_changes(self, kb_name: str, folder_id: str) -> dict: """ Detect new and modified files in a linked folder since last sync. This enables automatic sync of changes from local folders that may be synced with cloud services like SharePoint, Google Drive, etc. Args: kb_name: Knowledge base name folder_id: Folder ID to check for changes Returns: Dict with 'new_files', 'modified_files', and 'has_changes' keys """ if kb_name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {kb_name}") # Get folder info folders = self.get_linked_folders(kb_name) folder_info = next((f for f in folders if f["id"] == folder_id), None) if not folder_info: raise ValueError(f"Linked folder not found: {folder_id}") folder_path = Path(folder_info["path"]).expanduser().resolve() last_sync = folder_info.get("last_sync") synced_files = folder_info.get("synced_files", {}) # Parse last sync timestamp last_sync_time = None if last_sync: try: last_sync_time = datetime.fromisoformat(last_sync) except Exception: pass # Get RAG provider from kb_config.json to determine supported extensions self.config = self._load_config() kb_config = self.config.get("knowledge_bases", {}).get(kb_name, {}) provider = normalize_provider_name(kb_config.get("rag_provider") or DEFAULT_PROVIDER) # Scan current files based on provider's supported extensions supported_extensions = FileTypeRouter.get_extensions_for_provider(provider) new_files = [] modified_files = [] for ext in supported_extensions: for file_path in folder_path.glob(f"**/*{ext}"): file_str = str(file_path) file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime) if file_str in synced_files: # Check if modified since last sync prev_mtime_str = synced_files[file_str] try: prev_mtime = datetime.fromisoformat(prev_mtime_str) if file_mtime > prev_mtime: modified_files.append(file_str) except Exception: modified_files.append(file_str) else: # New file (not in synced files) new_files.append(file_str) return { "new_files": sorted(new_files), "modified_files": sorted(modified_files), "has_changes": len(new_files) > 0 or len(modified_files) > 0, "new_count": len(new_files), "modified_count": len(modified_files), } def update_folder_sync_state(self, kb_name: str, folder_id: str, synced_files: list[str]): """ Update the sync state for a linked folder after successful sync. Records which files were synced and their modification times, enabling future change detection. Args: kb_name: Knowledge base name folder_id: Folder ID synced_files: List of file paths that were successfully synced """ if kb_name not in self.list_knowledge_bases(): raise ValueError(f"Knowledge base not found: {kb_name}") kb_dir = self.base_dir / kb_name metadata_file = kb_dir / "metadata.json" if not metadata_file.exists(): return try: with open(metadata_file, encoding="utf-8") as f: metadata = json.load(f) except Exception: return linked = metadata.get("linked_folders", []) for folder in linked: if folder["id"] == folder_id: # Record sync timestamp folder["last_sync"] = datetime.now().isoformat() # Record file modification times file_states = folder.get("synced_files", {}) for file_path in synced_files: try: p = Path(file_path) if p.exists(): mtime = datetime.fromtimestamp(p.stat().st_mtime) file_states[file_path] = mtime.isoformat() except Exception: pass folder["synced_files"] = file_states folder["file_count"] = len(file_states) break def main(): """Command-line interface for knowledge base manager""" import argparse parser = argparse.ArgumentParser(description="Knowledge Base Manager") parser.add_argument( "--base-dir", default="./knowledge_bases", help="Base directory for knowledge bases" ) subparsers = parser.add_subparsers(dest="command", help="Commands") # List command subparsers.add_parser("list", help="List all knowledge bases") # Info command info_parser = subparsers.add_parser("info", help="Show knowledge base information") info_parser.add_argument( "name", nargs="?", help="Knowledge base name (default if not specified)" ) # Set default command default_parser = subparsers.add_parser("set-default", help="Set default knowledge base") default_parser.add_argument("name", help="Knowledge base name") # Delete command delete_parser = subparsers.add_parser("delete", help="Delete a knowledge base") delete_parser.add_argument("name", help="Knowledge base name") delete_parser.add_argument("--force", action="store_true", help="Skip confirmation") # Clean RAG command clean_parser = subparsers.add_parser( "clean-rag", help="Clean RAG storage (useful for corrupted data)" ) clean_parser.add_argument( "name", nargs="?", help="Knowledge base name (default if not specified)" ) clean_parser.add_argument( "--no-backup", action="store_true", help="Don't backup before cleaning" ) args = parser.parse_args() manager = KnowledgeBaseManager(args.base_dir) if args.command == "list": kb_list = manager.list_knowledge_bases() default_kb = manager.get_default() print("\nAvailable Knowledge Bases:") print("=" * 60) if not kb_list: print("No knowledge bases found") else: for kb_name in kb_list: default_marker = " (default)" if kb_name == default_kb else "" print(f" • {kb_name}{default_marker}") print() elif args.command == "info": try: info = manager.get_info(args.name) print("\nKnowledge Base Information:") print("=" * 60) print(f"Name: {info['name']}") print(f"Path: {info['path']}") print(f"Default: {'Yes' if info['is_default'] else 'No'}") if info.get("metadata"): print("\nMetadata:") for key, value in info["metadata"].items(): print(f" {key}: {value}") print("\nStatistics:") stats = info["statistics"] print(f" Raw documents: {stats['raw_documents']}") print(f" Images: {stats['images']}") print(f" Content lists: {stats['content_lists']}") print(f" RAG initialized: {'Yes' if stats['rag_initialized'] else 'No'}") if "rag" in stats: print("\n RAG Statistics:") for key, value in stats["rag"].items(): print(f" {key}: {value}") print() except Exception as e: print(f"Error: {e!s}") elif args.command == "set-default": try: manager.set_default(args.name) print(f"✓ Set '{args.name}' as default knowledge base") except Exception as e: print(f"Error: {e!s}") elif args.command == "delete": try: success = manager.delete_knowledge_base(args.name, confirm=args.force) if success: print(f"✓ Deleted knowledge base '{args.name}'") except Exception as e: print(f"Error: {e!s}") elif args.command == "clean-rag": try: manager.clean_rag_storage(args.name, backup=not args.no_backup) except Exception as e: print(f"Error: {e!s}") else: parser.print_help() if __name__ == "__main__": main()