Spaces:
Running
Running
| """ | |
| Offline Document Processor | |
| Processes PDF documents and uploads to database without needing the server running. | |
| Perfect for admin testing and bulk document uploads. | |
| Usage: | |
| python process_document_offline.py "path/to/document.pdf" --user-id YOUR_USER_ID | |
| python process_document_offline.py "path/to/document.pdf" --user-id YOUR_USER_ID --feature chat | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import asyncio | |
| import uuid | |
| import hashlib | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| import PyPDF2 | |
| from supabase import create_client | |
| # Load environment variables | |
| load_dotenv() | |
| # Color codes for terminal output | |
| class Colors: | |
| HEADER = '\033[95m' | |
| BLUE = '\033[94m' | |
| CYAN = '\033[96m' | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| RED = '\033[91m' | |
| END = '\033[0m' | |
| BOLD = '\033[1m' | |
| def print_header(text): | |
| print(f"\n{Colors.HEADER}{Colors.BOLD}{text}{Colors.END}") | |
| def print_info(text): | |
| print(f"{Colors.CYAN}ℹ️ {text}{Colors.END}") | |
| def print_success(text): | |
| print(f"{Colors.GREEN}✅ {text}{Colors.END}") | |
| def print_error(text): | |
| print(f"{Colors.RED}❌ {text}{Colors.END}") | |
| def print_progress(text): | |
| print(f"{Colors.YELLOW}⏳ {text}{Colors.END}", end='\r') | |
| class OfflineDocumentProcessor: | |
| """Processes documents offline and uploads to database""" | |
| def __init__(self): | |
| """Initialize processor with database connection""" | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_SERVICE_KEY") | |
| if not supabase_url or not supabase_key: | |
| raise Exception("SUPABASE_URL and SUPABASE_SERVICE_KEY must be set in .env") | |
| self.supabase = create_client(supabase_url, supabase_key) | |
| self.voyage_provider = None | |
| self._init_voyage() | |
| def _init_voyage(self): | |
| """Initialize Voyage AI provider""" | |
| try: | |
| from services.providers.voyage import get_voyage_provider | |
| self.voyage_provider = get_voyage_provider() | |
| print_success("Voyage AI provider initialized") | |
| except Exception as e: | |
| print_error(f"Failed to initialize Voyage provider: {str(e)}") | |
| sys.exit(1) | |
| def extract_pdf_text(self, pdf_path: str) -> str: | |
| """Extract text from PDF file""" | |
| print_info(f"Extracting text from PDF...") | |
| try: | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| total_pages = len(pdf_reader.pages) | |
| print_info(f"Total pages: {total_pages}") | |
| text = "" | |
| for i, page in enumerate(pdf_reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n\n" | |
| if (i + 1) % 50 == 0: | |
| print_progress(f"Extracted {i + 1}/{total_pages} pages...") | |
| except Exception as e: | |
| print_error(f"Failed to extract page {i + 1}: {str(e)}") | |
| continue | |
| print_success(f"Extracted {len(text)} characters from {total_pages} pages") | |
| return text.strip() | |
| except Exception as e: | |
| print_error(f"PDF extraction failed: {str(e)}") | |
| sys.exit(1) | |
| def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> list: | |
| """Split text into overlapping chunks""" | |
| print_info("Chunking text...") | |
| chunks = [] | |
| start = 0 | |
| text_length = len(text) | |
| while start < text_length: | |
| end = start + chunk_size | |
| chunk = text[start:end] | |
| # Try to break at sentence boundary | |
| if end < text_length: | |
| last_period = chunk.rfind('.') | |
| last_newline = chunk.rfind('\n') | |
| break_point = max(last_period, last_newline) | |
| if break_point > chunk_size * 0.5: | |
| chunk = chunk[:break_point + 1] | |
| end = start + break_point + 1 | |
| chunks.append(chunk.strip()) | |
| start = end - overlap | |
| # Filter out tiny chunks | |
| chunks = [c for c in chunks if len(c.strip()) > 50] | |
| print_success(f"Created {len(chunks)} chunks") | |
| return chunks | |
| def pad_embedding_to_4096(self, embedding: list, original_dim: int) -> list: | |
| """Pad embedding to 4096 dimensions with zeros""" | |
| if len(embedding) >= 4096: | |
| return embedding[:4096] | |
| padded = embedding + [0.0] * (4096 - len(embedding)) | |
| return padded | |
| async def generate_embeddings(self, chunks: list) -> list: | |
| """Generate embeddings for all chunks using Voyage AI""" | |
| print_header("Generating Embeddings with Voyage AI") | |
| print_info(f"Processing {len(chunks)} chunks (3 requests/minute rate limit)") | |
| print_info("This will take approximately {:.1f} minutes".format(len(chunks) / 3)) | |
| api_key = os.getenv("VOYAGE_API_KEY") | |
| if not api_key: | |
| print_error("VOYAGE_API_KEY not found in .env") | |
| sys.exit(1) | |
| embeddings = [] | |
| success_count = 0 | |
| fail_count = 0 | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| # Generate embedding | |
| result = await self.voyage_provider.generate_embedding( | |
| chunk, | |
| api_key=api_key, | |
| model="voyage-large-2" | |
| ) | |
| if result["success"]: | |
| # Pad to 4096 dimensions | |
| padded = self.pad_embedding_to_4096(result["embedding"], result["dimension"]) | |
| embeddings.append(padded) | |
| success_count += 1 | |
| # Calculate progress and ETA | |
| progress = ((i + 1) / len(chunks)) * 100 | |
| remaining = len(chunks) - (i + 1) | |
| eta_seconds = remaining * 20 # 20 seconds per request (3 RPM) | |
| eta_minutes = eta_seconds / 60 | |
| print_progress(f"✓ Chunk {i + 1}/{len(chunks)} ({progress:.1f}%) - ETA: {eta_minutes:.1f} min") | |
| # Rate limiting: 3 requests per minute = 20 seconds between requests | |
| if i < len(chunks) - 1: # Don't wait after last chunk | |
| await asyncio.sleep(20) | |
| else: | |
| print_error(f"\nFailed chunk {i + 1}: {result.get('error')}") | |
| embeddings.append(None) | |
| fail_count += 1 | |
| # If rate limited, wait longer | |
| if "rate limit" in result.get('error', '').lower(): | |
| print_info("Rate limit hit, waiting 60 seconds...") | |
| await asyncio.sleep(60) | |
| except Exception as e: | |
| print_error(f"\nException on chunk {i + 1}: {str(e)}") | |
| embeddings.append(None) | |
| fail_count += 1 | |
| print() # New line after progress | |
| print_success(f"Embeddings complete: {success_count} success, {fail_count} failed") | |
| return embeddings | |
| def create_document_record(self, user_id: str, filename: str, file_size: int, feature: str) -> dict: | |
| """Create document record in database""" | |
| print_info("Creating document record...") | |
| try: | |
| # Calculate retention days (default 30 for admin testing) | |
| expires_at = datetime.now() + timedelta(days=30) | |
| document_data = { | |
| "user_id": user_id, | |
| "filename": filename, | |
| "file_type": "application/pdf", | |
| "file_size": file_size, | |
| "storage_path": f"offline/{uuid.uuid4()}.pdf", # Placeholder | |
| "processing_status": "processing", | |
| "processing_progress": 0, | |
| "processing_stage": "Generating embeddings...", | |
| "feature": feature, | |
| "expires_at": expires_at.isoformat(), | |
| "created_at": datetime.now().isoformat() | |
| } | |
| result = self.supabase.table("documents").insert(document_data).execute() | |
| if not result.data: | |
| raise Exception("Failed to create document record") | |
| document = result.data[0] | |
| print_success(f"Document record created: {document['id']}") | |
| return document | |
| except Exception as e: | |
| print_error(f"Failed to create document record: {str(e)}") | |
| sys.exit(1) | |
| def store_chunks_with_embeddings(self, document_id: str, chunks: list, embeddings: list): | |
| """Store chunks with embeddings in database""" | |
| print_header("Storing Chunks in Database") | |
| stored_count = 0 | |
| failed_count = 0 | |
| for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): | |
| try: | |
| # Update progress | |
| if i % 10 == 0: | |
| progress = 50 + int((i / len(chunks)) * 45) # 50-95% | |
| self.supabase.table("documents").update({ | |
| "processing_progress": progress, | |
| "processing_stage": f"Storing chunks ({i + 1}/{len(chunks)})..." | |
| }).eq("id", document_id).execute() | |
| # Prepare chunk data | |
| embedding_str = None | |
| embedding_part1_str = None | |
| embedding_part2_str = None | |
| embedding_part3_str = None | |
| if embedding: | |
| # Convert to PostgreSQL vector format | |
| embedding_str = '[' + ','.join(str(x) for x in embedding) + ']' | |
| # Split into three parts for indexing | |
| part1 = embedding[:1365] | |
| part2 = embedding[1365:2730] | |
| part3 = embedding[2730:] | |
| embedding_part1_str = '[' + ','.join(str(x) for x in part1) + ']' | |
| embedding_part2_str = '[' + ','.join(str(x) for x in part2) + ']' | |
| embedding_part3_str = '[' + ','.join(str(x) for x in part3) + ']' | |
| chunk_data = { | |
| "document_id": document_id, | |
| "chunk_index": i, | |
| "content": chunk, | |
| "embedding": embedding_str, | |
| "embedding_part1": embedding_part1_str, | |
| "embedding_part2": embedding_part2_str, | |
| "embedding_part3": embedding_part3_str, | |
| "created_at": datetime.now().isoformat() | |
| } | |
| self.supabase.table("document_chunks").insert(chunk_data).execute() | |
| stored_count += 1 | |
| print_progress(f"Stored {i + 1}/{len(chunks)} chunks...") | |
| except Exception as e: | |
| print_error(f"\nFailed to store chunk {i}: {str(e)}") | |
| failed_count += 1 | |
| print() # New line | |
| print_success(f"Stored {stored_count} chunks, {failed_count} failed") | |
| # Update document status | |
| self.supabase.table("documents").update({ | |
| "processing_status": "completed", | |
| "processing_progress": 100, | |
| "processing_stage": "Completed", | |
| "processed_at": datetime.now().isoformat(), | |
| "total_chunks": len(chunks), | |
| "chunks_with_embeddings": stored_count | |
| }).eq("id", document_id).execute() | |
| async def process_document(self, pdf_path: str, user_id: str, feature: str = "chat"): | |
| """Main processing function""" | |
| print_header("📄 Offline Document Processor") | |
| print_info(f"File: {pdf_path}") | |
| print_info(f"User ID: {user_id}") | |
| print_info(f"Feature: {feature}") | |
| # Validate file | |
| if not os.path.exists(pdf_path): | |
| print_error(f"File not found: {pdf_path}") | |
| sys.exit(1) | |
| filename = os.path.basename(pdf_path) | |
| file_size = os.path.getsize(pdf_path) | |
| print_info(f"File size: {file_size / (1024 * 1024):.2f} MB") | |
| # Step 1: Extract text | |
| print_header("Step 1: Extract Text") | |
| text = self.extract_pdf_text(pdf_path) | |
| if not text or len(text) < 100: | |
| print_error("Insufficient text extracted from PDF") | |
| sys.exit(1) | |
| # Step 2: Chunk text | |
| print_header("Step 2: Chunk Text") | |
| chunks = self.chunk_text(text) | |
| # Step 3: Create document record | |
| print_header("Step 3: Create Document Record") | |
| document = self.create_document_record(user_id, filename, file_size, feature) | |
| document_id = document["id"] | |
| # Step 4: Generate embeddings | |
| embeddings = await self.generate_embeddings(chunks) | |
| # Step 5: Store in database | |
| self.store_chunks_with_embeddings(document_id, chunks, embeddings) | |
| # Final summary | |
| print_header("✅ Processing Complete!") | |
| print_success(f"Document ID: {document_id}") | |
| print_success(f"Total chunks: {len(chunks)}") | |
| print_success(f"Embeddings: {sum(1 for e in embeddings if e is not None)}") | |
| print_info("\n📱 Document is now available in the UI!") | |
| print_info(f" - View at: /documents") | |
| print_info(f" - Use in: Chat, MCQ, Explain, High-Yield") | |
| print_info(f" - RAG search: Fully functional") | |
| async def main(): | |
| """Main entry point""" | |
| parser = argparse.ArgumentParser( | |
| description="Process PDF documents offline and upload to database", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python process_document_offline.py "document.pdf" --user-id abc-123 | |
| python process_document_offline.py "C:\\Books\\Guyton.pdf" --user-id abc-123 --feature chat | |
| """ | |
| ) | |
| parser.add_argument("pdf_path", help="Path to PDF file") | |
| parser.add_argument("--user-id", required=True, help="User ID (UUID)") | |
| parser.add_argument("--feature", default="chat", choices=["chat", "mcq", "flashcard", "explain", "highyield"], | |
| help="Feature to enable RAG for (default: chat)") | |
| args = parser.parse_args() | |
| try: | |
| processor = OfflineDocumentProcessor() | |
| await processor.process_document(args.pdf_path, args.user_id, args.feature) | |
| except KeyboardInterrupt: | |
| print_error("\n\nProcessing interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print_error(f"\n\nFatal error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |