import json import time from datetime import datetime from langchain_groq import ChatGroq from langchain.tools import tool from langgraph.graph import StateGraph, END from langchain_core.runnables import RunnableLambda from typing import TypedDict from arxiv import Search, Client, SortCriterion from duckduckgo_search import DDGS import requests from bs4 import BeautifulSoup import re import os from langchain.tools import WikipediaQueryRun from langchain.utilities import WikipediaAPIWrapper @tool def wikipedia_search(query: str, language: str = "en") -> str: """ Recherche des informations sur Wikipedia. Args: query: La requête de recherche language: Code de langue (par défaut 'en') """ try: wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3) tool = WikipediaQueryRun(api_wrapper=wikipedia) return tool.run(query) except Exception as e: return f"Erreur Wikipedia: {str(e)}" import pandas as pd from dotenv import load_dotenv load_dotenv() # === VOTRE CODE AGENT (copié tel quel) === @tool def math_tool(expression: str) -> str: """ Évalue une expression mathématique simple donnée sous forme de chaîne de caractères. Exemple : "2 + 5 * 3" """ try: allowed_chars = set('0123456789+-*/.() ') if not all(c in allowed_chars for c in expression): return "Erreur : Expression contient des caractères non autorisés" result = eval(expression) return f"Résultat : {result}" except Exception as e: return f"Erreur : {str(e)}" @tool def search_arxiv(query: str, max_results: int = 3) -> str: """ Recherche d'articles scientifiques sur arXiv. Retourne les titres, auteurs, résumés et liens PDF. """ try: if not query.strip(): return "Erreur : la requête de recherche arXiv est vide." client = Client(num_retries=3) search = Search( query=query, max_results=max_results, sort_by=SortCriterion.Relevance ) response = "" for result in client.results(search): response += f" Titre : {result.title.strip()}\n" response += f" Auteurs : {', '.join([a.name for a in result.authors])}\n" response += f" Résumé : {result.summary.strip()[:300]}...\n" response += f" Lien : {result.pdf_url}\n\n" return response or "Aucun résultat trouvé sur arXiv." except Exception as e: return f"Erreur lors de la recherche sur arXiv : {str(e)}" @tool def search_web(query: str, max_results: int = 3) -> str: """ Recherche sur le web en utilisant DuckDuckGo. Retourne les titres et liens des résultats. Si la requête est vide, retourne une erreur. """ if not query.strip(): return "Erreur : La requête de recherche est vide." with DDGS() as ddgs: results = ddgs.text(query, max_results=max_results) response = "" for res in results: response += f"- {res['title']}: {res['href']}\n" return response or "Aucun résultat trouvé." @tool def html_scraper_tool(prompt: str) -> str: """ Extrait une URL depuis un prompt texte et scrappe la page correspondante. Ex : "Scrappe-moi le site : www.google.com" """ match = re.search(r'(https?://)?(www\.[^\s]+)', prompt) if not match: return " Aucune URL valide trouvée dans le prompt." url = match.group(0) if not url.startswith("http"): url = "https://" + url try: response = requests.get(url, timeout=5) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string.strip() if soup.title else "Aucun titre trouvé" return f" Page title : {title}" except Exception as e: return f" Erreur lors du scraping de '{url}' : {str(e)}" @tool def wikipedia_search(query: str, language: str = "en") -> str: """ Recherche des informations sur Wikipedia. Args: query: La requête de recherche language: Code de langue (par défaut 'en') """ try: print(f"Langue utilisée : {language}") print(f"Recherche Wikipedia pour : {query}") wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3) print("WikipediaAPIWrapper instancié.") tool = WikipediaQueryRun(api_wrapper=wikipedia) print("WikipediaQueryRun instancié.") result = tool.run(query) print("Résultat obtenu.") return result except Exception as e: print(f"Erreur Wikipedia : {str(e)}") return f"Erreur Wikipedia: {str(e)}" @tool def reverse_text(text: str) -> str: """ Inverse une chaîne de caractères. """ return text[::-1] @tool def process_excel(file_path: str) -> str: """ Traite un fichier Excel et extrait des informations. """ try: df = pd.read_excel(file_path) return df.to_string() except Exception as e: return f"Erreur Excel: {str(e)}" from youtube_transcript_api import YouTubeTranscriptApi @tool def get_youtube_transcript(video_url: str) -> str: """ Récupère la transcription d'une vidéo YouTube. """ try: video_id = video_url.split("watch?v=")[1] transcript = YouTubeTranscriptApi.get_transcript(video_id) return " ".join([entry['text'] for entry in transcript]) except Exception as e: return f"Erreur YouTube: {str(e)}" tools = { "math": math_tool, "search": search_web, "arxiv": search_arxiv, "html_scraper": html_scraper_tool, "wikipedia": wikipedia_search, "reverse_text": reverse_text, "process_excel": process_excel, "get_youtube_transcript": get_youtube_transcript } api_key = os.getenv("API_KEY_GROQ") if not api_key: raise ValueError("La variable d'environnement 'API_KEY_GROQ' n'est pas définie.") llm = ChatGroq( model="llama-3.1-8b-instant", temperature=0.7, max_tokens=1024, api_key=api_key ) class AgentState(TypedDict): input: str tool: str processed_input: str tool_output: str final_answer: str def call_llm(state: AgentState) -> AgentState: """ Appelle le LLM pour déterminer l'outil approprié. """ system_prompt = f""" Analyze this user request: "{state['input']}" Available tools: - 'math': for calculations and mathematical expressions - 'search': to perform web searches - 'arxiv': to search for scientific papers on arXiv - 'html_scraper': to scrape HTML content - 'wikipedia': to search for information on Wikipedia - 'reverse_text': to reverse a given text - 'process_excel': to process Excel files - 'get_youtube_transcript': to retrieve YouTube video transcripts Respond **only** with the appropriate tool name. NO SENTENCES, just the tool name. """ try: response = llm.invoke(system_prompt) tool_name = response.content.strip().lower() tool_name = tool_name.strip("'\"") if tool_name in ['math', 'search', 'arxiv', 'html_scraper', 'wikipedia', 'reverse_text', 'process_excel', 'get_youtube_transcript']: state['tool'] = tool_name return state except Exception as e: print(f"Erreur lors de l'appel LLM : {e}") return state def extract_math_expression(state: AgentState) -> AgentState: """ Extrait l'expression mathématique de l'input pour les demandes de type math. """ if state['tool'] == 'math': system_prompt = f""" Extrayez uniquement l'expression mathématique de cette question : "{state['input']}" Exemples : - "Quel est le résultat de 100 / 4 ?" → "100 / 4" - "Calcule 15 + 27 * 3" → "15 + 27 * 3" - "Quelle est la racine carrée de 144?" → "144 ** 0.5" - "2 plus 3 fois 5" → "2 + 3 * 5" - "Combien font 12 * 8 ?" → "12 * 8" - "Quel est le résultat de 23 * (5 + 7) ?" → "23 * (5 + 7)" Répondez uniquement par l'expression mathématique, sans explication. """ try: response = llm.invoke(system_prompt) math_expression = response.content.strip() state['processed_input'] = math_expression except Exception as e: print(f"Erreur extraction math : {e}") else: state['processed_input'] = state['input'] return state def generate_response(state: AgentState) -> AgentState: """ Génère la réponse finale pour l'utilisateur. """ system_prompt = f""" The tool '{state['tool']}' returned: "{state['tool_output']}" Formulate a clear and natural response for the user . Integrate the result smoothly into your reply. """ try: response = llm.invoke(system_prompt) state['final_answer'] = response.content return state except Exception as e: state['final_answer'] = f"Réponse générée avec succès : {state['tool_output']}" return state def create_agent_graph(): workflow = StateGraph(AgentState) workflow.add_node("llm_decision", call_llm) workflow.add_node("process", extract_math_expression) workflow.add_node("response_generation", generate_response) workflow.add_node("math_tool", RunnableLambda(lambda state: { **state, "tool_output": tools["math"].invoke(state["processed_input"]) })) workflow.add_node("search_tool", RunnableLambda(lambda state: { **state, "tool_output": tools["search"](state["processed_input"]) })) workflow.add_node("arxiv_tool", RunnableLambda(lambda state: { **state, "tool_output": tools["arxiv"](state["processed_input"]) })) workflow.add_node("html_scraper_tool", RunnableLambda(lambda state: { **state, "tool_output": tools["html_scraper"](state["processed_input"]) })) workflow.add_node("wikipedia_search", RunnableLambda(lambda state: { **state, "tool_output": tools["wikipedia"](state["processed_input"]) })) workflow.add_node("reverse_text", RunnableLambda(lambda state: { **state, "tool_output": tools["reverse_text"](state["processed_input"]) })) workflow.add_node("process_excel", RunnableLambda(lambda state: { **state, "tool_output": tools["process_excel"](state["processed_input"]) })) workflow.add_node("get_youtube_transcript", RunnableLambda(lambda state: { **state, "tool_output": tools["get_youtube_transcript"](state["processed_input"]) })) workflow.set_entry_point("llm_decision") workflow.add_edge("llm_decision", "process") def router(state: AgentState) -> str: if state["tool"] == "math": return "math_tool" elif state["tool"] == "search": return "search_tool" elif state["tool"] == "arxiv": return "arxiv_tool" elif state["tool"] == "html_scraper": return "html_scraper_tool" elif state["tool"] == "wikipedia": return "wikipedia_search" elif state["tool"] == "reverse_text": return "reverse_text" elif state["tool"] == "process_excel": return "process_excel" elif state["tool"] == "get_youtube_transcript": return "get_youtube_transcript" workflow.add_conditional_edges("process", router, { "math_tool": "math_tool", "search_tool": "search_tool", "arxiv_tool": "arxiv_tool", "html_scraper_tool": "html_scraper_tool", "wikipedia_search": "wikipedia_search", "reverse_text": "reverse_text", "process_excel": "process_excel", "get_youtube_transcript": "get_youtube_transcript" }) workflow.add_edge("math_tool", "response_generation") workflow.add_edge("search_tool", "response_generation") workflow.add_edge("arxiv_tool", "response_generation") workflow.add_edge("html_scraper_tool", "response_generation") workflow.add_edge("wikipedia_search", "response_generation") workflow.add_edge("reverse_text", "response_generation") workflow.add_edge("process_excel", "response_generation") workflow.add_edge("get_youtube_transcript", "response_generation") workflow.add_edge("response_generation", END) return workflow.compile() def run_agent(user_input: str) -> str: """ Exécute l'agent avec une entrée utilisateur. """ agent = create_agent_graph() initial_state = AgentState( input=user_input, tool="", processed_input="", tool_output="", final_answer="" ) try: result = agent.invoke(initial_state) return result['final_answer'] except Exception as e: return f"Erreur lors de l'exécution : {str(e)}" # === SCRIPT D'ÉVALUATION === def evaluate_agent_on_dataset(input_file_path, output_file_path): """ Évalue l'agent sur un dataset de questions et sauvegarde les réponses. Args: input_file_path (str): Chemin vers le fichier JSON contenant les questions output_file_path (str): Chemin vers le fichier de sortie pour les réponses """ # Charger les questions depuis le fichier JSON try: with open(input_file_path, 'r', encoding='utf-8') as f: questions_data = json.load(f) # récupèrer les 5 premières questions questions_data = questions_data[:-1] # Limiter à 5 questions pour l'évaluation print(f" Fichier chargé avec succès: {len(questions_data)} questions trouvées") except FileNotFoundError: print(f" Erreur: Le fichier {input_file_path} n'a pas été trouvé") return except json.JSONDecodeError as e: print(f" Erreur lors du parsing JSON: {e}") return # Préparer la structure des résultats results = [] start_time = datetime.now() print(f"\n Début de l'évaluation - {start_time.strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) # Traiter chaque question for i, item in enumerate(questions_data, 1): task_id = item.get('task_id', 'unknown') question = item.get('question', '') level = item.get('Level', 'unknown') file_name = item.get('file_name', '') print(f"\n Question {i}/{len(questions_data)}") print(f"Task ID: {task_id}") print(f"Level: {level}") print(f"Question: {question[:100]}{'...' if len(question) > 100 else ''}") if file_name: print(f" Note: Cette question fait référence au fichier: {file_name}") print(" L'agent ne peut pas traiter les fichiers joints actuellement.") # Exécuter l'agent try: print("🤖 Traitement en cours...") answer = run_agent(question) error_message = None except Exception as e: print(f" Erreur lors du traitement: {str(e)}") answer = f"Erreur: {str(e)}" error_message = str(e) # Sauvegarder le résultat result = { "username": "Bachir00", "code_agent": "https://huggingface.co/spaces/Bachir00/Final_Assignment_Template/tree/main", "task_id": task_id, "submitted_answer": answer, } if error_message: result["error_message"] = error_message results.append(result) print(f"Réponse: {answer[:150]}{'...' if len(answer) > 150 else ''}") # Pause entre les requêtes pour éviter la surcharge time.sleep(1) # Sauvegarder tous les résultats try: with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) end_time = datetime.now() duration = end_time - start_time print("\n" + "=" * 60) print("RÉSUMÉ DE L'ÉVALUATION") print("=" * 60) print(f" Total des questions traitées: {len(results)}") print(f"⏱ Temps total: {duration}") print(f" Résultats sauvegardés dans: {output_file_path}") # Statistiques par status success_count = sum(1 for r in results if r['status'] == 'success') error_count = len(results) - success_count print(f" Succès: {success_count}") print(f" Erreurs: {error_count}") if error_count > 0: print(f" Taux de réussite: {success_count/len(results)*100:.1f}%") # Statistiques par niveau levels = {} for result in results: level = result['level'] if level not in levels: levels[level] = 0 levels[level] += 1 print(f"\ Répartition par niveau:") for level, count in sorted(levels.items()): print(f" Niveau {level}: {count} questions") print("\n Évaluation terminée avec succès!") except Exception as e: print(f" Erreur lors de la sauvegarde: {str(e)}") # === UTILISATION === if __name__ == "__main__": # Chemins des fichiers input_file = "response_1748862846167.json" # Remplacez par le chemin de votre fichier JSON output_file = f"agent_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" # Lancer l'évaluation evaluate_agent_on_dataset(input_file, output_file)