Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| from pinecone import Pinecone | |
| from typing import List, Tuple | |
| import tempfile | |
| import shutil | |
| from dotenv import load_dotenv | |
| import time | |
| from datetime import datetime | |
| import json | |
| # Load environment variables | |
| load_dotenv() | |
| # Authentication configuration | |
| AUTH_PASSWORD = "gst_litigation@34$$&" | |
| def authenticate(password): | |
| """Simple authentication function""" | |
| return password == AUTH_PASSWORD | |
| # Validate required environment variables | |
| required_env_vars = ["PINECONE_API_KEY"] | |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] | |
| if missing_vars: | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| # Initialize Pinecone | |
| pinecone_api_key = os.getenv("PINECONE_API_KEY") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| # Create uploads directory | |
| UPLOAD_FOLDER = "uploads" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| def parse_pinecone_timestamp(iso_string): | |
| """ | |
| Parses an ISO 8601 string from Pinecone, handling nanosecond precision. | |
| Args: | |
| iso_string (str): The ISO-formatted timestamp string. | |
| Returns: | |
| datetime: The parsed datetime object. | |
| """ | |
| if not isinstance(iso_string, str) or not iso_string: | |
| return datetime.min # Return a very old date for sorting purposes | |
| # Handle ISO strings ending with 'Z' (convert to +00:00) | |
| if iso_string.endswith('Z'): | |
| iso_string = iso_string[:-1] + '+00:00' | |
| # Find the decimal point for fractional seconds | |
| decimal_point = iso_string.find('.') | |
| if decimal_point != -1: | |
| # Find where the timezone info starts (+ or -) | |
| tz_start = max(iso_string.rfind('+'), iso_string.rfind('-')) | |
| if tz_start > decimal_point: | |
| # Extract the fractional seconds part | |
| fractional_part = iso_string[decimal_point+1:tz_start] | |
| # Truncate to 6 digits (microseconds) if longer | |
| if len(fractional_part) > 6: | |
| fractional_part = fractional_part[:6] | |
| # Reconstruct the ISO string with truncated fractional seconds | |
| iso_string = iso_string[:decimal_point+1] + fractional_part + iso_string[tz_start:] | |
| return datetime.fromisoformat(iso_string) | |
| def get_all_files(): | |
| """Get all files from Pinecone Assistant and sort them""" | |
| try: | |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "freeplan") | |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) | |
| # List all files in the assistant | |
| files_response = assistant.list_files() | |
| # Check if files_response is directly a list or has a files attribute | |
| if hasattr(files_response, 'files'): | |
| files_list = files_response.files | |
| else: | |
| files_list = files_response | |
| if not files_list: | |
| return [] | |
| # Sort files by creation time (most recent first) using robust timestamp parsing | |
| sorted_files = sorted( | |
| files_list, | |
| key=lambda x: parse_pinecone_timestamp(getattr(x, 'created_on', '')), | |
| reverse=True | |
| ) | |
| return sorted_files | |
| except Exception as e: | |
| return [] | |
| def get_file_choices(): | |
| """Get file choices for the dropdown - returns list of (title, file_id) tuples""" | |
| try: | |
| all_files = get_all_files() | |
| if not all_files: | |
| return [] | |
| choices = [] | |
| for file_obj in all_files: | |
| file_name = getattr(file_obj, 'name', 'Unknown File') | |
| file_id = getattr(file_obj, 'id', 'unknown') | |
| created_on = getattr(file_obj, 'created_on', '') | |
| # Format timestamp for display | |
| try: | |
| if created_on: | |
| created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') | |
| display_name = f"{file_name} (uploaded: {created_formatted})" | |
| else: | |
| display_name = file_name | |
| except: | |
| display_name = file_name | |
| choices.append((display_name, file_id)) | |
| return choices | |
| except Exception as e: | |
| return [] | |
| def refresh_delete_dropdown(): | |
| """Refresh the dropdown with current files""" | |
| choices = get_file_choices() | |
| if not choices: | |
| return gr.update(choices=[], value=None, interactive=False) | |
| return gr.update(choices=choices, value=None, interactive=True) | |
| def delete_selected_files(selected_file_ids, progress=gr.Progress()): | |
| """Delete multiple selected files by their IDs""" | |
| if not selected_file_ids: | |
| return "❌ **Error:** No files selected for deletion", "" | |
| try: | |
| progress(0.1, desc="🔧 Initializing Pinecone Assistant...") | |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "freeplan") | |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) | |
| # Get current files to map IDs to names | |
| all_files = get_all_files() | |
| file_id_to_name = {getattr(f, 'id', ''): getattr(f, 'name', 'Unknown') for f in all_files} | |
| total_files = len(selected_file_ids) | |
| deleted_files = [] | |
| failed_files = [] | |
| progress(0.2, desc=f"🗑️ Starting deletion of {total_files} files...") | |
| for i, file_id in enumerate(selected_file_ids): | |
| try: | |
| file_name = file_id_to_name.get(file_id, f"File ID: {file_id}") | |
| progress((0.2 + (i / total_files) * 0.7), desc=f"🗑️ Deleting: {file_name}...") | |
| # Delete the file | |
| response = assistant.delete_file(file_id=file_id) | |
| deleted_files.append({ | |
| 'name': file_name, | |
| 'id': file_id, | |
| 'status': 'success' | |
| }) | |
| time.sleep(0.2) # Small delay between deletions | |
| except Exception as delete_error: | |
| failed_files.append({ | |
| 'name': file_id_to_name.get(file_id, f"File ID: {file_id}"), | |
| 'id': file_id, | |
| 'error': str(delete_error) | |
| }) | |
| progress(1.0, desc="✅ Deletion process completed!") | |
| # Format results | |
| success_count = len(deleted_files) | |
| error_count = len(failed_files) | |
| status_message = f"📊 **Deletion Complete**\n\n" | |
| status_message += f"✅ **Successfully deleted:** {success_count} files\n" | |
| status_message += f"❌ **Failed to delete:** {error_count} files\n" | |
| status_message += f"📁 **Total processed:** {total_files} files\n\n" | |
| # Detailed results | |
| detailed_results = "## 🗑️ **Deletion Results**\n\n" | |
| if deleted_files: | |
| detailed_results += "### ✅ **Successfully Deleted Files:**\n" | |
| for file_info in deleted_files: | |
| detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" | |
| detailed_results += "\n" | |
| if failed_files: | |
| detailed_results += "### ❌ **Failed Deletions:**\n" | |
| for file_info in failed_files: | |
| detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" | |
| detailed_results += f" - Error: {file_info['error']}\n" | |
| detailed_results += "\n" | |
| return status_message, detailed_results | |
| except Exception as e: | |
| error_msg = f"❌ **Critical Error during deletion:** {str(e)}" | |
| return error_msg, "" | |
| def list_uploaded_files_paginated(page_num=0, progress=gr.Progress()): | |
| """List files with pagination - 100 files per page""" | |
| try: | |
| progress(0.1, desc="🔍 Getting files...") | |
| # Get all files | |
| all_files = get_all_files() | |
| if not all_files: | |
| progress(1.0, desc="✅ Complete - No files found") | |
| return "📋 **No files found in Pinecone Assistant**", "", "No files available", gr.update(visible=False), gr.update(visible=False) | |
| progress(0.5, desc="📊 Processing page...") | |
| # Pagination settings | |
| files_per_page = 100 | |
| start_idx = page_num * files_per_page | |
| end_idx = start_idx + files_per_page | |
| # Get files for current page | |
| page_files = all_files[start_idx:end_idx] | |
| total_files = len(all_files) | |
| total_pages = (total_files + files_per_page - 1) // files_per_page | |
| # Create summary | |
| summary = f"📊 **Files Summary (Page {page_num + 1} of {total_pages})**\n\n" | |
| summary += f"📁 **Total files:** {total_files}\n" | |
| summary += f"📋 **Showing:** {start_idx + 1}-{min(end_idx, total_files)} of {total_files}\n\n" | |
| # Create file list - ONLY TITLES, VERTICALLY | |
| detailed_info = f"## 📋 **Latest Uploaded Files - Page {page_num + 1}**\n\n" | |
| progress(0.8, desc="📝 Formatting file titles...") | |
| for i, file_obj in enumerate(page_files, 1): | |
| try: | |
| # Get only the file name/title | |
| file_name = getattr(file_obj, 'name', 'Unknown File') | |
| file_id = getattr(file_obj, 'id', 'Unknown ID') | |
| created_on = getattr(file_obj, 'created_on', '') | |
| global_index = start_idx + i | |
| # Format timestamp | |
| try: | |
| if created_on: | |
| created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') | |
| else: | |
| created_formatted = 'Unknown' | |
| except: | |
| created_formatted = 'Unknown' | |
| # Display file info | |
| detailed_info += f"{global_index}. **{file_name}**\n" | |
| detailed_info += f" 📅 Uploaded: {created_formatted} | 🆔 ID: `{file_id}`\n\n" | |
| except Exception as file_error: | |
| detailed_info += f"{start_idx + i}. ❌ **Error loading file**\n\n" | |
| # Pagination info | |
| pagination_info = f"📄 Page {page_num + 1} of {total_pages} | Total: {total_files} files" | |
| # Show/hide next/prev buttons | |
| show_prev = page_num > 0 | |
| show_next = page_num < total_pages - 1 | |
| progress(1.0, desc="✅ Page loaded successfully!") | |
| return summary, detailed_info, pagination_info, gr.update(visible=show_prev), gr.update(visible=show_next) | |
| except Exception as e: | |
| error_msg = f"❌ **Error retrieving file list:** {str(e)}" | |
| return error_msg, "", "Error", gr.update(visible=False), gr.update(visible=False) | |
| def load_next_page(current_page_info): | |
| """Load next page of files""" | |
| try: | |
| # Extract current page number from pagination info | |
| current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 | |
| return list_uploaded_files_paginated(current_page + 1) | |
| except: | |
| return list_uploaded_files_paginated(0) | |
| def load_prev_page(current_page_info): | |
| """Load previous page of files""" | |
| try: | |
| # Extract current page number from pagination info | |
| current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 | |
| return list_uploaded_files_paginated(max(0, current_page - 1)) | |
| except: | |
| return list_uploaded_files_paginated(0) | |
| def refresh_file_list(): | |
| """Refresh the file list""" | |
| return "🔄 **Refreshing file list... Please wait**" | |
| def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()): | |
| """Process multiple files with individual metadata and show progress""" | |
| if not files: | |
| return "❌ Error: No files selected", "" | |
| if len(files) > 10: | |
| return "❌ Error: Maximum 10 files allowed at a time", "" | |
| try: | |
| results = [] | |
| errors = [] | |
| total_files = len(files) | |
| # Initialize Pinecone Assistant | |
| progress(0, desc="🔧 Initializing Pinecone Assistant...") | |
| time.sleep(0.5) # Small delay to show the progress | |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "freeplan") | |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) | |
| # Process each file with its individual metadata | |
| for i, file_path in enumerate(files): | |
| try: | |
| filename = os.path.basename(file_path) | |
| progress((i / total_files), desc=f"📄 Processing {filename}... ({i+1}/{total_files})") | |
| # Get metadata for this specific file (3 fields per file: sections, keywords, description) | |
| sections_idx = i * 3 | |
| keywords_idx = i * 3 + 1 | |
| description_idx = i * 3 + 2 | |
| if sections_idx < len(metadata_inputs): | |
| sections = metadata_inputs[sections_idx] or "" | |
| keywords = metadata_inputs[keywords_idx] or "" | |
| description = metadata_inputs[description_idx] or "" | |
| else: | |
| sections = keywords = description = "" | |
| # Skip if no metadata provided for this file | |
| if not sections.strip() and not keywords.strip() and not description.strip(): | |
| errors.append({ | |
| "filename": filename, | |
| "error": "❌ Error: No metadata provided" | |
| }) | |
| continue | |
| # Prepare metadata for this file | |
| progress((i / total_files), desc=f"🏷️ Preparing metadata for {filename}...") | |
| metadata = { | |
| "sections": [s.strip() for s in sections.split(",") if s.strip()], | |
| "keywords": [k.strip() for k in keywords.split(",") if k.strip()], | |
| "description": description.strip() | |
| } | |
| # Copy to uploads directory | |
| progress((i / total_files), desc=f"📁 Copying {filename} to uploads...") | |
| destination_path = os.path.join(UPLOAD_FOLDER, filename) | |
| shutil.copy2(file_path, destination_path) | |
| # Upload to Pinecone Assistant | |
| progress((i / total_files), desc=f"☁️ Uploading {filename} to Pinecone...") | |
| response = assistant.upload_file( | |
| file_path=destination_path, | |
| metadata=metadata, | |
| timeout=None | |
| ) | |
| results.append({ | |
| "filename": filename, | |
| "status": "✅ Success", | |
| "metadata": metadata, | |
| "response": str(response) | |
| }) | |
| except Exception as file_error: | |
| errors.append({ | |
| "filename": os.path.basename(file_path), | |
| "error": f"❌ Error: {str(file_error)}" | |
| }) | |
| # Final progress update | |
| progress(1.0, desc="✅ Processing complete!") | |
| time.sleep(0.5) | |
| # Format results for display | |
| success_count = len(results) | |
| error_count = len(errors) | |
| status_message = f"📊 **Processing Complete**\n\n" | |
| status_message += f"✅ **Successful uploads:** {success_count}\n" | |
| status_message += f"❌ **Failed uploads:** {error_count}\n" | |
| status_message += f"📁 **Total files processed:** {len(files)}\n\n" | |
| # Detailed results | |
| detailed_results = "## 📋 **Detailed Results**\n\n" | |
| if results: | |
| detailed_results += "### ✅ **Successful Uploads:**\n" | |
| for result in results: | |
| detailed_results += f"- **{result['filename']}**\n" | |
| detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n" | |
| detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n" | |
| detailed_results += f" - Description: {result['metadata']['description']}\n\n" | |
| if errors: | |
| detailed_results += "### ❌ **Failed Uploads:**\n" | |
| for error in errors: | |
| detailed_results += f"- **{error['filename']}** - {error['error']}\n" | |
| return status_message, detailed_results, "✅ **Processing completed successfully!**" | |
| except Exception as e: | |
| error_msg = f"❌ **Critical Error:** {str(e)}" | |
| return error_msg, "", "❌ **Processing failed with error**" | |
| def update_metadata_fields(files): | |
| """Update metadata fields based on uploaded files""" | |
| if not files: | |
| return [gr.update(visible=False)] * 30 # Hide all fields | |
| if len(files) > 10: | |
| # Show error and hide all fields | |
| return [gr.update(visible=False)] * 30 | |
| updates = [] | |
| for i in range(len(files)): | |
| if i < len(files): | |
| filename = os.path.basename(files[i]) | |
| # Show 3 fields per file (sections, keywords, description) | |
| updates.extend([ | |
| gr.update(visible=True, label=f"📑 Sections for {filename}", placeholder="e.g., Case Summary, Evidence, Legal Arguments"), | |
| gr.update(visible=True, label=f"🔍 Keywords for {filename}", placeholder="e.g., litigation, court, plaintiff, defendant"), | |
| gr.update(visible=True, label=f"📝 Description for {filename}", placeholder="Brief description of this document") | |
| ]) | |
| # Hide remaining fields | |
| while len(updates) < 30: | |
| updates.append(gr.update(visible=False)) | |
| return updates[:30] | |
| def clear_form(): | |
| """Clear all form fields""" | |
| return [None] + [""] * 30 + ["", "", "🟢 **Ready to process documents**"] | |
| def clear_delete_form(): | |
| """Clear delete form""" | |
| return gr.update(value=[]), "", "" | |
| def start_processing(): | |
| """Show processing started status""" | |
| return "🔄 **Processing documents... Please wait**" | |
| def finish_processing(): | |
| """Show processing finished status""" | |
| return "✅ **Processing completed successfully!**" | |
| def create_main_interface(): | |
| """Create the main application interface""" | |
| with gr.Blocks( | |
| title="📄 LITIGATION RESEARCH DOC SYSTEM", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| margin: auto; | |
| } | |
| .upload-container { | |
| border: 2px dashed #4CAF50; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| background-color: #f8f9fa; | |
| } | |
| .delete-container { | |
| border: 2px dashed #f44336; | |
| border-radius: 10px; | |
| padding: 20px; | |
| background-color: #ffebee; | |
| } | |
| .tab-nav { | |
| margin-bottom: 20px; | |
| } | |
| """ | |
| ) as app: | |
| gr.Markdown( | |
| """ | |
| # 📄 LITIGATION RESEARCH DOC SYSTEM | |
| Upload, manage, and delete documents in the Pinecone Assistant for litigation research processing. | |
| ## 🚀 Features: | |
| - ✅ **Multiple file upload** - Select and upload multiple documents at once | |
| - 🏷️ **Metadata tagging** - Add sections, keywords, and descriptions | |
| - 🔄 **Batch processing** - All files processed with individual metadata | |
| - 🗑️ **File deletion** - Delete multiple files by selecting from dropdown | |
| - 📊 **File management** - View uploaded files with timestamps and metadata | |
| - 📋 **Detailed reporting** - See success/failure status for each operation | |
| --- | |
| """ | |
| ) | |
| # Create tabs for different functionalities | |
| with gr.Tabs() as tabs: | |
| # Tab 1: Upload Documents | |
| with gr.TabItem("📤 Upload Documents", id="upload_tab"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📁 **File Upload**") | |
| files_input = gr.File( | |
| label="Select Documents (Max 10 files)", | |
| file_count="multiple", | |
| file_types=[".pdf", ".doc", ".docx", ".txt"], | |
| elem_classes=["upload-container"] | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 🏷️ **Document Metadata (Individual for Each File)**") | |
| gr.Markdown("*Upload files first, then metadata fields will appear for each document*") | |
| # Dynamic metadata fields container | |
| with gr.Column() as metadata_container: | |
| # Create 30 text fields (enough for 10 files with 3 fields each) | |
| metadata_fields = [] | |
| for i in range(30): | |
| field = gr.Textbox( | |
| label=f"Field {i}", | |
| placeholder="", | |
| visible=False, | |
| lines=2 | |
| ) | |
| metadata_fields.append(field) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| upload_btn = gr.Button( | |
| "🚀 Upload Documents to Pinecone Assistant", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button( | |
| "🗑️ Clear Form", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Processing status indicator | |
| with gr.Row(): | |
| processing_status = gr.Markdown( | |
| value="🟢 **Ready to process documents**", | |
| visible=True | |
| ) | |
| gr.Markdown("---") | |
| # Results section | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_output = gr.Markdown( | |
| label="📊 Upload Status", | |
| value="*Ready to upload documents...*" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| results_output = gr.Markdown( | |
| label="📋 Detailed Results", | |
| value="", | |
| max_height=400 # Add height limit for scrolling | |
| ) | |
| # Tab 2: Delete Documents | |
| with gr.TabItem("🗑️ Delete Documents", id="delete_tab"): | |
| gr.Markdown("### 🗑️ **Delete Multiple Documents**") | |
| gr.Markdown("Select multiple files from the dropdown to delete them from the Pinecone Assistant.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_dropdown = gr.Dropdown( | |
| label="📋 Select Files to Delete (Multiple Selection)", | |
| choices=[], | |
| multiselect=True, | |
| interactive=False, | |
| elem_classes=["delete-container"] | |
| ) | |
| with gr.Column(scale=1): | |
| refresh_dropdown_btn = gr.Button( | |
| "🔄 Refresh File List", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| delete_btn = gr.Button( | |
| "🗑️ Delete Selected Files", | |
| variant="stop", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| clear_delete_btn = gr.Button( | |
| "↺ Clear Selection", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| gr.Markdown("---") | |
| # Delete results section | |
| with gr.Row(): | |
| with gr.Column(): | |
| delete_status_output = gr.Markdown( | |
| label="📊 Deletion Status", | |
| value="*Select files to delete...*" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| delete_results_output = gr.Markdown( | |
| label="🗑️ Deletion Results", | |
| value="", | |
| max_height=400 | |
| ) | |
| # Tab 3: View Uploaded Files | |
| with gr.TabItem("📋 View Uploaded Files", id="view_tab"): | |
| gr.Markdown("### 📋 **Uploaded Files Management**") | |
| gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.") | |
| with gr.Row(): | |
| refresh_btn = gr.Button( | |
| "🔄 Fetch Files", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # File list status | |
| with gr.Row(): | |
| file_list_status = gr.Markdown( | |
| value="🟡 **Click 'Fetch Files' to load uploaded files**", | |
| visible=True | |
| ) | |
| gr.Markdown("---") | |
| # Pagination controls | |
| with gr.Row(): | |
| prev_btn = gr.Button( | |
| "⬅️ Previous 100", | |
| variant="secondary", | |
| visible=False | |
| ) | |
| pagination_info = gr.Markdown( | |
| value="📄 Page 1 of 1 | Total: 0 files", | |
| elem_classes=["pagination-info"] | |
| ) | |
| next_btn = gr.Button( | |
| "Next 100 ➡️", | |
| variant="secondary", | |
| visible=False | |
| ) | |
| # File list results | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_summary = gr.Markdown( | |
| label="📊 Files Summary", | |
| value="*Click refresh to load file summary...*" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_details = gr.Markdown( | |
| label="📋 File Details", | |
| value="*Click refresh to load file details...*", | |
| max_height=600 # Add height limit for scrolling | |
| ) | |
| # Event handlers for Upload tab | |
| # Update metadata fields when files are uploaded | |
| files_input.change( | |
| fn=update_metadata_fields, | |
| inputs=[files_input], | |
| outputs=metadata_fields | |
| ) | |
| # Show processing status when upload starts | |
| upload_btn.click( | |
| fn=start_processing, | |
| outputs=[processing_status] | |
| ).then( | |
| fn=process_files_with_progress, | |
| inputs=[files_input] + metadata_fields, | |
| outputs=[status_output, results_output, processing_status] | |
| ) | |
| clear_btn.click( | |
| fn=clear_form, | |
| outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status] | |
| ) | |
| # Event handlers for Delete tab | |
| # Refresh dropdown with current files | |
| refresh_dropdown_btn.click( | |
| fn=refresh_delete_dropdown, | |
| outputs=[file_dropdown] | |
| ) | |
| # Delete selected files | |
| delete_btn.click( | |
| fn=delete_selected_files, | |
| inputs=[file_dropdown], | |
| outputs=[delete_status_output, delete_results_output] | |
| ) | |
| # Clear delete form | |
| clear_delete_btn.click( | |
| fn=clear_delete_form, | |
| outputs=[file_dropdown, delete_status_output, delete_results_output] | |
| ) | |
| # Event handlers for View Files tab | |
| # Fetch files - load page 1 | |
| refresh_btn.click( | |
| fn=refresh_file_list, | |
| outputs=[file_list_status] | |
| ).then( | |
| fn=list_uploaded_files_paginated, | |
| inputs=[], | |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] | |
| ) | |
| # Next page button | |
| next_btn.click( | |
| fn=load_next_page, | |
| inputs=[pagination_info], | |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] | |
| ) | |
| # Previous page button | |
| prev_btn.click( | |
| fn=load_prev_page, | |
| inputs=[pagination_info], | |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] | |
| ) | |
| # Footer | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 💡 **Usage Tips:** | |
| **Upload Documents:** | |
| - Select up to 10 PDF, DOC, DOCX, or TXT files at once | |
| - Upload files first, then fill individual metadata for each document | |
| - Each file gets its own sections, keywords, and description | |
| - Check the results section for upload status | |
| **Delete Documents:** | |
| - Click 'Refresh File List' to load current files in dropdown | |
| - Select multiple files using the dropdown (supports multi-select) | |
| - Click 'Delete Selected Files' to remove them permanently | |
| - View deletion results for success/failure status | |
| **View Uploaded Files:** | |
| - Click 'Fetch Files' to see all uploaded files | |
| - View file details including upload timestamps and metadata | |
| - Files are sorted by most recent first | |
| - Use pagination to navigate through large file lists | |
| ### ⚠️ **Important Notes:** | |
| - File deletion is **permanent** and cannot be undone | |
| - Always verify your selection before deleting files | |
| - The system maps file titles to IDs internally for deletion | |
| ### 📞 **Support:** | |
| For issues or questions, contact the development team. | |
| """ | |
| ) | |
| return app | |
| def create_login_interface(): | |
| """Create the login interface""" | |
| with gr.Blocks( | |
| title="🔐 Authentication Required", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 500px !important; | |
| margin: auto; | |
| padding-top: 100px; | |
| } | |
| .login-container { | |
| border: 2px solid #2196F3; | |
| border-radius: 15px; | |
| padding: 30px; | |
| text-align: center; | |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
| box-shadow: 0 10px 20px rgba(0,0,0,0.1); | |
| } | |
| .password-input { | |
| margin: 20px 0; | |
| } | |
| .login-button { | |
| margin-top: 15px; | |
| } | |
| .error-message { | |
| color: #f44336; | |
| font-weight: bold; | |
| margin-top: 10px; | |
| } | |
| """ | |
| ) as login_app: | |
| with gr.Column(elem_classes=["login-container"]): | |
| gr.Markdown( | |
| """ | |
| # 🔐 **LITIGATION RESEARCH DOC SYSTEM** | |
| ## **Authentication Required** | |
| Please enter the password to access the application. | |
| --- | |
| """ | |
| ) | |
| password_input = gr.Textbox( | |
| label="🔑 Password", | |
| type="password", | |
| placeholder="Enter password to access the system", | |
| elem_classes=["password-input"] | |
| ) | |
| login_btn = gr.Button( | |
| "🚀 Login", | |
| variant="primary", | |
| size="lg", | |
| elem_classes=["login-button"] | |
| ) | |
| error_message = gr.Markdown( | |
| value="", | |
| visible=False, | |
| elem_classes=["error-message"] | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 🛡️ **Security Notice:** | |
| - This system contains sensitive litigation research documentation | |
| - Authorized access only | |
| - All activities are logged | |
| ### 📞 **Need Access?** | |
| Contact your system administrator for credentials. | |
| """ | |
| ) | |
| return login_app, password_input, login_btn, error_message | |
| def main(): | |
| """Main application with authentication — single Gradio Blocks app. | |
| This builds both the login UI and the main application UI inside one | |
| `gr.Blocks` so we only call `.launch()` once. The main UI is hidden | |
| until authentication succeeds. This avoids launching two separate | |
| Gradio servers which was causing the server to close unexpectedly. | |
| """ | |
| # Reuse same theme/CSS as main app for consistency | |
| app_css = """ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| margin: auto; | |
| } | |
| .upload-container { | |
| border: 2px dashed #4CAF50; | |
| border-radius: 10px; | |
| padding: 20px; | |
| text-align: center; | |
| background-color: #f8f9fa; | |
| } | |
| .delete-container { | |
| border: 2px dashed #f44336; | |
| border-radius: 10px; | |
| padding: 20px; | |
| background-color: #ffebee; | |
| } | |
| .login-container { | |
| border: 2px solid #2196F3; | |
| border-radius: 15px; | |
| padding: 30px; | |
| text-align: center; | |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
| box-shadow: 0 10px 20px rgba(0,0,0,0.1); | |
| } | |
| """ | |
| with gr.Blocks(title="LITIGATION RESEARCH DOC SYSTEM", theme=gr.themes.Soft(), css=app_css) as app: | |
| # --- LOGIN CONTAINER (visible initially) --- | |
| with gr.Column(elem_classes=["login-container"]) as login_container: | |
| gr.Markdown(""" | |
| # LITIGATION RESEARCH DOC SYSTEM | |
| Authentication required | |
| """) | |
| password_input = gr.Textbox( | |
| label="Password", | |
| type="password", | |
| placeholder="Enter password" | |
| ) | |
| login_btn = gr.Button("Login", variant="primary", size="lg") | |
| error_message = gr.Markdown(value="", visible=False) | |
| # --- MAIN APP CONTAINER (hidden until auth) --- | |
| with gr.Column(visible=False) as main_container: | |
| # Insert main UI contents (slimmed text, no emojis) | |
| gr.Markdown( | |
| """ | |
| # LITIGATION RESEARCH DOC SYSTEM | |
| Upload, manage, and delete documents in the Pinecone Assistant. | |
| """ | |
| ) | |
| with gr.Tabs() as tabs: | |
| # Upload Tab | |
| with gr.TabItem("Upload Documents", id="upload_tab"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### File Upload") | |
| files_input = gr.File( | |
| label="Select Documents (Max 10 files)", | |
| file_count="multiple", | |
| file_types=[".pdf", ".doc", ".docx", ".txt"], | |
| elem_classes=["upload-container"] | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Document Metadata") | |
| with gr.Column() as metadata_container: | |
| metadata_fields = [] | |
| for i in range(30): | |
| field = gr.Textbox(label=f"Field {i}", placeholder="", visible=False, lines=2) | |
| metadata_fields.append(field) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| upload_btn = gr.Button("Upload Documents to Pinecone Assistant", variant="primary") | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button("Clear Form", variant="secondary") | |
| processing_status = gr.Markdown(value="Ready to process documents", visible=True) | |
| status_output = gr.Markdown(label="Upload Status", value="Ready to upload documents...") | |
| results_output = gr.Markdown(label="Detailed Results", value="", max_height=400) | |
| # Delete Tab | |
| with gr.TabItem("Delete Documents", id="delete_tab"): | |
| gr.Markdown("### Delete Documents") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_dropdown = gr.Dropdown(label="Select Files to Delete", choices=[], multiselect=True, interactive=False, elem_classes=["delete-container"]) | |
| with gr.Column(scale=1): | |
| refresh_dropdown_btn = gr.Button("Refresh File List", variant="secondary") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| delete_btn = gr.Button("Delete Selected Files", variant="stop") | |
| with gr.Column(scale=1): | |
| clear_delete_btn = gr.Button("Clear Selection", variant="secondary") | |
| delete_status_output = gr.Markdown(label="Deletion Status", value="Select files to delete...") | |
| delete_results_output = gr.Markdown(label="Deletion Results", value="", max_height=400) | |
| # View Tab | |
| with gr.TabItem("View Uploaded Files", id="view_tab"): | |
| gr.Markdown("### Uploaded Files Management") | |
| with gr.Row(): | |
| refresh_btn = gr.Button("Fetch Files", variant="primary") | |
| file_list_status = gr.Markdown(value="Click 'Fetch Files' to load uploaded files", visible=True) | |
| with gr.Row(): | |
| prev_btn = gr.Button("Previous 100", variant="secondary", visible=False) | |
| pagination_info = gr.Markdown(value="Page 1 of 1 | Total: 0 files") | |
| next_btn = gr.Button("Next 100", variant="secondary", visible=False) | |
| file_summary = gr.Markdown(label="Files Summary", value="Click refresh to load file summary...") | |
| file_details = gr.Markdown(label="File Details", value="Click refresh to load file details...", max_height=600) | |
| # --- EVENT HANDLERS --- | |
| # Login handlers: show main_container on success, show error on failure | |
| def handle_login(password): | |
| if authenticate(password): | |
| return gr.update(visible=False), gr.update(value="", visible=False), gr.update(visible=True) | |
| else: | |
| return gr.update(visible=True), gr.update(value="Invalid password. Please try again.", visible=True), gr.update(visible=False) | |
| login_btn.click(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) | |
| password_input.submit(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) | |
| # Wiring existing handlers from earlier in the file | |
| files_input.change(fn=update_metadata_fields, inputs=[files_input], outputs=metadata_fields) | |
| upload_btn.click(fn=start_processing, outputs=[processing_status]).then( | |
| fn=process_files_with_progress, | |
| inputs=[files_input] + metadata_fields, | |
| outputs=[status_output, results_output, processing_status] | |
| ) | |
| clear_btn.click(fn=clear_form, outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status]) | |
| refresh_dropdown_btn.click(fn=refresh_delete_dropdown, outputs=[file_dropdown]) | |
| delete_btn.click(fn=delete_selected_files, inputs=[file_dropdown], outputs=[delete_status_output, delete_results_output]) | |
| clear_delete_btn.click(fn=clear_delete_form, outputs=[file_dropdown, delete_status_output, delete_results_output]) | |
| refresh_btn.click(fn=refresh_file_list, outputs=[file_list_status]).then( | |
| fn=list_uploaded_files_paginated, | |
| inputs=[], | |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] | |
| ) | |
| next_btn.click(fn=load_next_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) | |
| prev_btn.click(fn=load_prev_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) | |
| # Launch single app | |
| app.launch(server_name="0.0.0.0", server_port=7860, share=False, debug=True, show_error=True) | |
| if __name__ == "__main__": | |
| main() |