|
|
|
|
|
""" |
|
|
RAG 메모리 구축 스크립트 |
|
|
|
|
|
사용 예시: |
|
|
python scripts/build_rag_memory.py \ |
|
|
--source-dir rag_memory/source_docs \ |
|
|
--chunks-path rag_memory/chunks.jsonl \ |
|
|
--index-dir rag_memory/index \ |
|
|
--embedding-model sentence-transformers/all-MiniLM-L6-v2 \ |
|
|
--chunk-size 500 \ |
|
|
--chunk-overlap 100 \ |
|
|
--batch-size 16 \ |
|
|
--rebuild |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import datetime |
|
|
import hashlib |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import sys |
|
|
import uuid |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
|
|
|
try: |
|
|
import pdfplumber |
|
|
except ImportError: |
|
|
pdfplumber = None |
|
|
|
|
|
try: |
|
|
import docx |
|
|
except ImportError: |
|
|
docx = None |
|
|
|
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
except ImportError: |
|
|
SentenceTransformer = None |
|
|
|
|
|
try: |
|
|
import chromadb |
|
|
from chromadb.utils import embedding_functions |
|
|
except ImportError: |
|
|
chromadb = None |
|
|
embedding_functions = None |
|
|
|
|
|
|
|
|
LOGGER = logging.getLogger("build_rag_memory") |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Document: |
|
|
source_path: Path |
|
|
text: str |
|
|
metadata: Dict[str, str] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Chunk: |
|
|
id: str |
|
|
text: str |
|
|
source_path: str |
|
|
metadata: Dict[str, str] |
|
|
|
|
|
|
|
|
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="Build local RAG memory from source documents.") |
|
|
parser.add_argument("--source-dir", default="rag_memory/source_docs", help="Directory containing raw documents.") |
|
|
parser.add_argument("--chunks-path", default="rag_memory/chunks.jsonl", help="Output JSONL file for text chunks.") |
|
|
parser.add_argument("--index-dir", default="rag_memory/index", help="Persistent directory for vector index.") |
|
|
parser.add_argument("--embedding-model", default="sentence-transformers/all-MiniLM-L6-v2", |
|
|
help="SentenceTransformer model name.") |
|
|
parser.add_argument("--chunk-size", type=int, default=500, help="Character length of each chunk.") |
|
|
parser.add_argument("--chunk-overlap", type=int, default=100, help="Character overlap between chunks.") |
|
|
parser.add_argument("--batch-size", type=int, default=16, help="Batch size for embedding generation.") |
|
|
parser.add_argument("--rebuild", action="store_true", |
|
|
help="Rebuild chunks and index from scratch (existing files will be removed).") |
|
|
parser.add_argument("--collection-name", default="echolalia_rag", help="ChromaDB collection name.") |
|
|
return parser.parse_args(argv) |
|
|
|
|
|
|
|
|
def ensure_dependencies() -> None: |
|
|
missing = [] |
|
|
if SentenceTransformer is None: |
|
|
missing.append("sentence-transformers") |
|
|
if chromadb is None: |
|
|
missing.append("chromadb") |
|
|
if pdfplumber is None: |
|
|
LOGGER.warning("pdfplumber not installed: PDF extraction will be skipped.") |
|
|
if docx is None: |
|
|
LOGGER.warning("python-docx not installed: DOCX extraction will be skipped.") |
|
|
if missing: |
|
|
raise RuntimeError( |
|
|
"Missing required dependencies: {}. Please install them via `pip install {}`".format( |
|
|
", ".join(missing), " ".join(missing) |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
def normalize_text(text: str) -> str: |
|
|
text = text.replace("\u200b", " ") |
|
|
text = re.sub(r"\s+", " ", text) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def extract_text_from_pdf(path: Path) -> Optional[str]: |
|
|
if pdfplumber is None: |
|
|
return None |
|
|
try: |
|
|
with pdfplumber.open(str(path)) as pdf: |
|
|
pages = [page.extract_text() or "" for page in pdf.pages] |
|
|
return "\n".join(pages) |
|
|
except Exception as exc: |
|
|
LOGGER.error("Failed to extract PDF %s: %s", path, exc) |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_text_from_docx(path: Path) -> Optional[str]: |
|
|
if docx is None: |
|
|
return None |
|
|
try: |
|
|
document = docx.Document(str(path)) |
|
|
paragraphs = [para.text for para in document.paragraphs] |
|
|
return "\n".join(paragraphs) |
|
|
except Exception as exc: |
|
|
LOGGER.error("Failed to extract DOCX %s: %s", path, exc) |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_text_from_txt(path: Path) -> Optional[str]: |
|
|
try: |
|
|
return path.read_text(encoding="utf-8") |
|
|
except UnicodeDecodeError: |
|
|
LOGGER.warning("UTF-8 decoding failed for %s, trying latin-1.", path) |
|
|
try: |
|
|
return path.read_text(encoding="latin-1") |
|
|
except Exception as exc: |
|
|
LOGGER.error("Failed to read text file %s: %s", path, exc) |
|
|
return None |
|
|
|
|
|
|
|
|
def load_documents(source_dir: Path) -> List[Document]: |
|
|
documents: List[Document] = [] |
|
|
if not source_dir.exists(): |
|
|
LOGGER.warning("Source directory %s does not exist. No documents to process.", source_dir) |
|
|
return documents |
|
|
|
|
|
for file_path in sorted(source_dir.rglob("*")): |
|
|
if not file_path.is_file(): |
|
|
continue |
|
|
ext = file_path.suffix.lower() |
|
|
text: Optional[str] = None |
|
|
if ext == ".pdf": |
|
|
text = extract_text_from_pdf(file_path) |
|
|
elif ext == ".docx": |
|
|
text = extract_text_from_docx(file_path) |
|
|
elif ext in {".txt", ".md"}: |
|
|
text = extract_text_from_txt(file_path) |
|
|
else: |
|
|
LOGGER.info("Skipping unsupported file type: %s", file_path) |
|
|
continue |
|
|
|
|
|
if not text: |
|
|
LOGGER.warning("No text extracted from %s", file_path) |
|
|
continue |
|
|
|
|
|
text = normalize_text(text) |
|
|
metadata = { |
|
|
"filename": file_path.name, |
|
|
"extension": ext, |
|
|
"modified_at": datetime.datetime.fromtimestamp(file_path.stat().st_mtime).isoformat(), |
|
|
"filesize": str(file_path.stat().st_size), |
|
|
} |
|
|
documents.append(Document(source_path=file_path, text=text, metadata=metadata)) |
|
|
LOGGER.info("Loaded document %s (%s chars)", file_path, len(text)) |
|
|
|
|
|
return documents |
|
|
|
|
|
|
|
|
def split_into_chunks(doc: Document, chunk_size: int, chunk_overlap: int) -> List[Chunk]: |
|
|
if chunk_size <= 0: |
|
|
raise ValueError("chunk_size must be positive.") |
|
|
if chunk_overlap < 0: |
|
|
raise ValueError("chunk_overlap must be non-negative.") |
|
|
if chunk_overlap >= chunk_size: |
|
|
raise ValueError("chunk_overlap must be smaller than chunk_size.") |
|
|
|
|
|
text = doc.text |
|
|
chunks: List[Chunk] = [] |
|
|
start = 0 |
|
|
doc_hash = hashlib.sha1(str(doc.source_path).encode("utf-8")).hexdigest()[:12] |
|
|
index = 0 |
|
|
|
|
|
while start < len(text): |
|
|
end = min(start + chunk_size, len(text)) |
|
|
chunk_text = text[start:end].strip() |
|
|
if chunk_text: |
|
|
chunk_id = f"{doc_hash}-{index:04d}" |
|
|
chunk_meta = dict(doc.metadata) |
|
|
chunk_meta.update({ |
|
|
"chunk_index": str(index), |
|
|
"source_path": str(doc.source_path), |
|
|
}) |
|
|
chunks.append( |
|
|
Chunk( |
|
|
id=chunk_id, |
|
|
text=chunk_text, |
|
|
source_path=str(doc.source_path), |
|
|
metadata=chunk_meta, |
|
|
) |
|
|
) |
|
|
index += 1 |
|
|
if end == len(text): |
|
|
break |
|
|
start = end - chunk_overlap |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
def serialize_chunks(chunks: Sequence[Chunk], path: Path, rebuild: bool) -> None: |
|
|
path.parent.mkdir(parents=True, exist_ok=True) |
|
|
mode = "w" if rebuild else "a" |
|
|
if rebuild and path.exists(): |
|
|
path.unlink() |
|
|
with path.open(mode, encoding="utf-8") as f: |
|
|
for chunk in chunks: |
|
|
record = { |
|
|
"id": chunk.id, |
|
|
"text": chunk.text, |
|
|
"source_path": chunk.source_path, |
|
|
"metadata": chunk.metadata, |
|
|
} |
|
|
f.write(json.dumps(record, ensure_ascii=False) + "\n") |
|
|
|
|
|
|
|
|
def load_existing_chunk_ids(path: Path) -> set: |
|
|
ids = set() |
|
|
if not path.exists(): |
|
|
return ids |
|
|
with path.open("r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
try: |
|
|
data = json.loads(line) |
|
|
if "id" in data: |
|
|
ids.add(data["id"]) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
return ids |
|
|
|
|
|
|
|
|
def embed_chunks(chunks: Sequence[Chunk], model_name: str, batch_size: int) -> Tuple[List[str], List[str], List[Dict[str, str]], List[List[float]]]: |
|
|
if SentenceTransformer is None: |
|
|
raise RuntimeError("sentence-transformers is not installed.") |
|
|
|
|
|
texts = [chunk.text for chunk in chunks] |
|
|
ids = [chunk.id for chunk in chunks] |
|
|
metadatas = [chunk.metadata for chunk in chunks] |
|
|
|
|
|
LOGGER.info("Loading embedding model %s", model_name) |
|
|
model = SentenceTransformer(model_name) |
|
|
LOGGER.info("Embedding %d chunks (batch_size=%d)", len(texts), batch_size) |
|
|
embeddings = model.encode(texts, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=True) |
|
|
return ids, texts, metadatas, embeddings.tolist() |
|
|
|
|
|
|
|
|
def build_index( |
|
|
ids: Sequence[str], |
|
|
texts: Sequence[str], |
|
|
metadatas: Sequence[Dict[str, str]], |
|
|
embeddings: Sequence[Sequence[float]], |
|
|
index_dir: Path, |
|
|
collection_name: str, |
|
|
rebuild: bool, |
|
|
) -> None: |
|
|
if chromadb is None: |
|
|
raise RuntimeError("chromadb is not installed.") |
|
|
|
|
|
index_dir.mkdir(parents=True, exist_ok=True) |
|
|
if rebuild and index_dir.exists(): |
|
|
shutil.rmtree(index_dir) |
|
|
index_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
client = chromadb.PersistentClient(path=str(index_dir)) |
|
|
|
|
|
if rebuild: |
|
|
try: |
|
|
client.delete_collection(collection_name) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
collection = client.get_or_create_collection(name=collection_name) |
|
|
|
|
|
if ids: |
|
|
LOGGER.info("Adding %d embeddings to collection %s", len(ids), collection_name) |
|
|
collection.upsert( |
|
|
ids=list(ids), |
|
|
documents=list(texts), |
|
|
metadatas=list(metadatas), |
|
|
embeddings=list(embeddings), |
|
|
) |
|
|
else: |
|
|
LOGGER.info("No embeddings to add.") |
|
|
|
|
|
|
|
|
def process_documents(args: argparse.Namespace) -> None: |
|
|
ensure_dependencies() |
|
|
|
|
|
source_dir = Path(args.source_dir) |
|
|
chunks_path = Path(args.chunks_path) |
|
|
index_dir = Path(args.index_dir) |
|
|
|
|
|
documents = load_documents(source_dir) |
|
|
if not documents: |
|
|
LOGGER.warning("No documents processed. Exiting.") |
|
|
return |
|
|
|
|
|
existing_ids = set() |
|
|
if not args.rebuild: |
|
|
existing_ids = load_existing_chunk_ids(chunks_path) |
|
|
LOGGER.info("Loaded %d existing chunk ids.", len(existing_ids)) |
|
|
|
|
|
new_chunks: List[Chunk] = [] |
|
|
for doc in documents: |
|
|
doc_chunks = split_into_chunks(doc, args.chunk_size, args.chunk_overlap) |
|
|
for chunk in doc_chunks: |
|
|
if chunk.id in existing_ids: |
|
|
LOGGER.info("Skipping existing chunk id %s", chunk.id) |
|
|
continue |
|
|
new_chunks.append(chunk) |
|
|
|
|
|
if not new_chunks: |
|
|
LOGGER.info("No new chunks generated. Nothing to update.") |
|
|
return |
|
|
|
|
|
LOGGER.info("Generated %d new chunks.", len(new_chunks)) |
|
|
serialize_chunks(new_chunks, chunks_path, args.rebuild) |
|
|
|
|
|
ids, texts, metadatas, embeddings = embed_chunks(new_chunks, args.embedding_model, args.batch_size) |
|
|
build_index(ids, texts, metadatas, embeddings, index_dir, args.collection_name, args.rebuild) |
|
|
LOGGER.info("RAG memory build completed successfully.") |
|
|
|
|
|
|
|
|
def setup_logging() -> None: |
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s | %(levelname)s | %(message)s", |
|
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
|
) |
|
|
|
|
|
|
|
|
def main(argv: Optional[Sequence[str]] = None) -> None: |
|
|
setup_logging() |
|
|
args = parse_args(argv) |
|
|
LOGGER.info("Starting RAG memory build with args: %s", args) |
|
|
try: |
|
|
process_documents(args) |
|
|
except Exception as exc: |
|
|
LOGGER.exception("RAG memory build failed: %s", exc) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|