|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import uuid |
|
|
from pathlib import Path |
|
|
from typing import Optional, List, Tuple |
|
|
import time |
|
|
|
|
|
import gradio as gr |
|
|
from agents import ( |
|
|
AnalysisAgent, |
|
|
CollaborationAgent, |
|
|
ConversationAgent, |
|
|
MasterOrchestrator, |
|
|
) |
|
|
from utils import load_pdf_text |
|
|
from utils.session import make_user_session |
|
|
from utils.validation import validate_file_size |
|
|
from utils.prompts import PromptManager |
|
|
from utils.export import ExportManager |
|
|
from config import Config |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
Config.ensure_directories() |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not ensure directories: {e}") |
|
|
|
|
|
|
|
|
AGENTS = { |
|
|
"analysis": AnalysisAgent(name="AnalysisAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
|
|
"collab": CollaborationAgent(name="CollaborationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
|
|
"conversation": ConversationAgent(name="ConversationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
|
|
} |
|
|
ORCHESTRATOR = MasterOrchestrator(agents=AGENTS) |
|
|
|
|
|
|
|
|
try: |
|
|
PROMPT_MANAGER = PromptManager() |
|
|
EXPORT_MANAGER = ExportManager() |
|
|
except Exception as e: |
|
|
print(f"Warning: Could not initialize managers: {e}") |
|
|
PROMPT_MANAGER = None |
|
|
EXPORT_MANAGER = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_uploaded_file(uploaded, username: str = "anonymous", session_dir: Optional[str] = None) -> str: |
|
|
if session_dir is None: |
|
|
session_dir = make_user_session(username) |
|
|
Path(session_dir).mkdir(parents=True, exist_ok=True) |
|
|
dst = Path(session_dir) / f"upload_{uuid.uuid4().hex}.pdf" |
|
|
|
|
|
if isinstance(uploaded, str) and os.path.exists(uploaded): |
|
|
from shutil import copyfile |
|
|
copyfile(uploaded, dst) |
|
|
return str(dst) |
|
|
if hasattr(uploaded, "read"): |
|
|
with open(dst, "wb") as f: |
|
|
f.write(uploaded.read()) |
|
|
return str(dst) |
|
|
if isinstance(uploaded, dict) and "name" in uploaded and os.path.exists(uploaded["name"]): |
|
|
from shutil import copyfile |
|
|
copyfile(uploaded["name"], dst) |
|
|
return str(dst) |
|
|
raise RuntimeError("Unable to save uploaded file.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_async(func, *args, **kwargs): |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
return loop.run_until_complete(func(*args, **kwargs)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_analysis(file, prompt, username="anonymous", use_streaming=False): |
|
|
if file is None: |
|
|
return "Please upload a PDF.", None, None |
|
|
|
|
|
try: |
|
|
validate_file_size(file) |
|
|
path = save_uploaded_file(file, username) |
|
|
|
|
|
result = run_async( |
|
|
ORCHESTRATOR.handle_user_prompt, |
|
|
user_id=username, |
|
|
prompt=prompt, |
|
|
file_path=path, |
|
|
targets=["analysis"] |
|
|
) |
|
|
return result.get("analysis", "No analysis result."), None, None |
|
|
except Exception as e: |
|
|
return f"Error during analysis: {str(e)}", None, None |
|
|
|
|
|
def handle_batch_analysis(files, prompt, username="anonymous"): |
|
|
"""Handle batch analysis of multiple PDFs""" |
|
|
if not files or len(files) == 0: |
|
|
return "Please upload at least one PDF.", None, None |
|
|
|
|
|
try: |
|
|
|
|
|
file_paths = [] |
|
|
for file in files: |
|
|
validate_file_size(file) |
|
|
path = save_uploaded_file(file, username) |
|
|
file_paths.append(path) |
|
|
|
|
|
result = run_async( |
|
|
ORCHESTRATOR.handle_batch_analysis, |
|
|
user_id=username, |
|
|
prompt=prompt, |
|
|
file_paths=file_paths, |
|
|
targets=["analysis"] |
|
|
) |
|
|
|
|
|
|
|
|
batch_summary = result.get("summary", {}) |
|
|
batch_results = result.get("batch_results", []) |
|
|
|
|
|
formatted_output = f"π Batch Analysis Results\n" |
|
|
formatted_output += f"Total files: {batch_summary.get('processing_stats', {}).get('total_files', 0)}\n" |
|
|
formatted_output += f"Successful: {batch_summary.get('processing_stats', {}).get('successful', 0)}\n" |
|
|
formatted_output += f"Failed: {batch_summary.get('processing_stats', {}).get('failed', 0)}\n" |
|
|
formatted_output += f"Success rate: {batch_summary.get('processing_stats', {}).get('success_rate', '0%')}\n\n" |
|
|
|
|
|
if batch_summary.get("batch_analysis"): |
|
|
formatted_output += f"π Batch Summary:\n{batch_summary['batch_analysis']}\n\n" |
|
|
|
|
|
formatted_output += "π Individual Results:\n" |
|
|
for i, file_result in enumerate(batch_results): |
|
|
formatted_output += f"\n--- File {i+1}: {Path(file_result.get('file_path', 'Unknown')).name} ---\n" |
|
|
if "error" in file_result: |
|
|
formatted_output += f"β Error: {file_result['error']}\n" |
|
|
else: |
|
|
formatted_output += f"β
{file_result.get('analysis', 'No analysis')}\n" |
|
|
|
|
|
return formatted_output, None, None |
|
|
except Exception as e: |
|
|
return f"Error during batch analysis: {str(e)}", None, None |
|
|
|
|
|
def handle_export(result_text, export_format, username="anonymous"): |
|
|
"""Handle export of analysis results""" |
|
|
if not result_text or result_text.strip() == "": |
|
|
return "No content to export.", None |
|
|
|
|
|
if not EXPORT_MANAGER: |
|
|
return "Export functionality not available.", None |
|
|
|
|
|
try: |
|
|
if export_format == "txt": |
|
|
filepath = EXPORT_MANAGER.export_text(result_text, username=username) |
|
|
elif export_format == "json": |
|
|
data = {"analysis": result_text, "exported_by": username, "timestamp": time.time()} |
|
|
filepath = EXPORT_MANAGER.export_json(data, username=username) |
|
|
elif export_format == "pdf": |
|
|
filepath = EXPORT_MANAGER.export_pdf(result_text, username=username) |
|
|
else: |
|
|
return f"Unsupported export format: {export_format}", None |
|
|
|
|
|
return f"β
Export successful! File saved to: {filepath}", filepath |
|
|
except Exception as e: |
|
|
return f"β Export failed: {str(e)}", None |
|
|
|
|
|
def get_custom_prompts(): |
|
|
"""Get available custom prompts""" |
|
|
if not PROMPT_MANAGER: |
|
|
return [] |
|
|
prompts = PROMPT_MANAGER.get_all_prompts() |
|
|
return list(prompts.keys()) |
|
|
|
|
|
def load_custom_prompt(prompt_id): |
|
|
"""Load a custom prompt template""" |
|
|
if not PROMPT_MANAGER: |
|
|
return "" |
|
|
return PROMPT_MANAGER.get_prompt(prompt_id) or "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="PDF Analysis & Orchestrator", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# π PDF Analysis & Orchestrator - Intelligent Document Processing") |
|
|
gr.Markdown("Upload PDFs and provide instructions for analysis, summarization, or explanation.") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("π Single Document Analysis"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
pdf_in = gr.File(label="Upload PDF", file_types=[".pdf"], elem_id="file_upload") |
|
|
username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="username") |
|
|
|
|
|
|
|
|
with gr.Accordion("π― Custom Prompts", open=False): |
|
|
prompt_dropdown = gr.Dropdown( |
|
|
choices=get_custom_prompts(), |
|
|
label="Select Custom Prompt", |
|
|
value=None |
|
|
) |
|
|
load_prompt_btn = gr.Button("Load Prompt", size="sm") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("### Analysis Instructions") |
|
|
prompt_input = gr.Textbox( |
|
|
lines=4, |
|
|
placeholder="Describe what you want to do with the document...\nExamples:\n- Summarize this document in 3 key points\n- Explain this technical paper for a 10-year-old\n- Segment this document by themes\n- Analyze the key findings", |
|
|
label="Instructions" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
submit_btn = gr.Button("π Analyze & Orchestrate", variant="primary", size="lg") |
|
|
clear_btn = gr.Button("ποΈ Clear", size="sm") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
output_box = gr.Textbox(label="Analysis Result", lines=15, max_lines=25, show_copy_button=True) |
|
|
status_box = gr.Textbox(label="Status", value="Ready to analyze documents", interactive=False) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
with gr.Accordion("πΎ Export Results", open=False): |
|
|
export_format = gr.Dropdown( |
|
|
choices=["txt", "json", "pdf"], |
|
|
label="Export Format", |
|
|
value="txt" |
|
|
) |
|
|
export_btn = gr.Button("π₯ Export", variant="secondary") |
|
|
export_status = gr.Textbox(label="Export Status", interactive=False) |
|
|
|
|
|
|
|
|
with gr.Tab("π Batch Processing"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
batch_files = gr.File( |
|
|
label="Upload Multiple PDFs", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf"] |
|
|
) |
|
|
batch_username = gr.Textbox(label="Username (optional)", placeholder="anonymous") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
batch_prompt = gr.Textbox( |
|
|
lines=3, |
|
|
placeholder="Enter analysis instructions for all documents...", |
|
|
label="Batch Analysis Instructions" |
|
|
) |
|
|
batch_submit = gr.Button("π Process Batch", variant="primary", size="lg") |
|
|
|
|
|
batch_output = gr.Textbox(label="Batch Results", lines=20, max_lines=30, show_copy_button=True) |
|
|
batch_status = gr.Textbox(label="Batch Status", interactive=False) |
|
|
|
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=handle_analysis, |
|
|
inputs=[pdf_in, prompt_input, username_input, gr.State(False)], |
|
|
outputs=[output_box, status_box, gr.State()] |
|
|
) |
|
|
|
|
|
|
|
|
load_prompt_btn.click( |
|
|
fn=load_custom_prompt, |
|
|
inputs=[prompt_dropdown], |
|
|
outputs=[prompt_input] |
|
|
) |
|
|
|
|
|
|
|
|
export_btn.click( |
|
|
fn=handle_export, |
|
|
inputs=[output_box, export_format, username_input], |
|
|
outputs=[export_status, gr.State()] |
|
|
) |
|
|
|
|
|
|
|
|
clear_btn.click( |
|
|
fn=lambda: ("", "", "", "Ready"), |
|
|
inputs=[], |
|
|
outputs=[pdf_in, prompt_input, output_box, status_box] |
|
|
) |
|
|
|
|
|
|
|
|
batch_submit.click( |
|
|
fn=handle_batch_analysis, |
|
|
inputs=[batch_files, batch_prompt, batch_username], |
|
|
outputs=[batch_output, batch_status, gr.State()] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["Summarize this document in 3 key points"], |
|
|
["Explain this technical content for a general audience"], |
|
|
["Segment this document by main themes or topics"], |
|
|
["Analyze the key findings and recommendations"], |
|
|
["Create an executive summary of this document"], |
|
|
], |
|
|
inputs=prompt_input, |
|
|
label="Example Instructions" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |
|
|
|