anushkap01patidar
Update commit
1561d5f
# main.py
import argparse
import asyncio
import os
import warnings
from datetime import datetime
from typing import AsyncGenerator, Generator
from orchestrator.research_orchestrator import ResearchOrchestrator, StreamingManager
from save_to_pdf import save_draft_to_pdf
from streaming_config import (
get_astream_config,
get_streaming_config,
is_astream_enabled,
print_streaming_help,
)
warnings.filterwarnings("ignore")
# Workaround for Windows platform detection issue
import platform
if platform.system() == "Windows":
os.environ["OPENAI_SKIP_PLATFORM_HEADERS"] = "1"
def validate_environment():
"""Validate that OPENAI_API_KEY is set in the environment/.env."""
return bool(os.getenv("OPENAI_API_KEY"))
def sanitize_filename(name: str) -> str:
"""Sanitize string to be a valid filename on all OSes."""
import re
name = name.strip().replace('\n', ' ')
name = re.sub(r'[\\/:*?"<>|]', '', name) # Remove invalid filename chars
name = name.replace("'", "")
name = name[:100] # Limit length
return name
def yield_progress_updates(total_steps: int) -> Generator[str, None, None]:
"""
Yield progress update messages for workflow steps
Args:
total_steps: Total number of workflow steps
Yields:
str: Progress update messages
"""
steps = [
"Topic Analysis",
"Research Retrieval",
"Outline Building",
"Human Feedback",
"Draft Writing",
"Bibliography Generation"
]
for i, step in enumerate(steps[:total_steps]):
progress = (i / total_steps) * 100
yield f"πŸ”„ Progress: {progress:.1f}% - {step}"
yield "βœ… Progress: 100% - Workflow Complete"
async def astream_progress_updates(total_steps: int) -> AsyncGenerator[str, None]:
"""
AStream progress update messages for workflow steps
Args:
total_steps: Total number of workflow steps
Yields:
str: Progress update messages asynchronously
"""
steps = [
"Topic Analysis",
"Research Retrieval",
"Outline Building",
"Human Feedback",
"Draft Writing",
"Bibliography Generation"
]
for i, step in enumerate(steps[:total_steps]):
progress = (i / total_steps) * 100
yield f"πŸš€ AStream Progress: {progress:.1f}% - {step}"
await asyncio.sleep(0.1) # Small delay for async processing
yield "βœ… AStream Progress: 100% - Workflow Complete"
def yield_workflow_status(status_data: dict) -> Generator[str, None, None]:
"""
Yield workflow status information
Args:
status_data: Dictionary containing workflow status
Yields:
str: Status information strings
"""
if "topic" in status_data:
yield f"πŸ“ Topic: {status_data['topic']}"
if "refined_topic" in status_data:
yield f"🎯 Refined Topic: {status_data['refined_topic']}"
if "current_step" in status_data:
yield f"πŸ”„ Current Step: {status_data['current_step']}"
if "workflow_status" in status_data:
yield f"πŸ“Š Workflow Status: {status_data['workflow_status']}"
async def astream_workflow_status(status_data: dict) -> AsyncGenerator[str, None]:
"""
AStream workflow status information
Args:
status_data: Dictionary containing workflow status
Yields:
str: Status information strings asynchronously
"""
if "topic" in status_data:
yield f"πŸ“ AStream Topic: {status_data['topic']}"
await asyncio.sleep(0.01)
if "refined_topic" in status_data:
yield f"🎯 AStream Refined Topic: {status_data['refined_topic']}"
await asyncio.sleep(0.01)
if "current_step" in status_data:
yield f"πŸ”„ AStream Current Step: {status_data['current_step']}"
await asyncio.sleep(0.01)
if "workflow_status" in status_data:
yield f"πŸ“Š AStream Workflow Status: {status_data['workflow_status']}"
await asyncio.sleep(0.01)
def yield_error_details(error_info: dict) -> Generator[str, None, None]:
"""
Yield detailed error information
Args:
error_info: Dictionary containing error details
Yields:
str: Error detail strings
"""
if "error" in error_info:
yield f"❌ Error: {error_info['error']}"
if "step" in error_info:
yield f"πŸ“ Failed Step: {error_info['step']}"
if "timestamp" in error_info:
yield f"⏰ Error Time: {error_info['timestamp']}"
async def astream_error_details(error_info: dict) -> AsyncGenerator[str, None]:
"""
AStream detailed error information
Args:
error_info: Dictionary containing error details
Yields:
str: Error detail strings asynchronously
"""
if "error" in error_info:
yield f"❌ AStream Error: {error_info['error']}"
await asyncio.sleep(0.01)
if "step" in error_info:
yield f"πŸ“ AStream Failed Step: {error_info['step']}"
await asyncio.sleep(0.01)
if "timestamp" in error_info:
yield f"⏰ AStream Error Time: {error_info['timestamp']}"
await asyncio.sleep(0.01)
def handle_human_feedback(interrupt_data):
"""Handle human feedback from native LangGraph interrupt"""
# Extract the interrupt information
interrupt_type = interrupt_data.get("type", "unknown")
interrupt_payload = interrupt_data.get("payload", {})
print(f"πŸ“‹ Interrupt Type: {interrupt_type}")
# Display the outline
if "outline" in interrupt_payload:
print("\nπŸ“‹ GENERATED OUTLINE:")
print("-" * 40)
print(interrupt_payload["outline"])
print("-" * 40)
# Display the message and options
if "message" in interrupt_payload:
print(f"\n❓ {interrupt_payload['message']}")
if "options" in interrupt_payload:
print("\nπŸ“ Available options:")
for key, description in interrupt_payload["options"].items():
print(f" β€’ {key}: {description}")
# Keep asking until we get a valid response
while True:
print("\n" + "-" * 40)
choice = input("Your response (approve/revise/reject): ").strip().lower()
# Handle empty input gracefully
if not choice.strip():
print("⚠️ No response provided. Please enter 'approve', 'revise', or 'reject'.")
print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject")
print(" Or type 'quit' to abort the workflow.")
continue
# Handle different response types
if choice in ["approve", "a"]:
response = {
"response": "approve",
"feedback": ""
}
return response
elif choice in ["reject", "r", "abort"]:
return {
"response": "reject",
"feedback": ""
}
elif choice in ["revise", "v"]:
print("\nπŸ“ Please provide specific feedback for revision:")
feedback = input("Feedback: ").strip()
if not feedback:
print("⚠️ No feedback provided. Please provide specific feedback or choose 'approve' to proceed.")
continue
return {
"response": "revise",
"feedback": feedback
}
elif choice == "quit" or choice == "exit":
print("❌ Workflow aborted by user.")
return {
"response": "reject",
"feedback": ""
}
else:
print(f"⚠️ Invalid response: '{choice}'. Please enter 'approve', 'revise', or 'reject'.")
print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject")
print(" Or type 'quit' to abort the workflow.")
async def process_workflow_with_astream(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]:
"""
Process the research workflow using AStream for enhanced async processing
Args:
topic: Research topic
streaming_config: Streaming configuration
Yields:
str: Progress and status updates asynchronously
"""
try:
# Validate environment
if not validate_environment():
yield "❌ Environment validation failed"
return
yield f"πŸš€ Starting AStream research workflow for topic: {topic}"
# Check if AStream is enabled
astream_enabled = is_astream_enabled(streaming_config.get("preset"))
if astream_enabled:
yield "⚑ AStream processing enabled"
astream_config = get_astream_config(streaming_config.get("preset"))
yield f"πŸ“Š AStream config: delay={astream_config['delay']}s, buffer_size={astream_config['buffer_size']}"
else:
yield "ℹ️ AStream processing disabled, using standard streaming"
# Create streaming manager for real-time display
streaming_manager = StreamingManager(
stream_delay=streaming_config["stream_delay"],
config=streaming_config
)
# Create orchestrator with streaming support
orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event)
# Start the async workflow
result = await orchestrator.run(topic)
# Handle interrupt if workflow was interrupted
while result.get("status") == "interrupted":
yield "⏸️ AStream workflow paused for human feedback"
# Handle the interrupt
interrupt_data = result.get("interrupt_data", {})
current_state = result.get("current_state", {})
# Create interrupt data structure for the handler
interrupt_payload = {
"type": "outline_approval",
"payload": {
"outline": current_state.get("outline", ""),
"topic": current_state.get("refined_topic", ""),
"message": "Please review the generated outline and provide feedback",
"options": {
"approve": "Approve the outline and proceed to draft writing",
"revise": "Request revisions to the outline",
"reject": "Reject and abort the workflow"
}
}
}
human_response = handle_human_feedback(interrupt_payload)
# Resume the workflow
yield "πŸ”„ Resuming AStream workflow with user feedback..."
# Pass the full response string that includes feedback if needed
if human_response["response"] == "revise" and human_response["feedback"]:
human_input = f"revise {human_response['feedback']}"
else:
human_input = human_response["response"]
result = await orchestrator.resume(result["thread_id"], human_input)
# Check if workflow completed successfully
if result.get("status") == "completed":
# Get the result data from the new orchestrator structure
result_data = result.get("result", {})
# Check if workflow was aborted (rejected by user)
if result_data.get("workflow_status") == "aborted":
yield "❌ AStream workflow was rejected by user. No PDF will be generated."
return
yield "βœ… AStream workflow completed successfully!"
# Generate PDF with draft and bibliography
safe_topic = sanitize_filename(result_data.get('refined_topic', topic))
draft_pdf_filename = f"data/{safe_topic}.pdf"
draft_text = result_data.get("draft") or ""
bibliography = result_data.get("bibliography") or ""
if not draft_text.strip():
yield "⚠️ Warning: Draft is empty. PDF will be blank."
# Ensure data directory exists
os.makedirs("data", exist_ok=True)
try:
save_draft_to_pdf(
result_data.get('refined_topic', topic),
draft_text,
bibliography,
draft_pdf_filename
)
yield f"βœ… AStream research paper saved successfully!"
yield f"πŸ“„ File: {draft_pdf_filename}"
yield f"πŸ“ Draft length: {len(draft_text)} characters"
yield f"πŸ“š Bibliography length: {len(bibliography)} characters"
yield f"πŸ“š Number of references: {bibliography.count('[')}"
except Exception as e:
yield f"❌ Error saving PDF: {e}"
elif result.get("status") == "error":
yield f"❌ AStream workflow error: {result.get('error', 'Unknown error')}"
else:
yield f"❌ Unexpected AStream workflow status: {result.get('status', 'unknown')}"
except ValueError as e:
yield f"❌ AStream configuration error: {e}"
except Exception as e:
yield f"❌ Unexpected AStream error: {e}"
import traceback
traceback.print_exc()
async def process_workflow_with_yield(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]:
"""
Process the research workflow using yield generators for progressive updates
Args:
topic: Research topic
streaming_config: Streaming configuration
Yields:
str: Progress and status updates
"""
try:
# Validate environment
if not validate_environment():
yield "❌ Environment validation failed"
return
yield f"πŸš€ Starting research workflow for topic: {topic}"
# Create streaming manager for real-time display
streaming_manager = StreamingManager(
stream_delay=streaming_config["stream_delay"],
config=streaming_config
)
# Create orchestrator with streaming support
orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event)
# Start the async workflow
result = await orchestrator.run(topic)
# Handle interrupt if workflow was interrupted
while result.get("status") == "interrupted":
yield "⏸️ Workflow paused for human feedback"
# Handle the interrupt
interrupt_data = result.get("interrupt_data", {})
current_state = result.get("current_state", {})
# Create interrupt data structure for the handler
interrupt_payload = {
"type": "outline_approval",
"payload": {
"outline": current_state.get("outline", ""),
"topic": current_state.get("refined_topic", ""),
"message": "Please review the generated outline and provide feedback",
"options": {
"approve": "Approve the outline and proceed to draft writing",
"revise": "Request revisions to the outline",
"reject": "Reject and abort the workflow"
}
}
}
human_response = handle_human_feedback(interrupt_payload)
# Resume the workflow
yield "πŸ”„ Resuming workflow with user feedback..."
# Pass the full response string that includes feedback if needed
if human_response["response"] == "revise" and human_response["feedback"]:
human_input = f"revise {human_response['feedback']}"
else:
human_input = human_response["response"]
result = await orchestrator.resume(result["thread_id"], human_input)
# Check if workflow completed successfully
if result.get("status") == "completed":
# Get the result data from the new orchestrator structure
result_data = result.get("result", {})
# Check if workflow was aborted (rejected by user)
if result_data.get("workflow_status") == "aborted":
yield "❌ Workflow was rejected by user. No PDF will be generated."
return
yield "βœ… Workflow completed successfully!"
# Generate PDF with draft and bibliography
safe_topic = sanitize_filename(result_data.get('refined_topic', topic))
draft_pdf_filename = f"data/{safe_topic}.pdf"
draft_text = result_data.get("draft") or ""
bibliography = result_data.get("bibliography") or ""
if not draft_text.strip():
yield "⚠️ Warning: Draft is empty. PDF will be blank."
# Ensure data directory exists
os.makedirs("data", exist_ok=True)
try:
save_draft_to_pdf(
result_data.get('refined_topic', topic),
draft_text,
bibliography,
draft_pdf_filename
)
yield f"βœ… Research paper saved successfully!"
yield f"πŸ“„ File: {draft_pdf_filename}"
yield f"πŸ“ Draft length: {len(draft_text)} characters"
yield f"πŸ“š Bibliography length: {len(bibliography)} characters"
yield f"πŸ“š Number of references: {bibliography.count('[')}"
except Exception as e:
yield f"❌ Error saving PDF: {e}"
elif result.get("status") == "error":
yield f"❌ Workflow error: {result.get('error', 'Unknown error')}"
else:
yield f"❌ Unexpected workflow status: {result.get('status', 'unknown')}"
except ValueError as e:
yield f"❌ Configuration error: {e}"
except Exception as e:
yield f"❌ Unexpected error: {e}"
import traceback
traceback.print_exc()
async def main(streaming_preset=None):
"""Main async function to run the research workflow with AStream support"""
try:
# Validate environment
validate_environment()
topic = input("Enter your research topic: ").strip()
if not topic:
print("No topic provided.")
return
# Get streaming configuration
streaming_config = get_streaming_config(streaming_preset)
# Add preset to config for AStream detection
streaming_config["preset"] = streaming_preset
print(f"\n🎯 Research Topic: {topic}")
# Check AStream status
# if is_astream_enabled(streaming_preset):
# print("⚑ AStream processing: ENABLED")
# astream_config = get_astream_config(streaming_preset)
# print(f"πŸ“Š AStream settings: delay={astream_config['delay']}s, realtime={astream_config['realtime']}")
# else:
# print("ℹ️ AStream processing: DISABLED")
print("=" * 60)
# Choose processing method based on AStream availability
if is_astream_enabled(streaming_preset):
# Use AStream processing
async for update in process_workflow_with_astream(topic, streaming_config):
print(update)
else:
# Use standard yield processing
async for update in process_workflow_with_yield(topic, streaming_config):
print(update)
except KeyboardInterrupt:
print("\n❌ Workflow interrupted by user.")
except Exception as e:
print(f"❌ Unexpected error: {e}")
import traceback
traceback.print_exc()
def run_main():
"""Wrapper function to run the async main function"""
# Parse command line arguments
parser = argparse.ArgumentParser(description="AI Research Paper Generator with Yield and AStream Support")
parser.add_argument(
"--streaming",
choices=["fast", "slow", "none", "yield", "astream", "default"],
default="default",
help="Streaming speed preset: fast, slow, none, yield, astream, or default"
)
parser.add_argument(
"--help-streaming",
action="store_true",
help="Show detailed streaming configuration help"
)
args = parser.parse_args()
# Show streaming help if requested
if args.help_streaming:
print_streaming_help()
return
# Convert "default" to None for the function
preset = None if args.streaming == "default" else args.streaming
# Run the main function with the specified streaming preset
asyncio.run(main(streaming_preset=preset))
if __name__ == "__main__":
run_main()