multi-doc / app.py
datafreak's picture
Update app.py
c41ebc7 verified
raw
history blame
20.2 kB
import gradio as gr
import os
from pathlib import Path
from pinecone import Pinecone
from typing import List, Tuple
import tempfile
import shutil
from dotenv import load_dotenv
import time
from datetime import datetime
import json
# Load environment variables
load_dotenv()
# Validate required environment variables
required_env_vars = ["PINECONE_API_KEY"]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
# Initialize Pinecone
pinecone_api_key = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=pinecone_api_key)
# Create uploads directory
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def list_uploaded_files(progress=gr.Progress()):
"""List all files uploaded to Pinecone Assistant with their metadata and timestamps"""
try:
progress(0.1, desc="πŸ” Connecting to Pinecone Assistant...")
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes")
assistant = pc.assistant.Assistant(assistant_name=assistant_name)
progress(0.3, desc="πŸ“‹ Fetching file list...")
time.sleep(0.5)
# List all files in the assistant
files_response = assistant.list_files()
progress(0.7, desc="πŸ“Š Processing file information...")
time.sleep(0.3)
if not files_response or not hasattr(files_response, 'files') or not files_response.files:
progress(1.0, desc="βœ… Complete - No files found")
return "πŸ“‹ **No files found in Pinecone Assistant**", ""
files_list = files_response.files
total_files = len(files_list)
# Sort files by creation time (most recent first)
sorted_files = sorted(files_list, key=lambda x: getattr(x, 'created_on', ''), reverse=True)
progress(0.9, desc="πŸ“ Formatting results...")
# Create summary
summary = f"πŸ“Š **Files Summary**\n\n"
summary += f"πŸ“ **Total files:** {total_files}\n"
summary += f"πŸ• **Last updated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
# Create detailed file list
detailed_info = "## πŸ“‹ **File Details**\n\n"
for i, file_obj in enumerate(sorted_files, 1):
try:
# Get basic file information
file_name = getattr(file_obj, 'name', 'Unknown')
file_id = getattr(file_obj, 'id', 'Unknown')
file_size = getattr(file_obj, 'size', 0)
created_on = getattr(file_obj, 'created_on', 'Unknown')
updated_on = getattr(file_obj, 'updated_on', created_on)
# Format file size
if file_size > 1024 * 1024:
size_str = f"{file_size / (1024 * 1024):.2f} MB"
elif file_size > 1024:
size_str = f"{file_size / 1024:.2f} KB"
else:
size_str = f"{file_size} bytes"
# Format timestamps
try:
if created_on != 'Unknown':
created_formatted = datetime.fromisoformat(created_on.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S UTC')
else:
created_formatted = 'Unknown'
if updated_on != 'Unknown' and updated_on != created_on:
updated_formatted = datetime.fromisoformat(updated_on.replace('Z', '+00:00')).strftime('%Y-%m-%d %H:%M:%S UTC')
else:
updated_formatted = created_formatted
except:
created_formatted = str(created_on)
updated_formatted = str(updated_on)
detailed_info += f"### {i}. πŸ“„ **{file_name}**\n"
detailed_info += f"- **πŸ†” File ID:** `{file_id}`\n"
detailed_info += f"- **πŸ“ Size:** {size_str}\n"
detailed_info += f"- **πŸ“… Uploaded:** {created_formatted}\n"
detailed_info += f"- **πŸ”„ Last Updated:** {updated_formatted}\n"
# Try to get metadata if available
try:
# Get file details for metadata
file_details = assistant.describe_file(file_id=file_id)
if hasattr(file_details, 'metadata') and file_details.metadata:
metadata = file_details.metadata
detailed_info += f"- **🏷️ Metadata:**\n"
if isinstance(metadata, dict):
for key, value in metadata.items():
if isinstance(value, list):
detailed_info += f" - **{key.title()}:** {', '.join(map(str, value))}\n"
else:
detailed_info += f" - **{key.title()}:** {value}\n"
else:
detailed_info += f" - {metadata}\n"
except Exception as metadata_error:
detailed_info += f"- **🏷️ Metadata:** Could not retrieve metadata\n"
detailed_info += "\n---\n\n"
except Exception as file_error:
detailed_info += f"### {i}. ❌ **Error processing file**\n"
detailed_info += f"- **Error:** {str(file_error)}\n\n---\n\n"
progress(1.0, desc="βœ… File list retrieved successfully!")
time.sleep(0.3)
return summary, detailed_info
except Exception as e:
error_msg = f"❌ **Error retrieving file list:** {str(e)}"
return error_msg, ""
def refresh_file_list():
"""Refresh the file list"""
return "πŸ”„ **Refreshing file list... Please wait**"
def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()):
"""Process multiple files with individual metadata and show progress"""
if not files:
return "❌ Error: No files selected", ""
if len(files) > 10:
return "❌ Error: Maximum 10 files allowed at a time", ""
try:
results = []
errors = []
total_files = len(files)
# Initialize Pinecone Assistant
progress(0, desc="πŸ”§ Initializing Pinecone Assistant...")
time.sleep(0.5) # Small delay to show the progress
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes")
assistant = pc.assistant.Assistant(assistant_name=assistant_name)
# Process each file with its individual metadata
for i, file_path in enumerate(files):
try:
filename = os.path.basename(file_path)
progress((i / total_files), desc=f"πŸ“„ Processing {filename}... ({i+1}/{total_files})")
# Get metadata for this specific file (3 fields per file: sections, keywords, description)
sections_idx = i * 3
keywords_idx = i * 3 + 1
description_idx = i * 3 + 2
if sections_idx < len(metadata_inputs):
sections = metadata_inputs[sections_idx] or ""
keywords = metadata_inputs[keywords_idx] or ""
description = metadata_inputs[description_idx] or ""
else:
sections = keywords = description = ""
# Skip if no metadata provided for this file
if not sections.strip() and not keywords.strip() and not description.strip():
errors.append({
"filename": filename,
"error": "❌ Error: No metadata provided"
})
continue
# Prepare metadata for this file
progress((i / total_files), desc=f"🏷️ Preparing metadata for {filename}...")
metadata = {
"sections": [s.strip() for s in sections.split(",") if s.strip()],
"keywords": [k.strip() for k in keywords.split(",") if k.strip()],
"description": description.strip()
}
# Copy to uploads directory
progress((i / total_files), desc=f"πŸ“ Copying {filename} to uploads...")
destination_path = os.path.join(UPLOAD_FOLDER, filename)
shutil.copy2(file_path, destination_path)
# Upload to Pinecone Assistant
progress((i / total_files), desc=f"☁️ Uploading {filename} to Pinecone...")
response = assistant.upload_file(
file_path=destination_path,
metadata=metadata,
timeout=None
)
results.append({
"filename": filename,
"status": "βœ… Success",
"metadata": metadata,
"response": str(response)
})
except Exception as file_error:
errors.append({
"filename": os.path.basename(file_path),
"error": f"❌ Error: {str(file_error)}"
})
# Final progress update
progress(1.0, desc="βœ… Processing complete!")
time.sleep(0.5)
# Format results for display
success_count = len(results)
error_count = len(errors)
status_message = f"πŸ“Š **Processing Complete**\n\n"
status_message += f"βœ… **Successful uploads:** {success_count}\n"
status_message += f"❌ **Failed uploads:** {error_count}\n"
status_message += f"πŸ“ **Total files processed:** {len(files)}\n\n"
# Detailed results
detailed_results = "## πŸ“‹ **Detailed Results**\n\n"
if results:
detailed_results += "### βœ… **Successful Uploads:**\n"
for result in results:
detailed_results += f"- **{result['filename']}**\n"
detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n"
detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n"
detailed_results += f" - Description: {result['metadata']['description']}\n\n"
if errors:
detailed_results += "### ❌ **Failed Uploads:**\n"
for error in errors:
detailed_results += f"- **{error['filename']}** - {error['error']}\n"
return status_message, detailed_results, "βœ… **Processing completed successfully!**"
except Exception as e:
error_msg = f"❌ **Critical Error:** {str(e)}"
return error_msg, "", "❌ **Processing failed with error**"
def update_metadata_fields(files):
"""Update metadata fields based on uploaded files"""
if not files:
return [gr.update(visible=False)] * 30 # Hide all fields
if len(files) > 10:
# Show error and hide all fields
return [gr.update(visible=False)] * 30
updates = []
for i in range(len(files)):
if i < len(files):
filename = os.path.basename(files[i])
# Show 3 fields per file (sections, keywords, description)
updates.extend([
gr.update(visible=True, label=f"πŸ“‘ Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"),
gr.update(visible=True, label=f"πŸ” Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"),
gr.update(visible=True, label=f"πŸ“ Description for {filename}", placeholder="Brief description of this document")
])
# Hide remaining fields
while len(updates) < 30:
updates.append(gr.update(visible=False))
return updates[:30]
def clear_form():
"""Clear all form fields"""
return [None] + [""] * 30 + ["", "", "🟒 **Ready to process documents**"]
def start_processing():
"""Show processing started status"""
return "πŸ”„ **Processing documents... Please wait**"
def finish_processing():
"""Show processing finished status"""
return "βœ… **Processing completed successfully!**"
# Create Gradio interface
with gr.Blocks(
title="πŸ“„ Tax Document Ingestion System",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1400px !important;
margin: auto;
}
.upload-container {
border: 2px dashed #4CAF50;
border-radius: 10px;
padding: 20px;
text-align: center;
background-color: #f8f9fa;
}
.tab-nav {
margin-bottom: 20px;
}
"""
) as app:
gr.Markdown(
"""
# πŸ“„ Tax Document Ingestion System
Upload and manage documents in the Pinecone Assistant for GST Minutes processing.
## πŸš€ Features:
- βœ… **Multiple file upload** - Select and upload multiple documents at once
- 🏷️ **Metadata tagging** - Add sections, keywords, and descriptions
- πŸ”„ **Batch processing** - All files processed with individual metadata
- πŸ“Š **File management** - View uploaded files with timestamps and metadata
- πŸ“‹ **Detailed reporting** - See success/failure status for each operation
---
"""
)
# Create tabs for different functionalities
with gr.Tabs() as tabs:
# Tab 1: Upload Documents
with gr.TabItem("πŸ“€ Upload Documents", id="upload_tab"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“ **File Upload**")
files_input = gr.File(
label="Select Documents (Max 10 files)",
file_count="multiple",
file_types=[".pdf", ".doc", ".docx", ".txt"],
elem_classes=["upload-container"]
)
with gr.Column(scale=1):
gr.Markdown("### 🏷️ **Document Metadata (Individual for Each File)**")
gr.Markdown("*Upload files first, then metadata fields will appear for each document*")
# Dynamic metadata fields container
with gr.Column() as metadata_container:
# Create 30 text fields (enough for 10 files with 3 fields each)
metadata_fields = []
for i in range(30):
field = gr.Textbox(
label=f"Field {i}",
placeholder="",
visible=False,
lines=2
)
metadata_fields.append(field)
with gr.Row():
with gr.Column(scale=1):
upload_btn = gr.Button(
"πŸš€ Upload Documents to Pinecone Assistant",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
clear_btn = gr.Button(
"πŸ—‘οΈ Clear Form",
variant="secondary",
size="lg"
)
# Processing status indicator
with gr.Row():
processing_status = gr.Markdown(
value="🟒 **Ready to process documents**",
visible=True
)
gr.Markdown("---")
# Results section
with gr.Row():
with gr.Column():
status_output = gr.Markdown(
label="πŸ“Š Upload Status",
value="*Ready to upload documents...*"
)
with gr.Row():
with gr.Column():
results_output = gr.Markdown(
label="πŸ“‹ Detailed Results",
value=""
)
# Tab 2: View Uploaded Files
with gr.TabItem("πŸ“‹ View Uploaded Files", id="view_tab"):
gr.Markdown("### πŸ“‹ **Uploaded Files Management**")
gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.")
with gr.Row():
refresh_btn = gr.Button(
"πŸ”„ Refresh File List",
variant="primary",
size="lg"
)
auto_refresh_btn = gr.Button(
"πŸ“‹ Load Files on Startup",
variant="secondary",
size="lg"
)
# File list status
with gr.Row():
file_list_status = gr.Markdown(
value="🟑 **Click 'Refresh File List' to load uploaded files**",
visible=True
)
gr.Markdown("---")
# File list results
with gr.Row():
with gr.Column(scale=1):
file_summary = gr.Markdown(
label="πŸ“Š Files Summary",
value="*Click refresh to load file summary...*"
)
with gr.Row():
with gr.Column():
file_details = gr.Markdown(
label="πŸ“‹ File Details",
value="*Click refresh to load file details...*"
)
# Event handlers for Upload tab
# Update metadata fields when files are uploaded
files_input.change(
fn=update_metadata_fields,
inputs=[files_input],
outputs=metadata_fields
)
# Show processing status when upload starts
upload_btn.click(
fn=start_processing,
outputs=[processing_status]
).then(
fn=process_files_with_progress,
inputs=[files_input] + metadata_fields,
outputs=[status_output, results_output, processing_status]
)
clear_btn.click(
fn=clear_form,
outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status]
)
# Event handlers for View Files tab
# Refresh file list
refresh_btn.click(
fn=refresh_file_list,
outputs=[file_list_status]
).then(
fn=list_uploaded_files,
outputs=[file_summary, file_details]
)
# Auto load files on startup
auto_refresh_btn.click(
fn=refresh_file_list,
outputs=[file_list_status]
).then(
fn=list_uploaded_files,
outputs=[file_summary, file_details]
)
# Footer
gr.Markdown(
"""
---
### πŸ’‘ **Usage Tips:**
**Upload Documents:**
- Select up to 10 PDF, DOC, DOCX, or TXT files at once
- Upload files first, then fill individual metadata for each document
- Each file gets its own sections, keywords, and description
- Check the results section for upload status
**View Uploaded Files:**
- Click 'Refresh File List' to see all uploaded files
- View file details including upload timestamps and metadata
- Files are sorted by most recent first
### πŸ“ž **Support:**
For issues or questions, contact the development team.
"""
)
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
show_error=True
)