File size: 4,983 Bytes
c3b34bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional

import aiohttp
import html2text
from griffe import json_decoder
from llama_index.core import Settings, VectorStoreIndex
from llama_index.core.schema import Document
from llama_index.core.tools import FunctionTool
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.tools.tavily_research import TavilyToolSpec
from llama_index.tools.wikipedia import WikipediaToolSpec
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize OpenAI settings
Settings.llm = OpenAI(model="gpt-4o", temperature=0.1)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")



async def fetch_and_process(
    urls: List[str], session: aiohttp.ClientSession, timeout: int = 10
) -> List[Document]:
    """Fetch and convert webpages to Document objects concurrently."""

    async def fetch(url: str) -> Dict[str, str]:
        try:
            async with session.get(url, timeout=timeout) as response:
                return {"text": await response.text(), "url": str(response.url)}
        except (asyncio.TimeoutError, aiohttp.ClientError) as e:
            logging.warning(f"Could not fetch {url}: {repr(e)}")
            return {"text": "", "url": url}

    tasks = [fetch(url) for url in urls]
    responses = await asyncio.gather(*tasks)

    return [
        Document(text=html2text.html2text(resp["text"]), id_=resp["url"])
        for resp in responses if resp["text"]
    ]


async def summarize_websites(urls: List[str], query: str) -> List[str]:
    """Summarize a query from content across multiple websites. Even if there is only one website, it will still be used.
    
    Args:
        urls: A list of URLs to summarize.
        query: The query to summarize.
    Returns:
        A list of summaries.
    """

    logging.info(f"Summarizing {len(urls)} websites for query: {query}")

    Settings.llm = OpenAI(model="gpt-4o-mini")
    Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

    summaries = []

    async with aiohttp.ClientSession() as session:
        documents = await fetch_and_process(urls, session)
        for doc in documents:
            index = VectorStoreIndex.from_documents([doc])
            result = index.as_query_engine().query(f"Summarize in very meticulous detail. {query}")
            summaries.append(f"Source: {doc.id_} \nContent: {result.response}")

    return summaries


def tavily_search(query: str, max_results: Optional[int] = 10) -> List[Dict]:
    """
    Tavily search with result formatting.
    Args:
        query: The query to search for.
        max_results: The maximum number of results to return.
    Returns:
        results: A list of dictionaries containing the results as URLS. Need to be used with the summarize_websites tool.
    """
    logger.info(f"Called tavily_search for: {query}")
    
    try:
        search_engine = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
        search_results = search_engine.search(query, max_results=max_results)

        results = []
        for document in search_results:
            results.append({
                "url": document.metadata.get("url", ""),
                "content": f"Title: {document.metadata.get('title', '')}\nContent: {document.text}"
            })

        return results
    except Exception as e:
        logger.error(f"Tavily search failed: {str(e)}")
        return [{"error": f"Search failed: {str(e)}"}]


def search_wikipedia(query: str, language: str = "en") -> str:
    """
    Search Wikipedia for specific information. This is a more efficient way to search Wikipedia than the tavily_search tool.  Need to be used with the summarize_websites tool.
    Args:
        query: The search query
        language: Wikipedia language code (default: "en")
    Returns:
        str: Wikipedia content summary
    """
    logger.info(f"Searching Wikipedia for: {query}")

    try:
        wikipedia_tool = WikipediaToolSpec()
        search_results = wikipedia_tool.search_data(query, language=language)
        return search_results
    except Exception as e:
        logger.error(f"Wikipedia search failed: {str(e)}")
        return f"Wikipedia search failed: {str(e)}"
    
def transcribe_youtube_video(video_url: str) -> str:
    """Transcribe a YouTube video."""
    # get the video url
    reader = YoutubeTranscriptReader()
    transcript = reader.load_data(video_url)
    # return the transcript
    return transcript

def get_web_tools():
    """Return all available tools for the agent."""
    return [
        FunctionTool.from_defaults(summarize_websites),
        FunctionTool.from_defaults(tavily_search),
        FunctionTool.from_defaults(search_wikipedia),
        FunctionTool.from_defaults(transcribe_youtube_video),
    ]