import os from typing import List, Dict, Tuple import pypdf import numpy as np import faiss import torch from sentence_transformers import SentenceTransformer, CrossEncoder class DocumentProcessor: @staticmethod def extract_text(file_path: str) -> str: ext = os.path.splitext(file_path)[1].lower() if ext == '.pdf': with open(file_path, 'rb') as f: reader = pypdf.PdfReader(f) text = "" for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" return text elif ext == '.txt': with open(file_path, 'r', encoding='utf-8') as f: return f.read() else: return "" @staticmethod def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[Dict]: """ Splits text into chunks with overlap. Returns a list of dicts with 'id', 'text', 'start_idx', 'end_idx'. """ # Simple sliding window based on characters for simplicity, # ideally this would be token-based or sentence-based. chunks = [] text_len = len(text) start = 0 chunk_id = 0 while start < text_len: end = min(start + chunk_size, text_len) chunk_text = text[start:end] # Try to cut at the last newline or period to be cleaner if end < text_len: last_period = chunk_text.rfind('.') last_newline = chunk_text.rfind('\n') break_point = max(last_period, last_newline) if break_point != -1 and break_point > chunk_size * 0.5: end = start + break_point + 1 chunk_text = text[start:end] chunks.append({ 'id': chunk_id, 'text': chunk_text.strip(), 'start_char': start, 'end_char': end }) start = end - overlap chunk_id += 1 if start >= text_len: break return chunks class EmbeddingEngine: def __init__(self, model_name: str = 'all-MiniLM-L6-v2'): # Force CPU if no CUDA, though usually auto-detected. device = 'cuda' if torch.cuda.is_available() else 'cpu' self.model = SentenceTransformer(model_name, device=device) def encode(self, texts: List[str]) -> np.ndarray: embeddings = self.model.encode(texts, convert_to_numpy=True) # Normalize for cosine similarity in FAISS faiss.normalize_L2(embeddings) return embeddings class VectorStore: def __init__(self, dimension: int): self.dimension = dimension self.index = faiss.IndexFlatIP(dimension) # Inner Product + Normalized = Cosine Similarity def add(self, embeddings: np.ndarray): self.index.add(embeddings) def search(self, query_embeddings: np.ndarray, k: int = 5) -> Tuple[np.ndarray, np.ndarray]: return self.index.search(query_embeddings, k) class SemanticAnalyzer: def __init__(self): self.embedding_engine = EmbeddingEngine() # NLI model for contradiction detection # We load it lazily or here. Keeping it here for now. # This model outputs logits for [Contradiction, Entailment, Neutral] or similar depending on training. # cross-encoder/nli-distilroberta-base outputs: [contradiction, entailment, neutral] usually? # Actually checking HuggingFace: cross-encoder/nli-distilroberta-base # Label mapping: 0: contradiction, 1: entailment, 2: neutral (Check specific model card if unsure, usually standard) self.nli_model = CrossEncoder('cross-encoder/nli-distilroberta-base') def analyze_documents(self, file_paths: List[str]) -> Dict: """ Main pipeline function. """ all_chunks = [] doc_map = {} # chunk_id -> source_doc # 1. Load and Chunk global_chunk_id = 0 for fpath in file_paths: fname = os.path.basename(fpath) raw_text = DocumentProcessor.extract_text(fpath) chunks = DocumentProcessor.chunk_text(raw_text) for c in chunks: c['global_id'] = global_chunk_id c['source'] = fname all_chunks.append(c) global_chunk_id += 1 if not all_chunks: return {"error": "No text extracted"} texts = [c['text'] for c in all_chunks] # 2. Embed embeddings = self.embedding_engine.encode(texts) # 3. Build Index d = embeddings.shape[1] vector_store = VectorStore(d) vector_store.add(embeddings) results = { "duplicates": [], "contradictions": [], "stats": { "total_docs": len(file_paths), "total_chunks": len(all_chunks) } } # 4. Detect Duplicates & Contradictions # For every chunk, look for similar chunks # k=10 neighbors D, I = vector_store.search(embeddings, k=min(10, len(all_chunks))) checked_pairs = set() for i in range(len(all_chunks)): for rank, j in enumerate(I[i]): if i == j: continue # Skip self sim_score = D[i][rank] if sim_score < 0.5: continue # optimization: ignore low similarity # Sort indices to avoid double checking (i,j) vs (j,i) pair = tuple(sorted((i, j))) if pair in checked_pairs: continue checked_pairs.add(pair) chunk_a = all_chunks[i] chunk_b = all_chunks[j] # DUPLICATE DETECTION # Threshold > 0.95 usually implies near duplicate if sim_score > 0.95: results["duplicates"].append({ "score": float(sim_score), "chunk_a": chunk_a, "chunk_b": chunk_b }) continue # If it's a duplicate, we barely care if it contradicts (it shouldn't) # CONTRADICTION DETECTION # If they are talking about the same thing (high similarity) but not identical # Run NLI if sim_score > 0.65: # CrossEncoder input is list of pairs scores = self.nli_model.predict([(chunk_a['text'], chunk_b['text'])]) # scores is [logit_contradiction, logit_entailment, logit_neutral] # argmax 0 -> contradiction label = scores[0].argmax() # Assuming mapping: 0: contradiction, 1: entailment, 2: neutral # We need to verify this specific model's mapping. # Most nli models on HF: 0: contradiction, 1: entailment, 2: neutral. # verify: cross-encoder/nli-distilroberta-base # documentation says: label2id: {'contradiction': 0, 'entailment': 1, 'neutral': 2} if label == 0: # Contradiction results["contradictions"].append({ "similarity": float(sim_score), "confidence": float(scores[0][0]), # logit strength? convert to prob with softmax if needed "chunk_a": chunk_a, "chunk_b": chunk_b }) return results