File size: 4,323 Bytes
1804a7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import chromadb
from sentence_transformers import SentenceTransformer, CrossEncoder
import os
import glob
import logging
import uuid
from pypdf import PdfReader
import docx

class KnowledgeBase:
    def __init__(self, persist_dir="./data/vector_db", doc_dir="./src/data/docs"):
        print("📚 [RAG] Initializing Knowledge Base 2.5 (Verbose Mode)...")
        
        self.doc_dir = doc_dir
        
        # Models
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
        self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', device='cpu')
        
        # DB Setup
        os.makedirs(persist_dir, exist_ok=True)
        os.makedirs(doc_dir, exist_ok=True)
        
        self.client = chromadb.PersistentClient(path=persist_dir)
        self.collection = self.client.get_or_create_collection(name="project_a_docs")
        
        # Run Ingestion
        self.ingest_folder()

    def ingest_folder(self):
        """Scans folder and ingests files with detailed logging."""
        files = glob.glob(os.path.join(self.doc_dir, "*.*"))
        
        print(f"📂 [RAG] Scanning {self.doc_dir}... Found {len(files)} files.")
        
        for file_path in files:
            filename = os.path.basename(file_path)
            
            # 1. Check DB for duplicates
            existing = self.collection.get(where={"source": filename})
            if existing['ids']:
                print(f"   ℹ️  [Cache] Already in DB: {filename}")
                continue 

            # 2. Extract Text based on Extension (Case Insensitive)
            text = ""
            ext = os.path.splitext(filename)[1].lower()
            
            try:
                if ext == ".pdf":
                    reader = PdfReader(file_path)
                    text = "\n".join([page.extract_text() or "" for page in reader.pages])
                    
                elif ext == ".docx":
                    doc = docx.Document(file_path)
                    text = "\n".join([para.text for para in doc.paragraphs])
                    
                elif ext in [".txt", ".md", ".json", ".py"]:
                    with open(file_path, "r", encoding="utf-8", errors='ignore') as f:
                        text = f.read()
                
                else:
                    print(f"   ⚠️  Unsupported Format: {filename} ({ext})")
                    continue
                
                # 3. Save if text found
                if text.strip():
                    self.add_document(text, source=filename)
                    print(f"   ✅ Learned: {filename}")
                else:
                    print(f"   ⚠️  Empty File (No selectable text): {filename}")
                    
            except Exception as e:
                print(f"   ❌ Error reading {filename}: {e}")

    def add_document(self, text: str, source: str = "manual_entry"):
        chunk_size = 800 # Increased chunk size for better context
        overlap = 100
        
        raw_chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            raw_chunks.append(text[start:end])
            start += (chunk_size - overlap)
        
        if not raw_chunks: return

        ids = [f"{source}_{i}" for i in range(len(raw_chunks))]
        embeddings = self.embedder.encode(raw_chunks).tolist()
        metadatas = [{"source": source} for _ in raw_chunks]
        
        self.collection.add(
            documents=raw_chunks,
            embeddings=embeddings,
            metadatas=metadatas,
            ids=ids
        )

    def search(self, query: str, top_k=3):
        query_vec = self.embedder.encode([query]).tolist()
        results = self.collection.query(query_embeddings=query_vec, n_results=10)
        
        candidates = results['documents'][0]
        if not candidates: return None
        
        # Re-Ranking
        pairs = [[query, doc] for doc in candidates]
        scores = self.reranker.predict(pairs)
        scored_docs = sorted(list(zip(candidates, scores)), key=lambda x: x[1], reverse=True)
        
        # Return top K with Score > 0
        final_docs = [doc for doc, score in scored_docs if score > 0][:top_k]
        return "\n---\n".join(final_docs) if final_docs else None