PDF_analyst / app.py
JatsTheAIGen's picture
Fix
0e92524
# PDF Analysis & Orchestrator
# Extracted core functionality from Sharmaji ka PDF Blaster V1
import os
import asyncio
import uuid
from pathlib import Path
from typing import Optional, List, Tuple
import time
import gradio as gr
from agents import (
AnalysisAgent,
CollaborationAgent,
ConversationAgent,
ResearchAnalystAgent,
MasterOrchestrator,
)
from utils import load_pdf_text
from utils.session import make_user_session
from utils.validation import validate_file_size
from utils.prompts import PromptManager
from utils.export import ExportManager
from config import Config
# ------------------------
# Initialize Components
# ------------------------
Config.ensure_directories()
# Agent Roster - Focused on Analysis & Orchestration
AGENTS = {
"analysis": AnalysisAgent(name="AnalysisAgent", model=Config.OPENAI_MODEL, tasks_completed=0),
"collab": CollaborationAgent(name="CollaborationAgent", model=Config.OPENAI_MODEL, tasks_completed=0),
"conversation": ConversationAgent(name="ConversationAgent", model=Config.OPENAI_MODEL, tasks_completed=0),
"research": ResearchAnalystAgent(name="ResearchAnalystAgent", model=Config.OPENAI_MODEL, tasks_completed=0),
}
ORCHESTRATOR = MasterOrchestrator(agents=AGENTS)
# Initialize managers
PROMPT_MANAGER = PromptManager()
EXPORT_MANAGER = ExportManager()
# ------------------------
# File Handling
# ------------------------
def save_uploaded_file(uploaded, username: str = "anonymous", session_dir: Optional[str] = None) -> str:
if session_dir is None:
session_dir = make_user_session(username)
Path(session_dir).mkdir(parents=True, exist_ok=True)
dst = Path(session_dir) / f"upload_{uuid.uuid4().hex}.pdf"
if isinstance(uploaded, str) and os.path.exists(uploaded):
from shutil import copyfile
copyfile(uploaded, dst)
return str(dst)
if hasattr(uploaded, "read"):
with open(dst, "wb") as f:
f.write(uploaded.read())
return str(dst)
if isinstance(uploaded, dict) and "name" in uploaded and os.path.exists(uploaded["name"]):
from shutil import copyfile
copyfile(uploaded["name"], dst)
return str(dst)
raise RuntimeError("Unable to save uploaded file.")
# ------------------------
# Async wrapper
# ------------------------
def run_async(func, *args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(func(*args, **kwargs))
# ------------------------
# Analysis Handlers - Core Features
# ------------------------
def handle_analysis(file, prompt, username="anonymous", use_streaming=False):
if file is None:
return "Please upload a PDF.", None, None
validate_file_size(file)
path = save_uploaded_file(file, username)
if use_streaming:
return handle_analysis_streaming(path, prompt, username)
else:
result = run_async(
ORCHESTRATOR.handle_user_prompt,
user_id=username,
prompt=prompt,
file_path=path,
targets=["analysis"]
)
return result.get("analysis", "No analysis result."), None, None
def handle_analysis_streaming(file_path, prompt, username="anonymous"):
"""Handle analysis with streaming output"""
def stream_generator():
async def async_stream():
async for chunk in ORCHESTRATOR.handle_user_prompt_streaming(
user_id=username,
prompt=prompt,
file_path=file_path,
targets=["analysis"]
):
yield chunk
# Convert async generator to sync generator
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
async_gen = async_stream()
while True:
try:
chunk = loop.run_until_complete(async_gen.__anext__())
yield chunk
except StopAsyncIteration:
break
finally:
loop.close()
return stream_generator(), None, None
def handle_batch_analysis(files, prompt, username="anonymous"):
"""Handle batch analysis of multiple PDFs"""
if not files or len(files) == 0:
return "Please upload at least one PDF.", None, None
# Validate all files
file_paths = []
for file in files:
try:
validate_file_size(file)
path = save_uploaded_file(file, username)
file_paths.append(path)
except Exception as e:
return f"Error with file {file}: {str(e)}", None, None
result = run_async(
ORCHESTRATOR.handle_batch_analysis,
user_id=username,
prompt=prompt,
file_paths=file_paths,
targets=["analysis"]
)
# Format batch results
batch_summary = result.get("summary", {})
batch_results = result.get("batch_results", [])
formatted_output = f"๐Ÿ“Š Batch Analysis Results\n"
formatted_output += f"Total files: {batch_summary.get('processing_stats', {}).get('total_files', 0)}\n"
formatted_output += f"Successful: {batch_summary.get('processing_stats', {}).get('successful', 0)}\n"
formatted_output += f"Failed: {batch_summary.get('processing_stats', {}).get('failed', 0)}\n"
formatted_output += f"Success rate: {batch_summary.get('processing_stats', {}).get('success_rate', '0%')}\n\n"
if batch_summary.get("batch_analysis"):
formatted_output += f"๐Ÿ“‹ Batch Summary:\n{batch_summary['batch_analysis']}\n\n"
formatted_output += "๐Ÿ“„ Individual Results:\n"
for i, file_result in enumerate(batch_results):
formatted_output += f"\n--- File {i+1}: {Path(file_result.get('file_path', 'Unknown')).name} ---\n"
if "error" in file_result:
formatted_output += f"โŒ Error: {file_result['error']}\n"
else:
formatted_output += f"โœ… {file_result.get('analysis', 'No analysis')}\n"
return formatted_output, None, None
def handle_research_analysis(file, prompt, username="anonymous", use_streaming=False):
"""Handle research analysis with R&D pipeline focus"""
if file is None:
return "Please upload a PDF.", None, None
validate_file_size(file)
path = save_uploaded_file(file, username)
# For now, always use non-streaming approach for research analysis
# Streaming can be added later with proper Gradio integration
result = run_async(
ORCHESTRATOR.handle_user_prompt,
user_id=username,
prompt=prompt,
file_path=path,
targets=["research"]
)
return result.get("research_analysis", "No research analysis result."), None, None
def handle_export(result_text, export_format, username="anonymous"):
"""Handle export of analysis results"""
if not result_text or result_text.strip() == "":
return "No content to export.", None
try:
if export_format == "txt":
filepath = EXPORT_MANAGER.export_text(result_text, username=username)
elif export_format == "json":
data = {"analysis": result_text, "exported_by": username, "timestamp": time.time()}
filepath = EXPORT_MANAGER.export_json(data, username=username)
elif export_format == "pdf":
filepath = EXPORT_MANAGER.export_pdf(result_text, username=username)
else:
return f"Unsupported export format: {export_format}", None
return f"โœ… Export successful! File saved to: {filepath}", filepath
except Exception as e:
return f"โŒ Export failed: {str(e)}", None
def get_custom_prompts():
"""Get available custom prompts"""
prompts = PROMPT_MANAGER.get_all_prompts()
return list(prompts.keys())
def load_custom_prompt(prompt_id):
"""Load a custom prompt template"""
return PROMPT_MANAGER.get_prompt(prompt_id) or ""
# ------------------------
# Gradio UI - Enhanced Interface
# ------------------------
with gr.Blocks(title="PDF Analysis & Orchestrator", theme=gr.themes.Soft()) as demo:
gr.Markdown("# ๐Ÿ“„ PDF Analysis & Orchestrator - Intelligent Document Processing")
gr.Markdown("Upload PDFs and provide instructions for analysis, summarization, or explanation. Now with enhanced features!")
with gr.Tabs():
# Single Document Analysis Tab
with gr.Tab("๐Ÿ“„ Single Document Analysis"):
with gr.Row():
with gr.Column(scale=1):
pdf_in = gr.File(label="Upload PDF", file_types=[".pdf"], elem_id="file_upload")
username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="username")
# Custom Prompts Section
with gr.Accordion("๐ŸŽฏ Custom Prompts", open=False):
prompt_dropdown = gr.Dropdown(
choices=get_custom_prompts(),
label="Select Custom Prompt",
value=None
)
load_prompt_btn = gr.Button("Load Prompt", size="sm")
# Analysis Options
with gr.Accordion("โš™๏ธ Analysis Options", open=False):
use_streaming = gr.Checkbox(label="Enable Streaming Output", value=False)
chunk_size = gr.Slider(
minimum=5000, maximum=30000, value=15000, step=1000,
label="Chunk Size (for large documents)"
)
with gr.Column(scale=2):
gr.Markdown("### Analysis Instructions")
prompt_input = gr.Textbox(
lines=4,
placeholder="Describe what you want to do with the document...\nExamples:\n- Summarize this document in 3 key points\n- Explain this technical paper for a 10-year-old\n- Segment this document by themes\n- Analyze the key findings",
label="Instructions"
)
with gr.Row():
submit_btn = gr.Button("๐Ÿ” Analyze & Orchestrate", variant="primary", size="lg")
clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear", size="sm")
# Results Section
with gr.Row():
with gr.Column(scale=2):
output_box = gr.Textbox(label="Analysis Result", lines=15, max_lines=25, show_copy_button=True)
status_box = gr.Textbox(label="Status", value="Ready to analyze documents", interactive=False)
with gr.Column(scale=1):
# Export Section
with gr.Accordion("๐Ÿ’พ Export Results", open=False):
export_format = gr.Dropdown(
choices=["txt", "json", "pdf"],
label="Export Format",
value="txt"
)
export_btn = gr.Button("๐Ÿ“ฅ Export", variant="secondary")
export_status = gr.Textbox(label="Export Status", interactive=False)
# Document Info
with gr.Accordion("๐Ÿ“Š Document Info", open=False):
doc_info = gr.Textbox(label="Document Information", interactive=False, lines=6)
# Senior Research Analyst Tab
with gr.Tab("๐Ÿ”ฌ Senior Research Analyst"):
gr.Markdown("### ๐ŸŽฏ R&D Pipeline Analysis")
gr.Markdown("Act as a senior research analyst: extract high-value, novel ideas and convert them into concrete R&D pipeline outcomes (experiments โ†’ prototypes โ†’ product decisions)")
with gr.Row():
with gr.Column(scale=1):
research_pdf_in = gr.File(label="Upload Research Document", file_types=[".pdf"], elem_id="research_file_upload")
research_username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="research_username")
# Research-Specific Prompts Section
with gr.Accordion("๐ŸŽฏ Research Prompts", open=False):
research_prompt_dropdown = gr.Dropdown(
choices=[pid for pid, prompt in PROMPT_MANAGER.get_all_prompts().items() if prompt.get("category") == "research"],
label="Select Research Prompt",
value="research_pipeline"
)
load_research_prompt_btn = gr.Button("Load Research Prompt", size="sm")
# Research Analysis Options
with gr.Accordion("โš™๏ธ Research Options", open=False):
gr.Markdown("Research analysis uses comprehensive processing for detailed R&D pipeline insights.")
with gr.Column(scale=2):
gr.Markdown("### Research Analysis Instructions")
research_prompt_input = gr.Textbox(
lines=4,
placeholder="Focus on extracting novel ideas with high product/engineering impact...\nExamples:\n- Identify breakthrough concepts for R&D pipeline\n- Assess commercial viability of technical innovations\n- Design experimental frameworks for validation\n- Create prototype development roadmaps",
label="Research Instructions"
)
with gr.Row():
research_submit_btn = gr.Button("๐Ÿ”ฌ Research Analysis", variant="primary", size="lg")
research_clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear", size="sm")
# Research Results Section
with gr.Row():
with gr.Column(scale=2):
research_output_box = gr.Textbox(label="Research Analysis Result", lines=20, max_lines=30, show_copy_button=True)
research_status_box = gr.Textbox(label="Research Status", value="Ready for research analysis", interactive=False)
with gr.Column(scale=1):
# Research Export Section
with gr.Accordion("๐Ÿ’พ Export Research Results", open=False):
research_export_format = gr.Dropdown(
choices=["txt", "json", "pdf"],
label="Export Format",
value="txt"
)
research_export_btn = gr.Button("๐Ÿ“ฅ Export Research", variant="secondary")
research_export_status = gr.Textbox(label="Export Status", interactive=False)
# Research Insights Summary
with gr.Accordion("๐Ÿ“Š Research Insights", open=False):
research_insights = gr.Textbox(label="Key Insights Summary", interactive=False, lines=8)
# Batch Processing Tab
with gr.Tab("๐Ÿ“š Batch Processing"):
with gr.Row():
with gr.Column(scale=1):
batch_files = gr.File(
label="Upload Multiple PDFs",
file_count="multiple",
file_types=[".pdf"]
)
batch_username = gr.Textbox(label="Username (optional)", placeholder="anonymous")
with gr.Column(scale=2):
batch_prompt = gr.Textbox(
lines=3,
placeholder="Enter analysis instructions for all documents...",
label="Batch Analysis Instructions"
)
batch_submit = gr.Button("๐Ÿš€ Process Batch", variant="primary", size="lg")
batch_output = gr.Textbox(label="Batch Results", lines=20, max_lines=30, show_copy_button=True)
batch_status = gr.Textbox(label="Batch Status", interactive=False)
# Custom Prompts Management Tab
with gr.Tab("๐ŸŽฏ Manage Prompts"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Add New Prompt")
new_prompt_id = gr.Textbox(label="Prompt ID", placeholder="my_custom_prompt")
new_prompt_name = gr.Textbox(label="Prompt Name", placeholder="My Custom Analysis")
new_prompt_desc = gr.Textbox(label="Description", placeholder="What this prompt does")
new_prompt_template = gr.Textbox(
lines=4,
label="Prompt Template",
placeholder="Enter your custom prompt template..."
)
new_prompt_category = gr.Dropdown(
choices=["custom", "business", "technical", "explanation", "analysis"],
label="Category",
value="custom"
)
add_prompt_btn = gr.Button("โž• Add Prompt", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Existing Prompts")
prompt_list = gr.Dataframe(
headers=["ID", "Name", "Category", "Description"],
datatype=["str", "str", "str", "str"],
interactive=False,
label="Available Prompts"
)
refresh_prompts_btn = gr.Button("๐Ÿ”„ Refresh List")
delete_prompt_id = gr.Textbox(label="Prompt ID to Delete", placeholder="prompt_id")
delete_prompt_btn = gr.Button("๐Ÿ—‘๏ธ Delete Prompt", variant="stop")
# Event Handlers
# Single document analysis
submit_btn.click(
fn=handle_analysis,
inputs=[pdf_in, prompt_input, username_input, use_streaming],
outputs=[output_box, status_box, doc_info]
)
# Load custom prompt
load_prompt_btn.click(
fn=load_custom_prompt,
inputs=[prompt_dropdown],
outputs=[prompt_input]
)
# Export functionality
export_btn.click(
fn=handle_export,
inputs=[output_box, export_format, username_input],
outputs=[export_status, gr.State()]
)
# Clear functionality
clear_btn.click(
fn=lambda: ("", "", "", "Ready"),
inputs=[],
outputs=[pdf_in, prompt_input, output_box, status_box]
)
# Research analysis event handlers
research_submit_btn.click(
fn=handle_research_analysis,
inputs=[research_pdf_in, research_prompt_input, research_username_input],
outputs=[research_output_box, research_status_box, research_insights]
)
# Load research prompt
load_research_prompt_btn.click(
fn=load_custom_prompt,
inputs=[research_prompt_dropdown],
outputs=[research_prompt_input]
)
# Research export functionality
research_export_btn.click(
fn=handle_export,
inputs=[research_output_box, research_export_format, research_username_input],
outputs=[research_export_status, gr.State()]
)
# Research clear functionality
research_clear_btn.click(
fn=lambda: ("", "", "", "Ready for research analysis", ""),
inputs=[],
outputs=[research_pdf_in, research_prompt_input, research_output_box, research_status_box, research_insights]
)
# Batch processing
batch_submit.click(
fn=handle_batch_analysis,
inputs=[batch_files, batch_prompt, batch_username],
outputs=[batch_output, batch_status, gr.State()]
)
# Prompt management
add_prompt_btn.click(
fn=lambda id, name, desc, template, cat: PROMPT_MANAGER.add_prompt(id, name, desc, template, cat),
inputs=[new_prompt_id, new_prompt_name, new_prompt_desc, new_prompt_template, new_prompt_category],
outputs=[]
)
refresh_prompts_btn.click(
fn=lambda: [[pid, prompt["name"], prompt["category"], prompt["description"]]
for pid, prompt in PROMPT_MANAGER.get_all_prompts().items()],
inputs=[],
outputs=[prompt_list]
)
delete_prompt_btn.click(
fn=lambda pid: PROMPT_MANAGER.delete_prompt(pid),
inputs=[delete_prompt_id],
outputs=[]
)
# Examples
gr.Examples(
examples=[
["Summarize this document in 3 key points"],
["Explain this technical content for a general audience"],
["Segment this document by main themes or topics"],
["Analyze the key findings and recommendations"],
["Create an executive summary of this document"],
],
inputs=prompt_input,
label="Example Instructions"
)
# Research Examples
gr.Examples(
examples=[
["Identify breakthrough concepts with high product/engineering impact and design specific experiments to validate them"],
["Assess the commercial viability of technical innovations and create prototype development roadmaps"],
["Extract novel methodologies and convert them into concrete R&D pipeline outcomes"],
["Analyze technical concepts for transformative potential and generate strategic product decisions"],
["Design experimental frameworks to validate key hypotheses with measurable success criteria"],
],
inputs=research_prompt_input,
label="Research Analysis Examples"
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))