|
|
from pydantic_graph import BaseNode, End, GraphRunContext, Graph |
|
|
from pydantic_ai import Agent |
|
|
from pydantic_ai.common_tools.tavily import tavily_search_tool |
|
|
from dataclasses import dataclass |
|
|
from pydantic import Field, BaseModel |
|
|
from typing import List, Dict, Optional, Any |
|
|
from pydantic_ai.models.gemini import GeminiModel |
|
|
from pydantic_ai.providers.google_gla import GoogleGLAProvider |
|
|
from dotenv import load_dotenv |
|
|
import os |
|
|
from tavily import TavilyClient |
|
|
from IPython.display import Image, display |
|
|
import requests |
|
|
import time |
|
|
|
|
|
load_dotenv() |
|
|
google_api_key=os.getenv('google_api_key') |
|
|
tavily_key=os.getenv('tavily_key') |
|
|
tavily_client = TavilyClient(api_key=tavily_key) |
|
|
llm=GeminiModel('gemini-2.0-flash', provider=GoogleGLAProvider(api_key=google_api_key)) |
|
|
pse=os.getenv('pse') |
|
|
|
|
|
@dataclass |
|
|
class State: |
|
|
query:str |
|
|
preliminary_research: str |
|
|
research_plan: Dict |
|
|
research_results: Dict |
|
|
validation : str |
|
|
final: Dict |
|
|
class paragraph_content(BaseModel): |
|
|
title: str = Field(description='the title of the paragraph') |
|
|
content: str = Field(description='the content of the paragraph') |
|
|
|
|
|
class paragraph(BaseModel): |
|
|
title: str = Field(description='the title of the paragraph') |
|
|
should_include: str = Field(description='a description of what the paragraph should include') |
|
|
class Paper_layout(BaseModel): |
|
|
title: str = Field(description='the title of the paper') |
|
|
paragraphs: List[paragraph]= Field(description='the list of paragraphs of the paper') |
|
|
|
|
|
paper_layout_agent=Agent(llm, result_type=Paper_layout, system_prompt="generate a paper layout based on the query, preliminary_search, search_results,include a Title for the paper, for the paragraphs only include the title, no content, no image, no table, start with introduction and end with conclusion") |
|
|
paragraph_gen_agent=Agent(llm, result_type=paragraph_content, system_prompt="generate a paragraph synthesizing the research_results based on the title,what the paragraph should include, and what has already been written to avoid repetition") |
|
|
class PaperGen_node(BaseNode[State]): |
|
|
async def run(self, ctx: GraphRunContext[State])->End: |
|
|
prompt=(f'query:{ctx.state.query}, preliminary_search:{ctx.state.preliminary_research},search_results:{ctx.state.research_results.research_results}') |
|
|
result=await paper_layout_agent.run(prompt) |
|
|
paragraphs=[] |
|
|
for i in result.data.paragraphs: |
|
|
time.sleep(2) |
|
|
paragraph_data=await paragraph_gen_agent.run(f'title:{i.title}, should_include:{i.should_include}, research_results:{ctx.state.research_results.research_results}, already_written:{paragraphs}') |
|
|
paragraphs.append(paragraph_data.data.model_dump()) |
|
|
|
|
|
paper={'title':result.data.title, |
|
|
'image_url':ctx.state.research_results.image_url if ctx.state.research_results.image_url else None, |
|
|
'paragraphs':paragraphs, |
|
|
'table':ctx.state.research_results.table if ctx.state.research_results.table else None, |
|
|
'references':ctx.state.research_results.references if ctx.state.research_results.references else None} |
|
|
|
|
|
ctx.state.final=paper |
|
|
|
|
|
return End(ctx.state.final) |
|
|
|
|
|
def google_image_search(query:str): |
|
|
"""Search for images using Google Custom Search API |
|
|
args: query |
|
|
return: image url |
|
|
""" |
|
|
|
|
|
url = "https://www.googleapis.com/customsearch/v1" |
|
|
|
|
|
params = { |
|
|
"q": query, |
|
|
"cx": pse, |
|
|
"key": google_api_key, |
|
|
"searchType": "image", |
|
|
"num": 1 |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.get(url, params=params) |
|
|
data = response.json() |
|
|
|
|
|
|
|
|
if 'items' in data: |
|
|
|
|
|
image_url = data['items'][0]['link'] |
|
|
return image_url |
|
|
|
|
|
class Table_row(BaseModel): |
|
|
data: List[str] = Field(description='the data of the row') |
|
|
class Table(BaseModel): |
|
|
rows: List[Table_row] = Field(description='the rows of the table') |
|
|
columns: List[str] = Field(description='the columns of the table') |
|
|
|
|
|
class Research_results(BaseModel): |
|
|
research_results: List[str] = Field(default_factory=None,description='the research results') |
|
|
image_url: str = Field(default_factory=None,description='the image url if needed else return None') |
|
|
table: dict = Field(default_factory=None,description='the table dataframe in a dictionary format') |
|
|
references: str = Field(default_factory=None,description='the references (urls) of the research_results') |
|
|
|
|
|
table_agent=Agent(llm, result_type=Table, system_prompt="generate a detailed table in dictionary format based on the research and the query") |
|
|
|
|
|
class Research_node(BaseNode[State]): |
|
|
async def run(self, ctx: GraphRunContext[State])->PaperGen_node: |
|
|
research_results=Research_results(research_results=[], image_url='', table={}, references='') |
|
|
|
|
|
for i in ctx.state.research_plan.search_queries: |
|
|
response = tavily_client.search(i.search_query) |
|
|
data=[] |
|
|
for i in response.get('results'): |
|
|
if i.get('score')>0.50: |
|
|
|
|
|
|
|
|
data.append(i.get('url')) |
|
|
research_results.research_results.append(i.get('content')) |
|
|
research_results.research_results=list(set(research_results.research_results)) |
|
|
research_results.references=list(set(data)) |
|
|
research_results.references=', '.join(research_results.references) |
|
|
ctx.state.research_results=research_results |
|
|
if ctx.state.research_plan.image_search_query: |
|
|
image_url=google_image_search(ctx.state.research_plan.image_search_query) |
|
|
ctx.state.research_results.image_url=image_url |
|
|
|
|
|
if ctx.state.research_plan.table: |
|
|
result=await table_agent.run(f'research_results:{ctx.state.research_results.research_results},query:{ctx.state.query}') |
|
|
ctx.state.research_results.table={'data':[row.data for row in result.data.rows], 'columns':result.data.columns} |
|
|
|
|
|
|
|
|
return PaperGen_node() |
|
|
|
|
|
class search_query(BaseModel): |
|
|
search_query: str = Field(description='the detailed web search query for the research') |
|
|
|
|
|
class Research_plan(BaseModel): |
|
|
search_queries: List[search_query] = Field(description='the detailed web search queries for the research') |
|
|
table: Optional[str] = Field(default_factory=None,description='if a table is needed, return yes else return None') |
|
|
image_search_query: Optional[str] = Field(default_factory=None,description='if image is needed, generate a image search query, optional') |
|
|
|
|
|
research_plan_agent=Agent(llm, result_type=Research_plan, system_prompt='generate a detailed research plan breaking down the research into smaller parts based on the query and the preliminary search, include a table and image search query if the user wants it') |
|
|
|
|
|
class Research_plan_node(BaseNode[State]): |
|
|
async def run(self, ctx: GraphRunContext[State])->Research_node: |
|
|
|
|
|
prompt=(f'query:{ctx.state.query}, preliminary_search:{ctx.state.preliminary_research}') |
|
|
result=await research_plan_agent.run(prompt) |
|
|
ctx.state.research_plan=result.data |
|
|
return Research_node() |
|
|
|
|
|
|
|
|
search_agent=Agent(llm, tools=[tavily_search_tool(tavily_key)], system_prompt="do a websearch based on the query") |
|
|
|
|
|
class preliminary_search_node(BaseNode[State]): |
|
|
async def run(self, ctx: GraphRunContext[State]) -> Research_plan_node: |
|
|
prompt = (' Do a preliminary search to get a global idea of the subject that the user wants to do reseach on as well as the necessary informations to do a search on.\n' |
|
|
f'The subject is based on the query: {ctx.state.query}, return the results of the search.') |
|
|
result=await search_agent.run(prompt) |
|
|
ctx.state.preliminary_research=result.data |
|
|
return Research_plan_node() |
|
|
|
|
|
class Deep_research_engine: |
|
|
def __init__(self): |
|
|
self.graph=Graph(nodes=[preliminary_search_node, Research_plan_node, Research_node, PaperGen_node]) |
|
|
self.state=State(query='', preliminary_research='', research_plan=[], research_results=[], validation='', final='') |
|
|
|
|
|
async def chat(self,query:str): |
|
|
"""Chat with the deep research engine, |
|
|
Args: |
|
|
query (str): The query to search for |
|
|
Returns: |
|
|
str: The response from the deep research engine |
|
|
""" |
|
|
self.state.query=query |
|
|
response=await self.graph.run(preliminary_search_node(),state=self.state) |
|
|
return response.output |
|
|
|
|
|
|
|
|
def display_graph(self): |
|
|
"""Display the graph of the deep research engine |
|
|
Returns: |
|
|
Image: The image of the graph |
|
|
""" |
|
|
image=self.graph.mermaid_image() |
|
|
return display(Image(image)) |
|
|
|
|
|
|