darkmedia-x-api / backend /api /index_codebase.py
cybermedia's picture
Upload folder using huggingface_hub
343eed9 verified
import os
import sys
import uuid
import hashlib
import re
from pathlib import Path
from qdrant_client.http import models
from dotenv import load_dotenv
# Ajouter le dossier Engine au path pour importer qdrant_store
# Dossier des scripts (backend/api)
SCRIPTS_DIR = Path(__file__).resolve().parent
# Racine du projet (darkmedia-x_studio)
ROOT_DIR = SCRIPTS_DIR.parent.parent
# Dossier Engine à la racine
ENGINE_DIR = ROOT_DIR / "engine"
sys.path.insert(0, str(ENGINE_DIR))
# Charger le .env depuis la racine
load_dotenv(ROOT_DIR / ".env")
try:
from qdrant_store import _get_client, _get_embedding, init_collections
except ImportError:
print("❌ Erreur: Impossible d'importer qdrant_store. Vérifiez les chemins.")
sys.exit(1)
# Collection dédiée au codebase
COLLECTION_NAME = "codebase"
# Fichiers et dossiers à ignorer par défaut (si pas de .ragignore)
IGNORE_DIRS = {".git", "node_modules", "vendor", "brain", "assets", "tmp", "__pycache__", "dist", "build", ".claude", ".mcp", ".next", "venv", ".venv", "target"}
IGNORE_FILES = {"package-lock.json", "yarn.lock", "pnpm-lock.yaml", ".env"}
SUPPORTED_EXTENSIONS = {".py", ".js", ".ts", ".md", ".css", ".html", ".json", ".yml", ".yaml", ".ps1", ".bat", ".rs"}
def load_ragignore():
ragignore_path = ROOT_DIR / ".ragignore"
patterns = []
if ragignore_path.exists():
print(f"📖 Chargement des exclusions depuis {ragignore_path}...")
with open(ragignore_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
# Nettoyer le pattern (ex: target/ -> target)
p = line.rstrip("/").rstrip("\\")
patterns.append(p)
return patterns
def is_ignored(path_str, rag_patterns):
# Chemin relatif pour faciliter le matching
rel_path = os.path.relpath(path_str, ROOT_DIR)
parts = rel_path.split(os.sep)
# 1. Vérifier les hardcoded IGNORE_DIRS/FILES (pour la compatibilité)
for part in parts:
if part in IGNORE_DIRS: return True
if os.path.basename(path_str) in IGNORE_FILES: return True
# 2. Vérifier les patterns .ragignore
for p in rag_patterns:
# Simple matching: si le pattern est contenu dans le chemin ou match le début
if p in parts or rel_path.startswith(p):
return True
return False
def init_codebase_collection():
client = _get_client()
try:
client.get_collection(COLLECTION_NAME)
except Exception:
print(f"📦 Création de la collection '{COLLECTION_NAME}'...")
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE),
)
def chunk_text(text, filename, chunk_size=1200, overlap=150):
# Split by double newline for better logical blocks
blocks = re.split(r'\n\s*\n', text)
chunks = []
current_chunk = f"File: {filename}\n\n"
for block in blocks:
if len(current_chunk) + len(block) < chunk_size:
current_chunk += block + "\n\n"
else:
chunks.append(current_chunk.strip())
current_chunk = f"File: {filename}\n\n" + block + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
# Fallback for very long blocks
final_chunks = []
for c in chunks:
if len(c) > chunk_size * 1.5:
# Hard split
for i in range(0, len(c), chunk_size - overlap):
final_chunks.append(c[i:i + chunk_size])
else:
final_chunks.append(c)
return final_chunks
import json
CACHE_FILE = ROOT_DIR / ".indexing_cache.json"
def load_cache():
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, "r") as f:
return json.load(f)
except Exception:
return {}
return {}
def save_cache(cache):
try:
with open(CACHE_FILE, "w") as f:
json.dump(cache, f, indent=2)
except Exception as e:
print(f"⚠️ Erreur sauvegarde cache: {e}")
def get_file_hash(content):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def index_codebase():
client = _get_client()
init_codebase_collection()
rag_patterns = load_ragignore()
cache = load_cache()
new_cache = {}
print(f"🔍 Indexation incrémentale du codebase ({len(rag_patterns)} exclusions)")
points = []
file_count = 0
skipped_count = 0
chunk_count = 0
for root, dirs, files in os.walk(ROOT_DIR):
# 1. Filtrage des dossiers
dirs[:] = [d for d in dirs if not is_ignored(os.path.join(root, d), rag_patterns)]
for file in files:
file_path = os.path.join(root, file)
if is_ignored(file_path, rag_patterns):
continue
ext = os.path.splitext(file)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
continue
rel_path = str(Path(root).relative_to(ROOT_DIR))
full_rel_path = os.path.join(rel_path, file)
try:
content = Path(file_path).read_text(encoding="utf-8", errors="ignore")
except Exception:
continue
if not content.strip():
continue
# Vérifier le hash pour l'incrémental
file_hash = get_file_hash(content)
if cache.get(full_rel_path) == file_hash:
new_cache[full_rel_path] = file_hash
skipped_count += 1
continue
file_count += 1
new_cache[full_rel_path] = file_hash
chunks = chunk_text(content, full_rel_path)
for i, chunk in enumerate(chunks):
chunk_id = f"{full_rel_path}_{i}"
q_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, chunk_id))
vector = _get_embedding(chunk)
payload = {
"path": full_rel_path,
"filename": file,
"extension": ext,
"chunk_index": i,
"total_chunks": len(chunks),
"content": chunk,
"type": "codebase"
}
points.append(models.PointStruct(id=q_id, vector=vector, payload=payload))
chunk_count += 1
if len(points) >= 50:
client.upsert(collection_name=COLLECTION_NAME, points=points)
points = []
print(f" 🚀 [{file_count:3d}] {full_rel_path} ({chunk_count} chunks)")
if points:
client.upsert(collection_name=COLLECTION_NAME, points=points)
save_cache(new_cache)
print(f"\n✨ Indexation terminée !")
print(f" 📂 Fichiers nouveaux/modifiés : {file_count}")
print(f" ⏭️ Fichiers déjà à jour : {skipped_count}")
print(f" 🧩 Total chunks indexés : {chunk_count}")
if __name__ == "__main__":
index_codebase()