File size: 7,899 Bytes
253246d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import os
from typing import List, Dict, Tuple
import pypdf
import numpy as np
import faiss
import torch
from sentence_transformers import SentenceTransformer, CrossEncoder

class DocumentProcessor:
    @staticmethod
    def extract_text(file_path: str) -> str:
        ext = os.path.splitext(file_path)[1].lower()
        if ext == '.pdf':
            with open(file_path, 'rb') as f:
                reader = pypdf.PdfReader(f)
                text = ""
                for page in reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n"
            return text
        elif ext == '.txt':
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        else:
            return ""

    @staticmethod
    def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[Dict]:
        """
        Splits text into chunks with overlap.
        Returns a list of dicts with 'id', 'text', 'start_idx', 'end_idx'.
        """
        # Simple sliding window based on characters for simplicity, 
        # ideally this would be token-based or sentence-based.
        chunks = []
        text_len = len(text)
        start = 0
        chunk_id = 0
        
        while start < text_len:
            end = min(start + chunk_size, text_len)
            chunk_text = text[start:end]
            
            # Try to cut at the last newline or period to be cleaner
            if end < text_len:
                last_period = chunk_text.rfind('.')
                last_newline = chunk_text.rfind('\n')
                break_point = max(last_period, last_newline)
                if break_point != -1 and break_point > chunk_size * 0.5:
                     end = start + break_point + 1
                     chunk_text = text[start:end]

            chunks.append({
                'id': chunk_id,
                'text': chunk_text.strip(),
                'start_char': start,
                'end_char': end
            })
            
            start = end - overlap
            chunk_id += 1
            if start >= text_len:
                break
                
        return chunks

class EmbeddingEngine:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        # Force CPU if no CUDA, though usually auto-detected.
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = SentenceTransformer(model_name, device=device)

    def encode(self, texts: List[str]) -> np.ndarray:
        embeddings = self.model.encode(texts, convert_to_numpy=True)
        # Normalize for cosine similarity in FAISS
        faiss.normalize_L2(embeddings)
        return embeddings

class VectorStore:
    def __init__(self, dimension: int):
        self.dimension = dimension
        self.index = faiss.IndexFlatIP(dimension) # Inner Product + Normalized = Cosine Similarity
        
    def add(self, embeddings: np.ndarray):
        self.index.add(embeddings)
        
    def search(self, query_embeddings: np.ndarray, k: int = 5) -> Tuple[np.ndarray, np.ndarray]:
        return self.index.search(query_embeddings, k)

class SemanticAnalyzer:
    def __init__(self):
        self.embedding_engine = EmbeddingEngine()
        # NLI model for contradiction detection
        # We load it lazily or here. Keeping it here for now.
        # This model outputs logits for [Contradiction, Entailment, Neutral] or similar depending on training.
        # cross-encoder/nli-distilroberta-base outputs: [contradiction, entailment, neutral] usually?
        # Actually checking HuggingFace: cross-encoder/nli-distilroberta-base
        # Label mapping: 0: contradiction, 1: entailment, 2: neutral (Check specific model card if unsure, usually standard)
        self.nli_model = CrossEncoder('cross-encoder/nli-distilroberta-base')
        
    def analyze_documents(self, file_paths: List[str]) -> Dict:
        """
        Main pipeline function.
        """
        all_chunks = []
        doc_map = {} # chunk_id -> source_doc
        
        # 1. Load and Chunk
        global_chunk_id = 0
        for fpath in file_paths:
            fname = os.path.basename(fpath)
            raw_text = DocumentProcessor.extract_text(fpath)
            chunks = DocumentProcessor.chunk_text(raw_text)
            for c in chunks:
                c['global_id'] = global_chunk_id
                c['source'] = fname
                all_chunks.append(c)
                global_chunk_id += 1
        
        if not all_chunks:
            return {"error": "No text extracted"}

        texts = [c['text'] for c in all_chunks]
        
        # 2. Embed
        embeddings = self.embedding_engine.encode(texts)
        
        # 3. Build Index
        d = embeddings.shape[1]
        vector_store = VectorStore(d)
        vector_store.add(embeddings)
        
        results = {
            "duplicates": [],
            "contradictions": [],
            "stats": {
                "total_docs": len(file_paths),
                "total_chunks": len(all_chunks)
            }
        }
        
        # 4. Detect Duplicates & Contradictions
        # For every chunk, look for similar chunks
        # k=10 neighbors
        D, I = vector_store.search(embeddings, k=min(10, len(all_chunks)))
        
        checked_pairs = set()

        for i in range(len(all_chunks)):
            for rank, j in enumerate(I[i]):
                if i == j: continue # Skip self
                
                sim_score = D[i][rank]
                if sim_score < 0.5: continue # optimization: ignore low similarity
                
                # Sort indices to avoid double checking (i,j) vs (j,i)
                pair = tuple(sorted((i, j)))
                if pair in checked_pairs:
                    continue
                checked_pairs.add(pair)
                
                chunk_a = all_chunks[i]
                chunk_b = all_chunks[j]
                
                # DUPLICATE DETECTION
                # Threshold > 0.95 usually implies near duplicate
                if sim_score > 0.95:
                    results["duplicates"].append({
                        "score": float(sim_score),
                        "chunk_a": chunk_a,
                        "chunk_b": chunk_b
                    })
                    continue # If it's a duplicate, we barely care if it contradicts (it shouldn't)

                # CONTRADICTION DETECTION
                # If they are talking about the same thing (high similarity) but not identical
                # Run NLI
                if sim_score > 0.65:
                    # CrossEncoder input is list of pairs
                    scores = self.nli_model.predict([(chunk_a['text'], chunk_b['text'])])
                    # scores is [logit_contradiction, logit_entailment, logit_neutral]
                    # argmax 0 -> contradiction
                    label = scores[0].argmax()
                    
                    # Assuming mapping: 0: contradiction, 1: entailment, 2: neutral
                    # We need to verify this specific model's mapping. 
                    # Most nli models on HF: 0: contradiction, 1: entailment, 2: neutral.
                    # verify: cross-encoder/nli-distilroberta-base
                    # documentation says: label2id: {'contradiction': 0, 'entailment': 1, 'neutral': 2}
                    
                    if label == 0: # Contradiction
                         results["contradictions"].append({
                            "similarity": float(sim_score),
                            "confidence": float(scores[0][0]), # logit strength? convert to prob with softmax if needed
                            "chunk_a": chunk_a,
                            "chunk_b": chunk_b
                        })

        return results