| from typing import Dict, List, TypedDict |
| from langgraph.graph import StateGraph, END |
| from base_agent import BaseAgent |
| from models import AgentState |
| import os |
| from datetime import datetime |
| import json |
|
|
| class ResearchState(TypedDict): |
| subject: str |
| index: List[str] |
| content: Dict[str, str] |
| current_step: int |
|
|
| class ResearchGenerator(BaseAgent): |
| def __init__(self): |
| super().__init__() |
| self.workflow = self._create_workflow() |
| self.output_dir = "research_output" |
| os.makedirs(self.output_dir, exist_ok=True) |
| self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| self.research_dir = f"{self.output_dir}/research_{self.timestamp}" |
| os.makedirs(self.research_dir, exist_ok=True) |
|
|
| def _create_workflow(self): |
| |
| workflow = StateGraph(ResearchState) |
|
|
| |
| workflow.add_node("generate_index", self._generate_index) |
| workflow.add_node("generate_content", self._generate_content) |
| workflow.add_node("check_completion", self._check_completion) |
|
|
| |
| workflow.add_edge("generate_index", "generate_content") |
| workflow.add_edge("generate_content", "check_completion") |
| workflow.add_conditional_edges( |
| "check_completion", |
| self._should_continue, |
| { |
| True: "generate_content", |
| False: END |
| } |
| ) |
|
|
| |
| workflow.set_entry_point("generate_index") |
|
|
| return workflow.compile() |
|
|
| def _save_step(self, state: ResearchState, step_name: str, content: str): |
| """Save individual step content to a file""" |
| filename = f"{self.research_dir}/{step_name}.md" |
| with open(filename, 'w', encoding='utf-8') as f: |
| f.write(content) |
| print(f"Saved {step_name} to: {filename}") |
|
|
| def _save_final_markdown(self, state: ResearchState): |
| """Save the complete research in a single markdown file""" |
| filename = f"{self.research_dir}/complete_research.md" |
| |
| with open(filename, 'w', encoding='utf-8') as f: |
| |
| f.write(f"# {state['subject']}\n\n") |
| |
| |
| f.write("## الفهرس\n\n") |
| for item in state['index']: |
| f.write(f"{item}\n") |
| f.write("\n") |
| |
| |
| f.write("## المحتوى\n\n") |
| for topic, content in state['content'].items(): |
| f.write(f"### {topic}\n\n") |
| f.write(f"{content}\n\n") |
| f.write("---\n\n") |
| |
| return filename |
|
|
| async def _generate_index(self, state: ResearchState) -> Dict: |
| prompt_template = """ |
| Create a detailed research index for the following subject: |
| |
| Subject: {subject} |
| |
| Instructions: |
| 1. Create a comprehensive list of topics and subtopics |
| 2. Organize them in a logical order |
| 3. Include main sections and subsections |
| 4. Format as a numbered list |
| |
| Example format: |
| 1. Main Topic 1 |
| 1.1 Subtopic 1.1 |
| 1.2 Subtopic 1.2 |
| 2. Main Topic 2 |
| 2.1 Subtopic 2.1 |
| 2.2 Subtopic 2.2 |
| """ |
|
|
| result = await self._process( |
| state=state, |
| prompt_template=prompt_template, |
| output_key="index", |
| step_name="research index", |
| subject=state["subject"] |
| ) |
|
|
| |
| index_content = f"# {state['subject']}\n\n## الفهرس\n\n{result['index']}" |
| self._save_step(state, "index", index_content) |
|
|
| return {"index": result["index"].split("\n"), "current_step": 0} |
|
|
| async def _generate_content(self, state: ResearchState) -> Dict: |
| current_topic = state["index"][state["current_step"]] |
| |
| prompt_template = """ |
| Generate detailed content for the following research topic: |
| |
| Subject: {subject} |
| Topic: {current_topic} |
| |
| Instructions: |
| 1. Provide comprehensive information about the topic |
| 2. Include relevant examples and explanations |
| 3. Use clear and academic language |
| 4. Structure the content with proper paragraphs |
| 5. Include key points and supporting details |
| """ |
|
|
| result = await self._process( |
| state=state, |
| prompt_template=prompt_template, |
| output_key="content", |
| step_name=f"content for {current_topic}", |
| subject=state["subject"], |
| current_topic=current_topic |
| ) |
|
|
| |
| content = state.get("content", {}) |
| content[current_topic] = result["content"] |
|
|
| |
| topic_content = f"### {current_topic}\n\n{result['content']}" |
| self._save_step(state, f"topic_{state['current_step']}", topic_content) |
|
|
| return { |
| "content": content, |
| "current_step": state["current_step"] + 1 |
| } |
|
|
| def _check_completion(self, state: ResearchState) -> Dict: |
| return state |
|
|
| def _should_continue(self, state: ResearchState) -> bool: |
| return state["current_step"] < len(state["index"]) |
|
|
| async def generate_research(self, subject: str) -> Dict: |
| initial_state = { |
| "subject": subject, |
| "index": [], |
| "content": {}, |
| "current_step": 0 |
| } |
| |
| result = await self.workflow.ainvoke(initial_state) |
| |
| |
| filename = self._save_final_markdown(result) |
| print(f"\nComplete research saved to: {filename}") |
| |
| return result |
|
|
| async def astream(self, initial_state: ResearchState): |
| """Stream the workflow execution state.""" |
| async for state in self.workflow.astream(initial_state): |
| yield state |
|
|
| |
| if __name__ == "__main__": |
| import asyncio |
| |
| async def main(): |
| generator = ResearchGenerator() |
| research = await generator.generate_research("الذكاء الاصطناعي والاستثمار") |
| |
| |
| print("\n=== البحث المولد ===\n") |
| print("\n=== الفهرس ===") |
| for item in research["index"]: |
| print(item.strip()) |
| |
| print("\n=== المحتوى ===") |
| for topic, content in research["content"].items(): |
| print(f"\n{topic.strip()}:") |
| print(content.strip()) |
| print("-" * 50) |
|
|
| asyncio.run(main()) |