import os import sys import uuid import hashlib import re from pathlib import Path from qdrant_client.http import models from dotenv import load_dotenv # Ajouter le dossier Engine au path pour importer qdrant_store # Dossier des scripts (backend/api) SCRIPTS_DIR = Path(__file__).resolve().parent # Racine du projet (darkmedia-x_studio) ROOT_DIR = SCRIPTS_DIR.parent.parent # Dossier Engine à la racine ENGINE_DIR = ROOT_DIR / "engine" sys.path.insert(0, str(ENGINE_DIR)) # Charger le .env depuis la racine load_dotenv(ROOT_DIR / ".env") try: from qdrant_store import _get_client, _get_embedding, init_collections except ImportError: print("❌ Erreur: Impossible d'importer qdrant_store. Vérifiez les chemins.") sys.exit(1) # Collection dédiée au codebase COLLECTION_NAME = "codebase" # Fichiers et dossiers à ignorer par défaut (si pas de .ragignore) IGNORE_DIRS = {".git", "node_modules", "vendor", "brain", "assets", "tmp", "__pycache__", "dist", "build", ".claude", ".mcp", ".next", "venv", ".venv", "target"} IGNORE_FILES = {"package-lock.json", "yarn.lock", "pnpm-lock.yaml", ".env"} SUPPORTED_EXTENSIONS = {".py", ".js", ".ts", ".md", ".css", ".html", ".json", ".yml", ".yaml", ".ps1", ".bat", ".rs"} def load_ragignore(): ragignore_path = ROOT_DIR / ".ragignore" patterns = [] if ragignore_path.exists(): print(f"📖 Chargement des exclusions depuis {ragignore_path}...") with open(ragignore_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): # Nettoyer le pattern (ex: target/ -> target) p = line.rstrip("/").rstrip("\\") patterns.append(p) return patterns def is_ignored(path_str, rag_patterns): # Chemin relatif pour faciliter le matching rel_path = os.path.relpath(path_str, ROOT_DIR) parts = rel_path.split(os.sep) # 1. Vérifier les hardcoded IGNORE_DIRS/FILES (pour la compatibilité) for part in parts: if part in IGNORE_DIRS: return True if os.path.basename(path_str) in IGNORE_FILES: return True # 2. Vérifier les patterns .ragignore for p in rag_patterns: # Simple matching: si le pattern est contenu dans le chemin ou match le début if p in parts or rel_path.startswith(p): return True return False def init_codebase_collection(): client = _get_client() try: client.get_collection(COLLECTION_NAME) except Exception: print(f"📦 Création de la collection '{COLLECTION_NAME}'...") client.create_collection( collection_name=COLLECTION_NAME, vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE), ) def chunk_text(text, filename, chunk_size=1200, overlap=150): # Split by double newline for better logical blocks blocks = re.split(r'\n\s*\n', text) chunks = [] current_chunk = f"File: {filename}\n\n" for block in blocks: if len(current_chunk) + len(block) < chunk_size: current_chunk += block + "\n\n" else: chunks.append(current_chunk.strip()) current_chunk = f"File: {filename}\n\n" + block + "\n\n" if current_chunk: chunks.append(current_chunk.strip()) # Fallback for very long blocks final_chunks = [] for c in chunks: if len(c) > chunk_size * 1.5: # Hard split for i in range(0, len(c), chunk_size - overlap): final_chunks.append(c[i:i + chunk_size]) else: final_chunks.append(c) return final_chunks import json CACHE_FILE = ROOT_DIR / ".indexing_cache.json" def load_cache(): if CACHE_FILE.exists(): try: with open(CACHE_FILE, "r") as f: return json.load(f) except Exception: return {} return {} def save_cache(cache): try: with open(CACHE_FILE, "w") as f: json.dump(cache, f, indent=2) except Exception as e: print(f"⚠️ Erreur sauvegarde cache: {e}") def get_file_hash(content): return hashlib.sha256(content.encode("utf-8")).hexdigest() def index_codebase(): client = _get_client() init_codebase_collection() rag_patterns = load_ragignore() cache = load_cache() new_cache = {} print(f"🔍 Indexation incrémentale du codebase ({len(rag_patterns)} exclusions)") points = [] file_count = 0 skipped_count = 0 chunk_count = 0 for root, dirs, files in os.walk(ROOT_DIR): # 1. Filtrage des dossiers dirs[:] = [d for d in dirs if not is_ignored(os.path.join(root, d), rag_patterns)] for file in files: file_path = os.path.join(root, file) if is_ignored(file_path, rag_patterns): continue ext = os.path.splitext(file)[1].lower() if ext not in SUPPORTED_EXTENSIONS: continue rel_path = str(Path(root).relative_to(ROOT_DIR)) full_rel_path = os.path.join(rel_path, file) try: content = Path(file_path).read_text(encoding="utf-8", errors="ignore") except Exception: continue if not content.strip(): continue # Vérifier le hash pour l'incrémental file_hash = get_file_hash(content) if cache.get(full_rel_path) == file_hash: new_cache[full_rel_path] = file_hash skipped_count += 1 continue file_count += 1 new_cache[full_rel_path] = file_hash chunks = chunk_text(content, full_rel_path) for i, chunk in enumerate(chunks): chunk_id = f"{full_rel_path}_{i}" q_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, chunk_id)) vector = _get_embedding(chunk) payload = { "path": full_rel_path, "filename": file, "extension": ext, "chunk_index": i, "total_chunks": len(chunks), "content": chunk, "type": "codebase" } points.append(models.PointStruct(id=q_id, vector=vector, payload=payload)) chunk_count += 1 if len(points) >= 50: client.upsert(collection_name=COLLECTION_NAME, points=points) points = [] print(f" 🚀 [{file_count:3d}] {full_rel_path} ({chunk_count} chunks)") if points: client.upsert(collection_name=COLLECTION_NAME, points=points) save_cache(new_cache) print(f"\n✨ Indexation terminée !") print(f" 📂 Fichiers nouveaux/modifiés : {file_count}") print(f" ⏭️ Fichiers déjà à jour : {skipped_count}") print(f" 🧩 Total chunks indexés : {chunk_count}") if __name__ == "__main__": index_codebase()