| """ |
| agent/nodes/executor.py |
| Executes validated SQL or Pandas code. |
| Caches results in Upstash Redis (TTL 1 hour). |
| |
| OPTIMIZATION: Singleton Redis client, removed per-call instantiation overhead. |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import time |
| from functools import lru_cache |
| from typing import Any, Dict, List, Optional |
|
|
| from agent.state import AgentState |
| from connectors.base import get_connector |
| from sandbox.python_sandbox import run_pandas |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_redis(): |
| """Singleton Redis client β created once, reused across all requests.""" |
| try: |
| from upstash_redis import Redis |
| return Redis( |
| url=os.environ["UPSTASH_REDIS_REST_URL"], |
| token=os.environ["UPSTASH_REDIS_REST_TOKEN"], |
| ) |
| except Exception: |
| return None |
|
|
|
|
| def _cache_key(connector_id: str, code: str, code_type: str) -> str: |
| raw = f"{connector_id}:{code_type}:{code}" |
| return "exec:" + hashlib.sha256(raw.encode()).hexdigest()[:32] |
|
|
|
|
| def executor(state: AgentState) -> AgentState: |
| |
| if (state.get("execution_error") or "").startswith("SAFETY_BLOCK"): |
| return state |
|
|
| code = state["generated_code"] |
| code_type = state["code_type"] |
| connector_id = state["connector_id"] |
|
|
| |
| redis = _get_redis() |
| cache_key = _cache_key(connector_id, code, code_type) |
| try: |
| if redis: |
| cached = redis.get(cache_key) |
| if cached: |
| result = json.loads(cached) |
| return {**state, "execution_result": result, "from_cache": True, "execution_error": None} |
| except Exception: |
| pass |
|
|
| |
| start = time.time() |
| connector = get_connector(connector_id) |
|
|
| try: |
| if code_type == "sql": |
| rows = connector.execute_sql(code) |
| result = rows[:500] |
| else: |
| |
| df = connector.load_dataframe() |
| result_df = run_pandas(code, df) |
| result = json.loads(result_df.to_json(orient="records", date_format="iso")) |
|
|
| latency_ms = int((time.time() - start) * 1000) |
|
|
| |
| try: |
| if redis: |
| redis.setex(cache_key, 3600, json.dumps(result)) |
| except Exception: |
| pass |
|
|
| return { |
| **state, |
| "execution_result": result, |
| "execution_error": None, |
| "from_cache": False, |
| "latency_ms": latency_ms, |
| } |
|
|
| except Exception as exc: |
| return { |
| **state, |
| "execution_result": None, |
| "execution_error": str(exc), |
| "from_cache": False, |
| } |
|
|