# PDF Analysis & Orchestrator # Extracted core functionality from Sharmaji ka PDF Blaster V1 import os import asyncio import uuid from pathlib import Path from typing import Optional, List, Tuple import time import gradio as gr from agents import ( AnalysisAgent, CollaborationAgent, ConversationAgent, ResearchAnalystAgent, MasterOrchestrator, ) from utils import load_pdf_text from utils.session import make_user_session from utils.validation import validate_file_size from utils.prompts import PromptManager from utils.export import ExportManager from config import Config # ------------------------ # Initialize Components # ------------------------ Config.ensure_directories() # Agent Roster - Focused on Analysis & Orchestration AGENTS = { "analysis": AnalysisAgent(name="AnalysisAgent", model=Config.OPENAI_MODEL, tasks_completed=0), "collab": CollaborationAgent(name="CollaborationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), "conversation": ConversationAgent(name="ConversationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), "research": ResearchAnalystAgent(name="ResearchAnalystAgent", model=Config.OPENAI_MODEL, tasks_completed=0), } ORCHESTRATOR = MasterOrchestrator(agents=AGENTS) # Initialize managers PROMPT_MANAGER = PromptManager() EXPORT_MANAGER = ExportManager() # ------------------------ # File Handling # ------------------------ def save_uploaded_file(uploaded, username: str = "anonymous", session_dir: Optional[str] = None) -> str: if session_dir is None: session_dir = make_user_session(username) Path(session_dir).mkdir(parents=True, exist_ok=True) dst = Path(session_dir) / f"upload_{uuid.uuid4().hex}.pdf" if isinstance(uploaded, str) and os.path.exists(uploaded): from shutil import copyfile copyfile(uploaded, dst) return str(dst) if hasattr(uploaded, "read"): with open(dst, "wb") as f: f.write(uploaded.read()) return str(dst) if isinstance(uploaded, dict) and "name" in uploaded and os.path.exists(uploaded["name"]): from shutil import copyfile copyfile(uploaded["name"], dst) return str(dst) raise RuntimeError("Unable to save uploaded file.") # ------------------------ # Async wrapper # ------------------------ def run_async(func, *args, **kwargs): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(func(*args, **kwargs)) # ------------------------ # Analysis Handlers - Core Features # ------------------------ def handle_analysis(file, prompt, username="anonymous", use_streaming=False): if file is None: return "Please upload a PDF.", None, None validate_file_size(file) path = save_uploaded_file(file, username) if use_streaming: return handle_analysis_streaming(path, prompt, username) else: result = run_async( ORCHESTRATOR.handle_user_prompt, user_id=username, prompt=prompt, file_path=path, targets=["analysis"] ) return result.get("analysis", "No analysis result."), None, None def handle_analysis_streaming(file_path, prompt, username="anonymous"): """Handle analysis with streaming output""" def stream_generator(): async def async_stream(): async for chunk in ORCHESTRATOR.handle_user_prompt_streaming( user_id=username, prompt=prompt, file_path=file_path, targets=["analysis"] ): yield chunk # Convert async generator to sync generator loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: async_gen = async_stream() while True: try: chunk = loop.run_until_complete(async_gen.__anext__()) yield chunk except StopAsyncIteration: break finally: loop.close() return stream_generator(), None, None def handle_batch_analysis(files, prompt, username="anonymous"): """Handle batch analysis of multiple PDFs""" if not files or len(files) == 0: return "Please upload at least one PDF.", None, None # Validate all files file_paths = [] for file in files: try: validate_file_size(file) path = save_uploaded_file(file, username) file_paths.append(path) except Exception as e: return f"Error with file {file}: {str(e)}", None, None result = run_async( ORCHESTRATOR.handle_batch_analysis, user_id=username, prompt=prompt, file_paths=file_paths, targets=["analysis"] ) # Format batch results batch_summary = result.get("summary", {}) batch_results = result.get("batch_results", []) formatted_output = f"📊 Batch Analysis Results\n" formatted_output += f"Total files: {batch_summary.get('processing_stats', {}).get('total_files', 0)}\n" formatted_output += f"Successful: {batch_summary.get('processing_stats', {}).get('successful', 0)}\n" formatted_output += f"Failed: {batch_summary.get('processing_stats', {}).get('failed', 0)}\n" formatted_output += f"Success rate: {batch_summary.get('processing_stats', {}).get('success_rate', '0%')}\n\n" if batch_summary.get("batch_analysis"): formatted_output += f"📋 Batch Summary:\n{batch_summary['batch_analysis']}\n\n" formatted_output += "📄 Individual Results:\n" for i, file_result in enumerate(batch_results): formatted_output += f"\n--- File {i+1}: {Path(file_result.get('file_path', 'Unknown')).name} ---\n" if "error" in file_result: formatted_output += f"❌ Error: {file_result['error']}\n" else: formatted_output += f"✅ {file_result.get('analysis', 'No analysis')}\n" return formatted_output, None, None def handle_research_analysis(file, prompt, username="anonymous", use_streaming=False): """Handle research analysis with R&D pipeline focus""" if file is None: return "Please upload a PDF.", None, None validate_file_size(file) path = save_uploaded_file(file, username) # For now, always use non-streaming approach for research analysis # Streaming can be added later with proper Gradio integration result = run_async( ORCHESTRATOR.handle_user_prompt, user_id=username, prompt=prompt, file_path=path, targets=["research"] ) return result.get("research_analysis", "No research analysis result."), None, None def handle_export(result_text, export_format, username="anonymous"): """Handle export of analysis results""" if not result_text or result_text.strip() == "": return "No content to export.", None try: if export_format == "txt": filepath = EXPORT_MANAGER.export_text(result_text, username=username) elif export_format == "json": data = {"analysis": result_text, "exported_by": username, "timestamp": time.time()} filepath = EXPORT_MANAGER.export_json(data, username=username) elif export_format == "pdf": filepath = EXPORT_MANAGER.export_pdf(result_text, username=username) else: return f"Unsupported export format: {export_format}", None return f"✅ Export successful! File saved to: {filepath}", filepath except Exception as e: return f"❌ Export failed: {str(e)}", None def get_custom_prompts(): """Get available custom prompts""" prompts = PROMPT_MANAGER.get_all_prompts() return list(prompts.keys()) def load_custom_prompt(prompt_id): """Load a custom prompt template""" return PROMPT_MANAGER.get_prompt(prompt_id) or "" # ------------------------ # Gradio UI - Enhanced Interface # ------------------------ with gr.Blocks(title="PDF Analysis & Orchestrator", theme=gr.themes.Soft()) as demo: gr.Markdown("# 📄 PDF Analysis & Orchestrator - Intelligent Document Processing") gr.Markdown("Upload PDFs and provide instructions for analysis, summarization, or explanation. Now with enhanced features!") with gr.Tabs(): # Single Document Analysis Tab with gr.Tab("📄 Single Document Analysis"): with gr.Row(): with gr.Column(scale=1): pdf_in = gr.File(label="Upload PDF", file_types=[".pdf"], elem_id="file_upload") username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="username") # Custom Prompts Section with gr.Accordion("🎯 Custom Prompts", open=False): prompt_dropdown = gr.Dropdown( choices=get_custom_prompts(), label="Select Custom Prompt", value=None ) load_prompt_btn = gr.Button("Load Prompt", size="sm") # Analysis Options with gr.Accordion("⚙️ Analysis Options", open=False): use_streaming = gr.Checkbox(label="Enable Streaming Output", value=False) chunk_size = gr.Slider( minimum=5000, maximum=30000, value=15000, step=1000, label="Chunk Size (for large documents)" ) with gr.Column(scale=2): gr.Markdown("### Analysis Instructions") prompt_input = gr.Textbox( lines=4, placeholder="Describe what you want to do with the document...\nExamples:\n- Summarize this document in 3 key points\n- Explain this technical paper for a 10-year-old\n- Segment this document by themes\n- Analyze the key findings", label="Instructions" ) with gr.Row(): submit_btn = gr.Button("🔍 Analyze & Orchestrate", variant="primary", size="lg") clear_btn = gr.Button("🗑️ Clear", size="sm") # Results Section with gr.Row(): with gr.Column(scale=2): output_box = gr.Textbox(label="Analysis Result", lines=15, max_lines=25, show_copy_button=True) status_box = gr.Textbox(label="Status", value="Ready to analyze documents", interactive=False) with gr.Column(scale=1): # Export Section with gr.Accordion("💾 Export Results", open=False): export_format = gr.Dropdown( choices=["txt", "json", "pdf"], label="Export Format", value="txt" ) export_btn = gr.Button("📥 Export", variant="secondary") export_status = gr.Textbox(label="Export Status", interactive=False) # Document Info with gr.Accordion("📊 Document Info", open=False): doc_info = gr.Textbox(label="Document Information", interactive=False, lines=6) # Senior Research Analyst Tab with gr.Tab("🔬 Senior Research Analyst"): gr.Markdown("### 🎯 R&D Pipeline Analysis") gr.Markdown("Act as a senior research analyst: extract high-value, novel ideas and convert them into concrete R&D pipeline outcomes (experiments → prototypes → product decisions)") with gr.Row(): with gr.Column(scale=1): research_pdf_in = gr.File(label="Upload Research Document", file_types=[".pdf"], elem_id="research_file_upload") research_username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="research_username") # Research-Specific Prompts Section with gr.Accordion("🎯 Research Prompts", open=False): research_prompt_dropdown = gr.Dropdown( choices=[pid for pid, prompt in PROMPT_MANAGER.get_all_prompts().items() if prompt.get("category") == "research"], label="Select Research Prompt", value="research_pipeline" ) load_research_prompt_btn = gr.Button("Load Research Prompt", size="sm") # Research Analysis Options with gr.Accordion("⚙️ Research Options", open=False): gr.Markdown("Research analysis uses comprehensive processing for detailed R&D pipeline insights.") with gr.Column(scale=2): gr.Markdown("### Research Analysis Instructions") research_prompt_input = gr.Textbox( lines=4, placeholder="Focus on extracting novel ideas with high product/engineering impact...\nExamples:\n- Identify breakthrough concepts for R&D pipeline\n- Assess commercial viability of technical innovations\n- Design experimental frameworks for validation\n- Create prototype development roadmaps", label="Research Instructions" ) with gr.Row(): research_submit_btn = gr.Button("🔬 Research Analysis", variant="primary", size="lg") research_clear_btn = gr.Button("🗑️ Clear", size="sm") # Research Results Section with gr.Row(): with gr.Column(scale=2): research_output_box = gr.Textbox(label="Research Analysis Result", lines=20, max_lines=30, show_copy_button=True) research_status_box = gr.Textbox(label="Research Status", value="Ready for research analysis", interactive=False) with gr.Column(scale=1): # Research Export Section with gr.Accordion("💾 Export Research Results", open=False): research_export_format = gr.Dropdown( choices=["txt", "json", "pdf"], label="Export Format", value="txt" ) research_export_btn = gr.Button("📥 Export Research", variant="secondary") research_export_status = gr.Textbox(label="Export Status", interactive=False) # Research Insights Summary with gr.Accordion("📊 Research Insights", open=False): research_insights = gr.Textbox(label="Key Insights Summary", interactive=False, lines=8) # Batch Processing Tab with gr.Tab("📚 Batch Processing"): with gr.Row(): with gr.Column(scale=1): batch_files = gr.File( label="Upload Multiple PDFs", file_count="multiple", file_types=[".pdf"] ) batch_username = gr.Textbox(label="Username (optional)", placeholder="anonymous") with gr.Column(scale=2): batch_prompt = gr.Textbox( lines=3, placeholder="Enter analysis instructions for all documents...", label="Batch Analysis Instructions" ) batch_submit = gr.Button("🚀 Process Batch", variant="primary", size="lg") batch_output = gr.Textbox(label="Batch Results", lines=20, max_lines=30, show_copy_button=True) batch_status = gr.Textbox(label="Batch Status", interactive=False) # Custom Prompts Management Tab with gr.Tab("🎯 Manage Prompts"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Add New Prompt") new_prompt_id = gr.Textbox(label="Prompt ID", placeholder="my_custom_prompt") new_prompt_name = gr.Textbox(label="Prompt Name", placeholder="My Custom Analysis") new_prompt_desc = gr.Textbox(label="Description", placeholder="What this prompt does") new_prompt_template = gr.Textbox( lines=4, label="Prompt Template", placeholder="Enter your custom prompt template..." ) new_prompt_category = gr.Dropdown( choices=["custom", "business", "technical", "explanation", "analysis"], label="Category", value="custom" ) add_prompt_btn = gr.Button("➕ Add Prompt", variant="primary") with gr.Column(scale=1): gr.Markdown("### Existing Prompts") prompt_list = gr.Dataframe( headers=["ID", "Name", "Category", "Description"], datatype=["str", "str", "str", "str"], interactive=False, label="Available Prompts" ) refresh_prompts_btn = gr.Button("🔄 Refresh List") delete_prompt_id = gr.Textbox(label="Prompt ID to Delete", placeholder="prompt_id") delete_prompt_btn = gr.Button("🗑️ Delete Prompt", variant="stop") # Event Handlers # Single document analysis submit_btn.click( fn=handle_analysis, inputs=[pdf_in, prompt_input, username_input, use_streaming], outputs=[output_box, status_box, doc_info] ) # Load custom prompt load_prompt_btn.click( fn=load_custom_prompt, inputs=[prompt_dropdown], outputs=[prompt_input] ) # Export functionality export_btn.click( fn=handle_export, inputs=[output_box, export_format, username_input], outputs=[export_status, gr.State()] ) # Clear functionality clear_btn.click( fn=lambda: ("", "", "", "Ready"), inputs=[], outputs=[pdf_in, prompt_input, output_box, status_box] ) # Research analysis event handlers research_submit_btn.click( fn=handle_research_analysis, inputs=[research_pdf_in, research_prompt_input, research_username_input], outputs=[research_output_box, research_status_box, research_insights] ) # Load research prompt load_research_prompt_btn.click( fn=load_custom_prompt, inputs=[research_prompt_dropdown], outputs=[research_prompt_input] ) # Research export functionality research_export_btn.click( fn=handle_export, inputs=[research_output_box, research_export_format, research_username_input], outputs=[research_export_status, gr.State()] ) # Research clear functionality research_clear_btn.click( fn=lambda: ("", "", "", "Ready for research analysis", ""), inputs=[], outputs=[research_pdf_in, research_prompt_input, research_output_box, research_status_box, research_insights] ) # Batch processing batch_submit.click( fn=handle_batch_analysis, inputs=[batch_files, batch_prompt, batch_username], outputs=[batch_output, batch_status, gr.State()] ) # Prompt management add_prompt_btn.click( fn=lambda id, name, desc, template, cat: PROMPT_MANAGER.add_prompt(id, name, desc, template, cat), inputs=[new_prompt_id, new_prompt_name, new_prompt_desc, new_prompt_template, new_prompt_category], outputs=[] ) refresh_prompts_btn.click( fn=lambda: [[pid, prompt["name"], prompt["category"], prompt["description"]] for pid, prompt in PROMPT_MANAGER.get_all_prompts().items()], inputs=[], outputs=[prompt_list] ) delete_prompt_btn.click( fn=lambda pid: PROMPT_MANAGER.delete_prompt(pid), inputs=[delete_prompt_id], outputs=[] ) # Examples gr.Examples( examples=[ ["Summarize this document in 3 key points"], ["Explain this technical content for a general audience"], ["Segment this document by main themes or topics"], ["Analyze the key findings and recommendations"], ["Create an executive summary of this document"], ], inputs=prompt_input, label="Example Instructions" ) # Research Examples gr.Examples( examples=[ ["Identify breakthrough concepts with high product/engineering impact and design specific experiments to validate them"], ["Assess the commercial viability of technical innovations and create prototype development roadmaps"], ["Extract novel methodologies and convert them into concrete R&D pipeline outcomes"], ["Analyze technical concepts for transformative potential and generate strategic product decisions"], ["Design experimental frameworks to validate key hypotheses with measurable success criteria"], ], inputs=research_prompt_input, label="Research Analysis Examples" ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))