File size: 3,367 Bytes
3f6526a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Public toolbox APIs exposed to auxiliary metric code.

Design goal:
- keep provider details hidden behind internal adapters
- expose simple, controlled text APIs to the agent
"""

from __future__ import annotations

import json
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional


def _usage_file(results_dir: Optional[str]) -> Optional[Path]:
    if not results_dir:
        return None
    p = Path(results_dir).resolve()
    # expected shape: <experiment>/gen_x/results
    experiment_root = p.parent.parent if p.name == "results" and p.parent.name.startswith("gen_") else p
    return experiment_root / "eval_agent_memory" / "tool_usage.json"


def _load_usage(path: Path) -> Dict[str, Any]:
    try:
        if path.exists():
            with open(path) as f:
                data = json.load(f)
            if isinstance(data, dict):
                return data
    except Exception:
        pass
    return {}


def _save_usage(path: Path, data: Dict[str, Any]) -> None:
    try:
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, "w") as f:
            json.dump(data, f, indent=2)
    except Exception:
        # Usage logging should never break evaluation.
        pass


def _check_and_record_quota(tool: str, results_dir: Optional[str], limit: int) -> tuple[bool, str]:
    usage_path = _usage_file(results_dir)
    if usage_path is None:
        return True, "no_results_dir"

    usage = _load_usage(usage_path)
    counters = usage.setdefault("counters", {})
    key = f"{tool}_calls"
    count = int(counters.get(key, 0))
    if count >= limit:
        return False, f"quota_exceeded:{tool}:{count}/{limit}"

    counters[key] = count + 1
    usage["last_update"] = time.time()
    _save_usage(usage_path, usage)
    return True, f"ok:{tool}:{count + 1}/{limit}"


def call_vision(
    text: str,
    image_paths: List[str],
    *,
    results_dir: Optional[str] = None,
) -> str:
    """Call hidden vision backend and return textual output.

    Controls:
    - EVAL_TOOLBOX_VISION_MAX_CALLS (default: 2)
    - EVAL_TOOLBOX_VISION_MAX_IMAGES (default: 2)
    """
    max_calls = int(os.getenv("EVAL_TOOLBOX_VISION_MAX_CALLS", "2"))
    max_images = int(os.getenv("EVAL_TOOLBOX_VISION_MAX_IMAGES", "2"))

    ok, note = _check_and_record_quota("vision", results_dir, max_calls)
    if not ok:
        return f"TOOL_ERROR: {note}"

    selected = image_paths[:max_images]
    if not selected:
        return "TOOL_ERROR: no_images"

    existing = [p for p in selected if Path(p).exists()]
    if not existing:
        return "TOOL_ERROR: image_not_found"

    try:
        # Lazy import so toolbox remains importable without vision deps.
        from ._internal.vision_gemini import gemini_vision_chat

        return gemini_vision_chat(prompt=text, image_paths=existing)
    except Exception as e:
        return f"TOOL_ERROR: vision_exception:{e}"


def call_tool(name: str, payload: Dict[str, Any]) -> str:
    """Generic toolbox API entrypoint for future tools."""
    name = (name or "").strip().lower()
    if name == "vision":
        return call_vision(
            text=str(payload.get("text", "")),
            image_paths=list(payload.get("image_paths", [])),
            results_dir=payload.get("results_dir"),
        )
    return f"TOOL_ERROR: unknown_tool:{name}"