File size: 12,718 Bytes
398bbe5
15b7f56
bd03a3e
0cb983c
1ae2c4e
398bbe5
efb1996
 
 
bd03a3e
ba25c91
9280a27
5a54222
e8e4ce5
398bbe5
efb1996
1ae2c4e
efb1996
398bbe5
 
efb1996
15b7f56
bd03a3e
 
15b7f56
 
 
 
 
efb1996
 
 
 
 
 
398bbe5
9280a27
 
 
04eda68
9280a27
 
 
 
 
 
 
 
 
 
 
 
 
 
bd03a3e
 
efb1996
 
15b7f56
 
efb1996
15b7f56
efb1996
 
 
 
398bbe5
 
 
bd03a3e
ba25c91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd03a3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba25c91
 
398bbe5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd03a3e
 
 
 
 
 
 
 
 
 
 
 
 
 
0cb983c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ae2c4e
 
 
 
 
 
 
 
 
 
 
 
 
e47bd31
 
1ae2c4e
 
 
e47bd31
 
 
1ae2c4e
 
 
 
 
 
 
 
 
 
 
 
e47bd31
 
 
 
1ae2c4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0568e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a54222
 
 
 
 
 
 
 
 
 
 
 
7d4acc0
 
 
 
ffe7776
 
 
 
 
 
 
 
 
7d4acc0
ffe7776
 
 
 
 
 
 
7d4acc0
ffe7776
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import os, sys
from dotenv import load_dotenv
import requests
import pandas as pd
import base64

from langchain_community.tools import DuckDuckGoSearchRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.tools import WikipediaQueryRun
from langchain_community.document_loaders import WikipediaLoader
import wikipedia
from langchain_tavily import TavilySearch
from langchain_community.document_loaders import ArxivLoader
from langchain_community.document_loaders import YoutubeLoader
from langchain_core.tools import tool
from langchain.tools import Tool
from langchain_core.messages import HumanMessage

# per gestire esecuzione di codice python
import subprocess


DATASET_API_URL = 'https://agents-course-unit4-scoring.hf.space'

load_dotenv()
WIKIPEDIA_TOP_K_RESULTS = int(os.environ.get("WIKIPEDIA_TOP_K_RESULTS"))
WIKIPEDIA_DOC_CONTENT_CHARS_MAX = int(os.environ.get("WIKIPEDIA_DOC_CONTENT_CHARS_MAX"))


def get_search_tool():

    search_tool = DuckDuckGoSearchRun()

    return search_tool


def get_tavily_search_tool():

    tavily_search_tool = TavilySearch(
        max_results=3,
        topic="general",
        # include_answer=False,
        # include_raw_content=False,
        # include_images=False,
        # include_image_descriptions=False,
        # search_depth="basic",
        # time_range="day",
        # include_domains=None,
        # exclude_domains=None
    )

    return tavily_search_tool


# Wikipedia tool 1: usa WikipediaQueryRun dal package 'langchain_community.tools'
# problema: sembra ottenere solo i summary
def get_wikipedia_tool():

    #print("WIKIPEDIA_TOP_K_RESULTS:{}, WIKIPEDIA_DOC_CONTENT_CHARS_MAX:{}".format(WIKIPEDIA_TOP_K_RESULTS, WIKIPEDIA_DOC_CONTENT_CHARS_MAX))

    # creates an instance of the Wikipedia API wrapper. top_k_results=1 means it will only fetch the top result from Wikipedia
    wikipedia_api_wrapper = WikipediaAPIWrapper(top_k_results=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX)

    # converts the WikipediaAPIWrapper into a LangChain tool.
    wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper)

    return wikipedia_tool


# Wikipedia tool 2: utilizza direttamente il package 'wikipedia'
@tool
def wikipedia_search(query: str) -> str:
    """
    Search Wikipedia and return the full content of the most relevant article.
    """
    try:
        results = wikipedia.search(query)
        if not results:
            return f"No results found for '{query}'."

        page = wikipedia.page(results[0])
        content = page.content

        # Truncate content if it's too long
        if len(content) > WIKIPEDIA_DOC_CONTENT_CHARS_MAX:
            content = content[:WIKIPEDIA_DOC_CONTENT_CHARS_MAX] + "..."

        return content

    except wikipedia.exceptions.DisambiguationError as e:
        return f"Ambiguous query. Possible options: {', '.join(e.options[:5])}..."
    except wikipedia.exceptions.PageError:
        return f"Page not found for '{query}'."
    except Exception as e:
        return f"Error occurred: {str(e)}"


# Wikipedia tool 3: utilizza WikipediaLoader dla package 'langchain_community.document_loaders' 
@tool
def wikipedia_search_3(query: str) -> str:
    """
    Search Wikipedia and return the full content of the most relevant articles.
    Args:
        query: The search query.
    """
    search_docs = WikipediaLoader(query=query,
                                  load_max_docs=WIKIPEDIA_TOP_K_RESULTS,
                                  doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX,
                                  load_all_available_meta=True).load()
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ])
    return {"wiki_results": formatted_search_docs}



@tool
def execute_python_code_from_file(file_path: str) -> str:
    """
    Reads a Python file from the given path, executes its code, and returns the combined stdout and stderr.
    WARNING: Executing arbitrary code from files is a significant security risk.
    Only use this tool with trusted code in a controlled environment.
    """
    if not os.path.exists(file_path):
        return f"Error: File not found at '{file_path}'."
    
    if not file_path.endswith(".py"):
        return f"Error: Provided file '{file_path}' is not a Python (.py) file."

    try:
        # Use subprocess to run the Python file in a new process.
        # This provides some isolation compared to 'exec()' but is still dangerous for untrusted code.
        result = subprocess.run(
            [sys.executable, file_path], # sys.executable ensures it uses the current Python interpreter
            capture_output=True,         # Capture stdout and stderr
            text=True,                   # Capture output as text (strings)
            check=False                  # Do not raise an exception for non-zero exit codes (handle errors manually)
        )

        stdout_output = result.stdout.strip()
        stderr_output = result.stderr.strip()
        
        output_lines = []
        if stdout_output:
            output_lines.append(f"STDOUT:\n{stdout_output}")
        if stderr_output:
            output_lines.append(f"STDERR:\n{stderr_output}")
        
        if result.returncode != 0:
            output_lines.append(f"Process exited with code {result.returncode}. This usually indicates an error.")
        
        if not output_lines:
            return "Execution completed with no output."

        return "\n".join(output_lines)

    except Exception as e:
        return f"An unexpected error occurred during code execution: {e}"


@tool
def download_taskid_file(task_id: str, file_name: str) -> str:
    """
    Downloads the file associated with the given task_id (if any). Returns the absolute path of the file
    """
    try:
        response = requests.get(f"{DATASET_API_URL}/files/{task_id}", timeout=20)
        response.raise_for_status()
        with open(file_name, 'wb') as file:
            file.write(response.content)
        return os.path.abspath(file_name)
    except Exception as e:
        return "Error occurred: {}".format(e)


@tool
def analyze_excel_file(file_path: str, query: str) -> str:
    """
    Analyzes an Excel (.xlsx) file using pandas.
    Loads the specified Excel file into a pandas DataFrame and executes a Python query against it.
    The query should be a valid pandas DataFrame operation (e.g., df.head(), df.describe(),
    df[df['column_name'] > 10], df.groupby('category')['value'].mean()).
    Returns the result of the query as a string (JSON or string representation).
    """
    if not os.path.exists(file_path):
        return f"Error: File not found at {file_path}"
    
    try:
        df = pd.read_excel(file_path)
        
        # Make the DataFrame accessible for the query
        local_vars = {"df": df}
        
        # Execute the query
        # IMPORTANT: Be extremely cautious with eval/exec for user-provided input in a production system.
        # For a ReAct agent, the LLM generates this query, so it's generally safer
        # if the LLM is well-constrained and reliable.
        # For sensitive applications, consider a safer parsing mechanism or a restricted set of operations.
        result = eval(query, {}, local_vars)
        
        return str(result) # Convert result to string for the LLM
    except Exception as e:
        return f"Error analyzing Excel file: {e}"


def get_analyze_mp3_tool(llm):

    @tool
    def analyze_mp3_file(audio_path: str) -> str:
        """
        Extract text from an mp3 audio file.
        """
        all_text = ""
        try:
            # Read audio and encode as base64
            with open(audio_path, "rb") as audio_file:
                audio_bytes = audio_file.read()

            audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")

            # Determine the MIME type for MP3
            audio_mime_type = "audio/mpeg" # Or "audio/mp3", "audio/wav" etc. for other formats

            # Prepare the prompt including the base64 image data
            message = [
                HumanMessage(
                    content=[
                        {
                            "type": "text",
                            "text": (
                                "Extract all the text from this audio. "
                                "Return only the extracted text, no explanations."
                            ),
                        },
                        {
                            "type": "media",              # <--- CORRECTED: Use 'media' type
                            "data": audio_base64,         # <--- Use 'data' for the base64 content
                            "mime_type": audio_mime_type, # <--- Specify the MIME type
                        }
                    ]
                )
            ]

            # Call the vision-capable model
            response = llm.invoke(message)

            # Append extracted text
            all_text += response.content + "\n\n"

            return all_text.strip()
        except Exception as e:
            print("Error extracting text from audio file:{} - {}".format(audio_path, e))
            return ""
    
    return analyze_mp3_file


def get_analyze_image_tool(llm):
    @tool
    def analyze_png_image(image_path: str) -> str:
        """
        Analyzes a PNG image and returns a detailed description of its content.
        This tool requires an LLM capable of processing images, such as Gemini 1.5 Pro or Gemini 2.0 Flash.
        """
        try:
            # Read image and encode as base64
            with open(image_path, "rb") as image_file:
                image_bytes = image_file.read()

            image_base64 = base64.b64encode(image_bytes).decode("utf-8")

            # Prepare the prompt including the base64 image data
            message = [
                HumanMessage(
                    content=[
                        {
                            "type": "text",
                            "text": (
                                "Provide a very detailed description of the content of this image. "
                                "Focus on objects, people, actions, text, and overall scene context. "
                                "Be as comprehensive as possible."
                            ),
                        },
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{image_base64}"},
                        },
                    ]
                )
            ]

            # Call the vision-capable model
            response = llm.invoke(message)

            return response.content.strip()
        except Exception as e:
            print("Error analyzing image file:{} - {}".format(image_path, e))
            return ""
    
    return analyze_png_image


@tool
def arxiv_search(query: str) -> str:
    """Search Arxiv for a query and return maximum 3 result.
    Args:
        query: The search query."""
    search_docs = ArxivLoader(query=query, load_max_docs=3).load()
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
            for doc in search_docs
        ]
    )
    return {"arxiv_results": formatted_search_docs}


@tool
def get_youtube_transcript(url: str) -> dict:
    """Fetches the transcript from a YouTube video URL.

    Args:
        url: The URL of the YouTube video.

    Returns:
        A dictionary containing the transcript and metadata.
        The dictionary will have keys "transcript" (string, the video transcript or an error message) and "metadata" (dictionary, containing video title and other information, if available, otherwise empty).
    """
    try:
        loader = YoutubeLoader.from_youtube_url(url, add_video_info=True)
        docs = loader.load()

        # Combine all transcript chunks into a single string
        transcript = "\n".join(doc.page_content for doc in docs)
        metadata = docs[0].metadata if docs else {}

        return {"transcript": transcript, "metadata": metadata}
    except Exception as e:
        if "Could not retrieve transcript" in str(e):
            return {"transcript": "No transcript available for this video.", "metadata": {}}
        else:
            return {"transcript": f"Error fetching transcript: {e}", "metadata": {}}