uml / research_generator.py
Mohammed Foud
Add application file
5a2d62e
from typing import Dict, List, TypedDict
from langgraph.graph import StateGraph, END
from base_agent import BaseAgent
from models import AgentState
import os
from datetime import datetime
import json
class ResearchState(TypedDict):
subject: str
index: List[str]
content: Dict[str, str]
current_step: int
class ResearchGenerator(BaseAgent):
def __init__(self):
super().__init__()
self.workflow = self._create_workflow()
self.output_dir = "research_output"
os.makedirs(self.output_dir, exist_ok=True)
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.research_dir = f"{self.output_dir}/research_{self.timestamp}"
os.makedirs(self.research_dir, exist_ok=True)
def _create_workflow(self):
# Create the graph
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("generate_index", self._generate_index)
workflow.add_node("generate_content", self._generate_content)
workflow.add_node("check_completion", self._check_completion)
# Add edges
workflow.add_edge("generate_index", "generate_content")
workflow.add_edge("generate_content", "check_completion")
workflow.add_conditional_edges(
"check_completion",
self._should_continue,
{
True: "generate_content",
False: END
}
)
# Set entry point
workflow.set_entry_point("generate_index")
return workflow.compile()
def _save_step(self, state: ResearchState, step_name: str, content: str):
"""Save individual step content to a file"""
filename = f"{self.research_dir}/{step_name}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
print(f"Saved {step_name} to: {filename}")
def _save_final_markdown(self, state: ResearchState):
"""Save the complete research in a single markdown file"""
filename = f"{self.research_dir}/complete_research.md"
with open(filename, 'w', encoding='utf-8') as f:
# Write title
f.write(f"# {state['subject']}\n\n")
# Write index
f.write("## الفهرس\n\n")
for item in state['index']:
f.write(f"{item}\n")
f.write("\n")
# Write content
f.write("## المحتوى\n\n")
for topic, content in state['content'].items():
f.write(f"### {topic}\n\n")
f.write(f"{content}\n\n")
f.write("---\n\n")
return filename
async def _generate_index(self, state: ResearchState) -> Dict:
prompt_template = """
Create a detailed research index for the following subject:
Subject: {subject}
Instructions:
1. Create a comprehensive list of topics and subtopics
2. Organize them in a logical order
3. Include main sections and subsections
4. Format as a numbered list
Example format:
1. Main Topic 1
1.1 Subtopic 1.1
1.2 Subtopic 1.2
2. Main Topic 2
2.1 Subtopic 2.1
2.2 Subtopic 2.2
"""
result = await self._process(
state=state,
prompt_template=prompt_template,
output_key="index",
step_name="research index",
subject=state["subject"]
)
# Save index immediately
index_content = f"# {state['subject']}\n\n## الفهرس\n\n{result['index']}"
self._save_step(state, "index", index_content)
return {"index": result["index"].split("\n"), "current_step": 0}
async def _generate_content(self, state: ResearchState) -> Dict:
current_topic = state["index"][state["current_step"]]
prompt_template = """
Generate detailed content for the following research topic:
Subject: {subject}
Topic: {current_topic}
Instructions:
1. Provide comprehensive information about the topic
2. Include relevant examples and explanations
3. Use clear and academic language
4. Structure the content with proper paragraphs
5. Include key points and supporting details
"""
result = await self._process(
state=state,
prompt_template=prompt_template,
output_key="content",
step_name=f"content for {current_topic}",
subject=state["subject"],
current_topic=current_topic
)
# Update content dictionary
content = state.get("content", {})
content[current_topic] = result["content"]
# Save this topic's content immediately
topic_content = f"### {current_topic}\n\n{result['content']}"
self._save_step(state, f"topic_{state['current_step']}", topic_content)
return {
"content": content,
"current_step": state["current_step"] + 1
}
def _check_completion(self, state: ResearchState) -> Dict:
return state
def _should_continue(self, state: ResearchState) -> bool:
return state["current_step"] < len(state["index"])
async def generate_research(self, subject: str) -> Dict:
initial_state = {
"subject": subject,
"index": [],
"content": {},
"current_step": 0
}
result = await self.workflow.ainvoke(initial_state)
# Save final complete research
filename = self._save_final_markdown(result)
print(f"\nComplete research saved to: {filename}")
return result
async def astream(self, initial_state: ResearchState):
"""Stream the workflow execution state."""
async for state in self.workflow.astream(initial_state):
yield state
# Example usage
if __name__ == "__main__":
import asyncio
async def main():
generator = ResearchGenerator()
research = await generator.generate_research("الذكاء الاصطناعي والاستثمار")
# Print summary
print("\n=== البحث المولد ===\n")
print("\n=== الفهرس ===")
for item in research["index"]:
print(item.strip())
print("\n=== المحتوى ===")
for topic, content in research["content"].items():
print(f"\n{topic.strip()}:")
print(content.strip())
print("-" * 50)
asyncio.run(main())