Spaces:
Configuration error
Configuration error
| import os | |
| from typing import List, Dict, Tuple | |
| import pypdf | |
| import numpy as np | |
| import faiss | |
| import torch | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| class DocumentProcessor: | |
| def extract_text(file_path: str) -> str: | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == '.pdf': | |
| with open(file_path, 'rb') as f: | |
| reader = pypdf.PdfReader(f) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text | |
| elif ext == '.txt': | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| else: | |
| return "" | |
| def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[Dict]: | |
| """ | |
| Splits text into chunks with overlap. | |
| Returns a list of dicts with 'id', 'text', 'start_idx', 'end_idx'. | |
| """ | |
| # Simple sliding window based on characters for simplicity, | |
| # ideally this would be token-based or sentence-based. | |
| chunks = [] | |
| text_len = len(text) | |
| start = 0 | |
| chunk_id = 0 | |
| while start < text_len: | |
| end = min(start + chunk_size, text_len) | |
| chunk_text = text[start:end] | |
| # Try to cut at the last newline or period to be cleaner | |
| if end < text_len: | |
| last_period = chunk_text.rfind('.') | |
| last_newline = chunk_text.rfind('\n') | |
| break_point = max(last_period, last_newline) | |
| if break_point != -1 and break_point > chunk_size * 0.5: | |
| end = start + break_point + 1 | |
| chunk_text = text[start:end] | |
| chunks.append({ | |
| 'id': chunk_id, | |
| 'text': chunk_text.strip(), | |
| 'start_char': start, | |
| 'end_char': end | |
| }) | |
| start = end - overlap | |
| chunk_id += 1 | |
| if start >= text_len: | |
| break | |
| return chunks | |
| class EmbeddingEngine: | |
| def __init__(self, model_name: str = 'all-MiniLM-L6-v2'): | |
| # Force CPU if no CUDA, though usually auto-detected. | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| self.model = SentenceTransformer(model_name, device=device) | |
| def encode(self, texts: List[str]) -> np.ndarray: | |
| embeddings = self.model.encode(texts, convert_to_numpy=True) | |
| # Normalize for cosine similarity in FAISS | |
| faiss.normalize_L2(embeddings) | |
| return embeddings | |
| class VectorStore: | |
| def __init__(self, dimension: int): | |
| self.dimension = dimension | |
| self.index = faiss.IndexFlatIP(dimension) # Inner Product + Normalized = Cosine Similarity | |
| def add(self, embeddings: np.ndarray): | |
| self.index.add(embeddings) | |
| def search(self, query_embeddings: np.ndarray, k: int = 5) -> Tuple[np.ndarray, np.ndarray]: | |
| return self.index.search(query_embeddings, k) | |
| class SemanticAnalyzer: | |
| def __init__(self): | |
| self.embedding_engine = EmbeddingEngine() | |
| # NLI model for contradiction detection | |
| # We load it lazily or here. Keeping it here for now. | |
| # This model outputs logits for [Contradiction, Entailment, Neutral] or similar depending on training. | |
| # cross-encoder/nli-distilroberta-base outputs: [contradiction, entailment, neutral] usually? | |
| # Actually checking HuggingFace: cross-encoder/nli-distilroberta-base | |
| # Label mapping: 0: contradiction, 1: entailment, 2: neutral (Check specific model card if unsure, usually standard) | |
| self.nli_model = CrossEncoder('cross-encoder/nli-distilroberta-base') | |
| def analyze_documents(self, file_paths: List[str]) -> Dict: | |
| """ | |
| Main pipeline function. | |
| """ | |
| all_chunks = [] | |
| doc_map = {} # chunk_id -> source_doc | |
| # 1. Load and Chunk | |
| global_chunk_id = 0 | |
| for fpath in file_paths: | |
| fname = os.path.basename(fpath) | |
| raw_text = DocumentProcessor.extract_text(fpath) | |
| chunks = DocumentProcessor.chunk_text(raw_text) | |
| for c in chunks: | |
| c['global_id'] = global_chunk_id | |
| c['source'] = fname | |
| all_chunks.append(c) | |
| global_chunk_id += 1 | |
| if not all_chunks: | |
| return {"error": "No text extracted"} | |
| texts = [c['text'] for c in all_chunks] | |
| # 2. Embed | |
| embeddings = self.embedding_engine.encode(texts) | |
| # 3. Build Index | |
| d = embeddings.shape[1] | |
| vector_store = VectorStore(d) | |
| vector_store.add(embeddings) | |
| results = { | |
| "duplicates": [], | |
| "contradictions": [], | |
| "stats": { | |
| "total_docs": len(file_paths), | |
| "total_chunks": len(all_chunks) | |
| } | |
| } | |
| # 4. Detect Duplicates & Contradictions | |
| # For every chunk, look for similar chunks | |
| # k=10 neighbors | |
| D, I = vector_store.search(embeddings, k=min(10, len(all_chunks))) | |
| checked_pairs = set() | |
| for i in range(len(all_chunks)): | |
| for rank, j in enumerate(I[i]): | |
| if i == j: continue # Skip self | |
| sim_score = D[i][rank] | |
| if sim_score < 0.5: continue # optimization: ignore low similarity | |
| # Sort indices to avoid double checking (i,j) vs (j,i) | |
| pair = tuple(sorted((i, j))) | |
| if pair in checked_pairs: | |
| continue | |
| checked_pairs.add(pair) | |
| chunk_a = all_chunks[i] | |
| chunk_b = all_chunks[j] | |
| # DUPLICATE DETECTION | |
| # Threshold > 0.95 usually implies near duplicate | |
| if sim_score > 0.95: | |
| results["duplicates"].append({ | |
| "score": float(sim_score), | |
| "chunk_a": chunk_a, | |
| "chunk_b": chunk_b | |
| }) | |
| continue # If it's a duplicate, we barely care if it contradicts (it shouldn't) | |
| # CONTRADICTION DETECTION | |
| # If they are talking about the same thing (high similarity) but not identical | |
| # Run NLI | |
| if sim_score > 0.65: | |
| # CrossEncoder input is list of pairs | |
| scores = self.nli_model.predict([(chunk_a['text'], chunk_b['text'])]) | |
| # scores is [logit_contradiction, logit_entailment, logit_neutral] | |
| # argmax 0 -> contradiction | |
| label = scores[0].argmax() | |
| # Assuming mapping: 0: contradiction, 1: entailment, 2: neutral | |
| # We need to verify this specific model's mapping. | |
| # Most nli models on HF: 0: contradiction, 1: entailment, 2: neutral. | |
| # verify: cross-encoder/nli-distilroberta-base | |
| # documentation says: label2id: {'contradiction': 0, 'entailment': 1, 'neutral': 2} | |
| if label == 0: # Contradiction | |
| results["contradictions"].append({ | |
| "similarity": float(sim_score), | |
| "confidence": float(scores[0][0]), # logit strength? convert to prob with softmax if needed | |
| "chunk_a": chunk_a, | |
| "chunk_b": chunk_b | |
| }) | |
| return results | |