|
|
import asyncio |
|
|
import logging |
|
|
import os |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
import aiohttp |
|
|
import html2text |
|
|
from griffe import json_decoder |
|
|
from llama_index.core import Settings, VectorStoreIndex |
|
|
from llama_index.core.schema import Document |
|
|
from llama_index.core.tools import FunctionTool |
|
|
from llama_index.embeddings.openai import OpenAIEmbedding |
|
|
from llama_index.llms.openai import OpenAI |
|
|
from llama_index.tools.tavily_research import TavilyToolSpec |
|
|
from llama_index.tools.wikipedia import WikipediaToolSpec |
|
|
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
Settings.llm = OpenAI(model="gpt-4o", temperature=0.1) |
|
|
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") |
|
|
|
|
|
|
|
|
|
|
|
async def fetch_and_process( |
|
|
urls: List[str], session: aiohttp.ClientSession, timeout: int = 10 |
|
|
) -> List[Document]: |
|
|
"""Fetch and convert webpages to Document objects concurrently.""" |
|
|
|
|
|
async def fetch(url: str) -> Dict[str, str]: |
|
|
try: |
|
|
async with session.get(url, timeout=timeout) as response: |
|
|
return {"text": await response.text(), "url": str(response.url)} |
|
|
except (asyncio.TimeoutError, aiohttp.ClientError) as e: |
|
|
logging.warning(f"Could not fetch {url}: {repr(e)}") |
|
|
return {"text": "", "url": url} |
|
|
|
|
|
tasks = [fetch(url) for url in urls] |
|
|
responses = await asyncio.gather(*tasks) |
|
|
|
|
|
return [ |
|
|
Document(text=html2text.html2text(resp["text"]), id_=resp["url"]) |
|
|
for resp in responses if resp["text"] |
|
|
] |
|
|
|
|
|
|
|
|
async def summarize_websites(urls: List[str], query: str) -> List[str]: |
|
|
"""Summarize a query from content across multiple websites. Even if there is only one website, it will still be used. |
|
|
|
|
|
Args: |
|
|
urls: A list of URLs to summarize. |
|
|
query: The query to summarize. |
|
|
Returns: |
|
|
A list of summaries. |
|
|
""" |
|
|
|
|
|
logging.info(f"Summarizing {len(urls)} websites for query: {query}") |
|
|
|
|
|
Settings.llm = OpenAI(model="gpt-4o-mini") |
|
|
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") |
|
|
|
|
|
summaries = [] |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
documents = await fetch_and_process(urls, session) |
|
|
for doc in documents: |
|
|
index = VectorStoreIndex.from_documents([doc]) |
|
|
result = index.as_query_engine().query(f"Summarize in very meticulous detail. {query}") |
|
|
summaries.append(f"Source: {doc.id_} \nContent: {result.response}") |
|
|
|
|
|
return summaries |
|
|
|
|
|
|
|
|
def tavily_search(query: str, max_results: Optional[int] = 10) -> List[Dict]: |
|
|
""" |
|
|
Tavily search with result formatting. |
|
|
Args: |
|
|
query: The query to search for. |
|
|
max_results: The maximum number of results to return. |
|
|
Returns: |
|
|
results: A list of dictionaries containing the results as URLS. Need to be used with the summarize_websites tool. |
|
|
""" |
|
|
logger.info(f"Called tavily_search for: {query}") |
|
|
|
|
|
try: |
|
|
search_engine = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")) |
|
|
search_results = search_engine.search(query, max_results=max_results) |
|
|
|
|
|
results = [] |
|
|
for document in search_results: |
|
|
results.append({ |
|
|
"url": document.metadata.get("url", ""), |
|
|
"content": f"Title: {document.metadata.get('title', '')}\nContent: {document.text}" |
|
|
}) |
|
|
|
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Tavily search failed: {str(e)}") |
|
|
return [{"error": f"Search failed: {str(e)}"}] |
|
|
|
|
|
|
|
|
def search_wikipedia(query: str, language: str = "en") -> str: |
|
|
""" |
|
|
Search Wikipedia for specific information. This is a more efficient way to search Wikipedia than the tavily_search tool. Need to be used with the summarize_websites tool. |
|
|
Args: |
|
|
query: The search query |
|
|
language: Wikipedia language code (default: "en") |
|
|
Returns: |
|
|
str: Wikipedia content summary |
|
|
""" |
|
|
logger.info(f"Searching Wikipedia for: {query}") |
|
|
|
|
|
try: |
|
|
wikipedia_tool = WikipediaToolSpec() |
|
|
search_results = wikipedia_tool.search_data(query, language=language) |
|
|
return search_results |
|
|
except Exception as e: |
|
|
logger.error(f"Wikipedia search failed: {str(e)}") |
|
|
return f"Wikipedia search failed: {str(e)}" |
|
|
|
|
|
def transcribe_youtube_video(video_url: str) -> str: |
|
|
"""Transcribe a YouTube video.""" |
|
|
|
|
|
reader = YoutubeTranscriptReader() |
|
|
transcript = reader.load_data(video_url) |
|
|
|
|
|
return transcript |
|
|
|
|
|
def get_web_tools(): |
|
|
"""Return all available tools for the agent.""" |
|
|
return [ |
|
|
FunctionTool.from_defaults(summarize_websites), |
|
|
FunctionTool.from_defaults(tavily_search), |
|
|
FunctionTool.from_defaults(search_wikipedia), |
|
|
FunctionTool.from_defaults(transcribe_youtube_video), |
|
|
] |