# main.py import argparse import asyncio import os import warnings from datetime import datetime from typing import AsyncGenerator, Generator from orchestrator.research_orchestrator import ResearchOrchestrator, StreamingManager from save_to_pdf import save_draft_to_pdf from streaming_config import ( get_astream_config, get_streaming_config, is_astream_enabled, print_streaming_help, ) warnings.filterwarnings("ignore") # Workaround for Windows platform detection issue import platform if platform.system() == "Windows": os.environ["OPENAI_SKIP_PLATFORM_HEADERS"] = "1" def validate_environment(): """Validate that OPENAI_API_KEY is set in the environment/.env.""" return bool(os.getenv("OPENAI_API_KEY")) def sanitize_filename(name: str) -> str: """Sanitize string to be a valid filename on all OSes.""" import re name = name.strip().replace('\n', ' ') name = re.sub(r'[\\/:*?"<>|]', '', name) # Remove invalid filename chars name = name.replace("'", "") name = name[:100] # Limit length return name def yield_progress_updates(total_steps: int) -> Generator[str, None, None]: """ Yield progress update messages for workflow steps Args: total_steps: Total number of workflow steps Yields: str: Progress update messages """ steps = [ "Topic Analysis", "Research Retrieval", "Outline Building", "Human Feedback", "Draft Writing", "Bibliography Generation" ] for i, step in enumerate(steps[:total_steps]): progress = (i / total_steps) * 100 yield f"πŸ”„ Progress: {progress:.1f}% - {step}" yield "βœ… Progress: 100% - Workflow Complete" async def astream_progress_updates(total_steps: int) -> AsyncGenerator[str, None]: """ AStream progress update messages for workflow steps Args: total_steps: Total number of workflow steps Yields: str: Progress update messages asynchronously """ steps = [ "Topic Analysis", "Research Retrieval", "Outline Building", "Human Feedback", "Draft Writing", "Bibliography Generation" ] for i, step in enumerate(steps[:total_steps]): progress = (i / total_steps) * 100 yield f"πŸš€ AStream Progress: {progress:.1f}% - {step}" await asyncio.sleep(0.1) # Small delay for async processing yield "βœ… AStream Progress: 100% - Workflow Complete" def yield_workflow_status(status_data: dict) -> Generator[str, None, None]: """ Yield workflow status information Args: status_data: Dictionary containing workflow status Yields: str: Status information strings """ if "topic" in status_data: yield f"πŸ“ Topic: {status_data['topic']}" if "refined_topic" in status_data: yield f"🎯 Refined Topic: {status_data['refined_topic']}" if "current_step" in status_data: yield f"πŸ”„ Current Step: {status_data['current_step']}" if "workflow_status" in status_data: yield f"πŸ“Š Workflow Status: {status_data['workflow_status']}" async def astream_workflow_status(status_data: dict) -> AsyncGenerator[str, None]: """ AStream workflow status information Args: status_data: Dictionary containing workflow status Yields: str: Status information strings asynchronously """ if "topic" in status_data: yield f"πŸ“ AStream Topic: {status_data['topic']}" await asyncio.sleep(0.01) if "refined_topic" in status_data: yield f"🎯 AStream Refined Topic: {status_data['refined_topic']}" await asyncio.sleep(0.01) if "current_step" in status_data: yield f"πŸ”„ AStream Current Step: {status_data['current_step']}" await asyncio.sleep(0.01) if "workflow_status" in status_data: yield f"πŸ“Š AStream Workflow Status: {status_data['workflow_status']}" await asyncio.sleep(0.01) def yield_error_details(error_info: dict) -> Generator[str, None, None]: """ Yield detailed error information Args: error_info: Dictionary containing error details Yields: str: Error detail strings """ if "error" in error_info: yield f"❌ Error: {error_info['error']}" if "step" in error_info: yield f"πŸ“ Failed Step: {error_info['step']}" if "timestamp" in error_info: yield f"⏰ Error Time: {error_info['timestamp']}" async def astream_error_details(error_info: dict) -> AsyncGenerator[str, None]: """ AStream detailed error information Args: error_info: Dictionary containing error details Yields: str: Error detail strings asynchronously """ if "error" in error_info: yield f"❌ AStream Error: {error_info['error']}" await asyncio.sleep(0.01) if "step" in error_info: yield f"πŸ“ AStream Failed Step: {error_info['step']}" await asyncio.sleep(0.01) if "timestamp" in error_info: yield f"⏰ AStream Error Time: {error_info['timestamp']}" await asyncio.sleep(0.01) def handle_human_feedback(interrupt_data): """Handle human feedback from native LangGraph interrupt""" # Extract the interrupt information interrupt_type = interrupt_data.get("type", "unknown") interrupt_payload = interrupt_data.get("payload", {}) print(f"πŸ“‹ Interrupt Type: {interrupt_type}") # Display the outline if "outline" in interrupt_payload: print("\nπŸ“‹ GENERATED OUTLINE:") print("-" * 40) print(interrupt_payload["outline"]) print("-" * 40) # Display the message and options if "message" in interrupt_payload: print(f"\n❓ {interrupt_payload['message']}") if "options" in interrupt_payload: print("\nπŸ“ Available options:") for key, description in interrupt_payload["options"].items(): print(f" β€’ {key}: {description}") # Keep asking until we get a valid response while True: print("\n" + "-" * 40) choice = input("Your response (approve/revise/reject): ").strip().lower() # Handle empty input gracefully if not choice.strip(): print("⚠️ No response provided. Please enter 'approve', 'revise', or 'reject'.") print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject") print(" Or type 'quit' to abort the workflow.") continue # Handle different response types if choice in ["approve", "a"]: response = { "response": "approve", "feedback": "" } return response elif choice in ["reject", "r", "abort"]: return { "response": "reject", "feedback": "" } elif choice in ["revise", "v"]: print("\nπŸ“ Please provide specific feedback for revision:") feedback = input("Feedback: ").strip() if not feedback: print("⚠️ No feedback provided. Please provide specific feedback or choose 'approve' to proceed.") continue return { "response": "revise", "feedback": feedback } elif choice == "quit" or choice == "exit": print("❌ Workflow aborted by user.") return { "response": "reject", "feedback": "" } else: print(f"⚠️ Invalid response: '{choice}'. Please enter 'approve', 'revise', or 'reject'.") print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject") print(" Or type 'quit' to abort the workflow.") async def process_workflow_with_astream(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]: """ Process the research workflow using AStream for enhanced async processing Args: topic: Research topic streaming_config: Streaming configuration Yields: str: Progress and status updates asynchronously """ try: # Validate environment if not validate_environment(): yield "❌ Environment validation failed" return yield f"πŸš€ Starting AStream research workflow for topic: {topic}" # Check if AStream is enabled astream_enabled = is_astream_enabled(streaming_config.get("preset")) if astream_enabled: yield "⚑ AStream processing enabled" astream_config = get_astream_config(streaming_config.get("preset")) yield f"πŸ“Š AStream config: delay={astream_config['delay']}s, buffer_size={astream_config['buffer_size']}" else: yield "ℹ️ AStream processing disabled, using standard streaming" # Create streaming manager for real-time display streaming_manager = StreamingManager( stream_delay=streaming_config["stream_delay"], config=streaming_config ) # Create orchestrator with streaming support orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event) # Start the async workflow result = await orchestrator.run(topic) # Handle interrupt if workflow was interrupted while result.get("status") == "interrupted": yield "⏸️ AStream workflow paused for human feedback" # Handle the interrupt interrupt_data = result.get("interrupt_data", {}) current_state = result.get("current_state", {}) # Create interrupt data structure for the handler interrupt_payload = { "type": "outline_approval", "payload": { "outline": current_state.get("outline", ""), "topic": current_state.get("refined_topic", ""), "message": "Please review the generated outline and provide feedback", "options": { "approve": "Approve the outline and proceed to draft writing", "revise": "Request revisions to the outline", "reject": "Reject and abort the workflow" } } } human_response = handle_human_feedback(interrupt_payload) # Resume the workflow yield "πŸ”„ Resuming AStream workflow with user feedback..." # Pass the full response string that includes feedback if needed if human_response["response"] == "revise" and human_response["feedback"]: human_input = f"revise {human_response['feedback']}" else: human_input = human_response["response"] result = await orchestrator.resume(result["thread_id"], human_input) # Check if workflow completed successfully if result.get("status") == "completed": # Get the result data from the new orchestrator structure result_data = result.get("result", {}) # Check if workflow was aborted (rejected by user) if result_data.get("workflow_status") == "aborted": yield "❌ AStream workflow was rejected by user. No PDF will be generated." return yield "βœ… AStream workflow completed successfully!" # Generate PDF with draft and bibliography safe_topic = sanitize_filename(result_data.get('refined_topic', topic)) draft_pdf_filename = f"data/{safe_topic}.pdf" draft_text = result_data.get("draft") or "" bibliography = result_data.get("bibliography") or "" if not draft_text.strip(): yield "⚠️ Warning: Draft is empty. PDF will be blank." # Ensure data directory exists os.makedirs("data", exist_ok=True) try: save_draft_to_pdf( result_data.get('refined_topic', topic), draft_text, bibliography, draft_pdf_filename ) yield f"βœ… AStream research paper saved successfully!" yield f"πŸ“„ File: {draft_pdf_filename}" yield f"πŸ“ Draft length: {len(draft_text)} characters" yield f"πŸ“š Bibliography length: {len(bibliography)} characters" yield f"πŸ“š Number of references: {bibliography.count('[')}" except Exception as e: yield f"❌ Error saving PDF: {e}" elif result.get("status") == "error": yield f"❌ AStream workflow error: {result.get('error', 'Unknown error')}" else: yield f"❌ Unexpected AStream workflow status: {result.get('status', 'unknown')}" except ValueError as e: yield f"❌ AStream configuration error: {e}" except Exception as e: yield f"❌ Unexpected AStream error: {e}" import traceback traceback.print_exc() async def process_workflow_with_yield(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]: """ Process the research workflow using yield generators for progressive updates Args: topic: Research topic streaming_config: Streaming configuration Yields: str: Progress and status updates """ try: # Validate environment if not validate_environment(): yield "❌ Environment validation failed" return yield f"πŸš€ Starting research workflow for topic: {topic}" # Create streaming manager for real-time display streaming_manager = StreamingManager( stream_delay=streaming_config["stream_delay"], config=streaming_config ) # Create orchestrator with streaming support orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event) # Start the async workflow result = await orchestrator.run(topic) # Handle interrupt if workflow was interrupted while result.get("status") == "interrupted": yield "⏸️ Workflow paused for human feedback" # Handle the interrupt interrupt_data = result.get("interrupt_data", {}) current_state = result.get("current_state", {}) # Create interrupt data structure for the handler interrupt_payload = { "type": "outline_approval", "payload": { "outline": current_state.get("outline", ""), "topic": current_state.get("refined_topic", ""), "message": "Please review the generated outline and provide feedback", "options": { "approve": "Approve the outline and proceed to draft writing", "revise": "Request revisions to the outline", "reject": "Reject and abort the workflow" } } } human_response = handle_human_feedback(interrupt_payload) # Resume the workflow yield "πŸ”„ Resuming workflow with user feedback..." # Pass the full response string that includes feedback if needed if human_response["response"] == "revise" and human_response["feedback"]: human_input = f"revise {human_response['feedback']}" else: human_input = human_response["response"] result = await orchestrator.resume(result["thread_id"], human_input) # Check if workflow completed successfully if result.get("status") == "completed": # Get the result data from the new orchestrator structure result_data = result.get("result", {}) # Check if workflow was aborted (rejected by user) if result_data.get("workflow_status") == "aborted": yield "❌ Workflow was rejected by user. No PDF will be generated." return yield "βœ… Workflow completed successfully!" # Generate PDF with draft and bibliography safe_topic = sanitize_filename(result_data.get('refined_topic', topic)) draft_pdf_filename = f"data/{safe_topic}.pdf" draft_text = result_data.get("draft") or "" bibliography = result_data.get("bibliography") or "" if not draft_text.strip(): yield "⚠️ Warning: Draft is empty. PDF will be blank." # Ensure data directory exists os.makedirs("data", exist_ok=True) try: save_draft_to_pdf( result_data.get('refined_topic', topic), draft_text, bibliography, draft_pdf_filename ) yield f"βœ… Research paper saved successfully!" yield f"πŸ“„ File: {draft_pdf_filename}" yield f"πŸ“ Draft length: {len(draft_text)} characters" yield f"πŸ“š Bibliography length: {len(bibliography)} characters" yield f"πŸ“š Number of references: {bibliography.count('[')}" except Exception as e: yield f"❌ Error saving PDF: {e}" elif result.get("status") == "error": yield f"❌ Workflow error: {result.get('error', 'Unknown error')}" else: yield f"❌ Unexpected workflow status: {result.get('status', 'unknown')}" except ValueError as e: yield f"❌ Configuration error: {e}" except Exception as e: yield f"❌ Unexpected error: {e}" import traceback traceback.print_exc() async def main(streaming_preset=None): """Main async function to run the research workflow with AStream support""" try: # Validate environment validate_environment() topic = input("Enter your research topic: ").strip() if not topic: print("No topic provided.") return # Get streaming configuration streaming_config = get_streaming_config(streaming_preset) # Add preset to config for AStream detection streaming_config["preset"] = streaming_preset print(f"\n🎯 Research Topic: {topic}") # Check AStream status # if is_astream_enabled(streaming_preset): # print("⚑ AStream processing: ENABLED") # astream_config = get_astream_config(streaming_preset) # print(f"πŸ“Š AStream settings: delay={astream_config['delay']}s, realtime={astream_config['realtime']}") # else: # print("ℹ️ AStream processing: DISABLED") print("=" * 60) # Choose processing method based on AStream availability if is_astream_enabled(streaming_preset): # Use AStream processing async for update in process_workflow_with_astream(topic, streaming_config): print(update) else: # Use standard yield processing async for update in process_workflow_with_yield(topic, streaming_config): print(update) except KeyboardInterrupt: print("\n❌ Workflow interrupted by user.") except Exception as e: print(f"❌ Unexpected error: {e}") import traceback traceback.print_exc() def run_main(): """Wrapper function to run the async main function""" # Parse command line arguments parser = argparse.ArgumentParser(description="AI Research Paper Generator with Yield and AStream Support") parser.add_argument( "--streaming", choices=["fast", "slow", "none", "yield", "astream", "default"], default="default", help="Streaming speed preset: fast, slow, none, yield, astream, or default" ) parser.add_argument( "--help-streaming", action="store_true", help="Show detailed streaming configuration help" ) args = parser.parse_args() # Show streaming help if requested if args.help_streaming: print_streaming_help() return # Convert "default" to None for the function preset = None if args.streaming == "default" else args.streaming # Run the main function with the specified streaming preset asyncio.run(main(streaming_preset=preset)) if __name__ == "__main__": run_main()