| import gradio as gr |
| import os |
| from pathlib import Path |
| from pinecone import Pinecone |
| from typing import List, Tuple |
| import tempfile |
| import shutil |
| from dotenv import load_dotenv |
| import time |
| from datetime import datetime |
| import json |
|
|
| |
| load_dotenv() |
|
|
| |
| required_env_vars = ["PINECONE_API_KEY"] |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] |
|
|
| if missing_vars: |
| raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") |
|
|
| |
| pinecone_api_key = os.getenv("PINECONE_API_KEY") |
| pc = Pinecone(api_key=pinecone_api_key) |
|
|
| |
| UPLOAD_FOLDER = "uploads" |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
| def list_uploaded_files(progress=gr.Progress()): |
| """List all files uploaded to Pinecone Assistant with their metadata and timestamps""" |
| try: |
| progress(0.1, desc="π Connecting to Pinecone Assistant...") |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
| |
| progress(0.3, desc="π Fetching file list...") |
| time.sleep(0.5) |
| |
| |
| files_response = assistant.list_files() |
| |
| progress(0.7, desc="π Processing file information...") |
| time.sleep(0.3) |
| |
| if not files_response or not hasattr(files_response, 'files') or not files_response.files: |
| progress(1.0, desc="β
Complete - No files found") |
| return "π **No files found in Pinecone Assistant**", "" |
| |
| files_list = files_response.files |
| total_files = len(files_list) |
| |
| |
| sorted_files = sorted(files_list, key=lambda x: getattr(x, 'created_on', ''), reverse=True) |
| |
| progress(0.9, desc="π Formatting results...") |
| |
| |
| summary = f"π **Files Summary**\n\n" |
| summary += f"π **Total files:** {total_files}\n" |
| summary += f"π **Last updated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" |
| |
| |
| detailed_info = "## π **File Details**\n\n" |
| |
| for i, file_obj in enumerate(sorted_files, 1): |
| try: |
| |
| file_name = getattr(file_obj, 'name', 'Unknown') |
| file_id = getattr(file_obj, 'id', 'Unknown') |
| file_size = getattr(file_obj, 'size', 0) |
| created_on = getattr(file_obj, 'created_on', 'Unknown') |
| updated_on = getattr(file_obj, 'updated_on', created_on) |
| |
| |
| if file_size > 1024 * 1024: |
| size_str = f"{file_size / (1024 * 1024):.2f} MB" |
| elif file_size > 1024: |
| size_str = f"{file_size / 1024:.2f} KB" |
| else: |
| size_str = f"{file_size} bytes" |
| |
| |
| try: |
| if created_on != 'Unknown': |
| created_formatted = datetime.fromisoformat(created_on.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S UTC') |
| else: |
| created_formatted = 'Unknown' |
| |
| if updated_on != 'Unknown' and updated_on != created_on: |
| updated_formatted = datetime.fromisoformat(updated_on.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S UTC') |
| else: |
| updated_formatted = created_formatted |
| except: |
| created_formatted = str(created_on) |
| updated_formatted = str(updated_on) |
| |
| detailed_info += f"### {i}. π **{file_name}**\n" |
| detailed_info += f"- **π File ID:** `{file_id}`\n" |
| detailed_info += f"- **π Size:** {size_str}\n" |
| detailed_info += f"- **π
Uploaded:** {created_formatted}\n" |
| detailed_info += f"- **π Last Updated:** {updated_formatted}\n" |
| |
| |
| try: |
| |
| file_details = assistant.describe_file(file_id=file_id) |
| if hasattr(file_details, 'metadata') and file_details.metadata: |
| metadata = file_details.metadata |
| detailed_info += f"- **π·οΈ Metadata:**\n" |
| |
| if isinstance(metadata, dict): |
| for key, value in metadata.items(): |
| if isinstance(value, list): |
| detailed_info += f" - **{key.title()}:** {', '.join(map(str, value))}\n" |
| else: |
| detailed_info += f" - **{key.title()}:** {value}\n" |
| else: |
| detailed_info += f" - {metadata}\n" |
| except Exception as metadata_error: |
| detailed_info += f"- **π·οΈ Metadata:** Could not retrieve metadata\n" |
| |
| detailed_info += "\n---\n\n" |
| |
| except Exception as file_error: |
| detailed_info += f"### {i}. β **Error processing file**\n" |
| detailed_info += f"- **Error:** {str(file_error)}\n\n---\n\n" |
| |
| progress(1.0, desc="β
File list retrieved successfully!") |
| time.sleep(0.3) |
| |
| return summary, detailed_info |
| |
| except Exception as e: |
| error_msg = f"β **Error retrieving file list:** {str(e)}" |
| return error_msg, "" |
|
|
| def refresh_file_list(): |
| """Refresh the file list""" |
| return "π **Refreshing file list... Please wait**" |
|
|
| def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()): |
| """Process multiple files with individual metadata and show progress""" |
| if not files: |
| return "β Error: No files selected", "" |
| |
| if len(files) > 10: |
| return "β Error: Maximum 10 files allowed at a time", "" |
| |
| try: |
| results = [] |
| errors = [] |
| total_files = len(files) |
| |
| |
| progress(0, desc="π§ Initializing Pinecone Assistant...") |
| time.sleep(0.5) |
| assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
| assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
|
|
| |
| for i, file_path in enumerate(files): |
| try: |
| filename = os.path.basename(file_path) |
| progress((i / total_files), desc=f"π Processing {filename}... ({i+1}/{total_files})") |
| |
| |
| sections_idx = i * 3 |
| keywords_idx = i * 3 + 1 |
| description_idx = i * 3 + 2 |
| |
| if sections_idx < len(metadata_inputs): |
| sections = metadata_inputs[sections_idx] or "" |
| keywords = metadata_inputs[keywords_idx] or "" |
| description = metadata_inputs[description_idx] or "" |
| else: |
| sections = keywords = description = "" |
| |
| |
| if not sections.strip() and not keywords.strip() and not description.strip(): |
| errors.append({ |
| "filename": filename, |
| "error": "β Error: No metadata provided" |
| }) |
| continue |
| |
| |
| progress((i / total_files), desc=f"π·οΈ Preparing metadata for {filename}...") |
| metadata = { |
| "sections": [s.strip() for s in sections.split(",") if s.strip()], |
| "keywords": [k.strip() for k in keywords.split(",") if k.strip()], |
| "description": description.strip() |
| } |
| |
| |
| progress((i / total_files), desc=f"π Copying {filename} to uploads...") |
| destination_path = os.path.join(UPLOAD_FOLDER, filename) |
| shutil.copy2(file_path, destination_path) |
|
|
| |
| progress((i / total_files), desc=f"βοΈ Uploading {filename} to Pinecone...") |
| response = assistant.upload_file( |
| file_path=destination_path, |
| metadata=metadata, |
| timeout=None |
| ) |
|
|
| results.append({ |
| "filename": filename, |
| "status": "β
Success", |
| "metadata": metadata, |
| "response": str(response) |
| }) |
|
|
| except Exception as file_error: |
| errors.append({ |
| "filename": os.path.basename(file_path), |
| "error": f"β Error: {str(file_error)}" |
| }) |
|
|
| |
| progress(1.0, desc="β
Processing complete!") |
| time.sleep(0.5) |
|
|
| |
| success_count = len(results) |
| error_count = len(errors) |
| |
| status_message = f"π **Processing Complete**\n\n" |
| status_message += f"β
**Successful uploads:** {success_count}\n" |
| status_message += f"β **Failed uploads:** {error_count}\n" |
| status_message += f"π **Total files processed:** {len(files)}\n\n" |
| |
| |
| detailed_results = "## π **Detailed Results**\n\n" |
| |
| if results: |
| detailed_results += "### β
**Successful Uploads:**\n" |
| for result in results: |
| detailed_results += f"- **{result['filename']}**\n" |
| detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n" |
| detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n" |
| detailed_results += f" - Description: {result['metadata']['description']}\n\n" |
| |
| if errors: |
| detailed_results += "### β **Failed Uploads:**\n" |
| for error in errors: |
| detailed_results += f"- **{error['filename']}** - {error['error']}\n" |
| |
| return status_message, detailed_results, "β
**Processing completed successfully!**" |
| |
| except Exception as e: |
| error_msg = f"β **Critical Error:** {str(e)}" |
| return error_msg, "", "β **Processing failed with error**" |
|
|
| def update_metadata_fields(files): |
| """Update metadata fields based on uploaded files""" |
| if not files: |
| return [gr.update(visible=False)] * 30 |
| |
| if len(files) > 10: |
| |
| return [gr.update(visible=False)] * 30 |
| |
| updates = [] |
| for i in range(len(files)): |
| if i < len(files): |
| filename = os.path.basename(files[i]) |
| |
| updates.extend([ |
| gr.update(visible=True, label=f"π Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"), |
| gr.update(visible=True, label=f"π Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"), |
| gr.update(visible=True, label=f"π Description for {filename}", placeholder="Brief description of this document") |
| ]) |
| |
| |
| while len(updates) < 30: |
| updates.append(gr.update(visible=False)) |
| |
| return updates[:30] |
|
|
| def clear_form(): |
| """Clear all form fields""" |
| return [None] + [""] * 30 + ["", "", "π’ **Ready to process documents**"] |
|
|
| def start_processing(): |
| """Show processing started status""" |
| return "π **Processing documents... Please wait**" |
|
|
| def finish_processing(): |
| """Show processing finished status""" |
| return "β
**Processing completed successfully!**" |
|
|
| |
| with gr.Blocks( |
| title="π Tax Document Ingestion System", |
| theme=gr.themes.Soft(), |
| css=""" |
| .gradio-container { |
| max-width: 1400px !important; |
| margin: auto; |
| } |
| .upload-container { |
| border: 2px dashed #4CAF50; |
| border-radius: 10px; |
| padding: 20px; |
| text-align: center; |
| background-color: #f8f9fa; |
| } |
| .tab-nav { |
| margin-bottom: 20px; |
| } |
| """ |
| ) as app: |
| |
| gr.Markdown( |
| """ |
| # π Tax Document Ingestion System |
| |
| Upload and manage documents in the Pinecone Assistant for GST Minutes processing. |
| |
| ## π Features: |
| - β
**Multiple file upload** - Select and upload multiple documents at once |
| - π·οΈ **Metadata tagging** - Add sections, keywords, and descriptions |
| - π **Batch processing** - All files processed with individual metadata |
| - π **File management** - View uploaded files with timestamps and metadata |
| - π **Detailed reporting** - See success/failure status for each operation |
| |
| --- |
| """ |
| ) |
| |
| |
| with gr.Tabs() as tabs: |
| |
| |
| with gr.TabItem("π€ Upload Documents", id="upload_tab"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### π **File Upload**") |
| files_input = gr.File( |
| label="Select Documents (Max 10 files)", |
| file_count="multiple", |
| file_types=[".pdf", ".doc", ".docx", ".txt"], |
| elem_classes=["upload-container"] |
| ) |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π·οΈ **Document Metadata (Individual for Each File)**") |
| gr.Markdown("*Upload files first, then metadata fields will appear for each document*") |
| |
| |
| with gr.Column() as metadata_container: |
| |
| metadata_fields = [] |
| for i in range(30): |
| field = gr.Textbox( |
| label=f"Field {i}", |
| placeholder="", |
| visible=False, |
| lines=2 |
| ) |
| metadata_fields.append(field) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| upload_btn = gr.Button( |
| "π Upload Documents to Pinecone Assistant", |
| variant="primary", |
| size="lg" |
| ) |
| |
| with gr.Column(scale=1): |
| clear_btn = gr.Button( |
| "ποΈ Clear Form", |
| variant="secondary", |
| size="lg" |
| ) |
| |
| |
| with gr.Row(): |
| processing_status = gr.Markdown( |
| value="π’ **Ready to process documents**", |
| visible=True |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| with gr.Row(): |
| with gr.Column(): |
| status_output = gr.Markdown( |
| label="π Upload Status", |
| value="*Ready to upload documents...*" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| results_output = gr.Markdown( |
| label="π Detailed Results", |
| value="" |
| ) |
| |
| |
| with gr.TabItem("π View Uploaded Files", id="view_tab"): |
| gr.Markdown("### π **Uploaded Files Management**") |
| gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.") |
| |
| with gr.Row(): |
| refresh_btn = gr.Button( |
| "π Refresh File List", |
| variant="primary", |
| size="lg" |
| ) |
| auto_refresh_btn = gr.Button( |
| "π Load Files on Startup", |
| variant="secondary", |
| size="lg" |
| ) |
| |
| |
| with gr.Row(): |
| file_list_status = gr.Markdown( |
| value="π‘ **Click 'Refresh File List' to load uploaded files**", |
| visible=True |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| file_summary = gr.Markdown( |
| label="π Files Summary", |
| value="*Click refresh to load file summary...*" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(): |
| file_details = gr.Markdown( |
| label="π File Details", |
| value="*Click refresh to load file details...*" |
| ) |
| |
| |
| |
| |
| files_input.change( |
| fn=update_metadata_fields, |
| inputs=[files_input], |
| outputs=metadata_fields |
| ) |
| |
| |
| upload_btn.click( |
| fn=start_processing, |
| outputs=[processing_status] |
| ).then( |
| fn=process_files_with_progress, |
| inputs=[files_input] + metadata_fields, |
| outputs=[status_output, results_output, processing_status] |
| ) |
| |
| clear_btn.click( |
| fn=clear_form, |
| outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status] |
| ) |
| |
| |
| |
| |
| refresh_btn.click( |
| fn=refresh_file_list, |
| outputs=[file_list_status] |
| ).then( |
| fn=list_uploaded_files, |
| outputs=[file_summary, file_details] |
| ) |
| |
| |
| auto_refresh_btn.click( |
| fn=refresh_file_list, |
| outputs=[file_list_status] |
| ).then( |
| fn=list_uploaded_files, |
| outputs=[file_summary, file_details] |
| ) |
| |
| |
| gr.Markdown( |
| """ |
| --- |
| |
| ### π‘ **Usage Tips:** |
| |
| **Upload Documents:** |
| - Select up to 10 PDF, DOC, DOCX, or TXT files at once |
| - Upload files first, then fill individual metadata for each document |
| - Each file gets its own sections, keywords, and description |
| - Check the results section for upload status |
| |
| **View Uploaded Files:** |
| - Click 'Refresh File List' to see all uploaded files |
| - View file details including upload timestamps and metadata |
| - Files are sorted by most recent first |
| |
| ### π **Support:** |
| For issues or questions, contact the development team. |
| """ |
| ) |
|
|
| if __name__ == "__main__": |
| app.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| debug=True, |
| show_error=True |
| ) |