| | import json |
| | import os |
| | import requests |
| | import sys |
| | import time |
| | from datetime import datetime |
| | from dotenv import load_dotenv |
| | from typing import Dict, List, Any |
| | from smolagents import DuckDuckGoSearchTool, OpenAIServerModel, CodeAgent, ActionStep, TaskStep |
| | from blablador import Models |
| |
|
| | load_dotenv() |
| |
|
| |
|
| | class BasicAgent: |
| |
|
| | def __init__(self, |
| | model_provider: str = "Blablador", |
| | memory_file: str = "agent_memory.json"): |
| | self.model_provider = model_provider |
| | self.memory_file = memory_file |
| |
|
| | if model_provider == "Blablador": |
| |
|
| | models = Models( |
| | api_key=os.getenv("Blablador_API_KEY")).get_model_ids() |
| | model_id_blablador = 5 |
| | model_name = " ".join( |
| | models[model_id_blablador].split(" - ")[1].split()[:2]) |
| | print("The agent uses the following model:", model_name) |
| |
|
| | answer_llm = OpenAIServerModel( |
| | model_id=models[model_id_blablador], |
| | api_base="https://helmholtz-blablador.fz-juelich.de:8000/v1", |
| | api_key=os.getenv("Blablador_API_KEY"), |
| | flatten_messages_as_text=True, |
| | temperature=0.2) |
| |
|
| | elif model_provider == "Gemini": |
| |
|
| | |
| | model_name = "gemini-2.0-flash" |
| | print("The agent uses the following model:", model_name) |
| |
|
| | answer_llm = OpenAIServerModel( |
| | model_id=model_name, |
| | api_base= |
| | "https://generativelanguage.googleapis.com/v1beta/openai/", |
| | api_key=os.getenv("Gemini_API_KEY2"), |
| | temperature=0.2) |
| | else: |
| | print( |
| | f"Error: Unsupported model provider '{model_provider}'. Only 'Blablador' and 'Gemini' are supported." |
| | ) |
| | sys.exit(1) |
| |
|
| | self.agent = CodeAgent( |
| | tools=[DuckDuckGoSearchTool()], |
| | model=answer_llm, |
| | planning_interval=3, |
| | max_steps=10, |
| | |
| | ) |
| |
|
| | def __call__(self, |
| | question: str, |
| | task_id: str = "", |
| | file_url: str = "", |
| | file_ext: str = "") -> str: |
| | print(f"Agent received question (first 50 chars): {question[:50]}...") |
| |
|
| | SYSTEM_PROMPT = """You are a general AI assistant. I will ask you a question. |
| | Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. |
| | YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. |
| | If you are asked for a number, don't use comma to write your number |
| | neither use units such as $ or percent sign unless specified otherwise. |
| | If you are asked for a string, don't use articles, neither abbreviations, (e.g. for cities), |
| | and write the digits in plain text unless specified otherwise. |
| | If you are asked for a comma separated list, |
| | apply the above rules depending of whether the element to be put in the list is a number or a string. |
| | """ |
| |
|
| | |
| | additional_args = {} |
| |
|
| | |
| | if file_url: |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | additional_args = f"{file_url}_{file_ext}" |
| | full_prompt = f"{SYSTEM_PROMPT}\n\nQuestion: {question}\n\nNote: A {file_ext} file has been provided and is available for your analysis." |
| |
|
| | |
| | |
| | else: |
| | full_prompt = f"{SYSTEM_PROMPT}\n\nQuestion: {question}" |
| |
|
| | |
| | |
| |
|
| | try: |
| | answer = self.agent.run(full_prompt) |
| | |
| | |
| | |
| | print(f"Agent returning answer: {answer}") |
| |
|
| | |
| | self.export_memory_to_json(task_id=task_id, |
| | question=question, |
| | answer=answer) |
| |
|
| | |
| | if self.model_provider == "Gemini": |
| | time.sleep(10) |
| | return answer |
| | except Exception as e: |
| | print(f"Error running agent: {e}") |
| | return f"Error: {e}" |
| |
|
| | def export_memory_to_json(self, |
| | task_id: str = "", |
| | question: str = "", |
| | answer: str = "", |
| | error: str = ""): |
| | """Export agent's memory to JSON file for each question""" |
| | memory_data = self.extract_memory_data() |
| |
|
| | |
| | if os.path.exists(self.memory_file): |
| | with open(self.memory_file, 'r', encoding='utf-8') as f: |
| | existing_data = json.load(f) |
| | else: |
| | existing_data = {"questions": [], "batch_info": {}} |
| |
|
| | |
| | question_data = { |
| | "question_id": task_id or len(existing_data["questions"]) + 1, |
| | "timestamp": datetime.now().isoformat(), |
| | "model_provider": self.model_provider, |
| | "task": question, |
| | "result": answer, |
| | "error": error, |
| | "memory": memory_data, |
| | "memory_stats": self.get_memory_stats() |
| | } |
| |
|
| | |
| | if task_id: |
| | |
| | question_exists = False |
| | for i, existing_question in enumerate(existing_data["questions"]): |
| | if existing_question["question_id"] == task_id: |
| | existing_data["questions"][i] = question_data |
| | question_exists = True |
| | break |
| |
|
| | if not question_exists: |
| | existing_data["questions"].append(question_data) |
| | else: |
| | existing_data["questions"].append(question_data) |
| |
|
| | |
| | existing_data["batch_info"] = { |
| | "total_questions": len(existing_data["questions"]), |
| | "last_updated": datetime.now().isoformat(), |
| | "model_provider": self.model_provider |
| | } |
| |
|
| | |
| | with open(self.memory_file, 'w', encoding='utf-8') as f: |
| | json.dump(existing_data, |
| | f, |
| | indent=2, |
| | ensure_ascii=False, |
| | default=str) |
| |
|
| | print(f"Memory for question {task_id} exported to {self.memory_file}") |
| |
|
| | def extract_memory_data(self) -> Dict[str, Any]: |
| | """Extract memory data from agent""" |
| | memory_data = {"system_prompt": None, "steps": [], "full_steps": []} |
| |
|
| | |
| | if hasattr(self.agent.memory, |
| | 'system_prompt') and self.agent.memory.system_prompt: |
| | memory_data["system_prompt"] = { |
| | "content": str(self.agent.memory.system_prompt.system_prompt), |
| | "type": "system_prompt" |
| | } |
| |
|
| | |
| | for i, step in enumerate(self.agent.memory.steps): |
| | step_data = { |
| | "step_index": i, |
| | "step_type": type(step).__name__, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| |
|
| | if isinstance(step, TaskStep): |
| | step_data.update({ |
| | "task": |
| | step.task, |
| | "task_images": |
| | len(step.task_images) if step.task_images else 0 |
| | }) |
| |
|
| | elif isinstance(step, ActionStep): |
| | step_data.update({ |
| | "step_number": |
| | step.step_number, |
| | "llm_output": |
| | getattr(step, 'action', None), |
| | "observations": |
| | step.observations, |
| | "error": |
| | str(step.error) if step.error else None, |
| | "has_images": |
| | len(step.observations_images) > 0 |
| | if step.observations_images else False |
| | }) |
| |
|
| | memory_data["steps"].append(step_data) |
| |
|
| | |
| | try: |
| | full_steps = self.agent.memory.get_full_steps() |
| | memory_data["full_steps"] = full_steps |
| | except Exception as e: |
| | print(f"Could not get full steps: {e}") |
| | memory_data["full_steps"] = [] |
| |
|
| | return memory_data |
| |
|
| | def get_memory_stats(self) -> Dict[str, int]: |
| | """Get statistics about the agent's memory""" |
| | stats = { |
| | "total_steps": len(self.agent.memory.steps), |
| | "task_steps": 0, |
| | "action_steps": 0, |
| | "error_steps": 0, |
| | "successful_steps": 0 |
| | } |
| |
|
| | for step in self.agent.memory.steps: |
| | if isinstance(step, TaskStep): |
| | stats["task_steps"] += 1 |
| | elif isinstance(step, ActionStep): |
| | stats["action_steps"] += 1 |
| | if step.error: |
| | stats["error_steps"] += 1 |
| | else: |
| | stats["successful_steps"] += 1 |
| |
|
| | return stats |
| |
|
| | def _download_file(self, file_url: str, file_ext: str = "") -> str: |
| | """Download file content from URL and return as text or bytes""" |
| | try: |
| | response = requests.get(file_url, timeout=30) |
| | response.raise_for_status() |
| |
|
| | |
| | if file_ext.lower() in [ |
| | 'txt', 'csv', 'json', 'md', 'py', 'js', 'html', 'xml' |
| | ]: |
| | return response.text |
| | else: |
| | |
| | return response.content |
| |
|
| | except Exception as e: |
| | print(f"Error downloading file from {file_url}: {e}") |
| | return None |
| |
|