File size: 6,661 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import requests
import os
from typing import Dict, Any, Optional, List
from .search_base import SearchBase
from .tool import Tool,Toolkit
from evoagentx.core.logging import logger
import dotenv
dotenv.load_dotenv()

class SearchGoogle(SearchBase):

    def __init__(
        self, 
        name: str = 'SearchGoogle',
        num_search_pages: Optional[int] = 5, 
        max_content_words: Optional[int] = None,
        **kwargs
    ):
        """
        Initialize the Google Search tool.
        
        Args:
            name (str): The name of the search tool
            num_search_pages (int): Number of search results to retrieve
            max_content_words (int, optional): Maximum number of words to include in content, None means no limit
            **kwargs: Additional data to pass to the parent class
        """
        # Pass these to the parent class initialization
        super().__init__(name=name, num_search_pages=num_search_pages, max_content_words=max_content_words, **kwargs)
    
    def search(self, query: str, num_search_pages: int = None, max_content_words: int = None) -> Dict[str, Any]:
        """
        Search Google using the Custom Search API and retrieve detailed search results with content snippets.
        
        Args:
            query (str): The search query to execute on Google
            num_search_pages (int): Number of search results to retrieve
            max_content_words (int): Maximum number of words to include in content, None means no limit
            
        Returns:
            Dict[str, Any]: Contains search results and optional error message
        """
        num_search_pages = num_search_pages or self.num_search_pages
        max_content_words = max_content_words or self.max_content_words
        results = []
        
        # Get API credentials from environment variables
        api_key = os.getenv('GOOGLE_API_KEY', '')
        search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID', '')
        
        # print(f"api_key: {api_key}")
        # print(f"search_engine_id: {search_engine_id}")    
            
        if not api_key or not search_engine_id:
            error_msg = (
                "API key and search engine ID are required. "
                "Please set GOOGLE_API_KEY and GOOGLE_SEARCH_ENGINE_ID environment variables. "
                "You can get these from the Google Cloud Console: https://console.cloud.google.com/apis/"
            )
            logger.error(error_msg)
            return {"results": [], "error": error_msg}
        
        try:
            # Step 1: Query Google Custom Search API
            logger.info(f"Searching Google for: {query}, num_results={num_search_pages}, max_content_words={max_content_words}")
            search_url = "https://www.googleapis.com/customsearch/v1"
            params = {
                "key": api_key,
                "cx": search_engine_id,
                "q": query,
                "num": num_search_pages,
            }
            response = requests.get(search_url, params=params)
            data = response.json()

            if "items" not in data:
                return {"results": [], "error": "No search results found."}

            search_results = data["items"]
            logger.info(f"Found {len(search_results)} search results")

            # Step 2: Fetch content from each valid search result
            for item in search_results:
                url = item["link"]
                title = item["title"]
                try:
                    title, content = self._scrape_page(url)
                    if content:  # Ensure valid content exists
                        # Use the base class's content truncation method
                        display_content = self._truncate_content(content, max_content_words)
                        
                        results.append({
                            "title": title,
                            "content": display_content,
                            "url": url,
                        })
                except Exception as e:
                    logger.warning(f"Error processing URL {url}: {str(e)}")
                    continue  # Skip pages that cannot be processed

            return {"results": results, "error": None}

        except Exception as e:
            logger.error(f"Error searching Google: {str(e)}")
            return {"results": [], "error": str(e)}


class GoogleSearchTool(Tool):
    name: str = "google_search"
    description: str = "Search Google using the Custom Search API and retrieve content from search results"
    inputs: Dict[str, Dict[str, str]] = {
        "query": {
            "type": "string",
            "description": "The search query to execute on Google"
        },
        "num_search_pages": {
            "type": "integer",
            "description": "Number of search results to retrieve. Default: 5"
        },
        "max_content_words": {
            "type": "integer",
            "description": "Maximum number of words to include in content per result. None means no limit. Default: None"
        }
    }
    required: Optional[List[str]] = ["query"]
    
    def __init__(self, search_google: SearchGoogle = None):
        super().__init__()
        self.search_google = search_google
    
    def __call__(self, query: str, num_search_pages: int = None, max_content_words: int = None) -> Dict[str, Any]:
        """Execute Google search using the SearchGoogle instance."""
        if not self.search_google:
            raise RuntimeError("Google search instance not initialized")
        
        try:
            return self.search_google.search(query, num_search_pages, max_content_words)
        except Exception as e:
            return {"results": [], "error": f"Error executing Google search: {str(e)}"}


class GoogleSearchToolkit(Toolkit):
    def __init__(
        self,
        name: str = "GoogleSearchToolkit",
        num_search_pages: Optional[int] = 5,
        max_content_words: Optional[int] = None,
        **kwargs
    ):
        # Create the shared Google search instance
        search_google = SearchGoogle(
            name="SearchGoogle",
            num_search_pages=num_search_pages,
            max_content_words=max_content_words,
            **kwargs
        )
        
        # Create tools with the shared search instance
        tools = [
            GoogleSearchTool(search_google=search_google)
        ]
        
        # Initialize parent with tools
        super().__init__(name=name, tools=tools)
        
        # Store search_google as instance variable
        self.search_google = search_google