| |
| """ |
| Eurus - ERA5 Climate Analysis Agent |
| ====================================== |
| An intelligent oceanography and climate data analysis assistant. |
| |
| Features: |
| - Persistent memory across sessions |
| - Cloud-optimized ERA5 data retrieval |
| - Interactive Python analysis with visualization |
| - Conversation history and context awareness |
| |
| Usage: |
| python main.py |
| |
| Commands: |
| q, quit, exit - Exit the agent |
| /clear - Clear conversation history |
| /cache - List cached datasets |
| /memory - Show memory summary |
| /cleardata - Clear all downloaded ERA5 datasets |
| /help - Show help message |
| """ |
|
|
| import os |
| import sys |
| import logging |
| import warnings |
| from pathlib import Path |
| from datetime import datetime |
|
|
| |
| warnings.filterwarnings("ignore", category=FutureWarning) |
| warnings.filterwarnings("ignore", message="Consolidated metadata", category=UserWarning) |
|
|
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent |
| sys.path.insert(0, str(PROJECT_ROOT / "src")) |
|
|
| |
| from eurus.logging_config import setup_logging, cleanup_old_logs |
| setup_logging(mode="cli") |
| cleanup_old_logs(keep=20) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| from langchain_openai import ChatOpenAI |
| from langchain.agents import create_agent |
|
|
| from eurus.config import CONFIG, AGENT_SYSTEM_PROMPT, DATA_DIR, PLOTS_DIR |
| from eurus.memory import get_memory, MemoryManager |
| from eurus.tools import get_all_tools |
|
|
|
|
| |
| |
| |
|
|
| BANNER = """ |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β β |
| β βββββββββββ ββββββββββ βββ βββββββββββ β |
| β βββββββββββ ββββββββββββββ βββββββββββ β |
| β ββββββ βββ ββββββββββββββ βββββββββββ β |
| β ββββββ βββ ββββββββββββββ βββββββββββ β |
| β ββββββββββββββββββββ ββββββββββββββββββββ β |
| β ββββββββ βββββββ βββ βββ βββββββ ββββββββ β |
| β β |
| β AI Climate Physicist v2.0 β |
| β βββββββββββββββββββββββββββββββββββββββββ β |
| β β |
| β Scientific Capabilities: β |
| β β’ ERA5 reanalysis data retrieval (SST, wind, temperature, pressure) β |
| β β’ Climate Diagnostics: Anomalies, Z-Scores, Statistical Significance β |
| β β’ Pattern Discovery: EOF/PCA analysis for climate modes β |
| β β’ Compound Extremes: "Ocean Oven" detection (Heat + Stagnation) β |
| β β’ Trend Analysis: Decadal trends with p-value significance β |
| β β’ Teleconnections: Correlation and lead-lag analysis β |
| β β’ Maritime Routing & Lagrangian Risk Assessment β |
| β β |
| β Commands: /help, /clear, /cache, /memory, /quit β |
| β β |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
|
|
| HELP_TEXT = """ |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β EURUS HELP - AI Climate Physicist β |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ |
| β β |
| β COMMANDS: β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β /help - Show this help message β |
| β /clear - Clear conversation history (fresh start) β |
| β /cache - List all cached ERA5 datasets β |
| β /memory - Show memory summary (datasets, analyses) β |
| β /cleardata - Clear all downloaded ERA5 datasets β |
| β /quit - Exit the agent (also: q, quit, exit) β |
| β β |
| β SCIENTIFIC ANALYSIS (Publication-Grade): β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β "Analyze marine heatwaves in the North Atlantic summer 2023" β |
| β "Find compound extremes where high SST coincides with low wind" β |
| β "Perform EOF analysis on SST anomalies to find climate modes" β |
| β "Calculate SST trends with statistical significance" β |
| β "Detect Ocean Ovens in the Mediterranean" β |
| β β |
| β SCIENCE TOOLS (The "Physics Brain"): β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β analyze_climate_modes_eof - Pattern discovery via EOF/PCA β |
| β detect_compound_extremes - "Ocean Oven" detection β |
| β calculate_climate_trends - Trends with p-value significance β |
| β detrend_climate_data - Remove warming trend for analysis β |
| β detect_percentile_extremes - Percentile-based extreme detection β |
| β fetch_climate_index - NOAA indices (Nino3.4, NAO, PDO, AMO) β |
| β calculate_return_periods - GEV/EVT (1-in-100 year events) β |
| β analyze_granger_causality - Prove X causes Y (not just correlated) β |
| β β |
| β AVAILABLE VARIABLES: β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β sst - Sea Surface Temperature (K) β |
| β t2 - 2m Air Temperature (K) β |
| β u10 - 10m U-Wind Component (m/s) β |
| β v10 - 10m V-Wind Component (m/s) β |
| β mslp - Mean Sea Level Pressure (Pa) β |
| β tcc - Total Cloud Cover (0-1) β |
| β tp - Total Precipitation (m) β |
| β β |
| β PREDEFINED REGIONS: β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β north_atlantic, north_pacific, california_coast, mediterranean β |
| β gulf_of_mexico, caribbean, nino34, nino3, nino4, arctic, antarctic β |
| β β |
| β SCIENTIFIC WORKFLOW: β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β 1. RETRIEVE data β 2. DIAGNOSE (Z-scores) β 3. DISCOVER (EOF) β |
| β 4. DETECT (extremes) β 5. ATTRIBUTE (correlation) β 6. VISUALIZE β |
| β β |
| β TIPS: β |
| β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β |
| β β’ Always report in anomalies/Z-scores, not raw values β |
| β β’ Z > 2Ο means statistically significant extreme β |
| β β’ Use diverging colormaps (RdBu_r) centered at 0 for anomalies β |
| β β’ Add stippling for p < 0.05 significance β |
| β β |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
|
|
|
|
|
|
| def clear_data_directory(data_dir: Path = None) -> tuple[int, float]: |
| """ |
| Remove all downloaded ERA5 datasets (zarr directories) from the data folder. |
| |
| Args: |
| data_dir: Data directory path. Defaults to DATA_DIR from config. |
| |
| Returns: |
| Tuple of (datasets_removed, total_size_mb_freed) |
| """ |
| import shutil |
| |
| if data_dir is None: |
| data_dir = DATA_DIR |
| |
| datasets_removed = 0 |
| total_bytes = 0 |
| |
| if not data_dir.exists(): |
| return 0, 0.0 |
| |
| |
| for zarr_dir in data_dir.glob('*.zarr'): |
| if zarr_dir.is_dir(): |
| |
| dir_size = sum(f.stat().st_size for f in zarr_dir.rglob('*') if f.is_file()) |
| total_bytes += dir_size |
| shutil.rmtree(zarr_dir) |
| datasets_removed += 1 |
| logger.debug(f"Removed dataset: {zarr_dir}") |
| |
| total_mb = total_bytes / (1024 * 1024) |
| return datasets_removed, total_mb |
|
|
|
|
| |
| |
| |
|
|
| def handle_command(command: str, memory: MemoryManager) -> tuple[bool, str]: |
| """ |
| Handle slash commands. |
| |
| Returns: |
| (should_continue, response_message) |
| """ |
| cmd = command.lower().strip() |
|
|
| if cmd in ('/quit', '/exit', '/q', 'quit', 'exit', 'q'): |
| return False, "Goodbye! Your conversation has been saved." |
|
|
| elif cmd == '/help': |
| return True, HELP_TEXT |
|
|
| elif cmd == '/clear': |
| memory.clear_conversation() |
| return True, "Conversation history cleared. Starting fresh!" |
|
|
| elif cmd == '/cache': |
| cache_info = memory.list_datasets() |
| return True, f"\n{cache_info}\n" |
|
|
| elif cmd == '/memory': |
| summary = memory.get_context_summary() |
| datasets = len([p for p in memory.datasets if os.path.exists(p)]) |
| analyses = len(memory.analyses) |
| convos = len(memory.conversations) |
|
|
| response = f""" |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β MEMORY SUMMARY β |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ |
| β Conversation messages: {convos:<5} β |
| β Cached datasets: {datasets:<5} β |
| β Recorded analyses: {analyses:<5} β |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| |
| {summary} |
| """ |
| return True, response |
|
|
| elif cmd == '/cleardata': |
| datasets_removed, size_freed = clear_data_directory(DATA_DIR) |
| |
| memory.datasets.clear() |
| memory._save_datasets() |
| response = f""" |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β ERA5 DATA CLEARED β |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ |
| β Datasets removed: {datasets_removed:<5} β |
| β Space freed: {size_freed:>8.2f} MB β |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
| return True, response |
|
|
| elif cmd.startswith('/'): |
| return True, f"Unknown command: {cmd}\nType /help for available commands." |
|
|
| return True, None |
|
|
|
|
| |
| |
| |
|
|
| from langchain_core.callbacks import BaseCallbackHandler |
|
|
|
|
| class ToolProgressCallback(BaseCallbackHandler): |
| """Print tool calls in real-time during agent execution.""" |
| def on_tool_start(self, serialized, input_str, **kwargs): |
| tool_name = serialized.get('name', kwargs.get('name', 'unknown')) |
| print(f"π§ Calling: {tool_name}...", flush=True) |
|
|
| def on_tool_end(self, output, name=None, **kwargs): |
| display_name = name or "tool" |
| print(f" β {display_name} done", flush=True) |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point for the Eurus agent.""" |
|
|
| |
| print(BANNER) |
|
|
| |
| if not os.environ.get("ARRAYLAKE_API_KEY"): |
| print("ERROR: ARRAYLAKE_API_KEY not found in environment.") |
| print("Please add it to your .env file:") |
| print(" ARRAYLAKE_API_KEY=your_api_key_here") |
| sys.exit(1) |
|
|
| if not os.environ.get("OPENAI_API_KEY"): |
| print("ERROR: OPENAI_API_KEY not found in environment.") |
| print("Please add it to your .env file:") |
| print(" OPENAI_API_KEY=your_api_key_here") |
| sys.exit(1) |
|
|
| |
| print("Initializing memory system...") |
| memory = get_memory() |
|
|
| |
| recent_messages = memory.get_langchain_messages(n_messages=10) |
| logger.info(f"Loaded {len(recent_messages)} messages from history") |
|
|
| |
| print("Starting Python kernel...") |
|
|
| |
| tools = get_all_tools(enable_routing=True, enable_guide=True) |
| logger.info(f"Loaded {len(tools)} tools") |
|
|
| |
| print("Connecting to LLM...") |
| llm = ChatOpenAI( |
| model=CONFIG.model_name, |
| temperature=CONFIG.temperature, |
| streaming=True |
| ) |
|
|
| |
| context_summary = memory.get_context_summary() |
| enhanced_prompt = AGENT_SYSTEM_PROMPT |
|
|
| if context_summary and context_summary != "No context available.": |
| enhanced_prompt += f"\n\n## CURRENT CONTEXT\n{context_summary}" |
|
|
| |
| print("Creating agent...") |
| agent = create_agent( |
| model=llm, |
| tools=tools, |
| system_prompt=enhanced_prompt, |
| debug=False |
| ) |
|
|
| |
| messages = recent_messages.copy() |
|
|
| print("\n" + "=" * 75) |
| print("READY! Type your question or /help for commands.") |
| print("=" * 75 + "\n") |
|
|
| |
| try: |
| while True: |
| |
| try: |
| user_input = input(">> You: ").strip() |
| except EOFError: |
| break |
|
|
| if not user_input: |
| continue |
|
|
| |
| should_continue, response = handle_command(user_input, memory) |
|
|
| if response: |
| print(response) |
|
|
| if not should_continue: |
| break |
|
|
| if response: |
| continue |
|
|
| |
| memory.add_message("user", user_input) |
| messages.append({"role": "user", "content": user_input}) |
|
|
| |
| print("\nThinking...\n") |
|
|
| try: |
| print("\n" + "β" * 75) |
| |
| |
| config = {"recursion_limit": 35, "callbacks": [ToolProgressCallback()]} |
| result = agent.invoke({"messages": messages}, config=config) |
| |
| |
| messages = list(result["messages"]) |
| last_message = messages[-1] |
| |
| if hasattr(last_message, 'content') and last_message.content: |
| response_text = last_message.content |
| elif isinstance(last_message, dict) and last_message.get('content'): |
| response_text = last_message['content'] |
| else: |
| response_text = str(last_message) |
| |
| print(f"\nπ Eurus:\n{response_text}", flush=True) |
| print("β" * 75 + "\n") |
| memory.add_message("assistant", response_text) |
|
|
| except KeyboardInterrupt: |
| print("\n\nInterrupted. Type /quit to exit or continue with a new question.") |
|
|
| except Exception as e: |
| error_msg = f"Error: {str(e)}" |
| logger.error(error_msg, exc_info=True) |
| print(f"\nError during processing: {error_msg}") |
| print("Please try again or rephrase your question.\n") |
|
|
| except KeyboardInterrupt: |
| print("\n\nReceived interrupt signal.") |
|
|
| finally: |
| |
| print("\nShutting down...") |
|
|
| |
| removed = memory.cleanup_missing_datasets() |
| if removed: |
| logger.info(f"Cleaned up {removed} missing dataset records") |
|
|
| print("Session saved. Goodbye!") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| main() |
|
|