Spaces:
Runtime error
Runtime error
| import os | |
| import torch | |
| import logging | |
| import requests | |
| import pytesseract | |
| import pandas as pd | |
| from PIL import Image | |
| from io import BytesIO | |
| import soundfile as sf | |
| from langchain import hub | |
| from pytube import YouTube | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| pipeline, | |
| ) | |
| from duckduckgo_search import DDGS | |
| from whisper import load_model as load_whisper | |
| from langchain_huggingface import HuggingFacePipeline | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_experimental.utilities import PythonREPL | |
| from langchain.agents import initialize_agent, Tool, AgentType, AgentExecutor, create_react_agent | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| AUDIO_FILES = ["wav", "mp3", "aac", "ogg"] | |
| IMAGE_FILES = ["png", "jpg", "tiff", "jpeg", "bmp"] | |
| TABULAR_FILES = ["csv", "xlsx"] | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| GAIA_SYSTEM_PROMPT = ( | |
| "You are a general AI assistant. I will ask you a question. Report your thoughts, " | |
| "and finish your answer with the following template: " | |
| "FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible " | |
| "OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write " | |
| "your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, " | |
| "don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. " | |
| "If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string." | |
| ) | |
| def file_handler(task_id: str, file_name: str): | |
| try: | |
| response = requests.get(f"{DEFAULT_API_URL}/files/{task_id}") | |
| response.raise_for_status() | |
| data = response.content | |
| ext = file_name.split('.')[-1].lower() | |
| return data, ext | |
| except Exception as e: | |
| logger.error(f"Failed to fetch file: {e}") | |
| raise | |
| whisper_model = load_whisper("small") | |
| model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B" | |
| bnb_config = BitsAndBytesConfig(load_in_8bit=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| #use_cache=True, | |
| ) | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| try: | |
| model.enable_xformers_memory_efficient_attention() | |
| except Exception as e: | |
| logger.warning(f"Failed to enable xformers memory optimization: {e}") | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| temperature=0.05, | |
| device_map="auto" | |
| ) | |
| llm = HuggingFacePipeline(pipeline=pipe) | |
| def fetch_file(args: str) -> str: | |
| try: | |
| task_id, file_name = [x.strip() for x in args.split(',')] | |
| data, ext = file_handler(task_id, file_name) | |
| local_path = f"./tmp/{task_id}.{ext}" | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| with open(local_path, 'wb') as f: | |
| f.write(data) | |
| logger.info(f"File fetched and saved at {local_path}") | |
| return local_path | |
| except Exception as e: | |
| logger.error(f"fetch_file failed: {e}") | |
| raise | |
| def transcribe(path: str) -> str: | |
| try: | |
| data, sr = sf.read(path, dtype='float32') | |
| res = whisper_model.transcribe(data, language='en') | |
| return res['text'] | |
| except Exception as e: | |
| logger.error(f"transcribe failed: {e}") | |
| raise | |
| def ocr(path: str) -> str: | |
| try: | |
| img = Image.open(path) | |
| return pytesseract.image_to_string(img) | |
| except Exception as e: | |
| logger.error(f"ocr failed: {e}") | |
| raise | |
| def preview_table(path: str) -> str: | |
| try: | |
| ext = path.split('.')[-1] | |
| df = pd.read_csv(path) if ext == 'csv' else pd.read_excel(path) | |
| info = f"Table Shape: {df.shape}\nColumns: {list(df.columns)}\nHead:\n{df.head().to_markdown()}" | |
| return info | |
| except Exception as e: | |
| logger.error(f"preview_table failed: {e}") | |
| raise | |
| def youtube_info(url: str) -> str: | |
| try: | |
| yt = YouTube(url) | |
| output = f"title: {yt.title}\n\ndescription: {yt.description}\n\n" | |
| if 'en' in yt.captions: | |
| output += yt.captions['en'].generate_srt_captions() | |
| return output | |
| except Exception as e: | |
| logger.error(f"youtube_info failed: {e}") | |
| raise | |
| def web_search(query: str) -> str: | |
| results = [] | |
| with DDGS() as ddgs: | |
| for r in ddgs.text(query, max_results=5): | |
| results.append(f"{r['title']} — {r['href']}") | |
| return '\n'.join(results) | |
| def read_code_from_file(file_path: str) -> str: | |
| """Reads Python code from a file.""" | |
| try: | |
| with open(file_path, 'r') as file: | |
| code = file.read() | |
| return code | |
| except FileNotFoundError: | |
| return "Error: File not found." | |
| except Exception as e: | |
| return f"Error reading file: {e}" | |
| def execute_python_from_file(file_path: str) -> str: | |
| """Reads and executes Python code from a specified file.""" | |
| code = read_code_from_file(file_path) | |
| if code.startswith("Error"): | |
| return code | |
| try: | |
| output = python_repl.run(code) | |
| return output | |
| except Exception as e: | |
| return f"Error executing code: {e}" | |
| # --- Define toolset --- | |
| tools = [ | |
| Tool(name='fetch_file', func=fetch_file, description='Download file by task_id,file_name'), | |
| Tool(name='transcribe', func=transcribe, description='Transcribe a downloaded audio file'), | |
| Tool(name='ocr', func=ocr, description='Extract text from a downloaded image'), | |
| Tool(name='preview_table', func=preview_table, description='Show summary and first rows of a CSV/XLSX'), | |
| Tool(name='youtube_info', func=youtube_info, description='Get info & transcript from a YouTube URL'), | |
| Tool(name='web_search', func=web_search, description='Return top 5 search results for a query'), | |
| Tool(name="Execute Python File",func=execute_python_from_file,description="Executes Python code from a specified file path. Input should be the full path to the Python file.",) | |
| ] | |
| # --- Create agent using ReAct agent style --- | |
| base_prompt = hub.pull("langchain-ai/react-agent-template") | |
| tool_names = ", ".join([t.name for t in tools]) | |
| agent = create_react_agent(llm, tools, base_prompt) | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| agent_executor = AgentExecutor( | |
| agent=agent, | |
| tools=tools, | |
| memory=memory, | |
| verbose=True, | |
| max_iterations=5, | |
| verbose=True, | |
| handle_parsing_errors=True, | |
| return_only_outputs=True | |
| ) | |
| # --- 4) GAIAAgent class returning only the FINAL ANSWER --- | |
| class GAIAAgent: | |
| def __init__(self): | |
| self.agent = self.executor = agent_executor | |
| def __call__(self, question: str, task_id: str = None, file_name: str = None) -> str: | |
| prompt="" | |
| if task_id and file_name: | |
| prompt += f"FILE: {task_id},{file_name}\n" | |
| prompt += question | |
| # Use executor to get full dict response | |
| response = self.executor.invoke({"input": prompt, "instructions": GAIA_SYSTEM_PROMPT}) | |
| print("prompt : ", prompt) | |
| output = response.get("output") if isinstance(response, dict) else str(response) | |
| if output and 'FINAL ANSWER:' in output: | |
| return output.split('FINAL ANSWER:')[-1].strip() | |
| return output or "" | |
| agent = GAIAAgent() | |
| agent("Hello how are u?", "1", None) | |