Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import uuid | |
| import hashlib | |
| import re | |
| from pathlib import Path | |
| from qdrant_client.http import models | |
| from dotenv import load_dotenv | |
| # Ajouter le dossier Engine au path pour importer qdrant_store | |
| # Dossier des scripts (backend/api) | |
| SCRIPTS_DIR = Path(__file__).resolve().parent | |
| # Racine du projet (darkmedia-x_studio) | |
| ROOT_DIR = SCRIPTS_DIR.parent.parent | |
| # Dossier Engine à la racine | |
| ENGINE_DIR = ROOT_DIR / "engine" | |
| sys.path.insert(0, str(ENGINE_DIR)) | |
| # Charger le .env depuis la racine | |
| load_dotenv(ROOT_DIR / ".env") | |
| try: | |
| from qdrant_store import _get_client, _get_embedding, init_collections | |
| except ImportError: | |
| print("❌ Erreur: Impossible d'importer qdrant_store. Vérifiez les chemins.") | |
| sys.exit(1) | |
| # Collection dédiée au codebase | |
| COLLECTION_NAME = "codebase" | |
| # Fichiers et dossiers à ignorer par défaut (si pas de .ragignore) | |
| IGNORE_DIRS = {".git", "node_modules", "vendor", "brain", "assets", "tmp", "__pycache__", "dist", "build", ".claude", ".mcp", ".next", "venv", ".venv", "target"} | |
| IGNORE_FILES = {"package-lock.json", "yarn.lock", "pnpm-lock.yaml", ".env"} | |
| SUPPORTED_EXTENSIONS = {".py", ".js", ".ts", ".md", ".css", ".html", ".json", ".yml", ".yaml", ".ps1", ".bat", ".rs"} | |
| def load_ragignore(): | |
| ragignore_path = ROOT_DIR / ".ragignore" | |
| patterns = [] | |
| if ragignore_path.exists(): | |
| print(f"📖 Chargement des exclusions depuis {ragignore_path}...") | |
| with open(ragignore_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith("#"): | |
| # Nettoyer le pattern (ex: target/ -> target) | |
| p = line.rstrip("/").rstrip("\\") | |
| patterns.append(p) | |
| return patterns | |
| def is_ignored(path_str, rag_patterns): | |
| # Chemin relatif pour faciliter le matching | |
| rel_path = os.path.relpath(path_str, ROOT_DIR) | |
| parts = rel_path.split(os.sep) | |
| # 1. Vérifier les hardcoded IGNORE_DIRS/FILES (pour la compatibilité) | |
| for part in parts: | |
| if part in IGNORE_DIRS: return True | |
| if os.path.basename(path_str) in IGNORE_FILES: return True | |
| # 2. Vérifier les patterns .ragignore | |
| for p in rag_patterns: | |
| # Simple matching: si le pattern est contenu dans le chemin ou match le début | |
| if p in parts or rel_path.startswith(p): | |
| return True | |
| return False | |
| def init_codebase_collection(): | |
| client = _get_client() | |
| try: | |
| client.get_collection(COLLECTION_NAME) | |
| except Exception: | |
| print(f"📦 Création de la collection '{COLLECTION_NAME}'...") | |
| client.create_collection( | |
| collection_name=COLLECTION_NAME, | |
| vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE), | |
| ) | |
| def chunk_text(text, filename, chunk_size=1200, overlap=150): | |
| # Split by double newline for better logical blocks | |
| blocks = re.split(r'\n\s*\n', text) | |
| chunks = [] | |
| current_chunk = f"File: {filename}\n\n" | |
| for block in blocks: | |
| if len(current_chunk) + len(block) < chunk_size: | |
| current_chunk += block + "\n\n" | |
| else: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = f"File: {filename}\n\n" + block + "\n\n" | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| # Fallback for very long blocks | |
| final_chunks = [] | |
| for c in chunks: | |
| if len(c) > chunk_size * 1.5: | |
| # Hard split | |
| for i in range(0, len(c), chunk_size - overlap): | |
| final_chunks.append(c[i:i + chunk_size]) | |
| else: | |
| final_chunks.append(c) | |
| return final_chunks | |
| import json | |
| CACHE_FILE = ROOT_DIR / ".indexing_cache.json" | |
| def load_cache(): | |
| if CACHE_FILE.exists(): | |
| try: | |
| with open(CACHE_FILE, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| return {} | |
| def save_cache(cache): | |
| try: | |
| with open(CACHE_FILE, "w") as f: | |
| json.dump(cache, f, indent=2) | |
| except Exception as e: | |
| print(f"⚠️ Erreur sauvegarde cache: {e}") | |
| def get_file_hash(content): | |
| return hashlib.sha256(content.encode("utf-8")).hexdigest() | |
| def index_codebase(): | |
| client = _get_client() | |
| init_codebase_collection() | |
| rag_patterns = load_ragignore() | |
| cache = load_cache() | |
| new_cache = {} | |
| print(f"🔍 Indexation incrémentale du codebase ({len(rag_patterns)} exclusions)") | |
| points = [] | |
| file_count = 0 | |
| skipped_count = 0 | |
| chunk_count = 0 | |
| for root, dirs, files in os.walk(ROOT_DIR): | |
| # 1. Filtrage des dossiers | |
| dirs[:] = [d for d in dirs if not is_ignored(os.path.join(root, d), rag_patterns)] | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| if is_ignored(file_path, rag_patterns): | |
| continue | |
| ext = os.path.splitext(file)[1].lower() | |
| if ext not in SUPPORTED_EXTENSIONS: | |
| continue | |
| rel_path = str(Path(root).relative_to(ROOT_DIR)) | |
| full_rel_path = os.path.join(rel_path, file) | |
| try: | |
| content = Path(file_path).read_text(encoding="utf-8", errors="ignore") | |
| except Exception: | |
| continue | |
| if not content.strip(): | |
| continue | |
| # Vérifier le hash pour l'incrémental | |
| file_hash = get_file_hash(content) | |
| if cache.get(full_rel_path) == file_hash: | |
| new_cache[full_rel_path] = file_hash | |
| skipped_count += 1 | |
| continue | |
| file_count += 1 | |
| new_cache[full_rel_path] = file_hash | |
| chunks = chunk_text(content, full_rel_path) | |
| for i, chunk in enumerate(chunks): | |
| chunk_id = f"{full_rel_path}_{i}" | |
| q_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, chunk_id)) | |
| vector = _get_embedding(chunk) | |
| payload = { | |
| "path": full_rel_path, | |
| "filename": file, | |
| "extension": ext, | |
| "chunk_index": i, | |
| "total_chunks": len(chunks), | |
| "content": chunk, | |
| "type": "codebase" | |
| } | |
| points.append(models.PointStruct(id=q_id, vector=vector, payload=payload)) | |
| chunk_count += 1 | |
| if len(points) >= 50: | |
| client.upsert(collection_name=COLLECTION_NAME, points=points) | |
| points = [] | |
| print(f" 🚀 [{file_count:3d}] {full_rel_path} ({chunk_count} chunks)") | |
| if points: | |
| client.upsert(collection_name=COLLECTION_NAME, points=points) | |
| save_cache(new_cache) | |
| print(f"\n✨ Indexation terminée !") | |
| print(f" 📂 Fichiers nouveaux/modifiés : {file_count}") | |
| print(f" ⏭️ Fichiers déjà à jour : {skipped_count}") | |
| print(f" 🧩 Total chunks indexés : {chunk_count}") | |
| if __name__ == "__main__": | |
| index_codebase() | |