Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoTokenizer, pipeline | |
| import torch | |
| import faiss | |
| import numpy as np | |
| import json | |
| import requests | |
| import io | |
| import PyPDF2 | |
| import docx | |
| import re | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| from sentence_transformers import SentenceTransformer | |
| import time | |
| from dataclasses import dataclass | |
| import hashlib | |
| from fastapi import FastAPI, Request, Header | |
| from fastapi.responses import JSONResponse | |
| import warnings | |
| from urllib.parse import urlparse | |
| import os | |
| import uvicorn | |
| warnings.filterwarnings('ignore') | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Create FastAPI app for API endpoints | |
| app = FastAPI(title="Enhanced Single Document QA API", description="Single document AI query system") | |
| # Make sure you have: from some_module import hackathon_system, logger | |
| async def hackrx_run( | |
| request: Request, | |
| authorization: Optional[str] = Header(default=None), | |
| x_webhook_secret: Optional[str] = Header(default=None) | |
| ): | |
| try: | |
| data = await request.json() | |
| documents = data.get("documents") | |
| questions = data.get("questions") | |
| if not documents or not questions: | |
| return JSONResponse(status_code=400, content={"error": "Missing 'documents' or 'questions'"}) | |
| if not isinstance(questions, list) or not all(isinstance(q, str) for q in questions): | |
| return JSONResponse(status_code=400, content={"error": "'questions' must be a list of strings"}) | |
| # Improved handling from your second version | |
| if isinstance(documents, list): | |
| document_url = documents[0] | |
| else: | |
| document_url = documents | |
| # ✅ Step 1: Process document (FIXED - using enhanced_system instead of hackathon_system) | |
| doc_result = enhanced_system.process_document_optimized(document_url) | |
| if not doc_result.get("success"): | |
| return JSONResponse(content={"error": doc_result.get("error")}, status_code=500) | |
| # ✅ Step 2: Answer questions (FIXED - using enhanced_system instead of hackathon_system) | |
| batch_result = enhanced_system.process_batch_queries_optimized(questions) | |
| answers = batch_result.get("answers", []) | |
| return JSONResponse(content={"answers": answers}, status_code=200) | |
| except Exception as e: | |
| logger.error(f"API Error: {str(e)}") | |
| return JSONResponse(content={"error": str(e)}, status_code=500) | |
| class DocumentChunk: | |
| """Document chunk structure with source tracking""" | |
| text: str | |
| section: str | |
| page: int | |
| chunk_id: int | |
| word_count: int | |
| has_numbers: bool | |
| has_dates: bool | |
| importance_score: float | |
| context_window: str = "" | |
| class EnhancedDocumentProcessor: | |
| """Enhanced document processor for single document processing""" | |
| def __init__(self): | |
| self.cache = {} | |
| self.max_cache_size = 5 | |
| def _get_cache_key(self, content: bytes) -> str: | |
| return hashlib.md5(content[:1000]).hexdigest() | |
| def extract_pdf_optimized(self, file_content: bytes, source_url: str = "") -> Dict[str, Any]: | |
| """Optimized PDF extraction with better text cleaning""" | |
| cache_key = self._get_cache_key(file_content) | |
| if cache_key in self.cache: | |
| return self.cache[cache_key].copy() | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content)) | |
| pages_content = [] | |
| all_text = "" | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text: | |
| cleaned_text = self._clean_text_comprehensive(page_text) | |
| if len(cleaned_text.strip()) > 30: | |
| pages_content.append({ | |
| 'page_num': page_num + 1, | |
| 'text': cleaned_text, | |
| 'word_count': len(cleaned_text.split()) | |
| }) | |
| all_text += " " + cleaned_text | |
| except Exception as e: | |
| logger.warning(f"Error extracting page {page_num}: {e}") | |
| continue | |
| result = { | |
| 'pages': pages_content, | |
| 'full_text': all_text.strip(), | |
| 'total_pages': len(pages_content), | |
| 'total_words': len(all_text.split()), | |
| 'source_url': source_url | |
| } | |
| # Cache management | |
| if len(self.cache) >= self.max_cache_size: | |
| self.cache.pop(next(iter(self.cache))) | |
| self.cache[cache_key] = result | |
| logger.info(f"PDF extracted: {len(pages_content)} pages, {len(all_text.split())} words") | |
| return result | |
| except Exception as e: | |
| logger.error(f"PDF extraction error: {e}") | |
| return {'pages': [], 'full_text': '', 'total_pages': 0, 'total_words': 0, 'source_url': source_url} | |
| def extract_docx_optimized(self, file_content: bytes, source_url: str = "") -> Dict[str, Any]: | |
| """Optimized DOCX extraction""" | |
| try: | |
| doc = docx.Document(io.BytesIO(file_content)) | |
| full_text = "" | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| cleaned_text = self._clean_text_comprehensive(para.text) | |
| if len(cleaned_text.strip()) > 10: | |
| paragraphs.append(cleaned_text) | |
| full_text += " " + cleaned_text | |
| result = { | |
| 'pages': [{'page_num': 1, 'text': full_text, 'word_count': len(full_text.split())}], | |
| 'full_text': full_text.strip(), | |
| 'total_pages': 1, | |
| 'total_words': len(full_text.split()), | |
| 'paragraphs': paragraphs, | |
| 'source_url': source_url | |
| } | |
| logger.info(f"DOCX extracted: {len(paragraphs)} paragraphs, {len(full_text.split())} words") | |
| return result | |
| except Exception as e: | |
| logger.error(f"DOCX extraction error: {e}") | |
| return {'pages': [], 'full_text': '', 'total_pages': 0, 'total_words': 0, 'source_url': source_url} | |
| def _clean_text_comprehensive(self, text: str) -> str: | |
| """Comprehensive text cleaning for better processing""" | |
| if not text: | |
| return "" | |
| # Basic cleaning - preserve more content | |
| text = re.sub(r'\s+', ' ', text.strip()) | |
| # Fix spacing around punctuation | |
| text = re.sub(r'\s+([.,:;!?])', r'\1', text) | |
| text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text) | |
| # Preserve insurance terminology | |
| text = re.sub(r'(\d+)\s*months?', r'\1 months', text, flags=re.IGNORECASE) | |
| text = re.sub(r'(\d+)\s*days?', r'\1 days', text, flags=re.IGNORECASE) | |
| text = re.sub(r'(\d+)\s*years?', r'\1 years', text, flags=re.IGNORECASE) | |
| # Fix common insurance terms | |
| text = re.sub(r'Rs\.?\s*(\d+)', r'Rs. \1', text, flags=re.IGNORECASE) | |
| text = re.sub(r'grace\s+period', 'grace period', text, flags=re.IGNORECASE) | |
| text = re.sub(r'waiting\s+period', 'waiting period', text, flags=re.IGNORECASE) | |
| return text.strip() | |
| class EnhancedChunker: | |
| """Enhanced chunking with better context preservation""" | |
| def __init__(self, chunk_size: int = 300, overlap: int = 75, min_chunk_size: int = 80): | |
| self.chunk_size = chunk_size | |
| self.overlap = overlap | |
| self.min_chunk_size = min_chunk_size | |
| def create_smart_chunks(self, structured_content: Dict[str, Any]) -> List[DocumentChunk]: | |
| """Create optimized chunks with better context preservation""" | |
| chunks = [] | |
| chunk_id = 0 | |
| full_text = structured_content.get('full_text', '') | |
| if not full_text: | |
| return chunks | |
| logger.info(f"Creating chunks from text of length: {len(full_text)}") | |
| # Split by sentences first for better coherence | |
| sentences = re.split(r'(?<=[.!?])\s+', full_text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| logger.info(f"Split into {len(sentences)} sentences") | |
| current_chunk = "" | |
| current_words = 0 | |
| for i, sentence in enumerate(sentences): | |
| sentence_words = len(sentence.split()) | |
| # If adding this sentence would exceed chunk size and we have content | |
| if current_words + sentence_words > self.chunk_size and current_chunk: | |
| if current_words >= self.min_chunk_size: | |
| chunk = self._create_chunk(current_chunk.strip(), chunk_id, 1, "Document") | |
| chunks.append(chunk) | |
| chunk_id += 1 | |
| # Start new chunk with overlap | |
| overlap_sentences = [] | |
| temp_words = 0 | |
| j = 0 | |
| while j < min(3, len(sentences) - i) and temp_words < self.overlap: | |
| if i - j - 1 >= 0: | |
| prev_sentence = sentences[i - j - 1] | |
| sentence_len = len(prev_sentence.split()) | |
| if temp_words + sentence_len <= self.overlap: | |
| overlap_sentences.insert(0, prev_sentence) | |
| temp_words += sentence_len | |
| j += 1 | |
| else: | |
| break | |
| current_chunk = " ".join(overlap_sentences) + " " + sentence if overlap_sentences else sentence | |
| current_words = len(current_chunk.split()) | |
| else: | |
| if current_chunk: | |
| current_chunk += " " + sentence | |
| else: | |
| current_chunk = sentence | |
| current_words += sentence_words | |
| # Add final chunk | |
| if current_chunk.strip() and current_words >= self.min_chunk_size: | |
| chunk = self._create_chunk(current_chunk.strip(), chunk_id, 1, "Document") | |
| chunks.append(chunk) | |
| logger.info(f"Created {len(chunks)} chunks") | |
| # If no chunks created, create one from full text | |
| if not chunks and full_text.strip(): | |
| chunk = self._create_chunk(full_text.strip(), 0, 1, "Document") | |
| chunks.append(chunk) | |
| logger.info("Created fallback chunk from full text") | |
| return chunks | |
| def _create_chunk(self, text: str, chunk_id: int, page_num: int, section: str) -> DocumentChunk: | |
| """Create a document chunk with enhanced metadata""" | |
| return DocumentChunk( | |
| text=text, | |
| section=section, | |
| page=page_num, | |
| chunk_id=chunk_id, | |
| word_count=len(text.split()), | |
| has_numbers=bool(re.search(r'\d', text)), | |
| has_dates=bool(re.search(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', text)), | |
| importance_score=self._calculate_importance(text) | |
| ) | |
| def _calculate_importance(self, text: str) -> float: | |
| """Calculate importance score for chunk""" | |
| score = 1.0 | |
| text_lower = text.lower() | |
| # Enhanced keyword matching for insurance documents | |
| high_value_terms = [ | |
| 'grace period', 'waiting period', 'premium payment', 'sum insured', | |
| 'coverage amount', 'maternity', 'co-payment', 'deductible', 'exclusion', | |
| 'benefit', 'claim', 'policy', 'thirty days', '30 days', 'months', 'years' | |
| ] | |
| insurance_terms = [ | |
| 'premium', 'coverage', 'policy', 'benefit', 'exclusion', 'inclusion', | |
| 'hospital', 'treatment', 'medical', 'health', 'cashless', 'reimbursement' | |
| ] | |
| # Calculate scores | |
| high_value_count = sum(1 for term in high_value_terms if term in text_lower) | |
| insurance_count = sum(1 for term in insurance_terms if term in text_lower) | |
| score += high_value_count * 0.5 | |
| score += insurance_count * 0.2 | |
| # Boost for numerical information | |
| if re.search(r'\d+\s*(days?|months?|years?)', text_lower): | |
| score += 0.4 | |
| if re.search(r'grace\s*period', text_lower): | |
| score += 0.6 | |
| if re.search(r'waiting\s*period', text_lower): | |
| score += 0.5 | |
| return min(score, 5.0) | |
| class DeploymentReadyQASystem: | |
| """Deployment-ready QA system using only CPU-friendly models""" | |
| def __init__(self): | |
| self.qa_pipeline = None | |
| self.tokenizer = None | |
| self.initialize_models() | |
| def initialize_models(self): | |
| """Initialize only lightweight, deployment-friendly models""" | |
| try: | |
| # Use the same model as the working system but with better configuration | |
| logger.info("Loading deployment-ready QA model...") | |
| self.qa_pipeline = pipeline( | |
| "question-answering", | |
| model="deepset/minilm-uncased-squad2", | |
| tokenizer="deepset/minilm-uncased-squad2", | |
| device=-1, # Force CPU | |
| framework="pt", | |
| max_answer_len=100, | |
| max_question_len=64, | |
| max_seq_len=384, | |
| doc_stride=128 | |
| ) | |
| self.tokenizer = self.qa_pipeline.tokenizer | |
| logger.info("QA model loaded successfully for deployment") | |
| except Exception as e: | |
| logger.error(f"Failed to load QA model: {e}") | |
| # Complete fallback - pattern-based only | |
| self.qa_pipeline = None | |
| self.tokenizer = None | |
| def generate_answer(self, question: str, context: str, top_chunks: List[DocumentChunk]) -> Dict[str, Any]: | |
| """Generate answer with comprehensive fallback strategies""" | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Processing question: {question[:50]}...") | |
| # Enhanced pattern-based extraction (primary method) | |
| direct_answer = self._extract_comprehensive_answer(question, context) | |
| if direct_answer and len(direct_answer.strip()) > 3: | |
| logger.info(f"Pattern-based answer: {direct_answer[:50]}...") | |
| return { | |
| 'answer': direct_answer, | |
| 'confidence': 0.95, | |
| 'reasoning': "Direct pattern extraction from document", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| # Try QA model if available and context is reasonable | |
| if self.qa_pipeline and len(context.strip()) > 10: | |
| try: | |
| # Limit context length for better performance | |
| limited_context = context[:2000] # Limit context | |
| limited_question = question[:100] # Limit question | |
| logger.info("Trying QA model...") | |
| result = self.qa_pipeline( | |
| question=limited_question, | |
| context=limited_context | |
| ) | |
| if result and result.get('answer') and result.get('score', 0) > 0.1: | |
| answer = result['answer'].strip() | |
| if len(answer) > 3 and not answer.lower().startswith('the answer is'): | |
| logger.info(f"QA model answer: {answer[:50]}...") | |
| return { | |
| 'answer': answer, | |
| 'confidence': min(0.9, result['score'] + 0.2), | |
| 'reasoning': f"QA model extraction (confidence: {result['score']:.2f})", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| except Exception as e: | |
| logger.warning(f"QA model failed: {e}") | |
| # Enhanced fuzzy matching | |
| fuzzy_answer = self._fuzzy_answer_extraction(question, context) | |
| if fuzzy_answer: | |
| logger.info(f"Fuzzy answer: {fuzzy_answer[:50]}...") | |
| return { | |
| 'answer': fuzzy_answer, | |
| 'confidence': 0.75, | |
| 'reasoning': "Fuzzy pattern matching", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| # Context search with better sentence selection | |
| context_answer = self._advanced_context_search(question, context) | |
| if context_answer: | |
| return { | |
| 'answer': context_answer, | |
| 'confidence': 0.6, | |
| 'reasoning': "Advanced context search", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| # Final fallback - best chunk content | |
| if top_chunks: | |
| best_chunk = max(top_chunks, key=lambda x: x.importance_score) | |
| sentences = re.split(r'[.!?]+', best_chunk.text) | |
| for sentence in sentences: | |
| if len(sentence.strip()) > 20 and any(word in sentence.lower() for word in question.lower().split()): | |
| return { | |
| 'answer': sentence.strip() + ".", | |
| 'confidence': 0.4, | |
| 'reasoning': "Best matching content from document", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| return { | |
| 'answer': "I could not find specific information about this in the document.", | |
| 'confidence': 0.0, | |
| 'reasoning': "No relevant information found", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| except Exception as e: | |
| logger.error(f"Answer generation error: {e}") | |
| return { | |
| 'answer': "There was an error processing your question. Please try rephrasing it.", | |
| 'confidence': 0.0, | |
| 'reasoning': f"Processing error: {str(e)}", | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': len(top_chunks) | |
| } | |
| def _extract_comprehensive_answer(self, question: str, context: str) -> Optional[str]: | |
| """Enhanced pattern-based extraction with more comprehensive patterns""" | |
| if not context or not question: | |
| return None | |
| question_lower = question.lower().strip() | |
| context_lower = context.lower() | |
| logger.info(f"Pattern extraction for: {question_lower}") | |
| # Grace period patterns - most comprehensive | |
| if any(term in question_lower for term in ['grace period', 'grace', 'premium payment delay']): | |
| grace_patterns = [ | |
| # Direct patterns | |
| r'grace period[^.]*?(\d+)\s*days?', | |
| r'(\d+)\s*days?[^.]*?grace period', | |
| r'grace period[^.]*?thirty\s*\(?30\)?\s*days?', | |
| r'thirty\s*\(?30\)?\s*days?[^.]*?grace', | |
| # Premium-related patterns | |
| r'premium.*?(\d+)\s*days?.*?grace', | |
| r'premium.*?grace.*?(\d+)\s*days?', | |
| r'payment.*?grace.*?(\d+)\s*days?', | |
| # More flexible patterns | |
| r'(\d+)\s*days?.*?premium.*?payment', | |
| r'pay.*?within.*?(\d+)\s*days?', | |
| r'(\d+)\s*days?.*?after.*?due', | |
| ] | |
| for pattern in grace_patterns: | |
| matches = re.finditer(pattern, context_lower, re.IGNORECASE) | |
| for match in matches: | |
| groups = match.groups() | |
| for group in groups: | |
| if group and (group.isdigit() or group in ['thirty', 'fifteen']): | |
| number = group if group.isdigit() else ('30' if group == 'thirty' else '15') | |
| return f"The grace period for premium payment is {number} days." | |
| # Special case for "thirty days" without number | |
| if 'thirty' in context_lower and 'days' in context_lower: | |
| return "The grace period for premium payment is 30 days." | |
| # Waiting period patterns | |
| if any(term in question_lower for term in ['waiting period', 'waiting', 'wait']): | |
| waiting_patterns = [ | |
| r'waiting period[^.]*?(\d+)\s*(days?|months?|years?)', | |
| r'(\d+)\s*(months?|years?)[^.]*?waiting period', | |
| r'wait[^.]*?(\d+)\s*(months?|years?)', | |
| r'(\d+)\s*(months?|years?)[^.]*?wait', | |
| r'coverage.*?after.*?(\d+)\s*(months?|years?)', | |
| r'(\d+)\s*(months?|years?).*?before.*?cover', | |
| ] | |
| for pattern in waiting_patterns: | |
| matches = re.finditer(pattern, context_lower, re.IGNORECASE) | |
| for match in matches: | |
| if len(match.groups()) >= 2: | |
| number = match.group(1) | |
| unit = match.group(2) | |
| if number and number.isdigit(): | |
| return f"The waiting period is {number} {unit}." | |
| # Maternity coverage | |
| if 'maternity' in question_lower: | |
| maternity_context = self._extract_sentence_with_term(context, 'maternity') | |
| if maternity_context: | |
| if any(word in maternity_context.lower() for word in ['covered', 'included', 'benefit', 'eligible']): | |
| return "Yes, maternity benefits are covered under this policy." | |
| elif any(word in maternity_context.lower() for word in ['excluded', 'not covered', 'not eligible']): | |
| return "No, maternity benefits are not covered under this policy." | |
| # Coverage/benefit questions | |
| if any(word in question_lower for word in ['covered', 'cover', 'include', 'benefit']): | |
| # Extract the main subject from question | |
| question_terms = re.findall(r'\b\w{4,}\b', question_lower) | |
| for term in question_terms: | |
| if term not in ['what', 'does', 'this', 'policy', 'cover', 'include', 'benefit']: | |
| sentence = self._extract_sentence_with_term(context, term) | |
| if sentence: | |
| if any(word in sentence.lower() for word in ['covered', 'included', 'benefit']): | |
| return f"Yes, {term} is covered under this policy." | |
| elif any(word in sentence.lower() for word in ['excluded', 'not covered']): | |
| return f"No, {term} is not covered under this policy." | |
| return None | |
| def _extract_sentence_with_term(self, context: str, term: str) -> Optional[str]: | |
| """Extract sentence containing specific term""" | |
| sentences = re.split(r'[.!?]+', context) | |
| for sentence in sentences: | |
| if term.lower() in sentence.lower() and len(sentence.strip()) > 20: | |
| return sentence.strip() | |
| return None | |
| def _fuzzy_answer_extraction(self, question: str, context: str) -> Optional[str]: | |
| """Enhanced fuzzy matching with better accuracy""" | |
| question_lower = question.lower() | |
| context_lower = context.lower() | |
| # Grace period fuzzy matching with better accuracy | |
| if any(word in question_lower for word in ['grace', 'payment delay', 'premium due']): | |
| # Look for number + days combination | |
| day_patterns = [ | |
| r'(\d+)\s*days?', | |
| r'thirty\s*days?', | |
| r'fifteen\s*days?' | |
| ] | |
| for pattern in day_patterns: | |
| matches = re.finditer(pattern, context_lower) | |
| for match in matches: | |
| # Check context around the match | |
| start = max(0, match.start() - 50) | |
| end = min(len(context_lower), match.end() + 50) | |
| surrounding = context_lower[start:end] | |
| if any(word in surrounding for word in ['grace', 'premium', 'payment', 'due']): | |
| if match.group(1) and match.group(1).isdigit(): | |
| return f"The grace period is {match.group(1)} days." | |
| elif 'thirty' in match.group(0): | |
| return "The grace period is 30 days." | |
| elif 'fifteen' in match.group(0): | |
| return "The grace period is 15 days." | |
| # Yes/No questions with better context | |
| if question_lower.startswith(('is', 'does', 'are', 'will')): | |
| # Extract key terms from question | |
| question_words = set(re.findall(r'\b\w{4,}\b', question_lower)) | |
| question_words.discard('this') | |
| question_words.discard('policy') | |
| question_words.discard('coverage') | |
| # Find sentences with these terms | |
| sentences = re.split(r'[.!?]+', context) | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| sentence_words = set(re.findall(r'\b\w{4,}\b', sentence_lower)) | |
| # Check overlap | |
| overlap = question_words.intersection(sentence_words) | |
| if len(overlap) >= 1: # At least one significant word overlap | |
| if any(word in sentence_lower for word in ['yes', 'covered', 'included', 'eligible', 'benefit']): | |
| return "Yes, this is covered under the policy." | |
| elif any(word in sentence_lower for word in ['no', 'not covered', 'excluded', 'not eligible']): | |
| return "No, this is not covered under the policy." | |
| return None | |
| def _advanced_context_search(self, question: str, context: str) -> Optional[str]: | |
| """Advanced context search with better sentence ranking""" | |
| if not context or not question: | |
| return None | |
| question_lower = question.lower() | |
| context_sentences = [s.strip() for s in re.split(r'[.!?]+', context) if len(s.strip()) > 15] | |
| # Extract meaningful keywords from question | |
| question_keywords = set() | |
| words = re.findall(r'\b\w+\b', question_lower) | |
| stop_words = {'what', 'is', 'the', 'are', 'does', 'do', 'how', 'when', 'where', 'why', 'which', 'who', 'a', 'an', 'for', 'under', 'this'} | |
| for word in words: | |
| if len(word) > 2 and word not in stop_words: | |
| question_keywords.add(word) | |
| if not question_keywords: | |
| return None | |
| # Score sentences | |
| scored_sentences = [] | |
| for sentence in context_sentences: | |
| sentence_lower = sentence.lower() | |
| sentence_words = set(re.findall(r'\b\w+\b', sentence_lower)) | |
| # Calculate overlap score | |
| overlap = question_keywords.intersection(sentence_words) | |
| score = len(overlap) | |
| # Bonus for specific patterns | |
| if re.search(r'\d+\s*(days?|months?|years?)', sentence_lower): | |
| score += 2 | |
| if any(term in sentence_lower for term in ['grace period', 'waiting period', 'coverage', 'benefit']): | |
| score += 1.5 | |
| if any(term in sentence_lower for term in ['premium', 'policy', 'insurance']): | |
| score += 0.5 | |
| if score > 0: | |
| scored_sentences.append((score, sentence)) | |
| # Return best sentence if good enough | |
| if scored_sentences: | |
| scored_sentences.sort(key=lambda x: x[0], reverse=True) | |
| best_score, best_sentence = scored_sentences[0] | |
| if best_score >= 2: # Require at least 2 points | |
| # Clean up the sentence | |
| cleaned = best_sentence.strip() | |
| if not cleaned.endswith('.'): | |
| cleaned += '.' | |
| return cleaned | |
| return None | |
| class EnhancedSingleDocumentSystem: | |
| """Enhanced system optimized for deployment""" | |
| def __init__(self): | |
| self.doc_processor = EnhancedDocumentProcessor() | |
| self.chunker = EnhancedChunker() | |
| self.qa_system = DeploymentReadyQASystem() | |
| self.embedding_model = None | |
| self.index = None | |
| self.document_chunks = [] | |
| self.chunk_embeddings = None | |
| self.document_processed = False | |
| self.initialize_embeddings() | |
| def initialize_embeddings(self): | |
| """Initialize embedding model with better error handling""" | |
| try: | |
| # Use the most reliable embedding model | |
| self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.embedding_model.max_seq_length = 256 | |
| logger.info("Embedding model loaded: all-MiniLM-L6-v2") | |
| except Exception as e: | |
| logger.error(f"Embedding model error: {e}") | |
| try: | |
| # Even smaller fallback | |
| self.embedding_model = SentenceTransformer('paraphrase-MiniLM-L3-v2') | |
| logger.info("Loaded smaller embedding model") | |
| except Exception as e2: | |
| logger.error(f"All embedding models failed: {e2}") | |
| raise RuntimeError(f"No embedding model could be loaded: {str(e2)}") | |
| def process_document_optimized(self, url: str) -> Dict[str, Any]: | |
| """Process single document with better error handling""" | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Processing document: {url}") | |
| # Download document with better error handling | |
| response = self._download_with_retry(url) | |
| if not response: | |
| return {'success': False, 'error': f'Failed to download document from {url}'} | |
| logger.info(f"Downloaded document, size: {len(response.content)} bytes") | |
| # Determine document type and extract | |
| content_type = response.headers.get('content-type', '').lower() | |
| logger.info(f"Content type: {content_type}") | |
| if 'pdf' in content_type or url.lower().endswith('.pdf'): | |
| structured_content = self.doc_processor.extract_pdf_optimized(response.content, url) | |
| elif 'docx' in content_type or url.lower().endswith('.docx'): | |
| structured_content = self.doc_processor.extract_docx_optimized(response.content, url) | |
| else: | |
| # Try to handle as text | |
| try: | |
| text_content = response.content.decode('utf-8', errors='ignore') | |
| structured_content = { | |
| 'pages': [{'page_num': 1, 'text': text_content, 'word_count': len(text_content.split())}], | |
| 'full_text': text_content, | |
| 'total_pages': 1, | |
| 'total_words': len(text_content.split()), | |
| 'source_url': url | |
| } | |
| logger.info("Processed as text document") | |
| except Exception as e: | |
| return {'success': False, 'error': f'Unsupported document type or encoding error: {str(e)}'} | |
| full_text = structured_content.get('full_text', '') | |
| logger.info(f"Extracted text length: {len(full_text)}") | |
| if not full_text or len(full_text.strip()) < 50: | |
| return {'success': False, 'error': 'No meaningful text content could be extracted from the document'} | |
| # Create optimized chunks | |
| self.document_chunks = self.chunker.create_smart_chunks(structured_content) | |
| if not self.document_chunks: | |
| return {'success': False, 'error': 'No meaningful content chunks could be created from the document'} | |
| # Create embeddings for chunks | |
| chunk_texts = [chunk.text for chunk in self.document_chunks] | |
| try: | |
| logger.info("Creating embeddings...") | |
| self.chunk_embeddings = self.embedding_model.encode( | |
| chunk_texts, | |
| batch_size=4, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| # Create FAISS index | |
| dimension = self.chunk_embeddings.shape[1] | |
| self.index = faiss.IndexFlatIP(dimension) | |
| self.index.add(self.chunk_embeddings.astype('float32')) | |
| logger.info(f"Created FAISS index with {len(self.document_chunks)} chunks") | |
| except Exception as e: | |
| logger.error(f"Embedding creation failed: {e}") | |
| return {'success': False, 'error': f'Embedding creation failed: {str(e)}'} | |
| self.document_processed = True | |
| processing_time = time.time() - start_time | |
| logger.info(f"Document processed successfully: {len(self.document_chunks)} chunks in {processing_time:.2f}s") | |
| return { | |
| 'success': True, | |
| 'total_chunks': len(self.document_chunks), | |
| 'total_words': structured_content.get('total_words', 0), | |
| 'total_pages': structured_content.get('total_pages', 0), | |
| 'processing_time': processing_time | |
| } | |
| except Exception as e: | |
| logger.error(f"Document processing error: {e}") | |
| return {'success': False, 'error': str(e)} | |
| def _download_with_retry(self, url: str, max_retries: int = 3) -> Optional[requests.Response]: | |
| """Download document with retry logic""" | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"Download attempt {attempt + 1} for {url}") | |
| response = requests.get(url, headers=headers, timeout=30, stream=True) | |
| response.raise_for_status() | |
| return response | |
| except Exception as e: | |
| logger.warning(f"Download attempt {attempt + 1} failed for {url}: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| return None | |
| def semantic_search_optimized(self, query: str, top_k: int = 8) -> List[DocumentChunk]: | |
| """Enhanced semantic search with better relevance scoring""" | |
| if not self.index or not self.document_chunks or not self.document_processed: | |
| logger.warning("Document not processed or index not available") | |
| return [] | |
| try: | |
| logger.info(f"Searching for: {query}") | |
| # Create query embedding | |
| query_embedding = self.embedding_model.encode([query], normalize_embeddings=True) | |
| # Search for candidates | |
| search_k = min(top_k * 2, len(self.document_chunks)) | |
| scores, indices = self.index.search(query_embedding.astype('float32'), search_k) | |
| # Enhanced scoring with keyword matching | |
| query_lower = query.lower() | |
| boosted_results = [] | |
| query_keywords = self._extract_query_keywords(query_lower) | |
| logger.info(f"Query keywords: {query_keywords}") | |
| for score, idx in zip(scores[0], indices[0]): | |
| if 0 <= idx < len(self.document_chunks): | |
| chunk = self.document_chunks[idx] | |
| chunk_text_lower = chunk.text.lower() | |
| # Base semantic score | |
| boosted_score = float(score) | |
| # Keyword matching boost | |
| keyword_matches = sum(1 for keyword in query_keywords if keyword in chunk_text_lower) | |
| boosted_score += keyword_matches * 0.3 | |
| # Importance score boost | |
| boosted_score += chunk.importance_score * 0.1 | |
| # Exact phrase matching boost | |
| if 'grace period' in query_lower and 'grace period' in chunk_text_lower: | |
| boosted_score += 0.5 | |
| if 'waiting period' in query_lower and 'waiting period' in chunk_text_lower: | |
| boosted_score += 0.5 | |
| # Number/percentage matching boost | |
| query_numbers = re.findall(r'\d+', query_lower) | |
| chunk_numbers = re.findall(r'\d+', chunk_text_lower) | |
| number_matches = len(set(query_numbers).intersection(set(chunk_numbers))) | |
| boosted_score += number_matches * 0.2 | |
| logger.info(f"Chunk {idx}: base_score={score:.3f}, boosted={boosted_score:.3f}, keywords={keyword_matches}") | |
| boosted_results.append((boosted_score, idx, chunk)) | |
| # Sort by boosted score | |
| boosted_results.sort(key=lambda x: x[0], reverse=True) | |
| # Select top results | |
| top_chunks = [] | |
| for score, idx, chunk in boosted_results[:top_k]: | |
| logger.info(f"Selected chunk {idx}: score={score:.3f}, text preview: {chunk.text[:100]}...") | |
| top_chunks.append(chunk) | |
| return top_chunks | |
| except Exception as e: | |
| logger.error(f"Semantic search error: {e}") | |
| return [] | |
| def _extract_query_keywords(self, query_lower: str) -> List[str]: | |
| """Extract relevant keywords from query for boosting""" | |
| stop_words = {'what', 'is', 'are', 'the', 'a', 'an', 'how', 'when', 'where', 'why', 'which', 'who', 'for', 'under'} | |
| words = re.findall(r'\b\w+\b', query_lower) | |
| keywords = [word for word in words if word not in stop_words and len(word) > 2] | |
| # Add compound terms | |
| compound_terms = [] | |
| if 'grace' in keywords and 'period' in keywords: | |
| compound_terms.append('grace period') | |
| if 'waiting' in keywords and 'period' in keywords: | |
| compound_terms.append('waiting period') | |
| if 'premium' in keywords and 'payment' in keywords: | |
| compound_terms.append('premium payment') | |
| if 'sum' in keywords and 'insured' in keywords: | |
| compound_terms.append('sum insured') | |
| return keywords + compound_terms | |
| def _build_optimized_context(self, question: str, chunks: List[DocumentChunk], max_length: int = 1500) -> str: | |
| """Build optimized context from top chunks""" | |
| if not chunks: | |
| return "" | |
| context_parts = [] | |
| current_length = 0 | |
| # Prioritize chunks with higher importance scores | |
| sorted_chunks = sorted(chunks, key=lambda x: x.importance_score, reverse=True) | |
| for chunk in sorted_chunks: | |
| chunk_text = chunk.text | |
| chunk_length = len(chunk_text) | |
| if current_length + chunk_length <= max_length: | |
| context_parts.append(chunk_text) | |
| current_length += chunk_length | |
| else: | |
| # Add partial chunk if there's meaningful space left | |
| remaining_space = max_length - current_length | |
| if remaining_space > 100: | |
| truncated = chunk_text[:remaining_space-3] + "..." | |
| context_parts.append(truncated) | |
| break | |
| context = " ".join(context_parts) | |
| logger.info(f"Built context of length: {len(context)}") | |
| return context | |
| def process_single_query_optimized(self, question: str) -> Dict[str, Any]: | |
| """Process single query with enhanced accuracy""" | |
| if not self.document_processed or not self.index or not self.document_chunks: | |
| return { | |
| 'answer': 'No document has been processed yet. Please upload a document first.', | |
| 'confidence': 0.0, | |
| 'reasoning': 'System requires document processing before answering queries.', | |
| 'processing_time': 0, | |
| 'source_chunks': 0 | |
| } | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Processing query: {question}") | |
| # Get relevant chunks | |
| top_chunks = self.semantic_search_optimized(question, top_k=6) | |
| if not top_chunks: | |
| logger.warning("No relevant chunks found") | |
| return { | |
| 'answer': 'No relevant information found in the document for this question.', | |
| 'confidence': 0.0, | |
| 'reasoning': 'No semantically similar content found.', | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': 0 | |
| } | |
| # Build comprehensive context | |
| context = self._build_optimized_context(question, top_chunks) | |
| logger.info(f"Context preview: {context[:200]}...") | |
| # Generate answer | |
| result = self.qa_system.generate_answer(question, context, top_chunks) | |
| logger.info(f"Generated answer: {result['answer']}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Query processing error: {e}") | |
| return { | |
| 'answer': f'Error processing question: {str(e)}', | |
| 'confidence': 0.0, | |
| 'reasoning': f'Processing error occurred: {str(e)}', | |
| 'processing_time': time.time() - start_time, | |
| 'source_chunks': 0 | |
| } | |
| def process_batch_queries_optimized(self, questions: List[str]) -> Dict[str, Any]: | |
| """Process multiple questions efficiently""" | |
| start_time = time.time() | |
| answers = [] | |
| if not self.document_processed: | |
| return { | |
| 'answers': ['No document has been processed yet. Please upload a document first.'] * len(questions), | |
| 'processing_time': time.time() - start_time | |
| } | |
| for i, question in enumerate(questions): | |
| logger.info(f"Processing question {i+1}/{len(questions)}: {question}") | |
| result = self.process_single_query_optimized(question) | |
| answers.append(result['answer']) | |
| total_time = time.time() - start_time | |
| logger.info(f"Batch processing completed: {len(questions)} questions in {total_time:.2f}s") | |
| return { | |
| 'answers': answers, | |
| 'processing_time': total_time | |
| } | |
| # Initialize the enhanced system | |
| enhanced_system = EnhancedSingleDocumentSystem() | |
| def process_hackathon_submission(url_text, questions_text): | |
| """Process hackathon submission - deployment ready""" | |
| if not url_text or not questions_text: | |
| return "Please provide both document URL and questions." | |
| try: | |
| # Parse URL (single document) | |
| url = url_text.strip() | |
| if url.startswith('[') and url.endswith(']'): | |
| urls = json.loads(url) | |
| url = urls[0] if urls else "" | |
| if not url: | |
| return "No valid URL found. Please provide a document URL." | |
| # Parse questions | |
| if questions_text.strip().startswith('[') and questions_text.strip().endswith(']'): | |
| questions = json.loads(questions_text) | |
| else: | |
| questions = [q.strip() for q in questions_text.split('\n') if q.strip()] | |
| if not questions: | |
| return "No valid questions found. Please provide questions as JSON array or one per line." | |
| logger.info(f"Processing URL: {url}") | |
| logger.info(f"Processing questions: {questions}") | |
| # Process document | |
| doc_result = enhanced_system.process_document_optimized(url) | |
| if not doc_result.get("success"): | |
| error_msg = f"Document processing failed: {doc_result.get('error')}" | |
| logger.error(error_msg) | |
| return json.dumps({"error": error_msg}, indent=2) | |
| logger.info("Document processed successfully") | |
| # Process questions | |
| batch_result = enhanced_system.process_batch_queries_optimized(questions) | |
| # Format response for hackathon | |
| hackathon_response = { | |
| "answers": batch_result['answers'] | |
| } | |
| return json.dumps(hackathon_response, indent=2) | |
| except json.JSONDecodeError as e: | |
| return f"JSON parsing error: {str(e)}. Please provide valid JSON or line-separated input." | |
| except Exception as e: | |
| logger.error(f"Hackathon submission error: {e}") | |
| return json.dumps({"error": f"Error processing submission: {str(e)}"}, indent=2) | |
| def process_single_question(url_text, question): | |
| """Process single question with detailed response""" | |
| if not url_text or not question: | |
| return "Please provide both document URL and question." | |
| try: | |
| url = url_text.strip() | |
| if not url: | |
| return "No valid URL found. Please provide a document URL." | |
| logger.info(f"Processing single question - URL: {url}, Question: {question}") | |
| # Process document | |
| doc_result = enhanced_system.process_document_optimized(url) | |
| if not doc_result.get("success"): | |
| error_msg = f"Document processing failed: {doc_result.get('error')}" | |
| logger.error(error_msg) | |
| return error_msg | |
| # Process single question | |
| result = enhanced_system.process_single_query_optimized(question) | |
| # Format detailed response | |
| detailed_response = { | |
| "question": question, | |
| "answer": result['answer'], | |
| "confidence": result['confidence'], | |
| "reasoning": result['reasoning'], | |
| "metadata": { | |
| "processing_time": f"{result['processing_time']:.2f}s", | |
| "source_chunks": result['source_chunks'], | |
| "total_chunks": doc_result.get('total_chunks', 0), | |
| "document_pages": doc_result.get('total_pages', 0), | |
| "document_words": doc_result.get('total_words', 0) | |
| } | |
| } | |
| return json.dumps(detailed_response, indent=2) | |
| except Exception as e: | |
| logger.error(f"Single question processing error: {e}") | |
| return f"Error processing question: {str(e)}" | |
| # Wrapper functions for Gradio | |
| def hackathon_wrapper(url_text, questions_text): | |
| return process_hackathon_submission(url_text, questions_text) | |
| def single_query_wrapper(url_text, question): | |
| return process_single_question(url_text, question) | |
| # Create Gradio Interface with simpler theme | |
| with gr.Blocks( | |
| theme=gr.themes.Default(), # Use default theme for better compatibility | |
| title="Enhanced Document QA System" | |
| ) as demo: | |
| gr.Markdown(""" | |
| # 🎯 Enhanced Single Document QA System | |
| **Deployment-Ready Insurance Document Analysis** | |
| This system processes PDF and DOCX documents to answer questions accurately. | |
| """) | |
| with gr.Tab("🚀 Hackathon Mode"): | |
| gr.Markdown("### Process multiple questions in hackathon format") | |
| with gr.Row(): | |
| with gr.Column(): | |
| hack_url = gr.Textbox( | |
| label="📄 Document URL", | |
| placeholder="https://example.com/insurance-policy.pdf", | |
| lines=2 | |
| ) | |
| hack_questions = gr.Textbox( | |
| label="❓ Questions (JSON format)", | |
| placeholder='["What is the grace period?", "Is maternity covered?"]', | |
| lines=8 | |
| ) | |
| hack_submit_btn = gr.Button("🚀 Process Questions", variant="primary", size="lg") | |
| with gr.Column(): | |
| hack_output = gr.Textbox( | |
| label="📊 Results", | |
| lines=20, | |
| interactive=False | |
| ) | |
| hack_submit_btn.click( | |
| fn=hackathon_wrapper, | |
| inputs=[hack_url, hack_questions], | |
| outputs=[hack_output] | |
| ) | |
| with gr.Tab("🔍 Single Query"): | |
| gr.Markdown("### Ask detailed questions about the document") | |
| with gr.Row(): | |
| with gr.Column(): | |
| single_url = gr.Textbox( | |
| label="📄 Document URL", | |
| placeholder="https://example.com/insurance-policy.pdf", | |
| lines=2 | |
| ) | |
| single_question = gr.Textbox( | |
| label="❓ Your Question", | |
| placeholder="What is the grace period for premium payment?", | |
| lines=3 | |
| ) | |
| single_submit_btn = gr.Button("🔍 Get Answer", variant="primary", size="lg") | |
| with gr.Column(): | |
| single_output = gr.Textbox( | |
| label="📋 Detailed Response", | |
| lines=20, | |
| interactive=False | |
| ) | |
| single_submit_btn.click( | |
| fn=single_query_wrapper, | |
| inputs=[single_url, single_question], | |
| outputs=[single_output] | |
| ) | |
| gradio_app = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| gradio_app, | |
| host="0.0.0.0", | |
| port=7860 | |
| ) |