File size: 8,663 Bytes
52c7696
 
1eb9c9d
52c7696
 
 
 
 
 
 
 
 
1eb9c9d
52c7696
dd7fa38
6c0aeb9
0159aaf
52c7696
 
dd7fa38
52c7696
 
 
 
 
 
 
1eb9c9d
 
52c7696
 
6c0aeb9
395c3d4
 
 
 
 
 
 
 
 
 
 
0159aaf
 
395c3d4
 
 
0159aaf
395c3d4
 
 
 
dd7fa38
395c3d4
 
 
 
 
 
1eb9c9d
 
 
dd7fa38
52c7696
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1eb9c9d
395c3d4
1eb9c9d
 
52c7696
1eb9c9d
 
 
 
 
 
52c7696
1eb9c9d
 
 
52c7696
1eb9c9d
395c3d4
52c7696
 
 
 
1eb9c9d
52c7696
1eb9c9d
 
 
 
52c7696
 
 
dd7fa38
1eb9c9d
 
 
 
 
 
 
 
 
395c3d4
 
 
 
 
 
1eb9c9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c0aeb9
395c3d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0159aaf
395c3d4
 
dd7fa38
 
 
395c3d4
 
 
 
 
 
1eb9c9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from __future__ import annotations

from pathlib import Path
from io import BytesIO
from time import sleep

import matplotlib

matplotlib.use("Agg")  # headless backend
import matplotlib.pyplot as plt
from PIL import Image

from smolagents import CodeAgent, LiteLLMModel
from smolagents.agents import ActionStep
from deepengineer.webcrawler.crawl_database import DataBase
from deepengineer.logging_tools import LoggingTool
import queue


def _find_and_save_matplotlib_figure(image_path: Path = Path("figure.png")) -> str:
    """Save the current matplotlib figure to *path*.
    Save fig takes no arguments. The output path is hardcoded to "figure.png".
    """
    if not plt.get_fignums():
        raise RuntimeError(
            "No active figure to save; create one before calling save_fig()."
        )
    plt.savefig(image_path, bbox_inches="tight")
    return f"Figure saved to {image_path}."


class SaveMatplotlibFigTool(LoggingTool):
    name = "save_matplotlib_fig"
    description = """Save the current matplotlib figure to the current directory. Then plt.close() is called to clear the figure. The image is returned as a markdown string, use this markdown inside the final answer to include the image.
    """
    inputs = {
        "image_name": {
            "type": "string",
            "description": "The name of the image to save.",
        },
    }
    output_type = "string"

    def __init__(self, output_dir: Path, log_queue: queue.Queue | None = None):
        super().__init__(log_queue=log_queue)
        self.output_dir: Path = output_dir

    def forward(self, image_name: str) -> str:
        self.push_log(f"🖼️ Saving matplotlib figure to {image_name}")
        if not image_name.endswith(".png"):
            image_name = image_name + ".png"
        output_path = self.output_dir / image_name
        output_path.unlink(missing_ok=True)
        _find_and_save_matplotlib_figure(output_path)
        if output_path.exists():
            return f"![]({image_name})"
        else:
            return f"Error: The image {image_name} was not saved."


def _capture_snapshot(
    memory_step: ActionStep, agent: CodeAgent, image_path: Path = Path("figure.png")
) -> None:
    _find_and_save_matplotlib_figure(image_path)
    if not plt.get_fignums():
        return

    buf = BytesIO()
    plt.savefig(buf, format="png", bbox_inches="tight")
    buf.seek(0)
    img = Image.open(buf)

    for prev in agent.memory.steps:
        if (
            isinstance(prev, ActionStep)
            and prev.step_number <= memory_step.step_number - 2
        ):
            prev.observations_images = None

    memory_step.observations_images = [img.copy()]
    buf.close()

    hint = "[snapshot: matplotlib figure captured]"
    memory_step.observations = (
        hint
        if memory_step.observations is None
        else memory_step.observations + "\n" + hint
    )


matplotlib_instructions_multiple_steps = r"""
You may use the entire **matplotlib** and **numpy** and **pandas** and **seaborn** API. Do not worry about saving the image, it is done automatically and you can't access the os library.

Between each step, the image is provided in memory. From step 2, you can use it to pass additional instructions to the model to improve the image.

Workflow
--------
1. Construct your figure with ordinary matplotlib calls.
2. Wait another iteration, watch the image. If the image is correct call `final_answer() directly`. Otherwise, just do it again.
3. Do **not** call `plt.show()`; a callback captures a PNG automatically.
4. Keep code blocks concise and avoid GUI back‑end imports (TkAgg, Qt, etc.).

User instructions:
{user_instructions}
"""

matplotlib_instructions_single_step = r"""
You may use the entire **matplotlib** and **numpy** and **pandas** and **seaborn** API. Do not worry about saving the image, it is done automatically and you can't access the os library.

Workflow
--------
1. Construct your figure with ordinary matplotlib calls.
2. If the task is easy and you are confident that the image is correct, call `final_answer() directly`. Otherwise, wait another iteration to watch the image.
3. Do **not** call `plt.show()`; a callback captures a PNG automatically.
4. Keep code blocks concise and avoid GUI back‑end imports (TkAgg, Qt, etc.).  

User instructions:
{user_instructions}
"""


def draw_matplotlib_image_from_prompt(
    prompt: str,
    image_path: str = Path("figure.png"),
    model_id: str = "mistral/mistral-medium-latest",
    multiple_steps: bool = False,
) -> Path:
    model = LiteLLMModel(model_id=model_id)
    agent = CodeAgent(
        tools=[],
        model=model,
        additional_authorized_imports=[
            "matplotlib.*",
            "numpy.*",
            "pandas.*",
            "seaborn.*",
        ],
        step_callbacks=[
            lambda memory_step, agent: _capture_snapshot(memory_step, agent, image_path)
        ],
        max_steps=20,
        verbosity_level=2,
    )
    if multiple_steps:
        agent.run(
            matplotlib_instructions_multiple_steps.format(user_instructions=prompt)
        )
    else:
        agent.run(matplotlib_instructions_single_step.format(user_instructions=prompt))
    return image_path


class DrawImageTool(LoggingTool):
    name = "draw_image"
    description = f"Draw an image based on a prompt. The image is saved in the current directory. The image is returned as a markdown image, use this markdown inside the final answer to include the image. You must be very specific in your prompt."
    inputs = {
        "prompt": {
            "type": "string",
            "description": """
    Draw an image based on a prompt. The image is saved in the current directory. The image is returned as a markdown image, use this markdown inside the final answer to include the image. 
    
    You must be very specific in your prompt. This tool has access to matplotlib, numpy, pandas, seaborn.
                   
                   """,
        },
        "image_name": {
            "type": "string",
            "description": "The name of the image to save.",
        },
    }
    output_type = "string"

    def __init__(self, output_dir: Path):
        super().__init__()
        self.output_dir: Path = output_dir

    def forward(self, prompt: str, image_name: str) -> str:
        self.push_log(f"🖊️ Drawing image from prompt: {prompt}")
        if not image_name.endswith(".png"):
            image_name = image_name + ".png"
        output_path = draw_matplotlib_image_from_prompt(
            prompt, self.output_dir / image_name
        )
        if output_path.exists():
            return f"![]({image_name})"
        else:
            return f"Error: The image {image_name} was not saved."


def multiple_steps_draw_image_agent(
    prompt: str,
    image_path: str = Path("figure.png"),
    model_id: str = "mistral/mistral-medium-latest",
) -> Path:
    """
    The idea behind this function is to give to a multimodal agent the code and the image of the previous step to adapt it.
    """
    from smolagents import CodeAgent, ActionStep, TaskStep, Timing
    import time

    model = LiteLLMModel(model_id=model_id)
    agent = CodeAgent(
        tools=[],
        model=model,
        additional_authorized_imports=["matplotlib.*", "numpy.*"],
        step_callbacks=[
            lambda memory_step, agent: _capture_snapshot(memory_step, agent, image_path)
        ],
        max_steps=20,
        verbosity_level=2,
    )

    # Send the tools to the agent (no tools here)
    agent.python_executor.send_tools({**agent.tools})

    # Print the system prompt
    print(agent.memory.system_prompt)

    # Set the task
    task = prompt

    # You could modify the memory as needed here by inputting the memory of another agent.
    # agent.memory.steps = previous_agent.memory.steps

    # Let's start a new task!
    agent.memory.steps.append(TaskStep(task=task, task_images=[]))

    final_answer = None
    step_number = 1
    while final_answer is None and step_number <= 10:
        memory_step = ActionStep(
            step_number=step_number,
            observations_images=[],
            timing=Timing(start_time=time.time(), end_time=time.time()),
        )
        # Run one step.
        final_answer = agent.step(memory_step)
        agent.memory.steps.append(memory_step)
        step_number += 1
        _capture_snapshot(memory_step, agent, image_path)
        pass
        # Change the memory as you please!
        # For instance to update the latest step:
        # agent.memory.steps[-1] = ...

    print("The final answer is:", final_answer)

    return image_path