Spaces:
Sleeping
Sleeping
| import json | |
| import time | |
| from datetime import datetime | |
| from langchain_groq import ChatGroq | |
| from langchain.tools import tool | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.runnables import RunnableLambda | |
| from typing import TypedDict | |
| from arxiv import Search, Client, SortCriterion | |
| from duckduckgo_search import DDGS | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import os | |
| from langchain.tools import WikipediaQueryRun | |
| from langchain.utilities import WikipediaAPIWrapper | |
| def wikipedia_search(query: str, language: str = "en") -> str: | |
| """ | |
| Recherche des informations sur Wikipedia. | |
| Args: | |
| query: La requête de recherche | |
| language: Code de langue (par défaut 'en') | |
| """ | |
| try: | |
| wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3) | |
| tool = WikipediaQueryRun(api_wrapper=wikipedia) | |
| return tool.run(query) | |
| except Exception as e: | |
| return f"Erreur Wikipedia: {str(e)}" | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # === VOTRE CODE AGENT (copié tel quel) === | |
| def math_tool(expression: str) -> str: | |
| """ | |
| Évalue une expression mathématique simple donnée sous forme de chaîne de caractères. | |
| Exemple : "2 + 5 * 3" | |
| """ | |
| try: | |
| allowed_chars = set('0123456789+-*/.() ') | |
| if not all(c in allowed_chars for c in expression): | |
| return "Erreur : Expression contient des caractères non autorisés" | |
| result = eval(expression) | |
| return f"Résultat : {result}" | |
| except Exception as e: | |
| return f"Erreur : {str(e)}" | |
| def search_arxiv(query: str, max_results: int = 3) -> str: | |
| """ | |
| Recherche d'articles scientifiques sur arXiv. | |
| Retourne les titres, auteurs, résumés et liens PDF. | |
| """ | |
| try: | |
| if not query.strip(): | |
| return "Erreur : la requête de recherche arXiv est vide." | |
| client = Client(num_retries=3) | |
| search = Search( | |
| query=query, | |
| max_results=max_results, | |
| sort_by=SortCriterion.Relevance | |
| ) | |
| response = "" | |
| for result in client.results(search): | |
| response += f" Titre : {result.title.strip()}\n" | |
| response += f" Auteurs : {', '.join([a.name for a in result.authors])}\n" | |
| response += f" Résumé : {result.summary.strip()[:300]}...\n" | |
| response += f" Lien : {result.pdf_url}\n\n" | |
| return response or "Aucun résultat trouvé sur arXiv." | |
| except Exception as e: | |
| return f"Erreur lors de la recherche sur arXiv : {str(e)}" | |
| def search_web(query: str, max_results: int = 3) -> str: | |
| """ Recherche sur le web en utilisant DuckDuckGo. | |
| Retourne les titres et liens des résultats. | |
| Si la requête est vide, retourne une erreur. | |
| """ | |
| if not query.strip(): | |
| return "Erreur : La requête de recherche est vide." | |
| with DDGS() as ddgs: | |
| results = ddgs.text(query, max_results=max_results) | |
| response = "" | |
| for res in results: | |
| response += f"- {res['title']}: {res['href']}\n" | |
| return response or "Aucun résultat trouvé." | |
| def html_scraper_tool(prompt: str) -> str: | |
| """ | |
| Extrait une URL depuis un prompt texte et scrappe la page correspondante. | |
| Ex : "Scrappe-moi le site : www.google.com" | |
| """ | |
| match = re.search(r'(https?://)?(www\.[^\s]+)', prompt) | |
| if not match: | |
| return " Aucune URL valide trouvée dans le prompt." | |
| url = match.group(0) | |
| if not url.startswith("http"): | |
| url = "https://" + url | |
| try: | |
| response = requests.get(url, timeout=5) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| title = soup.title.string.strip() if soup.title else "Aucun titre trouvé" | |
| return f" Page title : {title}" | |
| except Exception as e: | |
| return f" Erreur lors du scraping de '{url}' : {str(e)}" | |
| def wikipedia_search(query: str, language: str = "en") -> str: | |
| """ | |
| Recherche des informations sur Wikipedia. | |
| Args: | |
| query: La requête de recherche | |
| language: Code de langue (par défaut 'en') | |
| """ | |
| try: | |
| print(f"Langue utilisée : {language}") | |
| print(f"Recherche Wikipedia pour : {query}") | |
| wikipedia = WikipediaAPIWrapper(language=language, top_k_results=3) | |
| print("WikipediaAPIWrapper instancié.") | |
| tool = WikipediaQueryRun(api_wrapper=wikipedia) | |
| print("WikipediaQueryRun instancié.") | |
| result = tool.run(query) | |
| print("Résultat obtenu.") | |
| return result | |
| except Exception as e: | |
| print(f"Erreur Wikipedia : {str(e)}") | |
| return f"Erreur Wikipedia: {str(e)}" | |
| def reverse_text(text: str) -> str: | |
| """ | |
| Inverse une chaîne de caractères. | |
| """ | |
| return text[::-1] | |
| def process_excel(file_path: str) -> str: | |
| """ | |
| Traite un fichier Excel et extrait des informations. | |
| """ | |
| try: | |
| df = pd.read_excel(file_path) | |
| return df.to_string() | |
| except Exception as e: | |
| return f"Erreur Excel: {str(e)}" | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """ | |
| Récupère la transcription d'une vidéo YouTube. | |
| """ | |
| try: | |
| video_id = video_url.split("watch?v=")[1] | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| return " ".join([entry['text'] for entry in transcript]) | |
| except Exception as e: | |
| return f"Erreur YouTube: {str(e)}" | |
| tools = { | |
| "math": math_tool, | |
| "search": search_web, | |
| "arxiv": search_arxiv, | |
| "html_scraper": html_scraper_tool, | |
| "wikipedia": wikipedia_search, | |
| "reverse_text": reverse_text, | |
| "process_excel": process_excel, | |
| "get_youtube_transcript": get_youtube_transcript | |
| } | |
| api_key = os.getenv("API_KEY_GROQ") | |
| if not api_key: | |
| raise ValueError("La variable d'environnement 'API_KEY_GROQ' n'est pas définie.") | |
| llm = ChatGroq( | |
| model="llama-3.1-8b-instant", | |
| temperature=0.7, | |
| max_tokens=1024, | |
| api_key=api_key | |
| ) | |
| class AgentState(TypedDict): | |
| input: str | |
| tool: str | |
| processed_input: str | |
| tool_output: str | |
| final_answer: str | |
| def call_llm(state: AgentState) -> AgentState: | |
| """ | |
| Appelle le LLM pour déterminer l'outil approprié. | |
| """ | |
| system_prompt = f""" | |
| Analyze this user request: "{state['input']}" | |
| Available tools: | |
| - 'math': for calculations and mathematical expressions | |
| - 'search': to perform web searches | |
| - 'arxiv': to search for scientific papers on arXiv | |
| - 'html_scraper': to scrape HTML content | |
| - 'wikipedia': to search for information on Wikipedia | |
| - 'reverse_text': to reverse a given text | |
| - 'process_excel': to process Excel files | |
| - 'get_youtube_transcript': to retrieve YouTube video transcripts | |
| Respond **only** with the appropriate tool name. | |
| NO SENTENCES, just the tool name. | |
| """ | |
| try: | |
| response = llm.invoke(system_prompt) | |
| tool_name = response.content.strip().lower() | |
| tool_name = tool_name.strip("'\"") | |
| if tool_name in ['math', 'search', 'arxiv', 'html_scraper', 'wikipedia', 'reverse_text', 'process_excel', 'get_youtube_transcript']: | |
| state['tool'] = tool_name | |
| return state | |
| except Exception as e: | |
| print(f"Erreur lors de l'appel LLM : {e}") | |
| return state | |
| def extract_math_expression(state: AgentState) -> AgentState: | |
| """ | |
| Extrait l'expression mathématique de l'input pour les demandes de type math. | |
| """ | |
| if state['tool'] == 'math': | |
| system_prompt = f""" | |
| Extrayez uniquement l'expression mathématique de cette question : "{state['input']}" | |
| Exemples : | |
| - "Quel est le résultat de 100 / 4 ?" → "100 / 4" | |
| - "Calcule 15 + 27 * 3" → "15 + 27 * 3" | |
| - "Quelle est la racine carrée de 144?" → "144 ** 0.5" | |
| - "2 plus 3 fois 5" → "2 + 3 * 5" | |
| - "Combien font 12 * 8 ?" → "12 * 8" | |
| - "Quel est le résultat de 23 * (5 + 7) ?" → "23 * (5 + 7)" | |
| Répondez uniquement par l'expression mathématique, sans explication. | |
| """ | |
| try: | |
| response = llm.invoke(system_prompt) | |
| math_expression = response.content.strip() | |
| state['processed_input'] = math_expression | |
| except Exception as e: | |
| print(f"Erreur extraction math : {e}") | |
| else: | |
| state['processed_input'] = state['input'] | |
| return state | |
| def generate_response(state: AgentState) -> AgentState: | |
| """ | |
| Génère la réponse finale pour l'utilisateur. | |
| """ | |
| system_prompt = f""" | |
| The tool '{state['tool']}' returned: "{state['tool_output']}" | |
| Formulate a clear and natural response for the user . | |
| Integrate the result smoothly into your reply. | |
| """ | |
| try: | |
| response = llm.invoke(system_prompt) | |
| state['final_answer'] = response.content | |
| return state | |
| except Exception as e: | |
| state['final_answer'] = f"Réponse générée avec succès : {state['tool_output']}" | |
| return state | |
| def create_agent_graph(): | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("llm_decision", call_llm) | |
| workflow.add_node("process", extract_math_expression) | |
| workflow.add_node("response_generation", generate_response) | |
| workflow.add_node("math_tool", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["math"].invoke(state["processed_input"]) | |
| })) | |
| workflow.add_node("search_tool", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["search"](state["processed_input"]) | |
| })) | |
| workflow.add_node("arxiv_tool", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["arxiv"](state["processed_input"]) | |
| })) | |
| workflow.add_node("html_scraper_tool", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["html_scraper"](state["processed_input"]) | |
| })) | |
| workflow.add_node("wikipedia_search", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["wikipedia"](state["processed_input"]) | |
| })) | |
| workflow.add_node("reverse_text", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["reverse_text"](state["processed_input"]) | |
| })) | |
| workflow.add_node("process_excel", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["process_excel"](state["processed_input"]) | |
| })) | |
| workflow.add_node("get_youtube_transcript", RunnableLambda(lambda state: { | |
| **state, | |
| "tool_output": tools["get_youtube_transcript"](state["processed_input"]) | |
| })) | |
| workflow.set_entry_point("llm_decision") | |
| workflow.add_edge("llm_decision", "process") | |
| def router(state: AgentState) -> str: | |
| if state["tool"] == "math": | |
| return "math_tool" | |
| elif state["tool"] == "search": | |
| return "search_tool" | |
| elif state["tool"] == "arxiv": | |
| return "arxiv_tool" | |
| elif state["tool"] == "html_scraper": | |
| return "html_scraper_tool" | |
| elif state["tool"] == "wikipedia": | |
| return "wikipedia_search" | |
| elif state["tool"] == "reverse_text": | |
| return "reverse_text" | |
| elif state["tool"] == "process_excel": | |
| return "process_excel" | |
| elif state["tool"] == "get_youtube_transcript": | |
| return "get_youtube_transcript" | |
| workflow.add_conditional_edges("process", router, { | |
| "math_tool": "math_tool", | |
| "search_tool": "search_tool", | |
| "arxiv_tool": "arxiv_tool", | |
| "html_scraper_tool": "html_scraper_tool", | |
| "wikipedia_search": "wikipedia_search", | |
| "reverse_text": "reverse_text", | |
| "process_excel": "process_excel", | |
| "get_youtube_transcript": "get_youtube_transcript" | |
| }) | |
| workflow.add_edge("math_tool", "response_generation") | |
| workflow.add_edge("search_tool", "response_generation") | |
| workflow.add_edge("arxiv_tool", "response_generation") | |
| workflow.add_edge("html_scraper_tool", "response_generation") | |
| workflow.add_edge("wikipedia_search", "response_generation") | |
| workflow.add_edge("reverse_text", "response_generation") | |
| workflow.add_edge("process_excel", "response_generation") | |
| workflow.add_edge("get_youtube_transcript", "response_generation") | |
| workflow.add_edge("response_generation", END) | |
| return workflow.compile() | |
| def run_agent(user_input: str) -> str: | |
| """ | |
| Exécute l'agent avec une entrée utilisateur. | |
| """ | |
| agent = create_agent_graph() | |
| initial_state = AgentState( | |
| input=user_input, | |
| tool="", | |
| processed_input="", | |
| tool_output="", | |
| final_answer="" | |
| ) | |
| try: | |
| result = agent.invoke(initial_state) | |
| return result['final_answer'] | |
| except Exception as e: | |
| return f"Erreur lors de l'exécution : {str(e)}" | |
| # === SCRIPT D'ÉVALUATION === | |
| def evaluate_agent_on_dataset(input_file_path, output_file_path): | |
| """ | |
| Évalue l'agent sur un dataset de questions et sauvegarde les réponses. | |
| Args: | |
| input_file_path (str): Chemin vers le fichier JSON contenant les questions | |
| output_file_path (str): Chemin vers le fichier de sortie pour les réponses | |
| """ | |
| # Charger les questions depuis le fichier JSON | |
| try: | |
| with open(input_file_path, 'r', encoding='utf-8') as f: | |
| questions_data = json.load(f) | |
| # récupèrer les 5 premières questions | |
| questions_data = questions_data[:-1] # Limiter à 5 questions pour l'évaluation | |
| print(f" Fichier chargé avec succès: {len(questions_data)} questions trouvées") | |
| except FileNotFoundError: | |
| print(f" Erreur: Le fichier {input_file_path} n'a pas été trouvé") | |
| return | |
| except json.JSONDecodeError as e: | |
| print(f" Erreur lors du parsing JSON: {e}") | |
| return | |
| # Préparer la structure des résultats | |
| results = [] | |
| start_time = datetime.now() | |
| print(f"\n Début de l'évaluation - {start_time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print("=" * 60) | |
| # Traiter chaque question | |
| for i, item in enumerate(questions_data, 1): | |
| task_id = item.get('task_id', 'unknown') | |
| question = item.get('question', '') | |
| level = item.get('Level', 'unknown') | |
| file_name = item.get('file_name', '') | |
| print(f"\n Question {i}/{len(questions_data)}") | |
| print(f"Task ID: {task_id}") | |
| print(f"Level: {level}") | |
| print(f"Question: {question[:100]}{'...' if len(question) > 100 else ''}") | |
| if file_name: | |
| print(f" Note: Cette question fait référence au fichier: {file_name}") | |
| print(" L'agent ne peut pas traiter les fichiers joints actuellement.") | |
| # Exécuter l'agent | |
| try: | |
| print("🤖 Traitement en cours...") | |
| answer = run_agent(question) | |
| error_message = None | |
| except Exception as e: | |
| print(f" Erreur lors du traitement: {str(e)}") | |
| answer = f"Erreur: {str(e)}" | |
| error_message = str(e) | |
| # Sauvegarder le résultat | |
| result = { | |
| "username": "Bachir00", | |
| "code_agent": "https://huggingface.co/spaces/Bachir00/Final_Assignment_Template/tree/main", | |
| "task_id": task_id, | |
| "submitted_answer": answer, | |
| } | |
| if error_message: | |
| result["error_message"] = error_message | |
| results.append(result) | |
| print(f"Réponse: {answer[:150]}{'...' if len(answer) > 150 else ''}") | |
| # Pause entre les requêtes pour éviter la surcharge | |
| time.sleep(1) | |
| # Sauvegarder tous les résultats | |
| try: | |
| with open(output_file_path, 'w', encoding='utf-8') as f: | |
| json.dump(results, f, ensure_ascii=False, indent=2) | |
| end_time = datetime.now() | |
| duration = end_time - start_time | |
| print("\n" + "=" * 60) | |
| print("RÉSUMÉ DE L'ÉVALUATION") | |
| print("=" * 60) | |
| print(f" Total des questions traitées: {len(results)}") | |
| print(f"⏱ Temps total: {duration}") | |
| print(f" Résultats sauvegardés dans: {output_file_path}") | |
| # Statistiques par status | |
| success_count = sum(1 for r in results if r['status'] == 'success') | |
| error_count = len(results) - success_count | |
| print(f" Succès: {success_count}") | |
| print(f" Erreurs: {error_count}") | |
| if error_count > 0: | |
| print(f" Taux de réussite: {success_count/len(results)*100:.1f}%") | |
| # Statistiques par niveau | |
| levels = {} | |
| for result in results: | |
| level = result['level'] | |
| if level not in levels: | |
| levels[level] = 0 | |
| levels[level] += 1 | |
| print(f"\ Répartition par niveau:") | |
| for level, count in sorted(levels.items()): | |
| print(f" Niveau {level}: {count} questions") | |
| print("\n Évaluation terminée avec succès!") | |
| except Exception as e: | |
| print(f" Erreur lors de la sauvegarde: {str(e)}") | |
| # === UTILISATION === | |
| if __name__ == "__main__": | |
| # Chemins des fichiers | |
| input_file = "response_1748862846167.json" # Remplacez par le chemin de votre fichier JSON | |
| output_file = f"agent_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| # Lancer l'évaluation | |
| evaluate_agent_on_dataset(input_file, output_file) | |