Shekarss commited on
Commit
aa9134d
·
verified ·
1 Parent(s): 2bd4b8d

Upload 10 files

Browse files
deep_research.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import asyncio
3
+ from research_manager import ResearchManager
4
+ from dotenv import load_dotenv
5
+
6
+ load_dotenv(override=True)
7
+
8
+ manager = ResearchManager()
9
+
10
+ async def run(query: str):
11
+ async for chunk in ResearchManager().run(query):
12
+ yield chunk
13
+
14
+
15
+ with gr.Blocks(theme=gr.themes.Default(primary_hue="sky")) as ui:
16
+ gr.Markdown("# Deep Research")
17
+ query_textbox = gr.Textbox(label="What topic would you like to research?")
18
+ run_button = gr.Button("Run", variant="primary")
19
+ report = gr.Markdown(label="Report")
20
+
21
+ run_button.click(fn=run, inputs=query_textbox, outputs=report)
22
+ query_textbox.submit(fn=run, inputs=query_textbox, outputs=report)
23
+
24
+ ui.launch(inbrowser=True)
25
+
guardrail_agent.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from agents import Agent, output_guardrail, GuardrailFunctionOutput, Runner
2
+ from pydantic import BaseModel, Field
3
+ from model import model
4
+
5
+ class NoCode(BaseModel):
6
+ is_code: bool = Field(description='Checks for code in the report')
7
+
8
+ guard_agent = Agent(
9
+ name='guard_rail',
10
+ instructions='Checks whther the report has hidden code in it',
11
+ model=model,
12
+ output_type=NoCode
13
+ )
14
+
15
+ @output_guardrail
16
+ async def check_no_code(ctx, agent, report):
17
+ result = await Runner.run(guard_agent, report, context=ctx.context)
18
+ is_code = result.final_output.is_code
19
+ return GuardrailFunctionOutput(output_info={'found code': 'An embedded code was found'}, tripwire_triggered=is_code)
20
+
model.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from agents import OpenAIChatCompletionsModel
3
+ from openai import AsyncOpenAI
4
+ from dotenv import load_dotenv
5
+
6
+ load_dotenv(override=True)
7
+
8
+ client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv('grok_key'))
9
+ model = OpenAIChatCompletionsModel(model='meta-llama/llama-4-scout-17b-16e-instruct', openai_client=client)
planner_agent.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from model import model
2
+ from agents import Agent
3
+ from pydantic import BaseModel, Field
4
+
5
+ HOW_MANY_SEARCHES = 3
6
+
7
+ INSTRUCTIONS = f"You are a helpful research assistant. Given a query, come up with a set of web searches \
8
+ to perform to best answer the query. Output {HOW_MANY_SEARCHES} terms to query for."
9
+
10
+ class WebSearchItem(BaseModel):
11
+ reason: str = Field(description="Your reasoning for why this search is important to the query.")
12
+
13
+ query: str = Field(description="The search term to use for the web search.")
14
+
15
+
16
+ class WebSearchPlan(BaseModel):
17
+ searches: list[WebSearchItem] = Field(description="A list of web searches to perform to best answer the query.")
18
+
19
+ planner_agent = Agent(
20
+ name='Planner Agent',
21
+ instructions=INSTRUCTIONS,
22
+ model=model,
23
+ output_type=WebSearchPlan
24
+ )
question_agent.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from agents import Agent, Runner, trace, output_guardrail, GuardrailFunctionOutput
2
+ from pydantic import BaseModel, Field
3
+ from typing import Dict, List
4
+ from model import model
5
+
6
+ INSTRUCTIONS = (
7
+ "You are a helpful research assistant. "
8
+ "Given a user query, generate a set of 3 insightful questions "
9
+ "to ask the user in order to facilitate detailed and in-depth planning."
10
+ )
11
+
12
+ class QuestionItem(BaseModel):
13
+ number: int = Field(description='Question Number')
14
+ question: str = Field(description='The question text based on user query')
15
+ answer: str | None = None
16
+
17
+ class QuestionPlan(BaseModel):
18
+ questions: list[QuestionItem] = Field(description='List of question')
19
+
20
+ question_agent = Agent(
21
+ name='question_agent',
22
+ instructions=INSTRUCTIONS,
23
+ model=model,
24
+ output_type=QuestionPlan
25
+ )
26
+
refiner_agent.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from model import model
2
+ from agents import Agent
3
+ from pydantic import BaseModel, Field
4
+
5
+ instructions = (
6
+ "Your role is to evaluate the research plan based on the user's query. "
7
+ "If the existing plan is incomplete, unclear, or insufficient, "
8
+ "You return is_valid a bool values, 'True' if accepted, 'False' if not"
9
+ "Provide proper feedback and suggestion why it was not a valid plan"
10
+ )
11
+
12
+ class ValidPlan(BaseModel):
13
+ is_valid: bool = Field(..., description="Whether the plan is valid. Return True if valid, False otherwise.")
14
+ feedback: str = Field("", description="If invalid, describe why the plan is insufficient and suggest improvements.")
15
+
16
+ refiner_agent = Agent(
17
+ name='refiner_agent',
18
+ instructions=instructions,
19
+ model=model,
20
+ output_type=ValidPlan
21
+ )
research_manager.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from agents import Runner, trace, gen_trace_id
2
+ from model import model
3
+ from question_agent import question_agent, QuestionItem, QuestionPlan
4
+ from search_agent import search_agent
5
+ from planner_agent import planner_agent, WebSearchItem, WebSearchPlan
6
+ from writer_agent import writer_agent, ReportData
7
+ from refiner_agent import refiner_agent, ValidPlan
8
+ import asyncio
9
+
10
+ class ResearchManager:
11
+
12
+ async def run(self, query: str):
13
+ """ Run the deep research process, yielding the status updates and the final report"""
14
+ trace_id = gen_trace_id()
15
+ with trace('Research trace', trace_id=trace_id):
16
+ yield "Starting research"
17
+
18
+ question_plan: QuestionPlan = await self.ask_questions(query)
19
+
20
+ user_answers = {}
21
+ for q_item in question_plan.questions:
22
+ answer = input(f"{q_item.number}. {q_item.question}: ")
23
+ user_answers[q_item.number] = answer
24
+ q_item.answer = answer
25
+ yield f"Collected answers for {len(user_answers)} questions."
26
+
27
+ search_plan: WebSearchPlan = await self.plan_next(query, question_plan)
28
+ yield "Initial web search plan generated."
29
+
30
+ valid_plan: ValidPlan = await self.refine_plan(query, search_plan)
31
+ plan_num = 0
32
+ while not valid_plan.is_valid:
33
+ yield f"Plan not valid: {valid_plan.feedback}. Refining..."
34
+ search_plan: WebSearchPlan = await self.plan_next(query, question_plan)
35
+ valid_plan: ValidPlan = await self.refine_plan(query, search_plan)
36
+
37
+ if plan_num >= 4:
38
+ break
39
+ plan_num += 1
40
+ yield "Plan validated and refined."
41
+
42
+ search_results = await self.web_search(search_plan)
43
+ yield "Web searches completed."
44
+
45
+ report: ReportData = await self.write_report(query, search_results)
46
+ yield report.markdown_report
47
+
48
+ async def ask_questions(self, query: str)->QuestionPlan:
49
+ """Given a user query, generate a set of 3 insightful questions to ask the user in order to facilitate detailed and in-depth planning."""
50
+ result = await Runner.run(question_agent, query)
51
+ return result.final_output_as(QuestionPlan)
52
+
53
+ async def plan_next(self, query:str, que_depth: QuestionPlan)->WebSearchPlan:
54
+ """Based on the user query and the questions they answered, you come up with a set of web searches to perform to best answer the query"""
55
+ answers_str = "; ".join([q.answer for q in que_depth.questions])
56
+ input_query = f'User query {query}. User answered questions {answers_str}'
57
+ result = await Runner.run(planner_agent, input_query)
58
+ return result.final_output_as(WebSearchPlan)
59
+
60
+ async def refine_plan(self, query:str, ex_plan: WebSearchPlan)->ValidPlan:
61
+ """Evaluate the research plan based on the user's query. """
62
+ all_plans = "; ".join([p.reason for p in ex_plan.searches])
63
+ input_plan = f'Validate the plan and provide whether its valid or not along with feedback and a bool of whether its valid or not. The plan {all_plans}'
64
+ result = await Runner.run(refiner_agent, input_plan)
65
+ return result.final_output_as(ValidPlan)
66
+
67
+ async def web_search(self, search_plan: WebSearchPlan)->list[str]:
68
+ """ Perform the searches to perform for the query """
69
+ num_completed = 0
70
+ tasks = [asyncio.create_task(self.search(item)) for item in search_plan.searches]
71
+ results = []
72
+ for task in asyncio.as_completed(tasks):
73
+ result = await task
74
+ if result is not None:
75
+ results.append(result)
76
+ num_completed += 1
77
+ print(f"Searching... {num_completed}/{len(tasks)} completed")
78
+ print("Finished searching")
79
+ return results
80
+
81
+ async def search(self, item: WebSearchItem)->str | None:
82
+ """ Perform a search for the query """
83
+ input = f"Search term: {item.query}\nReason for searching: {item.reason}"
84
+ try:
85
+ result = await Runner.run(search_agent, input)
86
+ return str(result.final_output)
87
+ except Exception:
88
+ return None
89
+
90
+ async def write_report(self, query: str, search_results: list[str]) -> ReportData:
91
+ """ Write the report for the query """
92
+ print("Thinking about report...")
93
+ input = f"Original query: {query}\nSummarized search results: {search_results}"
94
+ result = await Runner.run(writer_agent, input)
95
+ print("Finished writing report")
96
+ return result.final_output_as(ReportData)
search_agent.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from agents import Agent, ModelSettings, function_tool
2
+ from duckduckgo_search import DDGS
3
+ from model import model
4
+ from writer_agent import writer_agent
5
+
6
+ INSTRUCTIONS = (
7
+ "You are a research assistant. Given a search term, you search the web for that term and "
8
+ "produce a concise summary of the results. The summary must 2-3 paragraphs and less than 300 "
9
+ "words. Capture the main points. Write succintly, no need to have complete sentences or good "
10
+ "grammar. This will be consumed by someone synthesizing a report, so its vital you capture the "
11
+ "essence and ignore any fluff. Do not include any additional commentary other than the summary itself."
12
+ )
13
+
14
+ @function_tool
15
+ def duckduckgo_search(query: str, max_results: str = 5) -> list[dict]:
16
+ """Search the web using DuckDuckGo and return results."""
17
+ # Cast to int just in case the model passes a string
18
+ max_results = int(max_results)
19
+ with DDGS() as ddgs:
20
+ return [r for r in ddgs.text(query, max_results=max_results)]
21
+
22
+ search_agent = Agent(
23
+ name="Search agent",
24
+ instructions=INSTRUCTIONS,
25
+ tools=[duckduckgo_search],
26
+ model=model,
27
+ handoff_description='After searching, handoff to writer_agent to generate a report',
28
+ handoffs=[writer_agent],
29
+ model_settings=ModelSettings(tool_choice="required"),
30
+ )
test_gradio.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Test script to verify the Gradio interface works correctly
4
+ """
5
+
6
+ import asyncio
7
+ from research_manager import ResearchManager
8
+
9
+ async def test_research_flow():
10
+ """Test the research flow without Gradio to ensure it works"""
11
+ manager = ResearchManager()
12
+
13
+ # Test query
14
+ query = "The impact of AI on healthcare"
15
+
16
+ print("Testing research flow...")
17
+ print(f"Query: {query}")
18
+ print("-" * 50)
19
+
20
+ # First run - should return questions
21
+ print("Step 1: Getting questions...")
22
+ async for chunk in manager.run(query):
23
+ if isinstance(chunk, dict) and chunk.get("type") == "questions":
24
+ print(f"Generated {len(chunk['questions'])} questions:")
25
+ for q in chunk['questions']:
26
+ print(f" {q.number}. {q.question}")
27
+ break
28
+ else:
29
+ print(f"Status: {chunk}")
30
+
31
+ print("\n" + "-" * 50)
32
+
33
+ # Simulate user answers
34
+ sample_answers = {
35
+ "1": "I'm interested in diagnostic accuracy improvements",
36
+ "2": "Looking at the last 5 years of developments",
37
+ "3": "Focus on both benefits and challenges"
38
+ }
39
+
40
+ print("Step 2: Running research with sample answers...")
41
+ async for chunk in manager.run(query, sample_answers):
42
+ print(f"Status: {chunk}")
43
+
44
+ if __name__ == "__main__":
45
+ asyncio.run(test_research_flow())
writer_agent.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from model import model
2
+ from agents import Agent
3
+ from pydantic import BaseModel, Field
4
+ from guardrail_agent import check_no_code
5
+
6
+ INSTRUCTIONS = (
7
+ "You are a senior researcher tasked with writing a cohesive report for a research query. "
8
+ "You will be provided with the original query, and some initial research done by a research assistant.\n"
9
+ "You should first come up with an outline for the report that describes the structure and "
10
+ "flow of the report. Then, generate the report and return that as your final output.\n"
11
+ "The final output should be in markdown format, and it should be lengthy and detailed. Aim "
12
+ "for 5-10 pages of content, at least 1000 words."
13
+ )
14
+
15
+ class ReportData(BaseModel):
16
+ short_summary: str = Field(description="A short 2-3 sentence summary of the findings.")
17
+
18
+ markdown_report: str = Field(description="The final report atleast 1000 words")
19
+
20
+ follow_up_questions: list[str] = Field(description="Suggested topics to research further")
21
+
22
+ writer_agent = Agent(
23
+ name="WriterAgent",
24
+ instructions=INSTRUCTIONS,
25
+ model=model,
26
+ output_type=ReportData
27
+ )