Bachir00's picture
Update agent.py
e4bcc27 verified
import json
import time
from datetime import datetime
from langchain_groq import ChatGroq
from langchain.tools import tool
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from typing import TypedDict
from arxiv import Search, Client, SortCriterion
from duckduckgo_search import DDGS
import requests
from bs4 import BeautifulSoup
import re
import os
from langchain.tools import WikipediaQueryRun
from langchain.utilities import WikipediaAPIWrapper
@tool
def wikipedia_search(query: str, language: str = "en") -> str:
"""
Recherche des informations sur Wikipedia.
Args:
query: La requête de recherche
language: Code de langue (par défaut 'en')
"""
try:
wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3)
tool = WikipediaQueryRun(api_wrapper=wikipedia)
return tool.run(query)
except Exception as e:
return f"Erreur Wikipedia: {str(e)}"
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
# === VOTRE CODE AGENT (copié tel quel) ===
@tool
def math_tool(expression: str) -> str:
"""
Évalue une expression mathématique simple donnée sous forme de chaîne de caractères.
Exemple : "2 + 5 * 3"
"""
try:
allowed_chars = set('0123456789+-*/.() ')
if not all(c in allowed_chars for c in expression):
return "Erreur : Expression contient des caractères non autorisés"
result = eval(expression)
return f"Résultat : {result}"
except Exception as e:
return f"Erreur : {str(e)}"
@tool
def search_arxiv(query: str, max_results: int = 3) -> str:
"""
Recherche d'articles scientifiques sur arXiv.
Retourne les titres, auteurs, résumés et liens PDF.
"""
try:
if not query.strip():
return "Erreur : la requête de recherche arXiv est vide."
client = Client(num_retries=3)
search = Search(
query=query,
max_results=max_results,
sort_by=SortCriterion.Relevance
)
response = ""
for result in client.results(search):
response += f" Titre : {result.title.strip()}\n"
response += f" Auteurs : {', '.join([a.name for a in result.authors])}\n"
response += f" Résumé : {result.summary.strip()[:300]}...\n"
response += f" Lien : {result.pdf_url}\n\n"
return response or "Aucun résultat trouvé sur arXiv."
except Exception as e:
return f"Erreur lors de la recherche sur arXiv : {str(e)}"
@tool
def search_web(query: str, max_results: int = 3) -> str:
""" Recherche sur le web en utilisant DuckDuckGo.
Retourne les titres et liens des résultats.
Si la requête est vide, retourne une erreur.
"""
if not query.strip():
return "Erreur : La requête de recherche est vide."
with DDGS() as ddgs:
results = ddgs.text(query, max_results=max_results)
response = ""
for res in results:
response += f"- {res['title']}: {res['href']}\n"
return response or "Aucun résultat trouvé."
@tool
def html_scraper_tool(prompt: str) -> str:
"""
Extrait une URL depuis un prompt texte et scrappe la page correspondante.
Ex : "Scrappe-moi le site : www.google.com"
"""
match = re.search(r'(https?://)?(www\.[^\s]+)', prompt)
if not match:
return " Aucune URL valide trouvée dans le prompt."
url = match.group(0)
if not url.startswith("http"):
url = "https://" + url
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string.strip() if soup.title else "Aucun titre trouvé"
return f" Page title : {title}"
except Exception as e:
return f" Erreur lors du scraping de '{url}' : {str(e)}"
@tool
def wikipedia_search(query: str, language: str = "en") -> str:
"""
Recherche des informations sur Wikipedia.
Args:
query: La requête de recherche
language: Code de langue (par défaut 'en')
"""
try:
print(f"Langue utilisée : {language}")
print(f"Recherche Wikipedia pour : {query}")
wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3)
print("WikipediaAPIWrapper instancié.")
tool = WikipediaQueryRun(api_wrapper=wikipedia)
print("WikipediaQueryRun instancié.")
result = tool.run(query)
print("Résultat obtenu.")
return result
except Exception as e:
print(f"Erreur Wikipedia : {str(e)}")
return f"Erreur Wikipedia: {str(e)}"
@tool
def reverse_text(text: str) -> str:
"""
Inverse une chaîne de caractères.
"""
return text[::-1]
@tool
def process_excel(file_path: str) -> str:
"""
Traite un fichier Excel et extrait des informations.
"""
try:
df = pd.read_excel(file_path)
return df.to_string()
except Exception as e:
return f"Erreur Excel: {str(e)}"
from youtube_transcript_api import YouTubeTranscriptApi
@tool
def get_youtube_transcript(video_url: str) -> str:
"""
Récupère la transcription d'une vidéo YouTube.
"""
try:
video_id = video_url.split("watch?v=")[1]
transcript = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join([entry['text'] for entry in transcript])
except Exception as e:
return f"Erreur YouTube: {str(e)}"
tools = {
"math": math_tool,
"search": search_web,
"arxiv": search_arxiv,
"html_scraper": html_scraper_tool,
"wikipedia": wikipedia_search,
"reverse_text": reverse_text,
"process_excel": process_excel,
"get_youtube_transcript": get_youtube_transcript
}
api_key = os.getenv("API_KEY_GROQ")
if not api_key:
raise ValueError("La variable d'environnement 'API_KEY_GROQ' n'est pas définie.")
llm = ChatGroq(
model="llama-3.1-8b-instant",
temperature=0.7,
max_tokens=1024,
api_key=api_key
)
class AgentState(TypedDict):
input: str
tool: str
processed_input: str
tool_output: str
final_answer: str
def call_llm(state: AgentState) -> AgentState:
"""
Appelle le LLM pour déterminer l'outil approprié.
"""
system_prompt = f"""
Analyze this user request: "{state['input']}"
Available tools:
- 'math': for calculations and mathematical expressions
- 'search': to perform web searches
- 'arxiv': to search for scientific papers on arXiv
- 'html_scraper': to scrape HTML content
- 'wikipedia': to search for information on Wikipedia
- 'reverse_text': to reverse a given text
- 'process_excel': to process Excel files
- 'get_youtube_transcript': to retrieve YouTube video transcripts
Respond **only** with the appropriate tool name.
NO SENTENCES, just the tool name.
"""
try:
response = llm.invoke(system_prompt)
tool_name = response.content.strip().lower()
tool_name = tool_name.strip("'\"")
if tool_name in ['math', 'search', 'arxiv', 'html_scraper', 'wikipedia', 'reverse_text', 'process_excel', 'get_youtube_transcript']:
state['tool'] = tool_name
return state
except Exception as e:
print(f"Erreur lors de l'appel LLM : {e}")
return state
def extract_math_expression(state: AgentState) -> AgentState:
"""
Extrait l'expression mathématique de l'input pour les demandes de type math.
"""
if state['tool'] == 'math':
system_prompt = f"""
Extrayez uniquement l'expression mathématique de cette question : "{state['input']}"
Exemples :
- "Quel est le résultat de 100 / 4 ?" → "100 / 4"
- "Calcule 15 + 27 * 3" → "15 + 27 * 3"
- "Quelle est la racine carrée de 144?" → "144 ** 0.5"
- "2 plus 3 fois 5" → "2 + 3 * 5"
- "Combien font 12 * 8 ?" → "12 * 8"
- "Quel est le résultat de 23 * (5 + 7) ?" → "23 * (5 + 7)"
Répondez uniquement par l'expression mathématique, sans explication.
"""
try:
response = llm.invoke(system_prompt)
math_expression = response.content.strip()
state['processed_input'] = math_expression
except Exception as e:
print(f"Erreur extraction math : {e}")
else:
state['processed_input'] = state['input']
return state
def generate_response(state: AgentState) -> AgentState:
"""
Génère la réponse finale pour l'utilisateur.
"""
system_prompt = f"""
The tool '{state['tool']}' returned: "{state['tool_output']}"
Formulate a clear and natural response for the user .
Integrate the result smoothly into your reply.
"""
try:
response = llm.invoke(system_prompt)
state['final_answer'] = response.content
return state
except Exception as e:
state['final_answer'] = f"Réponse générée avec succès : {state['tool_output']}"
return state
def create_agent_graph():
workflow = StateGraph(AgentState)
workflow.add_node("llm_decision", call_llm)
workflow.add_node("process", extract_math_expression)
workflow.add_node("response_generation", generate_response)
workflow.add_node("math_tool", RunnableLambda(lambda state: {
**state,
"tool_output": tools["math"].invoke(state["processed_input"])
}))
workflow.add_node("search_tool", RunnableLambda(lambda state: {
**state,
"tool_output": tools["search"](state["processed_input"])
}))
workflow.add_node("arxiv_tool", RunnableLambda(lambda state: {
**state,
"tool_output": tools["arxiv"](state["processed_input"])
}))
workflow.add_node("html_scraper_tool", RunnableLambda(lambda state: {
**state,
"tool_output": tools["html_scraper"](state["processed_input"])
}))
workflow.add_node("wikipedia_search", RunnableLambda(lambda state: {
**state,
"tool_output": tools["wikipedia"](state["processed_input"])
}))
workflow.add_node("reverse_text", RunnableLambda(lambda state: {
**state,
"tool_output": tools["reverse_text"](state["processed_input"])
}))
workflow.add_node("process_excel", RunnableLambda(lambda state: {
**state,
"tool_output": tools["process_excel"](state["processed_input"])
}))
workflow.add_node("get_youtube_transcript", RunnableLambda(lambda state: {
**state,
"tool_output": tools["get_youtube_transcript"](state["processed_input"])
}))
workflow.set_entry_point("llm_decision")
workflow.add_edge("llm_decision", "process")
def router(state: AgentState) -> str:
if state["tool"] == "math":
return "math_tool"
elif state["tool"] == "search":
return "search_tool"
elif state["tool"] == "arxiv":
return "arxiv_tool"
elif state["tool"] == "html_scraper":
return "html_scraper_tool"
elif state["tool"] == "wikipedia":
return "wikipedia_search"
elif state["tool"] == "reverse_text":
return "reverse_text"
elif state["tool"] == "process_excel":
return "process_excel"
elif state["tool"] == "get_youtube_transcript":
return "get_youtube_transcript"
workflow.add_conditional_edges("process", router, {
"math_tool": "math_tool",
"search_tool": "search_tool",
"arxiv_tool": "arxiv_tool",
"html_scraper_tool": "html_scraper_tool",
"wikipedia_search": "wikipedia_search",
"reverse_text": "reverse_text",
"process_excel": "process_excel",
"get_youtube_transcript": "get_youtube_transcript"
})
workflow.add_edge("math_tool", "response_generation")
workflow.add_edge("search_tool", "response_generation")
workflow.add_edge("arxiv_tool", "response_generation")
workflow.add_edge("html_scraper_tool", "response_generation")
workflow.add_edge("wikipedia_search", "response_generation")
workflow.add_edge("reverse_text", "response_generation")
workflow.add_edge("process_excel", "response_generation")
workflow.add_edge("get_youtube_transcript", "response_generation")
workflow.add_edge("response_generation", END)
return workflow.compile()
def run_agent(user_input: str) -> str:
"""
Exécute l'agent avec une entrée utilisateur.
"""
agent = create_agent_graph()
initial_state = AgentState(
input=user_input,
tool="",
processed_input="",
tool_output="",
final_answer=""
)
try:
result = agent.invoke(initial_state)
return result['final_answer']
except Exception as e:
return f"Erreur lors de l'exécution : {str(e)}"
# === SCRIPT D'ÉVALUATION ===
def evaluate_agent_on_dataset(input_file_path, output_file_path):
"""
Évalue l'agent sur un dataset de questions et sauvegarde les réponses.
Args:
input_file_path (str): Chemin vers le fichier JSON contenant les questions
output_file_path (str): Chemin vers le fichier de sortie pour les réponses
"""
# Charger les questions depuis le fichier JSON
try:
with open(input_file_path, 'r', encoding='utf-8') as f:
questions_data = json.load(f)
# récupèrer les 5 premières questions
questions_data = questions_data[:-1] # Limiter à 5 questions pour l'évaluation
print(f" Fichier chargé avec succès: {len(questions_data)} questions trouvées")
except FileNotFoundError:
print(f" Erreur: Le fichier {input_file_path} n'a pas été trouvé")
return
except json.JSONDecodeError as e:
print(f" Erreur lors du parsing JSON: {e}")
return
# Préparer la structure des résultats
results = []
start_time = datetime.now()
print(f"\n Début de l'évaluation - {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# Traiter chaque question
for i, item in enumerate(questions_data, 1):
task_id = item.get('task_id', 'unknown')
question = item.get('question', '')
level = item.get('Level', 'unknown')
file_name = item.get('file_name', '')
print(f"\n Question {i}/{len(questions_data)}")
print(f"Task ID: {task_id}")
print(f"Level: {level}")
print(f"Question: {question[:100]}{'...' if len(question) > 100 else ''}")
if file_name:
print(f" Note: Cette question fait référence au fichier: {file_name}")
print(" L'agent ne peut pas traiter les fichiers joints actuellement.")
# Exécuter l'agent
try:
print("🤖 Traitement en cours...")
answer = run_agent(question)
error_message = None
except Exception as e:
print(f" Erreur lors du traitement: {str(e)}")
answer = f"Erreur: {str(e)}"
error_message = str(e)
# Sauvegarder le résultat
result = {
"username": "Bachir00",
"code_agent": "https://huggingface.co/spaces/Bachir00/Final_Assignment_Template/tree/main",
"task_id": task_id,
"submitted_answer": answer,
}
if error_message:
result["error_message"] = error_message
results.append(result)
print(f"Réponse: {answer[:150]}{'...' if len(answer) > 150 else ''}")
# Pause entre les requêtes pour éviter la surcharge
time.sleep(1)
# Sauvegarder tous les résultats
try:
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
end_time = datetime.now()
duration = end_time - start_time
print("\n" + "=" * 60)
print("RÉSUMÉ DE L'ÉVALUATION")
print("=" * 60)
print(f" Total des questions traitées: {len(results)}")
print(f"⏱ Temps total: {duration}")
print(f" Résultats sauvegardés dans: {output_file_path}")
# Statistiques par status
success_count = sum(1 for r in results if r['status'] == 'success')
error_count = len(results) - success_count
print(f" Succès: {success_count}")
print(f" Erreurs: {error_count}")
if error_count > 0:
print(f" Taux de réussite: {success_count/len(results)*100:.1f}%")
# Statistiques par niveau
levels = {}
for result in results:
level = result['level']
if level not in levels:
levels[level] = 0
levels[level] += 1
print(f"\ Répartition par niveau:")
for level, count in sorted(levels.items()):
print(f" Niveau {level}: {count} questions")
print("\n Évaluation terminée avec succès!")
except Exception as e:
print(f" Erreur lors de la sauvegarde: {str(e)}")
# === UTILISATION ===
if __name__ == "__main__":
# Chemins des fichiers
input_file = "response_1748862846167.json" # Remplacez par le chemin de votre fichier JSON
output_file = f"agent_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
# Lancer l'évaluation
evaluate_agent_on_dataset(input_file, output_file)