|
|
import os |
|
|
import openai |
|
|
import wikipedia |
|
|
from tavily import TavilyClient |
|
|
import arxiv |
|
|
import json |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") |
|
|
|
|
|
if not os.getenv("OPENAI_API_KEY"): |
|
|
print("ATTENZIONE: La variabile d'ambiente OPENAI_API_KEY non è impostata.") |
|
|
print("L'agente potrebbe non funzionare correttamente senza una chiave API OpenAI valida.") |
|
|
|
|
|
if not TAVILY_API_KEY: |
|
|
print("ATTENZIONE: La variabile d'ambiente TAVILY_API_KEY non è impostata.") |
|
|
print("La ricerca Tavily non funzionerà senza una chiave API valida.") |
|
|
|
|
|
|
|
|
MAX_WIKIPEDIA_RESULTS = 3 |
|
|
MAX_TAVILY_RESULTS = 3 |
|
|
MAX_ARXIV_RESULTS = 2 |
|
|
MAX_TOOL_ITERATIONS = 6 |
|
|
|
|
|
class GaiaAgent: |
|
|
def __init__(self, model_name="o4-mini"): |
|
|
self.model_name = model_name |
|
|
self.openai_client = openai.OpenAI() |
|
|
if TAVILY_API_KEY: |
|
|
self.tavily_client = TavilyClient(api_key=TAVILY_API_KEY) |
|
|
else: |
|
|
self.tavily_client = None |
|
|
print("Cliente Tavily non inizializzato a causa della mancanza di TAVILY_API_KEY.") |
|
|
|
|
|
print(f"GaiaAgent initialized with model: {self.model_name}") |
|
|
if not self.openai_client.api_key: |
|
|
print("WARNING: OpenAI API key not found by the client. Ensure OPENAI_API_KEY is set.") |
|
|
|
|
|
|
|
|
def _call_openai_api(self, messages, tools=None, tool_choice=None): |
|
|
try: |
|
|
response = self.openai_client.chat.completions.create( |
|
|
model=self.model_name, |
|
|
messages=messages, |
|
|
tools=tools, |
|
|
tool_choice=tool_choice, |
|
|
|
|
|
) |
|
|
return response |
|
|
except openai.APIError as e: |
|
|
print(f"Errore API OpenAI: {e}") |
|
|
return f"Errore durante la chiamata all'API OpenAI: {str(e)}" |
|
|
except Exception as e: |
|
|
print(f"Errore imprevisto durante la chiamata OpenAI API: {e}") |
|
|
return f"Errore imprevisto durante la chiamata OpenAI API: {str(e)}" |
|
|
|
|
|
|
|
|
def _execute_python_code(self, code_string: str) -> str: |
|
|
print(f"Esecuzione codice Python (primi 200 caratteri): {code_string[:200]}...") |
|
|
try: |
|
|
code_string = re.sub(r"^```python\n", "", code_string) |
|
|
code_string = re.sub(r"\n```$", "", code_string) |
|
|
code_string = code_string.strip() |
|
|
import io |
|
|
from contextlib import redirect_stdout |
|
|
f = io.StringIO() |
|
|
with redirect_stdout(f): |
|
|
exec(code_string, {}) |
|
|
s = f.getvalue() |
|
|
if not s: |
|
|
s = "Codice eseguito con successo, nessun output esplicito (print)." |
|
|
print(f"Esecuzione codice riuscita. Output (primi 200 caratteri): {s[:200]}") |
|
|
return s |
|
|
except Exception as e: |
|
|
print(f"Errore durante l'esecuzione del codice Python: {e}") |
|
|
return f"Errore nell'esecuzione del codice: {str(e)}" |
|
|
|
|
|
def _search_wikipedia(self, query: str) -> str: |
|
|
print(f"Ricerca Wikipedia per: {query}") |
|
|
try: |
|
|
wikipedia.set_lang("en") |
|
|
results = wikipedia.search(query, results=MAX_WIKIPEDIA_RESULTS) |
|
|
if not results: |
|
|
return "Nessun risultato trovato su Wikipedia." |
|
|
summaries = [] |
|
|
for res_title in results: |
|
|
try: |
|
|
page = wikipedia.page(res_title, auto_suggest=False, preload=True) |
|
|
summary_text = page.summary.replace("\n", " ")[:700] |
|
|
summaries.append(f"Title: {page.title}\nSummary: {summary_text}...") |
|
|
except wikipedia.exceptions.DisambiguationError as e: |
|
|
options = ", ".join(e.options[:3]) |
|
|
summaries.append(f"Pagina di disambiguazione per '{res_title}': Le opzioni includono {options}. Prova una query più specifica.") |
|
|
except wikipedia.exceptions.PageError: |
|
|
summaries.append(f"Pagina '{res_title}' non trovata su Wikipedia.") |
|
|
except Exception as e_page: |
|
|
summaries.append(f"Errore nel recuperare la pagina '{res_title}': {str(e_page)}") |
|
|
return "\n\n".join(summaries) if summaries else "Nessun sommario trovato per i risultati di Wikipedia." |
|
|
except Exception as e: |
|
|
return f"Errore durante la ricerca su Wikipedia: {str(e)}" |
|
|
|
|
|
def _search_tavily(self, query: str) -> str: |
|
|
print(f"Ricerca Tavily per: {query}") |
|
|
if not self.tavily_client: |
|
|
return "Errore: Tavily API key non configurata. Impossibile eseguire la ricerca." |
|
|
try: |
|
|
|
|
|
|
|
|
response = self.tavily_client.search( |
|
|
query=query, |
|
|
search_depth="basic", |
|
|
max_results=MAX_TAVILY_RESULTS, |
|
|
include_answer=True |
|
|
) |
|
|
|
|
|
results_str = "" |
|
|
if response.get("answer"): |
|
|
results_str += f"Risposta diretta da Tavily: {response['answer']}\n\n" |
|
|
|
|
|
if response.get("results"): |
|
|
results_str += "Risultati della ricerca:\n" |
|
|
for r in response["results"]: |
|
|
results_str += f"Title: {r.get('title', 'N/A')}\nURL: {r.get('url', 'N/A')}\nContent Snippet: {r.get('content', 'N/A')[:500]}...\n\n" |
|
|
|
|
|
return results_str if results_str else "Nessun risultato significativo trovato da Tavily." |
|
|
|
|
|
except Exception as e: |
|
|
return f"Errore durante la ricerca su Tavily: {str(e)}" |
|
|
|
|
|
def _search_arxiv(self, query: str) -> str: |
|
|
print(f"Ricerca Arxiv per: {query}") |
|
|
try: |
|
|
search = arxiv.Search( |
|
|
query=query, |
|
|
max_results=MAX_ARXIV_RESULTS, |
|
|
sort_by=arxiv.SortCriterion.Relevance |
|
|
) |
|
|
client = arxiv.Client() |
|
|
results_data = [] |
|
|
for r in client.results(search): |
|
|
authors = ", ".join([str(a) for a in r.authors]) |
|
|
results_data.append(f"Title: {r.title}\nAuthors: {authors}\nPublished: {r.published.date()}\nSummary: {r.summary.replace_summary_newline_chars_with_spaces()[:700]}...\nLink: {r.pdf_url}") |
|
|
return "\n\n".join(results_data) if results_data else "Nessun risultato trovato su Arxiv." |
|
|
except Exception as e: |
|
|
return f"Errore durante la ricerca su Arxiv: {str(e)}" |
|
|
|
|
|
def __call__(self, question: str) -> str: |
|
|
print(f"GaiaAgent ha ricevuto la domanda (primi 100 caratteri): {question[:100]}...") |
|
|
if not self.openai_client.api_key: |
|
|
return "Errore: OPENAI_API_KEY non configurata o non valida per l'agente." |
|
|
if not self.tavily_client: |
|
|
print("Avviso: Tavily client non inizializzato. La ricerca web non sarà disponibile.") |
|
|
|
|
|
|
|
|
tools = [ |
|
|
{"type": "function", "function": { |
|
|
"name": "search_wikipedia", |
|
|
"description": "Cerca informazioni su Wikipedia. Utile per conoscenza generale, fatti, definizioni, storia.", |
|
|
"parameters": {"type": "object", "properties": {"query": {"type": "string", "description": "La query di ricerca per Wikipedia."}}, "required": ["query"]}}}, |
|
|
{"type": "function", "function": { |
|
|
"name": "search_tavily", |
|
|
"description": "Cerca sul web usando l'API di Tavily. Utile per eventi attuali, informazioni specifiche, siti web, o quando Wikipedia non è sufficiente. Può fornire risposte dirette e snippet di contenuto.", |
|
|
"parameters": {"type": "object", "properties": {"query": {"type": "string", "description": "La query di ricerca per Tavily."}}, "required": ["query"]}}}, |
|
|
{"type": "function", "function": { |
|
|
"name": "search_arxiv", |
|
|
"description": "Cerca su Arxiv articoli scientifici e preprint (fisica, matematica, informatica, ecc.).", |
|
|
"parameters": {"type": "object", "properties": {"query": {"type": "string", "description": "La query di ricerca per Arxiv (es. autore, titolo, parole chiave)."}}, "required": ["query"]}}}, |
|
|
{"type": "function", "function": { |
|
|
"name": "execute_python_code", |
|
|
"description": "Esegue una stringa di codice Python e restituisce il suo output. Usare per calcoli, manipolazione dati o qualsiasi task che richieda esecuzione di codice. Assicurarsi che il codice stampi il risultato su stdout. Il codice viene eseguito in un ambiente stateless.", |
|
|
"parameters": {"type": "object", "properties": {"code_string": {"type": "string", "description": "Il codice Python da eseguire. Esempio: 'print(1+1)'"}}, "required": ["code_string"]}}} |
|
|
] |
|
|
|
|
|
system_prompt = ( |
|
|
"You are a general AI assistant that uses the tools available. I will ask you a question. You must think and output only the exact answer to the question with no comments, so your final answer must be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. If you are asked for a code number, give the code number nothing else. Do not add a dot at the end of the answer. Pay attention at the question and at the expected output." |
|
|
) |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": question}] |
|
|
|
|
|
for iteration in range(MAX_TOOL_ITERATIONS): |
|
|
print(f"Agente: Chiamata OpenAI. Iterazione {iteration + 1}. Ultimo messaggio: {messages[-1]['role']}") |
|
|
|
|
|
api_response_or_error = self._call_openai_api(messages, tools=tools, tool_choice="auto") |
|
|
|
|
|
if isinstance(api_response_or_error, str): |
|
|
return api_response_or_error |
|
|
|
|
|
response_message = api_response_or_error.choices[0].message |
|
|
|
|
|
if response_message.tool_calls: |
|
|
print(f"OpenAI suggerisce chiamate a strumenti: {[tc.function.name for tc in response_message.tool_calls]}") |
|
|
messages.append(response_message) |
|
|
|
|
|
for tool_call in response_message.tool_calls: |
|
|
function_name = tool_call.function.name |
|
|
try: |
|
|
function_args = json.loads(tool_call.function.arguments) |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"Errore nel decodificare gli argomenti JSON per {function_name}: {e}") |
|
|
tool_output = f"Errore: argomenti JSON invalidi per {function_name}." |
|
|
messages.append({"tool_call_id": tool_call.id, "role": "tool", "name": function_name, "content": tool_output}) |
|
|
continue |
|
|
|
|
|
print(f"Esecuzione strumento: {function_name} con argomenti: {function_args}") |
|
|
tool_output = "" |
|
|
if function_name == "search_wikipedia": |
|
|
tool_output = self._search_wikipedia(query=function_args.get("query","")) |
|
|
elif function_name == "search_tavily": |
|
|
if not self.tavily_client: |
|
|
tool_output = "Errore: Tavily client non inizializzato. Impossibile eseguire la ricerca Tavily." |
|
|
else: |
|
|
tool_output = self._search_tavily(query=function_args.get("query","")) |
|
|
elif function_name == "search_arxiv": |
|
|
tool_output = self._search_arxiv(query=function_args.get("query","")) |
|
|
elif function_name == "execute_python_code": |
|
|
tool_output = self._execute_python_code(code_string=function_args.get("code_string","")) |
|
|
else: |
|
|
tool_output = f"Strumento sconosciuto: {function_name}" |
|
|
|
|
|
print(f"Output strumento {function_name} (primi 100 caratteri): {str(tool_output)[:100]}") |
|
|
messages.append({"tool_call_id": tool_call.id, "role": "tool", "name": function_name, "content": str(tool_output)}) |
|
|
|
|
|
else: |
|
|
final_answer = response_message.content |
|
|
if final_answer: |
|
|
print(f"Agente restituisce risposta finale (primi 200 caratteri): {final_answer[:200]}") |
|
|
return final_answer |
|
|
else: |
|
|
print("L'LLM non ha chiamato strumenti e non ha fornito contenuto. Tento di forzare una risposta.") |
|
|
messages.append({"role": "user", "content": "Please provide the best possible answer based on the information you have gathered so far, without using any other tools."}) |
|
|
final_attempt_response = self._call_openai_api(messages) |
|
|
if isinstance(final_attempt_response, str): return final_attempt_response |
|
|
if final_attempt_response and final_attempt_response.choices[0].message.content: |
|
|
return final_attempt_response.choices[0].message.content |
|
|
return "L'agente ha ricevuto una risposta finale vuota e non è riuscito a generarne una alternativa." |
|
|
|
|
|
print("L'agente ha raggiunto il numero massimo di iterazioni degli strumenti.") |
|
|
messages.append({"role": "user", "content": "You have reached your tool usage limit. Please provide the best possible answer based on the information you have gathered so far."}) |
|
|
final_summary_response = self._call_openai_api(messages) |
|
|
if isinstance(final_summary_response, str): return final_summary_response |
|
|
if final_summary_response and final_summary_response.choices[0].message.content: |
|
|
return final_summary_response.choices[0].message.content |
|
|
return "L'agente ha raggiunto il numero massimo di iterazioni e non è riuscito a formulare una risposta finale." |