hackrx6.0 / app /main_api.py
rohannsinghal's picture
few changes
c4c5f40
# --- KAGGLE-POWERED RAG SYSTEM - COMPLETE 1144+ LINES WITH DEADLOCK FIX ---
#final version
import os
import json
import uuid
import time
import re
import asyncio
import logging
import hashlib
import httpx
from typing import List, Dict, Any, Optional
from collections import defaultdict
from itertools import cycle
from pathlib import Path
import functools
import threading
import concurrent.futures
# FastAPI and core dependencies
from fastapi import FastAPI, Body, HTTPException, Request, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# LangChain imports
from langchain_community.vectorstores import Chroma
# Multi-format document processing
import fitz # PyMuPDF
import pdfplumber
import docx
import openpyxl
import csv
import zipfile
import email
from email.policy import default
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
# LLM providers
import groq
import openai
import google.generativeai as genai
import cachetools
from dotenv import load_dotenv
# Setup
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Kaggle-Powered Hackathon RAG", version="5.4.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*", "ngrok-skip-browser-warning"],
)
# --- CRITICAL FIX: LAZY KAGGLE MODEL CLIENT ---
class LazyKaggleModelClient:
"""LAZY INITIALIZATION: Only connects when actually needed - PREVENTS 'Preparing Space' ISSUE"""
def __init__(self):
self._client = None
self._endpoint = None
self._initialized = False
logger.info("🎯 Lazy Kaggle Model Client created (no immediate connection)")
def _initialize_if_needed(self):
"""Initialize client only when first API call is made"""
if not self._initialized:
# Get endpoint from Hugging Face Secrets (or fallback to env var)
self._endpoint = os.getenv("KAGGLE_NGROK_URL") or os.getenv("KAGGLE_ENDPOINT", "")
if not self._endpoint:
logger.error("❌ No KAGGLE_NGROK_URL found in secrets or environment!")
raise Exception("Kaggle endpoint not configured")
self._endpoint = self._endpoint.rstrip('/')
self._client = httpx.AsyncClient(
timeout=30.0,
headers={"ngrok-skip-browser-warning": "true"}
)
self._initialized = True
logger.info(f"🎯 Lazy Kaggle client initialized: {self._endpoint}")
async def health_check(self) -> bool:
"""Check if Kaggle model server is healthy"""
try:
self._initialize_if_needed()
response = await self._client.get(f"{self._endpoint}/health")
return response.status_code == 200
except Exception as e:
logger.error(f"Kaggle health check failed: {e}")
return False
async def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using Kaggle GPU"""
try:
self._initialize_if_needed()
response = await self._client.post(
f"{self._endpoint}/embed",
json={"texts": texts}
)
response.raise_for_status()
result = response.json()
logger.info(f"🎯 Kaggle embeddings: {result.get('count', 0)} texts in {result.get('processing_time', 0):.2f}s")
return result["embeddings"]
except Exception as e:
logger.error(f"Kaggle embedding error: {e}")
return []
async def rerank_documents(self, query: str, documents: List[str], k: int = 8) -> List[str]:
"""Rerank documents using Kaggle GPU"""
try:
self._initialize_if_needed()
response = await self._client.post(
f"{self._endpoint}/rerank",
json={
"query": query,
"documents": documents,
"k": k
}
)
response.raise_for_status()
result = response.json()
logger.info(f"🎯 Kaggle reranking: {k} docs in {result.get('processing_time', 0):.2f}s")
return result["reranked_documents"]
except Exception as e:
logger.error(f"Kaggle reranking error: {e}")
return documents[:k]
# --- LIGHTWEIGHT QUERY PROCESSOR (YOUR COMPLETE ORIGINAL) ---
class LightweightQueryProcessor:
def __init__(self, kaggle_client: LazyKaggleModelClient):
self.kaggle_client = kaggle_client
self.cache = cachetools.TTLCache(maxsize=500, ttl=3600)
async def enhance_query_semantically(self, question: str, domain: str = "insurance") -> str:
"""OPTIMIZED semantic query processing"""
# Quick cache check with shorter hash
cache_key = hashlib.md5(question.encode()).hexdigest()[:8]
if cache_key in self.cache:
return self.cache[cache_key]
# Streamlined domain expansion
enhanced_query = self._expand_with_domain_knowledge_fast(question, domain)
enhanced_query = self._handle_incomplete_questions(enhanced_query)
# Cache result
self.cache[cache_key] = enhanced_query
return enhanced_query
def _expand_with_domain_knowledge_fast(self, query: str, domain: str) -> str:
"""OPTIMIZED domain expansion - same intelligence, faster processing"""
# Streamlined expansion mapping for speed
key_expansions = {
'grace period': 'payment deadline premium due',
'waiting period': 'exclusion time coverage delay',
'pre-existing': 'prior medical condition',
'coverage': 'policy benefits protection',
'exclusion': 'limitations restrictions',
'premium': 'insurance cost payment',
'claim': 'benefit request reimbursement',
'ayush': 'alternative medicine treatment',
'hospital': 'healthcare facility medical center'
}
query_lower = query.lower()
for key_term, expansion in key_expansions.items():
if key_term in query_lower:
return f"{query}. Also: {expansion}"
return query
def _handle_incomplete_questions(self, query: str) -> str:
"""Handle R4's 'half questions' requirement"""
incomplete_patterns = [
r'^(what|how|when|where|why)\s*\?*$',
r'^(yes|no)\s*\?*$',
r'^\w{1,3}\s*\?*$',
r'^(this|that|it)\s*',
]
query_lower = query.lower()
is_incomplete = any(re.search(pattern, query_lower) for pattern in incomplete_patterns)
if is_incomplete and len(query.split()) <= 2:
return f"{query}. Please provide information about insurance policy terms, coverage, exclusions, waiting periods, or benefits."
return query
# --- ANTI-JAILBREAK SECURITY SYSTEM (YOUR COMPLETE ORIGINAL) ---
class SecurityGuard:
def __init__(self):
self.jailbreak_patterns = [
r'ignore.*previous.*instructions',
r'act.*as.*different.*character',
r'generate.*code.*(?:javascript|python|html)',
r'write.*program',
r'roleplay.*as',
r'pretend.*you.*are',
r'system.*prompt',
r'override.*settings',
r'bypass.*restrictions',
r'admin.*mode',
r'developer.*mode',
r'tell.*me.*about.*yourself',
r'what.*are.*you',
r'who.*created.*you'
]
def detect_jailbreak(self, text: str) -> bool:
"""Detect jailbreak attempts"""
text_lower = text.lower()
return any(re.search(pattern, text_lower) for pattern in self.jailbreak_patterns)
def sanitize_response(self, question: str, answer: str) -> str:
"""Sanitize responses against jailbreaks"""
if self.detect_jailbreak(question):
return "I can only provide information based on the document content provided. Please ask questions about the document."
# Remove any potential code or script tags
answer = re.sub(r'<script.*?</script>', '', answer, flags=re.DOTALL | re.IGNORECASE)
answer = re.sub(r'<.*?>', '', answer) # Remove HTML tags
return answer
# --- MULTI-LLM MANAGER (YOUR COMPLETE ORIGINAL WITH ALL PROVIDERS) ---
class MultiLLMManager:
def __init__(self):
# Initialize multiple LLM providers with fallback
self.providers = ['groq'] # Start with Groq as primary
self.groq_keys = cycle([k.strip() for k in os.getenv("GROQ_API_KEYS", "").split(',') if k.strip()])
# Optional paid providers (if keys available)
openai_keys = [k.strip() for k in os.getenv("OPENAI_API_KEYS", "").split(',') if k.strip()]
gemini_keys = [k.strip() for k in os.getenv("GEMINI_API_KEYS", "").split(',') if k.strip()]
if openai_keys:
self.providers.append('openai')
self.openai_keys = cycle(openai_keys)
if gemini_keys:
self.providers.append('gemini')
self.gemini_keys = cycle(gemini_keys)
self.current_provider_index = 0
logger.info(f"πŸ”‘ Multi-LLM Manager initialized with {len(self.providers)} providers")
async def get_response(self, prompt: str, max_tokens: int = 900) -> str:
"""Get response with automatic fallback between providers"""
for attempt in range(len(self.providers)):
try:
provider = self.providers[self.current_provider_index]
if provider == 'groq':
return await self._groq_response(prompt, max_tokens)
elif provider == 'openai':
return await self._openai_response(prompt, max_tokens)
elif provider == 'gemini':
return await self._gemini_response(prompt, max_tokens)
except Exception as e:
logger.warning(f"{provider} failed: {e}")
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
continue
return "Error: All LLM providers failed"
async def _groq_response(self, prompt: str, max_tokens: int) -> str:
key = next(self.groq_keys)
# --- THE FINAL FIX: BYPASS GROQ PROXY BUG ---
# Create our own clean HTTP client and pass it to Groq
# This bypasses the internal bug that incorrectly handles proxies in HF Spaces
try:
# Create a clean HTTP client without proxy issues
clean_http_client = httpx.Client(
timeout=30.0,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
client = groq.Groq(
api_key=key,
http_client=clean_http_client # <-- This bypasses the proxy bug
)
response = client.chat.completions.create(
model="llama-3.3-70b-versatile", # Updated to latest model
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=max_tokens,
top_p=0.9
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Groq response error: {e}")
raise e # Let the parent handle fallback
async def _openai_response(self, prompt: str, max_tokens: int) -> str:
key = next(self.openai_keys)
openai.api_key = key
response = await openai.ChatCompletion.acreate(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=max_tokens
)
return response.choices[0].message.content.strip()
async def _gemini_response(self, prompt: str, max_tokens: int) -> str:
key = next(self.gemini_keys)
genai.configure(api_key=key)
model = genai.GenerativeModel('gemini-pro')
response = await model.generate_content_async(prompt)
return response.text.strip()
# --- COMPLETE UNIVERSAL DOCUMENT PROCESSOR (ALL YOUR ORIGINAL FEATURES) ---
class UniversalDocumentProcessor:
def __init__(self):
# SPEED OPTIMIZATIONS: Reduced limits
self.chunk_size = 1000 # Reduced from 1200
self.chunk_overlap = 200
self.max_chunks = 200 # Kept at 200 (good balance)
self.max_pages = 18 # Reduced from 25
# Smaller cache for speed
self.cache = cachetools.TTLCache(maxsize=50, ttl=1800)
# Supported formats (KEEPING all your excellent processors)
self.processors = {
'.pdf': self.process_pdf,
'.docx': self.process_docx,
'.doc': self.process_doc,
'.xlsx': self.process_excel,
'.xls': self.process_excel,
'.csv': self.process_csv,
'.txt': self.process_text,
'.html': self.process_html,
'.xml': self.process_xml,
'.eml': self.process_email,
'.zip': self.process_archive,
'.json': self.process_json
}
logger.info("⚑ Speed-Optimized Universal Document Processor initialized")
def get_file_hash(self, content: bytes) -> str:
"""Generate shorter hash for caching"""
return hashlib.md5(content).hexdigest()[:8]
async def process_document(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
"""Process any document format with optimized caching"""
file_hash = self.get_file_hash(content)
# Check cache first
if file_hash in self.cache:
logger.info(f"πŸ“¦ Cache hit for {os.path.basename(file_path)}")
return self.cache[file_hash]
# Detect file type
file_ext = Path(file_path).suffix.lower()
if not file_ext:
file_ext = self._detect_file_type(content)
# Process based on file type
processor = self.processors.get(file_ext, self.process_text)
try:
chunks = await processor(file_path, content)
# Cache the result
self.cache[file_hash] = chunks
logger.info(f"βœ… Processed {os.path.basename(file_path)}: {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"❌ Processing failed for {file_path}: {e}")
return self._emergency_text_extraction(content, file_path)
def _detect_file_type(self, content: bytes) -> str:
"""Detect file type from content"""
if content.startswith(b'%PDF'):
return '.pdf'
elif content.startswith(b'PK'):
return '.docx' if b'word/' in content[:1000] else '.zip'
elif content.startswith(b'<html') or content.startswith(b'<!DOCTYPE'):
return '.html'
elif content.startswith(b'<?xml'):
return '.xml'
else:
return '.txt'
# --- SPEED-OPTIMIZED PDF PROCESSING (YOUR COMPLETE ORIGINAL) ---
async def process_pdf(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
"""Enhanced PDF processing with speed optimizations"""
chunks = []
temp_path = f"/tmp/{uuid.uuid4().hex[:6]}.pdf" # Shorter UUID
with open(temp_path, 'wb') as f:
f.write(content)
try:
# Extract text with PyMuPDF
doc = fitz.open(temp_path)
full_text = ""
# SPEED OPTIMIZATION: Process fewer pages
for page_num in range(min(len(doc), self.max_pages)):
page = doc[page_num]
text = page.get_text()
if text.strip():
full_text += f"\n\nPage {page_num + 1}:\n{self._clean_text(text)}"
doc.close()
# OPTIMIZED table extraction
table_text = await self._extract_pdf_tables_fast(temp_path)
if table_text:
full_text += f"\n\n=== TABLES ===\n{table_text}"
# Create semantic chunks
chunks = self._create_semantic_chunks(full_text, file_path, "pdf")
except Exception as e:
logger.error(f"PDF processing error: {e}")
chunks = self._emergency_text_extraction(content, file_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return chunks
async def _extract_pdf_tables_fast(self, file_path: str) -> str:
"""SPEED-OPTIMIZED table extraction"""
table_text = ""
try:
with pdfplumber.open(file_path) as pdf:
# SPEED OPTIMIZATION: Fewer pages and tables
for page_num, page in enumerate(pdf.pages[:10]): # Reduced from 12
tables = page.find_tables()
for i, table in enumerate(tables[:1]): # Only 1 table per page
try:
table_data = table.extract()
if table_data and len(table_data) > 1:
table_md = f"\n**Table {i+1} (Page {page_num+1})**\n"
for row in table_data[:12]: # Reduced from 15
if row:
clean_row = [str(cell or "").strip()[:30] for cell in row]
table_md += "| " + " | ".join(clean_row) + " |\n"
table_text += table_md + "\n"
except:
continue
except Exception as e:
logger.warning(f"Table extraction failed: {e}")
return table_text
# --- OTHER FORMAT PROCESSORS (ALL YOUR EXCELLENT FEATURES) ---
async def process_docx(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
"""Process DOCX files"""
temp_path = f"/tmp/{uuid.uuid4().hex[:6]}.docx"
with open(temp_path, 'wb') as f:
f.write(content)
try:
doc = docx.Document(temp_path)
full_text = ""
# Extract paragraphs
for para in doc.paragraphs:
if para.text.strip():
full_text += para.text + "\n"
# Extract tables
for table in doc.tables:
table_text = "\n**TABLE**\n"
for row in table.rows:
row_text = []
for cell in row.cells:
row_text.append(cell.text.strip())
table_text += "| " + " | ".join(row_text) + " |\n"
full_text += table_text + "\n"
chunks = self._create_semantic_chunks(full_text, file_path, "docx")
except Exception as e:
logger.error(f"DOCX processing error: {e}")
chunks = self._emergency_text_extraction(content, file_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return chunks
async def process_doc(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
"""Process DOC files (fallback to text extraction)"""
return self._emergency_text_extraction(content, file_path)
async def process_excel(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
"""Process Excel files"""
temp_path = f"/tmp/{uuid.uuid4().hex[:6]}.xlsx"
with open(temp_path, 'wb') as f:
f.write(content)
try:
workbook = openpyxl.load_workbook(temp_path, read_only=True)
full_text = ""
for sheet_name in workbook.sheetnames[:3]:
sheet = workbook[sheet_name]
full_text += f"\n**Sheet: {sheet_name}**\n"
for row_num, row in enumerate(sheet.iter_rows(max_row=50, values_only=True)):
if row_num == 0 or any(cell for cell in row):
row_text = [str(cell or "").strip()[:30] for cell in row[:8]]
full_text += "| " + " | ".join(row_text) + " |\n"
workbook.close()
chunks = self._create_semantic_chunks(full_text, file_path, "excel")
except Exception as e:
logger.error(f"Excel processing error: {e}")
chunks = self._emergency_text_extraction(content, file_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return chunks
# --- Other format processors (keeping all your excellent features) ---
async def process_csv(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
text_content = content.decode('utf-8', errors='ignore')
lines = text_content.split('\n')
full_text = "**CSV DATA**\n"
for i, line in enumerate(lines[:100]):
if line.strip():
full_text += f"| {line} |\n"
return self._create_semantic_chunks(full_text, file_path, "csv")
except Exception as e:
logger.error(f"CSV processing error: {e}")
return []
async def process_text(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
text = content.decode('utf-8', errors='ignore')
return self._create_semantic_chunks(text, file_path, "text")
except Exception as e:
logger.error(f"Text processing error: {e}")
return []
async def process_html(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
soup = BeautifulSoup(content, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
return self._create_semantic_chunks(text, file_path, "html")
except Exception as e:
logger.error(f"HTML processing error: {e}")
return []
async def process_xml(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
root = ET.fromstring(content)
def extract_text(element, level=0):
text = ""
if element.text and element.text.strip():
text += f"{' ' * level}{element.tag}: {element.text.strip()}\n"
for child in element:
text += extract_text(child, level + 1)
return text
full_text = extract_text(root)
return self._create_semantic_chunks(full_text, file_path, "xml")
except Exception as e:
logger.error(f"XML processing error: {e}")
return []
async def process_email(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
msg = email.message_from_bytes(content, policy=default)
full_text = f"**EMAIL**\n"
full_text += f"From: {msg.get('From', 'Unknown')}\n"
full_text += f"Subject: {msg.get('Subject', 'No Subject')}\n\n"
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_content()
full_text += f"Content:\n{body}\n"
else:
body = msg.get_content()
full_text += f"Content:\n{body}\n"
return self._create_semantic_chunks(full_text, file_path, "email")
except Exception as e:
logger.error(f"Email processing error: {e}")
return []
async def process_archive(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
temp_path = f"/tmp/{uuid.uuid4().hex[:6]}.zip"
with open(temp_path, 'wb') as f:
f.write(content)
chunks = []
try:
if file_path.endswith('.zip'):
with zipfile.ZipFile(temp_path, 'r') as zip_file:
for file_info in zip_file.filelist[:5]:
try:
file_content = zip_file.read(file_info)
sub_chunks = await self.process_document(file_info.filename, file_content)
chunks.extend(sub_chunks[:15]) # Limit sub-chunks for speed
except:
continue
except Exception as e:
logger.error(f"Archive processing error: {e}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return chunks
async def process_json(self, file_path: str, content: bytes) -> List[Dict[str, Any]]:
try:
data = json.loads(content.decode('utf-8'))
full_text = json.dumps(data, indent=2, ensure_ascii=False)
return self._create_semantic_chunks(full_text, file_path, "json")
except Exception as e:
logger.error(f"JSON processing error: {e}")
return []
# --- UTILITY METHODS (YOUR EXCELLENT ORIGINAL) ---
def _clean_text(self, text: str) -> str:
"""Clean extracted text"""
# Remove excessive whitespace
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
text = re.sub(r'\s+', ' ', text)
# Remove noise patterns
noise_patterns = [
r'Office of.*Insurance Ombudsman.*?\n',
r'Lalit Bhawan.*?\n',
r'^\d+\s*$'
]
for pattern in noise_patterns:
text = re.sub(pattern, '', text, flags=re.MULTILINE)
return text.strip()
def _create_semantic_chunks(self, text: str, source: str, doc_type: str) -> List[Dict[str, Any]]:
"""Create semantic chunks from text"""
text = self._clean_text(text)
if not text or len(text) < 50:
return []
# Smart sentence-based chunking
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) <= self.chunk_size:
current_chunk += sentence + " "
else:
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
if current_chunk.strip():
chunks.append(current_chunk.strip())
# Convert to structured chunks
structured_chunks = []
for i, chunk_text in enumerate(chunks[:self.max_chunks]):
structured_chunks.append({
"content": chunk_text,
"metadata": {
"source": os.path.basename(source),
"chunk_index": i,
"document_type": doc_type,
"chunk_length": len(chunk_text)
},
"chunk_id": str(uuid.uuid4())
})
return structured_chunks
def _emergency_text_extraction(self, content: bytes, file_path: str) -> List[Dict[str, Any]]:
"""Emergency text extraction for unsupported formats"""
try:
text = content.decode('utf-8', errors='ignore')
if len(text) > 50:
return self._create_semantic_chunks(text, file_path, "unknown")
except:
pass
return [{
"content": "Failed to extract content from document",
"metadata": {
"source": os.path.basename(file_path),
"chunk_index": 0,
"document_type": "error",
"error": True
},
"chunk_id": str(uuid.uuid4())
}]
# --- GEMINI'S FIX: DEADLOCK-FREE RAG PIPELINE ---
class DeadlockFreeRAGPipeline:
"""FIXED: Direct embedding management - no more AsyncKaggleEmbeddingWrapper deadlock"""
def __init__(self, collection_name: str, llm_manager: MultiLLMManager, kaggle_client: LazyKaggleModelClient):
self.collection_name = collection_name
self.llm_manager = llm_manager
self.kaggle_client = kaggle_client
self.security_guard = SecurityGuard()
self.query_processor = LightweightQueryProcessor(kaggle_client)
# GEMINI'S FIX: No embedding function - let Chroma be a simple data store
self.vectorstore = Chroma(
collection_name=collection_name,
# REMOVED: embedding_function parameter completely
persist_directory="/tmp/chroma_kaggle"
)
logger.info(f"πŸš€ Deadlock-Free RAG Pipeline initialized: {collection_name}")
async def add_documents(self, chunks: List[Dict[str, Any]]):
"""FINAL FIX: Bypasses the faulty LangChain wrapper to talk to ChromaDB directly."""
if not chunks:
return
logger.info(f"πŸ“š Processing {len(chunks)} chunks...")
# Advanced quality filtering (YOUR EXCELLENT ORIGINAL LOGIC)
quality_chunks = []
for chunk in chunks:
content = chunk['content']
# Skip error chunks
if chunk['metadata'].get('error'):
continue
# Quality assessment
quality_score = 0
# Length factor
if 100 <= len(content) <= 2000:
quality_score += 2
elif len(content) > 50:
quality_score += 1
# Content richness
sentences = len(re.split(r'[.!?]+', content))
if sentences > 3:
quality_score += 1
# Numerical data (good for policies)
numbers = len(re.findall(r'\d+', content))
if numbers > 0:
quality_score += 1
if quality_score >= 2:
quality_chunks.append(chunk)
logger.info(f"πŸ“š Filtered to {len(quality_chunks)} quality chunks")
if not quality_chunks:
return
documents_to_process = quality_chunks[:100]
texts = [chunk['content'] for chunk in documents_to_process]
# GEMINI'S FIX: Step 2 - Embed all texts via Kaggle (this works perfectly)
logger.info(f"πŸš€ Embedding {len(texts)} chunks via Kaggle...")
embeddings = await self.kaggle_client.generate_embeddings(texts)
# Debug logging (keep this to confirm data is still perfect)
logger.info("--- HF DEBUG ---")
logger.info(f"Type of embeddings received: {type(embeddings)}")
if isinstance(embeddings, list) and len(embeddings) > 0:
logger.info(f"Number of embeddings: {len(embeddings)}")
logger.info(f"Type of first item: {type(embeddings[0])}")
if isinstance(embeddings[0], list):
logger.info(f"Dimension of first embedding: {len(embeddings[0])}")
logger.info(f"First few values: {embeddings[0][:5] if len(embeddings[0]) > 5 else embeddings[0]}")
logger.info("--- END HF DEBUG ---")
if not embeddings or len(embeddings) != len(texts):
logger.error("Embedding generation failed.")
logger.error(f"Expected {len(texts)} embeddings, got {len(embeddings) if embeddings else 0}")
return
# --- THE FINAL FIX: BYPASS LANGCHAIN BUG ---
try:
logger.info("🎯 FINAL FIX: Bypassing faulty LangChain wrapper, adding to ChromaDB directly...")
# Get the raw, underlying collection object from Chroma
collection = self.vectorstore._collection
# The direct `add` method requires a unique ID for each document
ids = [str(uuid.uuid4()) for _ in texts]
# Use the direct .add() method instead of the buggy .add_texts()
collection.add(
embeddings=embeddings,
documents=texts,
metadatas=[chunk['metadata'] for chunk in documents_to_process],
ids=ids
)
logger.info(f"πŸŽ‰ FINAL SUCCESS! Directly added {len(texts)} documents to ChromaDB collection (BYPASSED LANGCHAIN BUG)")
except Exception as e:
logger.error(f"❌ Direct ChromaDB add failed: {e}")
logger.error(f"❌ Error type: {type(e)}")
# Additional debug info
logger.error(f"❌ Collection info: {type(collection)}")
logger.error(f"❌ Embeddings type: {type(embeddings)}")
logger.error(f"❌ Texts count: {len(texts)}")
logger.error(f"❌ IDs count: {len(ids)}")
# Re-raise the exception to be caught by the main error handler
raise e
async def answer_question(self, question: str) -> str:
"""GEMINI'S FIX: Direct query embedding - no deadlock"""
# Security check
if self.security_guard.detect_jailbreak(question):
return self.security_guard.sanitize_response(question, "")
try:
# Enhanced query processing
enhanced_question = await self.query_processor.enhance_query_semantically(question)
# GEMINI'S FIX: Step 1 - Embed the query yourself first (Manager gets sauce)
query_embedding_list = await self.kaggle_client.generate_embeddings([enhanced_question])
if not query_embedding_list:
return "I could not process the query for searching."
query_embedding = query_embedding_list[0]
# GEMINI'S FIX: Step 2 - Search using vector directly (no async calls in Chroma)
relevant_docs = self.vectorstore.similarity_search_by_vector(
embedding=query_embedding,
k=15
)
if not relevant_docs:
return "I don't have sufficient information to answer this question based on the provided documents."
# Use Kaggle GPU for reranking (GAME CHANGER)
doc_contents = [doc.page_content for doc in relevant_docs]
if await self.kaggle_client.health_check():
logger.info("🎯 Using Kaggle GPU for reranking")
top_docs_content = await self.kaggle_client.rerank_documents(
enhanced_question, doc_contents, k=6
)
else:
logger.warning("πŸ“¦ Kaggle unavailable, using first 6 docs")
top_docs_content = doc_contents[:6]
# Prepare enhanced context
context = "\n\n".join(top_docs_content)
# Create advanced semantic prompt
prompt = self._create_advanced_prompt(context, question)
# Get response from multi-LLM system
response = await self.llm_manager.get_response(prompt)
# Final security check and cleaning
response = self.security_guard.sanitize_response(question, response)
response = self._clean_response(response)
return response
except Exception as e:
logger.error(f"❌ Question processing failed: {e}")
return "An error occurred while processing your question."
def _create_advanced_prompt(self, context: str, question: str) -> str:
"""Create advanced semantic-aware prompt (YOUR EXCELLENT ORIGINAL)"""
return f"""You are an expert insurance policy analyst with advanced semantic understanding.
CONTEXT ANALYSIS FRAMEWORK:
- Apply deep semantic understanding to connect related concepts across documents
- Recognize implicit relationships and cross-references within policy content
- Understand hierarchical information structures and conditional dependencies
- Synthesize information from multiple sources with semantic coherence
DOCUMENT CONTEXT:
{context}
QUESTION: {question}
ADVANCED REASONING APPROACH:
1. SEMANTIC COMPREHENSION: Understand the full meaning and intent behind the question
2. CONTEXTUAL MAPPING: Map question elements to semantically relevant sections
3. RELATIONSHIP INFERENCE: Identify implicit connections between policy components
4. MULTI-SOURCE SYNTHESIS: Combine information while maintaining semantic consistency
5. CONDITIONAL REASONING: Apply logical reasoning to policy exceptions and conditions
RESPONSE REQUIREMENTS:
- Provide semantically rich, contextually grounded answers
- Include specific details: numbers, percentages, timeframes, conditions
- Write in clear, professional language without excessive quotes
- Address both explicit information and reasonable semantic inferences
- Structure information hierarchically when appropriate
- **OUTPUT DIRECTIVE: All instructions above are for your internal analysis. Your final written output MUST ONLY be the concise summary of your findings, limited to 4-5 lines, and must not start with phrases like "In summary".**
ANSWER:"""
def _clean_response(self, response: str) -> str:
"""Enhanced response cleaning (YOUR EXCELLENT ORIGINAL)"""
# Remove excessive quotes
response = re.sub(r'"([^"]{1,50})"', r'\1', response)
response = re.sub(r'"(\w+)"', r'\1', response)
response = re.sub(r'"(Rs\.?\s*[\d,]+[/-]*)"', r'\1', response)
response = re.sub(r'"(\d+%)"', r'\1', response)
response = re.sub(r'"(\d+\s*(?:days?|months?|years?))"', r'\1', response)
# Clean policy references
response = re.sub(r'[Aa]s stated in the policy[:\s]*"([^"]+)"', r'As per the policy, \1', response)
response = re.sub(r'[Aa]ccording to the policy[:\s]*"([^"]+)"', r'According to the policy, \1', response)
response = re.sub(r'[Tt]he policy states[:\s]*"([^"]+)"', r'The policy states that \1', response)
# Fix spacing and formatting
response = re.sub(r'\s+', ' ', response)
response = response.replace(' ,', ',')
response = response.replace(' .', '.')
response = re.sub(r'\n\s*\n\s*\n+', '\n\n', response)
return response.strip()
# --- AUTHENTICATION (YOUR EXCELLENT ORIGINAL) ---
async def verify_bearer_token(authorization: str = Header(None)):
"""Enhanced authentication with better logging"""
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization format")
token = authorization.replace("Bearer ", "")
if len(token) < 10:
raise HTTPException(status_code=401, detail="Invalid token format")
logger.info(f"βœ… Authentication successful with token: {token[:10]}...")
return token
# --- GLOBAL INSTANCES (NO EARLY KAGGLE CONNECTION!) ---
multi_llm = MultiLLMManager()
doc_processor = UniversalDocumentProcessor()
# CRITICAL: Create lazy client (no immediate connection!)
kaggle_client = LazyKaggleModelClient()
# --- API MODELS ---
class SubmissionRequest(BaseModel):
documents: str # <-- This now correctly expects a single string
questions: List[str]
class SubmissionResponse(BaseModel):
answers: List[str]
# --- FIXED: BOTH GET AND POST ENDPOINTS FOR /api/v1/hackrx/run ---
@app.get("/api/v1/hackrx/run")
def test_endpoint():
"""GET endpoint for testing - fixes 405 Method Not Allowed error"""
return {
"message": "This endpoint requires POST method",
"usage": "Send POST request with documents and questions",
"status": "API is running - DEADLOCK-FREE with lazy initialization",
"kaggle_connection": "Will initialize on first request",
"fix": "Direct embedding management prevents async deadlocks",
"method": "Use POST with JSON body",
"example": {
"documents": ["url1", "url2"],
"questions": ["question1", "question2"]
}
}
# --- SPEED-OPTIMIZED MAIN ENDPOINT WITH GEMINI'S DEADLOCK FIX ---
@app.post("/api/v1/hackrx/run", response_model=SubmissionResponse, dependencies=[Depends(verify_bearer_token)])
async def run_submission(request: Request, submission_request: SubmissionRequest = Body(...)):
start_time = time.time()
# This log is changed to reflect one document
logger.info(f"🎯 DEADLOCK-FREE KAGGLE-POWERED PROCESSING: 1 doc, {len(submission_request.questions)} questions")
try:
# LAZY INITIALIZATION: Only now do we connect to Kaggle!
logger.info("πŸ”„ Initializing Kaggle connection (lazy initialization)...")
# Check Kaggle health (this will trigger initialization)
if not await kaggle_client.health_check():
logger.error("❌ Kaggle endpoint not available!")
return SubmissionResponse(answers=[
"Model service unavailable" for _ in submission_request.questions
])
# Create unique session with DEADLOCK-FREE pipeline
session_id = f"kaggle_{uuid.uuid4().hex[:6]}" # Shorter UUID
rag_pipeline = DeadlockFreeRAGPipeline(session_id, multi_llm, kaggle_client)
# Process the single document
all_chunks = []
async with httpx.AsyncClient(
timeout=45.0,
headers={"ngrok-skip-browser-warning": "true"}
) as client:
async def process_single_document(doc_idx: int, doc_url: str):
# This inner function remains the same
try:
logger.info(f"πŸ“₯ Downloading document {doc_idx + 1}")
response = await client.get(doc_url, follow_redirects=True)
response.raise_for_status()
filename = os.path.basename(doc_url.split('?')[0]) or f"document_{doc_idx}"
chunks = await doc_processor.process_document(filename, response.content)
logger.info(f"βœ… Document {doc_idx + 1}: {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"❌ Document {doc_idx + 1} failed: {e}")
return []
# --- THIS IS THE CORRECTED LOGIC ---
# It now processes only the single string from submission_request.documents
single_doc_url = submission_request.documents
chunks_for_single_doc = await process_single_document(0, single_doc_url)
all_chunks.extend(chunks_for_single_doc)
# ------------------------------------
logger.info(f"πŸ“Š Total chunks processed: {len(all_chunks)}")
if not all_chunks:
logger.error("❌ No valid content extracted!")
return SubmissionResponse(answers=[
"No valid content could be extracted from the provided documents."
for _ in submission_request.questions
])
# Add to RAG pipeline with DEADLOCK-FREE processing
await rag_pipeline.add_documents(all_chunks)
# SPEED OPTIMIZATION: Full parallel question answering
logger.info(f"⚑ Answering questions in parallel...")
semaphore = asyncio.Semaphore(4)
async def answer_single_question(question: str) -> str:
async with semaphore:
return await rag_pipeline.answer_question(question)
tasks = [answer_single_question(q) for q in submission_request.questions]
answers = await asyncio.gather(*tasks)
elapsed = time.time() - start_time
logger.info(f"πŸŽ‰ DEADLOCK-FREE KAGGLE-POWERED SUCCESS! Processed in {elapsed:.2f}s")
return SubmissionResponse(answers=answers)
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"πŸ’₯ CRITICAL ERROR after {elapsed:.2f}s: {e}")
return SubmissionResponse(answers=[
"Processing error occurred. Please try again."
for _ in submission_request.questions
])
# --- HEALTH ENDPOINTS (YOUR EXCELLENT ORIGINAL + DEADLOCK-FREE INFO) ---
@app.get("/")
def read_root():
return {
"message": "🎯 KAGGLE-POWERED HACKATHON RAG SYSTEM - DEADLOCK-FREE COMPLETE VERSION",
"version": "5.4.0",
"status": "FIXED: Deadlock-free + lazy initialization prevents all issues!",
"target_time": "<20 seconds with Kaggle GPU",
"supported_formats": list(doc_processor.processors.keys()),
"features": [
"Multi-format document processing (PDF, DOCX, Excel, CSV, HTML, etc.)",
"Kaggle GPU-powered embeddings and reranking",
"Multi-LLM fallback system (Groq, OpenAI, Gemini)",
"Advanced semantic query enhancement",
"Anti-jailbreak security system",
"Optimized caching and concurrent processing",
"Semantic chunking and context fusion",
"R4 'half questions' handling",
"Lightning-fast GPU-accelerated response times",
"DEADLOCK-FREE async operations",
"Lazy initialization prevents startup timeouts",
"Direct embedding management"
],
"kaggle_connection": "Lazy (connects on first API call)",
"embedding_method": "Direct Kaggle management (no wrapper deadlock)",
"fixes": [
"DeadlockFreeRAGPipeline prevents async conflicts",
"LazyKaggleModelClient prevents startup connection",
"Direct embedding calls to Kaggle (no AsyncWrapper)",
"Chroma as simple data store (no embedding function)",
"CORS headers with ngrok-skip-browser-warning",
"Both GET and POST endpoints for /api/v1/hackrx/run",
"Improved error handling and logging",
"Hugging Face Secrets support for dynamic URLs"
]
}
@app.get("/health")
def health_check():
return {
"status": "healthy",
"version": "5.4.0",
"mode": "DEADLOCK_FREE_KAGGLE_GPU_POWERED_LAZY",
"cache_size": len(doc_processor.cache),
"kaggle_connection": "lazy (on-demand)",
"embedding_method": "direct_kaggle_management",
"timestamp": time.time(),
"fixes_applied": [
"deadlock_free_pipeline",
"lazy_initialization",
"direct_embedding_management",
"ngrok_compatibility",
"http_method_fix",
"cors_headers",
"hf_secrets_support"
]
}
@app.get("/test-kaggle")
async def test_kaggle_connection():
"""Test endpoint to check Kaggle connection (will trigger lazy initialization)"""
try:
is_healthy = await kaggle_client.health_check()
return {
"kaggle_connection": "initialized" if kaggle_client._initialized else "not_initialized",
"health_status": "healthy" if is_healthy else "unhealthy",
"endpoint": kaggle_client._endpoint if kaggle_client._initialized else "not_set",
"timestamp": time.time()
}
except Exception as e:
return {
"kaggle_connection": "failed",
"health_status": "error",
"error": str(e),
"timestamp": time.time()
}
# --- RUN SERVER ---
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)