|
|
|
|
|
|
|
|
import argparse |
|
|
import asyncio |
|
|
import os |
|
|
import warnings |
|
|
from datetime import datetime |
|
|
from typing import AsyncGenerator, Generator |
|
|
|
|
|
from orchestrator.research_orchestrator import ResearchOrchestrator, StreamingManager |
|
|
from save_to_pdf import save_draft_to_pdf |
|
|
from streaming_config import ( |
|
|
get_astream_config, |
|
|
get_streaming_config, |
|
|
is_astream_enabled, |
|
|
print_streaming_help, |
|
|
) |
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
|
|
|
import platform |
|
|
|
|
|
if platform.system() == "Windows": |
|
|
os.environ["OPENAI_SKIP_PLATFORM_HEADERS"] = "1" |
|
|
|
|
|
def validate_environment(): |
|
|
"""Validate that OPENAI_API_KEY is set in the environment/.env.""" |
|
|
return bool(os.getenv("OPENAI_API_KEY")) |
|
|
|
|
|
def sanitize_filename(name: str) -> str: |
|
|
"""Sanitize string to be a valid filename on all OSes.""" |
|
|
import re |
|
|
name = name.strip().replace('\n', ' ') |
|
|
name = re.sub(r'[\\/:*?"<>|]', '', name) |
|
|
name = name.replace("'", "") |
|
|
name = name[:100] |
|
|
return name |
|
|
|
|
|
def yield_progress_updates(total_steps: int) -> Generator[str, None, None]: |
|
|
""" |
|
|
Yield progress update messages for workflow steps |
|
|
|
|
|
Args: |
|
|
total_steps: Total number of workflow steps |
|
|
|
|
|
Yields: |
|
|
str: Progress update messages |
|
|
""" |
|
|
steps = [ |
|
|
"Topic Analysis", |
|
|
"Research Retrieval", |
|
|
"Outline Building", |
|
|
"Human Feedback", |
|
|
"Draft Writing", |
|
|
"Bibliography Generation" |
|
|
] |
|
|
|
|
|
for i, step in enumerate(steps[:total_steps]): |
|
|
progress = (i / total_steps) * 100 |
|
|
yield f"π Progress: {progress:.1f}% - {step}" |
|
|
|
|
|
yield "β
Progress: 100% - Workflow Complete" |
|
|
|
|
|
async def astream_progress_updates(total_steps: int) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
AStream progress update messages for workflow steps |
|
|
|
|
|
Args: |
|
|
total_steps: Total number of workflow steps |
|
|
|
|
|
Yields: |
|
|
str: Progress update messages asynchronously |
|
|
""" |
|
|
steps = [ |
|
|
"Topic Analysis", |
|
|
"Research Retrieval", |
|
|
"Outline Building", |
|
|
"Human Feedback", |
|
|
"Draft Writing", |
|
|
"Bibliography Generation" |
|
|
] |
|
|
|
|
|
for i, step in enumerate(steps[:total_steps]): |
|
|
progress = (i / total_steps) * 100 |
|
|
yield f"π AStream Progress: {progress:.1f}% - {step}" |
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
yield "β
AStream Progress: 100% - Workflow Complete" |
|
|
|
|
|
def yield_workflow_status(status_data: dict) -> Generator[str, None, None]: |
|
|
""" |
|
|
Yield workflow status information |
|
|
|
|
|
Args: |
|
|
status_data: Dictionary containing workflow status |
|
|
|
|
|
Yields: |
|
|
str: Status information strings |
|
|
""" |
|
|
if "topic" in status_data: |
|
|
yield f"π Topic: {status_data['topic']}" |
|
|
|
|
|
if "refined_topic" in status_data: |
|
|
yield f"π― Refined Topic: {status_data['refined_topic']}" |
|
|
|
|
|
if "current_step" in status_data: |
|
|
yield f"π Current Step: {status_data['current_step']}" |
|
|
|
|
|
if "workflow_status" in status_data: |
|
|
yield f"π Workflow Status: {status_data['workflow_status']}" |
|
|
|
|
|
async def astream_workflow_status(status_data: dict) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
AStream workflow status information |
|
|
|
|
|
Args: |
|
|
status_data: Dictionary containing workflow status |
|
|
|
|
|
Yields: |
|
|
str: Status information strings asynchronously |
|
|
""" |
|
|
if "topic" in status_data: |
|
|
yield f"π AStream Topic: {status_data['topic']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
if "refined_topic" in status_data: |
|
|
yield f"π― AStream Refined Topic: {status_data['refined_topic']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
if "current_step" in status_data: |
|
|
yield f"π AStream Current Step: {status_data['current_step']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
if "workflow_status" in status_data: |
|
|
yield f"π AStream Workflow Status: {status_data['workflow_status']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
def yield_error_details(error_info: dict) -> Generator[str, None, None]: |
|
|
""" |
|
|
Yield detailed error information |
|
|
|
|
|
Args: |
|
|
error_info: Dictionary containing error details |
|
|
|
|
|
Yields: |
|
|
str: Error detail strings |
|
|
""" |
|
|
if "error" in error_info: |
|
|
yield f"β Error: {error_info['error']}" |
|
|
|
|
|
if "step" in error_info: |
|
|
yield f"π Failed Step: {error_info['step']}" |
|
|
|
|
|
if "timestamp" in error_info: |
|
|
yield f"β° Error Time: {error_info['timestamp']}" |
|
|
|
|
|
async def astream_error_details(error_info: dict) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
AStream detailed error information |
|
|
|
|
|
Args: |
|
|
error_info: Dictionary containing error details |
|
|
|
|
|
Yields: |
|
|
str: Error detail strings asynchronously |
|
|
""" |
|
|
if "error" in error_info: |
|
|
yield f"β AStream Error: {error_info['error']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
if "step" in error_info: |
|
|
yield f"π AStream Failed Step: {error_info['step']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
if "timestamp" in error_info: |
|
|
yield f"β° AStream Error Time: {error_info['timestamp']}" |
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
def handle_human_feedback(interrupt_data): |
|
|
"""Handle human feedback from native LangGraph interrupt""" |
|
|
|
|
|
|
|
|
interrupt_type = interrupt_data.get("type", "unknown") |
|
|
interrupt_payload = interrupt_data.get("payload", {}) |
|
|
|
|
|
print(f"π Interrupt Type: {interrupt_type}") |
|
|
|
|
|
|
|
|
if "outline" in interrupt_payload: |
|
|
print("\nπ GENERATED OUTLINE:") |
|
|
print("-" * 40) |
|
|
print(interrupt_payload["outline"]) |
|
|
print("-" * 40) |
|
|
|
|
|
|
|
|
if "message" in interrupt_payload: |
|
|
print(f"\nβ {interrupt_payload['message']}") |
|
|
|
|
|
if "options" in interrupt_payload: |
|
|
print("\nπ Available options:") |
|
|
for key, description in interrupt_payload["options"].items(): |
|
|
print(f" β’ {key}: {description}") |
|
|
|
|
|
|
|
|
while True: |
|
|
print("\n" + "-" * 40) |
|
|
choice = input("Your response (approve/revise/reject): ").strip().lower() |
|
|
|
|
|
|
|
|
if not choice.strip(): |
|
|
print("β οΈ No response provided. Please enter 'approve', 'revise', or 'reject'.") |
|
|
print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject") |
|
|
print(" Or type 'quit' to abort the workflow.") |
|
|
continue |
|
|
|
|
|
|
|
|
if choice in ["approve", "a"]: |
|
|
response = { |
|
|
"response": "approve", |
|
|
"feedback": "" |
|
|
} |
|
|
return response |
|
|
elif choice in ["reject", "r", "abort"]: |
|
|
return { |
|
|
"response": "reject", |
|
|
"feedback": "" |
|
|
} |
|
|
elif choice in ["revise", "v"]: |
|
|
print("\nπ Please provide specific feedback for revision:") |
|
|
feedback = input("Feedback: ").strip() |
|
|
if not feedback: |
|
|
print("β οΈ No feedback provided. Please provide specific feedback or choose 'approve' to proceed.") |
|
|
continue |
|
|
return { |
|
|
"response": "revise", |
|
|
"feedback": feedback |
|
|
} |
|
|
elif choice == "quit" or choice == "exit": |
|
|
print("β Workflow aborted by user.") |
|
|
return { |
|
|
"response": "reject", |
|
|
"feedback": "" |
|
|
} |
|
|
else: |
|
|
print(f"β οΈ Invalid response: '{choice}'. Please enter 'approve', 'revise', or 'reject'.") |
|
|
print(" You can also use shortcuts: 'a' for approve, 'v' for revise, 'r' for reject") |
|
|
print(" Or type 'quit' to abort the workflow.") |
|
|
|
|
|
async def process_workflow_with_astream(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
Process the research workflow using AStream for enhanced async processing |
|
|
|
|
|
Args: |
|
|
topic: Research topic |
|
|
streaming_config: Streaming configuration |
|
|
|
|
|
Yields: |
|
|
str: Progress and status updates asynchronously |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not validate_environment(): |
|
|
yield "β Environment validation failed" |
|
|
return |
|
|
|
|
|
yield f"π Starting AStream research workflow for topic: {topic}" |
|
|
|
|
|
|
|
|
astream_enabled = is_astream_enabled(streaming_config.get("preset")) |
|
|
if astream_enabled: |
|
|
yield "β‘ AStream processing enabled" |
|
|
astream_config = get_astream_config(streaming_config.get("preset")) |
|
|
yield f"π AStream config: delay={astream_config['delay']}s, buffer_size={astream_config['buffer_size']}" |
|
|
else: |
|
|
yield "βΉοΈ AStream processing disabled, using standard streaming" |
|
|
|
|
|
|
|
|
streaming_manager = StreamingManager( |
|
|
stream_delay=streaming_config["stream_delay"], |
|
|
config=streaming_config |
|
|
) |
|
|
|
|
|
|
|
|
orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event) |
|
|
|
|
|
|
|
|
result = await orchestrator.run(topic) |
|
|
|
|
|
|
|
|
while result.get("status") == "interrupted": |
|
|
yield "βΈοΈ AStream workflow paused for human feedback" |
|
|
|
|
|
|
|
|
interrupt_data = result.get("interrupt_data", {}) |
|
|
current_state = result.get("current_state", {}) |
|
|
|
|
|
|
|
|
interrupt_payload = { |
|
|
"type": "outline_approval", |
|
|
"payload": { |
|
|
"outline": current_state.get("outline", ""), |
|
|
"topic": current_state.get("refined_topic", ""), |
|
|
"message": "Please review the generated outline and provide feedback", |
|
|
"options": { |
|
|
"approve": "Approve the outline and proceed to draft writing", |
|
|
"revise": "Request revisions to the outline", |
|
|
"reject": "Reject and abort the workflow" |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
human_response = handle_human_feedback(interrupt_payload) |
|
|
|
|
|
|
|
|
yield "π Resuming AStream workflow with user feedback..." |
|
|
|
|
|
|
|
|
if human_response["response"] == "revise" and human_response["feedback"]: |
|
|
human_input = f"revise {human_response['feedback']}" |
|
|
else: |
|
|
human_input = human_response["response"] |
|
|
|
|
|
result = await orchestrator.resume(result["thread_id"], human_input) |
|
|
|
|
|
|
|
|
if result.get("status") == "completed": |
|
|
|
|
|
result_data = result.get("result", {}) |
|
|
|
|
|
|
|
|
if result_data.get("workflow_status") == "aborted": |
|
|
yield "β AStream workflow was rejected by user. No PDF will be generated." |
|
|
return |
|
|
|
|
|
yield "β
AStream workflow completed successfully!" |
|
|
|
|
|
|
|
|
safe_topic = sanitize_filename(result_data.get('refined_topic', topic)) |
|
|
draft_pdf_filename = f"data/{safe_topic}.pdf" |
|
|
draft_text = result_data.get("draft") or "" |
|
|
bibliography = result_data.get("bibliography") or "" |
|
|
|
|
|
if not draft_text.strip(): |
|
|
yield "β οΈ Warning: Draft is empty. PDF will be blank." |
|
|
|
|
|
|
|
|
os.makedirs("data", exist_ok=True) |
|
|
|
|
|
try: |
|
|
save_draft_to_pdf( |
|
|
result_data.get('refined_topic', topic), |
|
|
draft_text, |
|
|
bibliography, |
|
|
draft_pdf_filename |
|
|
) |
|
|
yield f"β
AStream research paper saved successfully!" |
|
|
yield f"π File: {draft_pdf_filename}" |
|
|
yield f"π Draft length: {len(draft_text)} characters" |
|
|
yield f"π Bibliography length: {len(bibliography)} characters" |
|
|
yield f"π Number of references: {bibliography.count('[')}" |
|
|
except Exception as e: |
|
|
yield f"β Error saving PDF: {e}" |
|
|
|
|
|
elif result.get("status") == "error": |
|
|
yield f"β AStream workflow error: {result.get('error', 'Unknown error')}" |
|
|
|
|
|
else: |
|
|
yield f"β Unexpected AStream workflow status: {result.get('status', 'unknown')}" |
|
|
|
|
|
except ValueError as e: |
|
|
yield f"β AStream configuration error: {e}" |
|
|
except Exception as e: |
|
|
yield f"β Unexpected AStream error: {e}" |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
async def process_workflow_with_yield(topic: str, streaming_config: dict) -> AsyncGenerator[str, None]: |
|
|
""" |
|
|
Process the research workflow using yield generators for progressive updates |
|
|
|
|
|
Args: |
|
|
topic: Research topic |
|
|
streaming_config: Streaming configuration |
|
|
|
|
|
Yields: |
|
|
str: Progress and status updates |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not validate_environment(): |
|
|
yield "β Environment validation failed" |
|
|
return |
|
|
|
|
|
yield f"π Starting research workflow for topic: {topic}" |
|
|
|
|
|
|
|
|
streaming_manager = StreamingManager( |
|
|
stream_delay=streaming_config["stream_delay"], |
|
|
config=streaming_config |
|
|
) |
|
|
|
|
|
|
|
|
orchestrator = ResearchOrchestrator(stream_callback=streaming_manager.handle_stream_event) |
|
|
|
|
|
|
|
|
result = await orchestrator.run(topic) |
|
|
|
|
|
|
|
|
while result.get("status") == "interrupted": |
|
|
yield "βΈοΈ Workflow paused for human feedback" |
|
|
|
|
|
|
|
|
interrupt_data = result.get("interrupt_data", {}) |
|
|
current_state = result.get("current_state", {}) |
|
|
|
|
|
|
|
|
interrupt_payload = { |
|
|
"type": "outline_approval", |
|
|
"payload": { |
|
|
"outline": current_state.get("outline", ""), |
|
|
"topic": current_state.get("refined_topic", ""), |
|
|
"message": "Please review the generated outline and provide feedback", |
|
|
"options": { |
|
|
"approve": "Approve the outline and proceed to draft writing", |
|
|
"revise": "Request revisions to the outline", |
|
|
"reject": "Reject and abort the workflow" |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
human_response = handle_human_feedback(interrupt_payload) |
|
|
|
|
|
|
|
|
yield "π Resuming workflow with user feedback..." |
|
|
|
|
|
|
|
|
if human_response["response"] == "revise" and human_response["feedback"]: |
|
|
human_input = f"revise {human_response['feedback']}" |
|
|
else: |
|
|
human_input = human_response["response"] |
|
|
|
|
|
result = await orchestrator.resume(result["thread_id"], human_input) |
|
|
|
|
|
|
|
|
if result.get("status") == "completed": |
|
|
|
|
|
result_data = result.get("result", {}) |
|
|
|
|
|
|
|
|
if result_data.get("workflow_status") == "aborted": |
|
|
yield "β Workflow was rejected by user. No PDF will be generated." |
|
|
return |
|
|
|
|
|
yield "β
Workflow completed successfully!" |
|
|
|
|
|
|
|
|
safe_topic = sanitize_filename(result_data.get('refined_topic', topic)) |
|
|
draft_pdf_filename = f"data/{safe_topic}.pdf" |
|
|
draft_text = result_data.get("draft") or "" |
|
|
bibliography = result_data.get("bibliography") or "" |
|
|
|
|
|
if not draft_text.strip(): |
|
|
yield "β οΈ Warning: Draft is empty. PDF will be blank." |
|
|
|
|
|
|
|
|
os.makedirs("data", exist_ok=True) |
|
|
|
|
|
try: |
|
|
save_draft_to_pdf( |
|
|
result_data.get('refined_topic', topic), |
|
|
draft_text, |
|
|
bibliography, |
|
|
draft_pdf_filename |
|
|
) |
|
|
yield f"β
Research paper saved successfully!" |
|
|
yield f"π File: {draft_pdf_filename}" |
|
|
yield f"π Draft length: {len(draft_text)} characters" |
|
|
yield f"π Bibliography length: {len(bibliography)} characters" |
|
|
yield f"π Number of references: {bibliography.count('[')}" |
|
|
except Exception as e: |
|
|
yield f"β Error saving PDF: {e}" |
|
|
|
|
|
elif result.get("status") == "error": |
|
|
yield f"β Workflow error: {result.get('error', 'Unknown error')}" |
|
|
|
|
|
else: |
|
|
yield f"β Unexpected workflow status: {result.get('status', 'unknown')}" |
|
|
|
|
|
except ValueError as e: |
|
|
yield f"β Configuration error: {e}" |
|
|
except Exception as e: |
|
|
yield f"β Unexpected error: {e}" |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
async def main(streaming_preset=None): |
|
|
"""Main async function to run the research workflow with AStream support""" |
|
|
try: |
|
|
|
|
|
validate_environment() |
|
|
|
|
|
topic = input("Enter your research topic: ").strip() |
|
|
|
|
|
if not topic: |
|
|
print("No topic provided.") |
|
|
return |
|
|
|
|
|
|
|
|
streaming_config = get_streaming_config(streaming_preset) |
|
|
|
|
|
|
|
|
streaming_config["preset"] = streaming_preset |
|
|
|
|
|
print(f"\nπ― Research Topic: {topic}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
if is_astream_enabled(streaming_preset): |
|
|
|
|
|
async for update in process_workflow_with_astream(topic, streaming_config): |
|
|
print(update) |
|
|
else: |
|
|
|
|
|
async for update in process_workflow_with_yield(topic, streaming_config): |
|
|
print(update) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nβ Workflow interrupted by user.") |
|
|
except Exception as e: |
|
|
print(f"β Unexpected error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
def run_main(): |
|
|
"""Wrapper function to run the async main function""" |
|
|
|
|
|
parser = argparse.ArgumentParser(description="AI Research Paper Generator with Yield and AStream Support") |
|
|
parser.add_argument( |
|
|
"--streaming", |
|
|
choices=["fast", "slow", "none", "yield", "astream", "default"], |
|
|
default="default", |
|
|
help="Streaming speed preset: fast, slow, none, yield, astream, or default" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--help-streaming", |
|
|
action="store_true", |
|
|
help="Show detailed streaming configuration help" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if args.help_streaming: |
|
|
print_streaming_help() |
|
|
return |
|
|
|
|
|
|
|
|
preset = None if args.streaming == "default" else args.streaming |
|
|
|
|
|
|
|
|
asyncio.run(main(streaming_preset=preset)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
run_main() |
|
|
|
|
|
|