| import gradio as gr |
| import os |
| from pathlib import Path |
| from pinecone import Pinecone |
| from typing import List, Tuple |
| import tempfile |
| import shutil |
| from dotenv import load_dotenv |
| import time |
| from datetime import datetime |
| import json |
|
|
| |
| load_dotenv() |
|
|
| |
| required_env_vars = ["PINECONE_API_KEY"] |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] |
|
|
| if missing_vars: |
| raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") |
|
|
| |
| pinecone_api_key = os.getenv("PINECONE_API_KEY") |
| pc = Pinecone(api_key=pinecone_api_key) |
|
|
| |
| UPLOAD_FOLDER = "uploads" |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
| def parse_pinecone_timestamp(iso_string): |
| """ |
| Parses an ISO 8601 string from Pinecone, handling nanosecond precision. |
| |
| Args: |
| iso_string (str): The ISO-formatted timestamp string. |
| |
| Returns: |
| datetime: The parsed datetime object. |
| """ |
| if not isinstance(iso_string, str) or not iso_string: |
| return datetime.min |
|
|
| |
| if iso_string.endswith('Z'): |
| iso_string = iso_string[:-1] + '+00:00' |
| |
| |
| decimal_point = iso_string.find('.') |
| |
| if decimal_point != -1: |
| |
| tz_start = max(iso_string.rfind('+'), iso_string.rfind('-')) |
| |
| if tz_start > decimal_point: |
| |
| fractional_part = iso_string[decimal_point+1:tz_start] |
| |
| |
| if len(fractional_part) > 6: |
| fractional_part = fractional_part[:6] |
| |
| |
| iso_string = iso_string[:decimal_point+1] + fractional_part + iso_string[tz_start:] |
| |
| return datetime.fromisoformat(iso_string) |
|
|
| def get_all_files(): |
| """Get all files from Pinecone Assistant and sort them""" |
| try: |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
| |
| |
| files_response = assistant.list_files() |
| |
| |
| if hasattr(files_response, 'files'): |
| files_list = files_response.files |
| else: |
| files_list = files_response |
| |
| if not files_list: |
| return [] |
| |
| |
| sorted_files = sorted( |
| files_list, |
| key=lambda x: parse_pinecone_timestamp(getattr(x, 'created_on', '')), |
| reverse=True |
| ) |
| |
| return sorted_files |
| |
| except Exception as e: |
| return [] |
|
|
| def get_file_choices(): |
| """Get file choices for the dropdown - returns list of (title, file_id) tuples""" |
| try: |
| all_files = get_all_files() |
| if not all_files: |
| return [] |
| |
| choices = [] |
| for file_obj in all_files: |
| file_name = getattr(file_obj, 'name', 'Unknown File') |
| file_id = getattr(file_obj, 'id', 'unknown') |
| created_on = getattr(file_obj, 'created_on', '') |
| |
| |
| try: |
| if created_on: |
| created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') |
| display_name = f"{file_name} (uploaded: {created_formatted})" |
| else: |
| display_name = file_name |
| except: |
| display_name = file_name |
| |
| choices.append((display_name, file_id)) |
| |
| return choices |
| except Exception as e: |
| return [] |
|
|
| def refresh_delete_dropdown(): |
| """Refresh the dropdown with current files""" |
| choices = get_file_choices() |
| if not choices: |
| return gr.update(choices=[], value=None, interactive=False) |
| return gr.update(choices=choices, value=None, interactive=True) |
|
|
| def delete_selected_files(selected_file_ids, progress=gr.Progress()): |
| """Delete multiple selected files by their IDs""" |
| if not selected_file_ids: |
| return "β **Error:** No files selected for deletion", "" |
| |
| try: |
| progress(0.1, desc="π§ Initializing Pinecone Assistant...") |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
| |
| |
| all_files = get_all_files() |
| file_id_to_name = {getattr(f, 'id', ''): getattr(f, 'name', 'Unknown') for f in all_files} |
| |
| total_files = len(selected_file_ids) |
| deleted_files = [] |
| failed_files = [] |
| |
| progress(0.2, desc=f"ποΈ Starting deletion of {total_files} files...") |
| |
| for i, file_id in enumerate(selected_file_ids): |
| try: |
| file_name = file_id_to_name.get(file_id, f"File ID: {file_id}") |
| progress((0.2 + (i / total_files) * 0.7), desc=f"ποΈ Deleting: {file_name}...") |
| |
| |
| response = assistant.delete_file(file_id=file_id) |
| deleted_files.append({ |
| 'name': file_name, |
| 'id': file_id, |
| 'status': 'success' |
| }) |
| |
| time.sleep(0.2) |
| |
| except Exception as delete_error: |
| failed_files.append({ |
| 'name': file_id_to_name.get(file_id, f"File ID: {file_id}"), |
| 'id': file_id, |
| 'error': str(delete_error) |
| }) |
| |
| progress(1.0, desc="β
Deletion process completed!") |
| |
| |
| success_count = len(deleted_files) |
| error_count = len(failed_files) |
| |
| status_message = f"π **Deletion Complete**\n\n" |
| status_message += f"β
**Successfully deleted:** {success_count} files\n" |
| status_message += f"β **Failed to delete:** {error_count} files\n" |
| status_message += f"π **Total processed:** {total_files} files\n\n" |
| |
| |
| detailed_results = "## ποΈ **Deletion Results**\n\n" |
| |
| if deleted_files: |
| detailed_results += "### β
**Successfully Deleted Files:**\n" |
| for file_info in deleted_files: |
| detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" |
| detailed_results += "\n" |
| |
| if failed_files: |
| detailed_results += "### β **Failed Deletions:**\n" |
| for file_info in failed_files: |
| detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" |
| detailed_results += f" - Error: {file_info['error']}\n" |
| detailed_results += "\n" |
| |
| return status_message, detailed_results |
| |
| except Exception as e: |
| error_msg = f"β **Critical Error during deletion:** {str(e)}" |
| return error_msg, "" |
|
|
| def list_uploaded_files_paginated(page_num=0, progress=gr.Progress()): |
| """List files with pagination - 100 files per page""" |
| try: |
| progress(0.1, desc="π Getting files...") |
| |
| |
| all_files = get_all_files() |
| |
| if not all_files: |
| progress(1.0, desc="β
Complete - No files found") |
| return "π **No files found in Pinecone Assistant**", "", "No files available", gr.update(visible=False), gr.update(visible=False) |
| |
| progress(0.5, desc="π Processing page...") |
| |
| |
| files_per_page = 100 |
| start_idx = page_num * files_per_page |
| end_idx = start_idx + files_per_page |
| |
| |
| page_files = all_files[start_idx:end_idx] |
| total_files = len(all_files) |
| total_pages = (total_files + files_per_page - 1) // files_per_page |
| |
| |
| summary = f"π **Files Summary (Page {page_num + 1} of {total_pages})**\n\n" |
| summary += f"π **Total files:** {total_files}\n" |
| summary += f"π **Showing:** {start_idx + 1}-{min(end_idx, total_files)} of {total_files}\n\n" |
| |
| |
| detailed_info = f"## π **Latest Uploaded Files - Page {page_num + 1}**\n\n" |
| |
| progress(0.8, desc="π Formatting file titles...") |
| |
| for i, file_obj in enumerate(page_files, 1): |
| try: |
| |
| file_name = getattr(file_obj, 'name', 'Unknown File') |
| file_id = getattr(file_obj, 'id', 'Unknown ID') |
| created_on = getattr(file_obj, 'created_on', '') |
| |
| global_index = start_idx + i |
| |
| |
| try: |
| if created_on: |
| created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') |
| else: |
| created_formatted = 'Unknown' |
| except: |
| created_formatted = 'Unknown' |
| |
| |
| detailed_info += f"{global_index}. **{file_name}**\n" |
| detailed_info += f" π
Uploaded: {created_formatted} | π ID: `{file_id}`\n\n" |
| |
| except Exception as file_error: |
| detailed_info += f"{start_idx + i}. β **Error loading file**\n\n" |
| |
| |
| pagination_info = f"π Page {page_num + 1} of {total_pages} | Total: {total_files} files" |
| |
| |
| show_prev = page_num > 0 |
| show_next = page_num < total_pages - 1 |
| |
| progress(1.0, desc="β
Page loaded successfully!") |
| |
| return summary, detailed_info, pagination_info, gr.update(visible=show_prev), gr.update(visible=show_next) |
| |
| except Exception as e: |
| error_msg = f"β **Error retrieving file list:** {str(e)}" |
| return error_msg, "", "Error", gr.update(visible=False), gr.update(visible=False) |
|
|
| def load_next_page(current_page_info): |
| """Load next page of files""" |
| try: |
| |
| current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 |
| return list_uploaded_files_paginated(current_page + 1) |
| except: |
| return list_uploaded_files_paginated(0) |
|
|
| def load_prev_page(current_page_info): |
| """Load previous page of files""" |
| try: |
| |
| current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 |
| return list_uploaded_files_paginated(max(0, current_page - 1)) |
| except: |
| return list_uploaded_files_paginated(0) |
|
|
| def refresh_file_list(): |
| """Refresh the file list""" |
| return "π **Refreshing file list... Please wait**" |
|
|
| def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()): |
| """Process multiple files with individual metadata and show progress""" |
| if not files: |
| return "β Error: No files selected", "" |
| |
| if len(files) > 10: |
| return "β Error: Maximum 10 files allowed at a time", "" |
| |
| try: |
| results = [] |
| errors = [] |
| total_files = len(files) |
| |
| |
| progress(0, desc="π§ Initializing Pinecone Assistant...") |
| time.sleep(0.5) |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
|
|
| |
| for i, file_path in enumerate(files): |
| try: |
| filename = os.path.basename(file_path) |
| progress((i / total_files), desc=f"π Processing {filename}... ({i+1}/{total_files})") |
| |
| |
| sections_idx = i * 3 |
| keywords_idx = i * 3 + 1 |
| description_idx = i * 3 + 2 |
| |
| if sections_idx < len(metadata_inputs): |
| sections = metadata_inputs[sections_idx] or "" |
| keywords = metadata_inputs[keywords_idx] or "" |
| description = metadata_inputs[description_idx] or "" |
| else: |
| sections = keywords = description = "" |
| |
| |
| if not sections.strip() and not keywords.strip() and not description.strip(): |
| errors.append({ |
| "filename": filename, |
| "error": "β Error: No metadata provided" |
| }) |
| continue |
| |
| |
| progress((i / total_files), desc=f"π·οΈ Preparing metadata for {filename}...") |
| metadata = { |
| "sections": [s.strip() for s in sections.split(",") if s.strip()], |
| "keywords": [k.strip() for k in keywords.split(",") if k.strip()], |
| "description": description.strip() |
| } |
| |
| |
| progress((i / total_files), desc=f"π Copying {filename} to uploads...") |
| destination_path = os.path.join(UPLOAD_FOLDER, filename) |
| shutil.copy2(file_path, destination_path) |
|
|
| |
| progress((i / total_files), desc=f"βοΈ Uploading {filename} to Pinecone...") |
| response = assistant.upload_file( |
| file_path=destination_path, |
| metadata=metadata, |
| timeout=None |
| ) |
|
|
| results.append({ |
| "filename": filename, |
| "status": "β
Success", |
| "metadata": metadata, |
| "response": str(response) |
| }) |
|
|
| except Exception as file_error: |
| errors.append({ |
| "filename": os.path.basename(file_path), |
| "error": f"β Error: {str(file_error)}" |
| }) |
|
|
| |
| progress(1.0, desc="β
Processing complete!") |
| time.sleep(0.5) |
|
|
| |
| success_count = len(results) |
| error_count = len(errors) |
| |
| status_message = f"π **Processing Complete**\n\n" |
| status_message += f"β
**Successful uploads:** {success_count}\n" |
| status_message += f"β **Failed uploads:** {error_count}\n" |
| status_message += f"π **Total files processed:** {len(files)}\n\n" |
| |
| |
| detailed_results = "## π **Detailed Results**\n\n" |
| |
| if results: |
| detailed_results += "### β
**Successful Uploads:**\n" |
| for result in results: |
| detailed_results += f"- **{result['filename']}**\n" |
| detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n" |
| detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n" |
| detailed_results += f" - Description: {result['metadata']['description']}\n\n" |
| |
| if errors: |
| detailed_results += "### β **Failed Uploads:**\n" |
| for error in errors: |
| detailed_results += f"- **{error['filename']}** - {error['error']}\n" |
| |
| return status_message, detailed_results, "β
**Processing completed successfully!**" |
| |
| except Exception as e: |
| error_msg = f"β **Critical Error:** {str(e)}" |
| return error_msg, "", "β **Processing failed with error**" |
|
|
| def update_metadata_fields(files): |
| """Update metadata fields based on uploaded files""" |
| if not files: |
| return [gr.update(visible=False)] * 30 |
| |
| if len(files) > 10: |
| |
| return [gr.update(visible=False)] * 30 |
| |
| updates = [] |
| for i in range(len(files)): |
| if i < len(files): |
| filename = os.path.basename(files[i]) |
| |
| updates.extend([ |
| gr.update(visible=True, label=f"π Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"), |
| gr.update(visible=True, label=f"π Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"), |
| gr.update(visible=True, label=f"π Description for {filename}", placeholder="Brief description of this document") |
| ]) |
| |
| |
| while len(updates) < 30: |
| updates.append(gr.update(visible=False)) |
| |
| return updates[:30] |
|
|
| def clear_form(): |
| """Clear all form fields""" |
| return [None] + [""] * 30 + ["", "", "π’ **Ready to process documents**"] |
|
|
| def clear_delete_form(): |
| """Clear delete form""" |
| return gr.update(value=[]), "", "" |
|
|
| def start_processing(): |
| """Show processing started status""" |
| return "π **Processing documents... Please wait**" |
|
|
| def finish_processing(): |
| """Show processing finished status""" |
| return "β
**Processing completed successfully!**" |
|
|
| |
| with gr.Blocks( |
| title="π Tax Document Ingestion System", |
| theme=gr.themes.Soft(), |
| css=""" |
| .gradio-container { |
| max-width: 1400px !important; |
| margin: auto; |
| } |
| .upload-container { |
| border: 2px dashed #4CAF50; |
| border-radius: 10px; |
| padding: 20px; |
| text-align: center; |
| background-color: #f8f9fa; |
| } |
| .delete-container { |
| border: 2px dashed #f44336; |
| border-radius: 10px; |
| padding: 20px; |
| background-color: #ffebee; |
| } |
| .tab-nav { |
| margin-bottom: 20px; |
| } |
| """ |
| ) as app: |
| |
| gr.Markdown( |
| """ |
| # π Tax Document Ingestion System |
| |
| Upload, manage, and delete documents in the Pinecone Assistant for GST Minutes processing. |
| |
| ## π Features: |
| - β
**Multiple file upload** - Select and upload multiple documents at once |
| - π·οΈ **Metadata tagging** - Add sections, keywords, and descriptions |
| - π **Batch processing** - All files processed with individual metadata |
| - ποΈ **File deletion** - Delete multiple files by selecting from dropdown |
| - π **File management** - View uploaded files with timestamps and metadata |
| - π **Detailed reporting** - See success/failure status for each operation |
| |
| --- |
| """ |
| ) |
| |
| |
| with gr.Tabs() as tabs: |
| |
| |
| with gr.TabItem("π€ Upload Documents", id="upload_tab"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### π **File Upload**") |
| files_input = gr.File( |
| label="Select Documents (Max 10 files)", |
| file_count="multiple", |
| file_types=[".pdf", ".doc", ".docx", ".txt"], |
| elem_classes=["upload-container"] |
| ) |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π·οΈ **Document Metadata (Individual for Each File)**") |
| gr.Markdown("*Upload files first, then metadata fields will appear for each document*") |
| |
| |
| with gr.Column() as metadata_container: |
| |
| metadata_fields = [] |
| for i in range(30): |
| field = gr.Textbox( |
| label=f"Field {i}", |
| placeholder="", |
| visible=False, |
| lines=2 |
| ) |
| metadata_fields.append(field) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| upload_btn = gr.Button( |
| "π Upload Documents to Pinecone Assistant", |
| variant="primary", |
| size="lg" |
| ) |
| |
| with gr.Column(scale=1): |
| clear_btn = gr.Button( |
| "ποΈ Clear Form", |
| variant="secondary", |
| size="lg" |
| ) |
| |
| |
| with gr.Row(): |
| processing_status = gr.Markdown( |
| value="π’ **Ready to process documents**", |
| visible=True |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| with gr.Row(): |
| with gr.Column(): |
| status_output = gr.Markdown( |
| label="π Upload Status", |
| value="*Ready to upload documents...*" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| results_output = gr.Markdown( |
| label="π Detailed Results", |
| value="", |
| max_height=400 |
| ) |
|
|
| |
| with gr.TabItem("ποΈ Delete Documents", id="delete_tab"): |
| gr.Markdown("### ποΈ **Delete Multiple Documents**") |
| gr.Markdown("Select multiple files from the dropdown to delete them from the Pinecone Assistant.") |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| file_dropdown = gr.Dropdown( |
| label="π Select Files to Delete (Multiple Selection)", |
| choices=[], |
| multiselect=True, |
| interactive=False, |
| elem_classes=["delete-container"] |
| ) |
| |
| with gr.Column(scale=1): |
| refresh_dropdown_btn = gr.Button( |
| "π Refresh File List", |
| variant="secondary", |
| size="lg" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| delete_btn = gr.Button( |
| "ποΈ Delete Selected Files", |
| variant="stop", |
| size="lg" |
| ) |
| |
| with gr.Column(scale=1): |
| clear_delete_btn = gr.Button( |
| "βΊ Clear Selection", |
| variant="secondary", |
| size="lg" |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| with gr.Row(): |
| with gr.Column(): |
| delete_status_output = gr.Markdown( |
| label="π Deletion Status", |
| value="*Select files to delete...*" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| delete_results_output = gr.Markdown( |
| label="ποΈ Deletion Results", |
| value="", |
| max_height=400 |
| ) |
| |
| |
| with gr.TabItem("π View Uploaded Files", id="view_tab"): |
| gr.Markdown("### π **Uploaded Files Management**") |
| gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.") |
| |
| with gr.Row(): |
| refresh_btn = gr.Button( |
| "π Fetch Files", |
| variant="primary", |
| size="lg" |
| ) |
| |
| |
| with gr.Row(): |
| file_list_status = gr.Markdown( |
| value="π‘ **Click 'Fetch Files' to load uploaded files**", |
| visible=True |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| with gr.Row(): |
| prev_btn = gr.Button( |
| "β¬
οΈ Previous 100", |
| variant="secondary", |
| visible=False |
| ) |
| pagination_info = gr.Markdown( |
| value="π Page 1 of 1 | Total: 0 files", |
| elem_classes=["pagination-info"] |
| ) |
| next_btn = gr.Button( |
| "Next 100 β‘οΈ", |
| variant="secondary", |
| visible=False |
| ) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| file_summary = gr.Markdown( |
| label="π Files Summary", |
| value="*Click refresh to load file summary...*" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| file_details = gr.Markdown( |
| label="π File Details", |
| value="*Click refresh to load file details...*", |
| max_height=600 |
| ) |
| |
| |
| |
| |
| files_input.change( |
| fn=update_metadata_fields, |
| inputs=[files_input], |
| outputs=metadata_fields |
| ) |
| |
| |
| upload_btn.click( |
| fn=start_processing, |
| outputs=[processing_status] |
| ).then( |
| fn=process_files_with_progress, |
| inputs=[files_input] + metadata_fields, |
| outputs=[status_output, results_output, processing_status] |
| ) |
| |
| clear_btn.click( |
| fn=clear_form, |
| outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status] |
| ) |
| |
| |
| |
| |
| refresh_dropdown_btn.click( |
| fn=refresh_delete_dropdown, |
| outputs=[file_dropdown] |
| ) |
| |
| |
| delete_btn.click( |
| fn=delete_selected_files, |
| inputs=[file_dropdown], |
| outputs=[delete_status_output, delete_results_output] |
| ) |
| |
| |
| clear_delete_btn.click( |
| fn=clear_delete_form, |
| outputs=[file_dropdown, delete_status_output, delete_results_output] |
| ) |
| |
| |
| |
| |
| refresh_btn.click( |
| fn=refresh_file_list, |
| outputs=[file_list_status] |
| ).then( |
| fn=list_uploaded_files_paginated, |
| inputs=[], |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
| ) |
| |
| |
| next_btn.click( |
| fn=load_next_page, |
| inputs=[pagination_info], |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
| ) |
| |
| |
| prev_btn.click( |
| fn=load_prev_page, |
| inputs=[pagination_info], |
| outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
| ) |
| |
| |
| gr.Markdown( |
| """ |
| --- |
| |
| ### π‘ **Usage Tips:** |
| |
| **Upload Documents:** |
| - Select up to 10 PDF, DOC, DOCX, or TXT files at once |
| - Upload files first, then fill individual metadata for each document |
| - Each file gets its own sections, keywords, and description |
| - Check the results section for upload status |
| |
| **Delete Documents:** |
| - Click 'Refresh File List' to load current files in dropdown |
| - Select multiple files using the dropdown (supports multi-select) |
| - Click 'Delete Selected Files' to remove them permanently |
| - View deletion results for success/failure status |
| |
| **View Uploaded Files:** |
| - Click 'Fetch Files' to see all uploaded files |
| - View file details including upload timestamps and metadata |
| - Files are sorted by most recent first |
| - Use pagination to navigate through large file lists |
| |
| ### β οΈ **Important Notes:** |
| - File deletion is **permanent** and cannot be undone |
| - Always verify your selection before deleting files |
| - The system maps file titles to IDs internally for deletion |
| |
| ### π **Support:** |
| For issues or questions, contact the development team. |
| """ |
| ) |
|
|
| if __name__ == "__main__": |
| app.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| debug=True, |
| show_error=True |
| ) |