File size: 4,983 Bytes
c3b34bf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional
import aiohttp
import html2text
from griffe import json_decoder
from llama_index.core import Settings, VectorStoreIndex
from llama_index.core.schema import Document
from llama_index.core.tools import FunctionTool
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.tools.tavily_research import TavilyToolSpec
from llama_index.tools.wikipedia import WikipediaToolSpec
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize OpenAI settings
Settings.llm = OpenAI(model="gpt-4o", temperature=0.1)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
async def fetch_and_process(
urls: List[str], session: aiohttp.ClientSession, timeout: int = 10
) -> List[Document]:
"""Fetch and convert webpages to Document objects concurrently."""
async def fetch(url: str) -> Dict[str, str]:
try:
async with session.get(url, timeout=timeout) as response:
return {"text": await response.text(), "url": str(response.url)}
except (asyncio.TimeoutError, aiohttp.ClientError) as e:
logging.warning(f"Could not fetch {url}: {repr(e)}")
return {"text": "", "url": url}
tasks = [fetch(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [
Document(text=html2text.html2text(resp["text"]), id_=resp["url"])
for resp in responses if resp["text"]
]
async def summarize_websites(urls: List[str], query: str) -> List[str]:
"""Summarize a query from content across multiple websites. Even if there is only one website, it will still be used.
Args:
urls: A list of URLs to summarize.
query: The query to summarize.
Returns:
A list of summaries.
"""
logging.info(f"Summarizing {len(urls)} websites for query: {query}")
Settings.llm = OpenAI(model="gpt-4o-mini")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
summaries = []
async with aiohttp.ClientSession() as session:
documents = await fetch_and_process(urls, session)
for doc in documents:
index = VectorStoreIndex.from_documents([doc])
result = index.as_query_engine().query(f"Summarize in very meticulous detail. {query}")
summaries.append(f"Source: {doc.id_} \nContent: {result.response}")
return summaries
def tavily_search(query: str, max_results: Optional[int] = 10) -> List[Dict]:
"""
Tavily search with result formatting.
Args:
query: The query to search for.
max_results: The maximum number of results to return.
Returns:
results: A list of dictionaries containing the results as URLS. Need to be used with the summarize_websites tool.
"""
logger.info(f"Called tavily_search for: {query}")
try:
search_engine = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
search_results = search_engine.search(query, max_results=max_results)
results = []
for document in search_results:
results.append({
"url": document.metadata.get("url", ""),
"content": f"Title: {document.metadata.get('title', '')}\nContent: {document.text}"
})
return results
except Exception as e:
logger.error(f"Tavily search failed: {str(e)}")
return [{"error": f"Search failed: {str(e)}"}]
def search_wikipedia(query: str, language: str = "en") -> str:
"""
Search Wikipedia for specific information. This is a more efficient way to search Wikipedia than the tavily_search tool. Need to be used with the summarize_websites tool.
Args:
query: The search query
language: Wikipedia language code (default: "en")
Returns:
str: Wikipedia content summary
"""
logger.info(f"Searching Wikipedia for: {query}")
try:
wikipedia_tool = WikipediaToolSpec()
search_results = wikipedia_tool.search_data(query, language=language)
return search_results
except Exception as e:
logger.error(f"Wikipedia search failed: {str(e)}")
return f"Wikipedia search failed: {str(e)}"
def transcribe_youtube_video(video_url: str) -> str:
"""Transcribe a YouTube video."""
# get the video url
reader = YoutubeTranscriptReader()
transcript = reader.load_data(video_url)
# return the transcript
return transcript
def get_web_tools():
"""Return all available tools for the agent."""
return [
FunctionTool.from_defaults(summarize_websites),
FunctionTool.from_defaults(tavily_search),
FunctionTool.from_defaults(search_wikipedia),
FunctionTool.from_defaults(transcribe_youtube_video),
] |