File size: 7,248 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from typing import Dict, List, Optional
import os
import requests
from ...tool import Tool
from ...storage_handler import FileStorageHandler, LocalStorageHandler


class OpenRouterImageGenerationEditTool(Tool):
    name: str = "openrouter_image_generation_edit"
    description: str = (
        "Text-to-image and image-editing via OpenRouter models (e.g., google/gemini-2.5-flash-image-preview). "
        "No images → generate; with images (URLs or local paths) → edit/compose."
    )

    inputs: Dict[str, Dict] = {
        "prompt": {"type": "string", "description": "Text prompt."},
        "image_urls": {"type": "array", "description": "Remote image URLs (optional)."},
        "image_paths": {"type": "array", "description": "Local image paths (optional)."},
        "model": {"type": "string", "description": "OpenRouter model id.", "default": "google/gemini-2.5-flash-image-preview"},
        "api_key": {"type": "string", "description": "OpenRouter API key (fallback to env OPENROUTER_API_KEY)."},
        "save_path": {"type": "string", "description": "Directory to save images (when data URLs).", "default": "./openrouter_images"},
        "output_basename": {"type": "string", "description": "Base filename for outputs.", "default": "or_gen"}
    }
    required: List[str] = ["prompt"]

    def __init__(self, api_key: str = None, storage_handler: Optional[FileStorageHandler] = None, 
                 base_path: str = "./openrouter_images"):
        super().__init__()
        self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
        self.storage_handler = storage_handler or LocalStorageHandler(base_path=base_path)

    def __call__(
        self,
        prompt: str,
        image_urls: list = None,
        image_paths: list = None,
        model: str = "google/gemini-2.5-flash-image-preview",
        api_key: str = None,
        save_path: str = "./openrouter_images",
        output_basename: str = "or_gen",
    ):
        key = api_key or self.api_key
        if not key:
            return {"error": "OPENROUTER_API_KEY not provided."}

        messages = [{"role": "user", "content": prompt}]
        payload = {"model": model, "messages": messages, "modalities": ["image", "text"]}

        # Build content parts from URLs and/or local paths
        content_parts = [{"type": "text", "text": prompt}]
        if image_urls:
            content_parts.extend(self._urls_to_image_parts(image_urls))
        if image_paths:
            content_parts.extend(self._paths_to_image_parts(image_paths))
        if len(content_parts) > 1:
            payload["messages"][0] = {"role": "user", "content": content_parts}

        headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"}
        url = "https://openrouter.ai/api/v1/chat/completions"
        
        try:
            resp = requests.post(url, headers=headers, json=payload)
            resp.raise_for_status()
            data = resp.json()
        except requests.exceptions.HTTPError as e:
            # Log the error details for debugging
            try:
                error_data = resp.json()
                return {"error": f"OpenRouter API error: {error_data}", "status_code": resp.status_code}
            except Exception:
                return {"error": f"OpenRouter API error: {e}", "status_code": resp.status_code}
        except Exception as e:
            return {"error": f"Request failed: {e}"}

        saved_paths: List[str] = []
        if data.get("choices"):
            msg = data["choices"][0]["message"]
            images = msg.get("images") or []
            for im in images:
                image_url = im.get("image_url", {}).get("url")
                if not image_url:
                    continue
                # Save data URL using storage handler
                if image_url.startswith("data:") and "," in image_url:
                    import base64
                    header, b64data = image_url.split(",", 1)
                    # mime → extension
                    mime = "image/png"
                    if ";" in header:
                        mime = header.split(":", 1)[1].split(";", 1)[0] or mime
                    ext = ".png"
                    if mime == "image/jpeg":
                        ext = ".jpg"
                    elif mime == "image/webp":
                        ext = ".webp"
                    elif mime == "image/heic":
                        ext = ".heic"
                    elif mime == "image/heif":
                        ext = ".heif"
                    
                    # Generate unique filename
                    filename = self._get_unique_filename(output_basename or "or_gen", ext)
                    
                    # Decode and save using storage handler
                    image_content = base64.b64decode(b64data)
                    result = self.storage_handler.save(filename, image_content)
                    
                    if result["success"]:
                        saved_paths.append(filename)
                    else:
                        return {"error": f"Failed to save image: {result.get('error', 'Unknown error')}"}

        if saved_paths:
            return {"saved_paths": saved_paths}
        return {"warning": "No image returned or saved.", "raw": data}

    # --- helpers (replacing prior utils usage) ---
    def _url_to_image_part(self, url: str) -> Dict:
        return {"type": "image_url", "image_url": {"url": url}}

    def _guess_mime_from_name(self, name: str, default: str = "image/png") -> str:
        import mimetypes
        guess, _ = mimetypes.guess_type(name)
        return guess or default

    def _path_to_data_url(self, path: str) -> str:
        import base64
        mime = self._guess_mime_from_name(path)
        
        # Use storage handler to read raw bytes directly
        # This bypasses the high-level read() method that processes images
        try:
            # Translate user path to system path first
            system_path = self.storage_handler.translate_in(path)
            content = self.storage_handler._read_raw(system_path)
        except Exception as e:
            raise FileNotFoundError(f"Could not read file {path}: {str(e)}")
        
        b64 = base64.b64encode(content).decode("utf-8")
        return f"data:{mime};base64,{b64}"
    
    def _get_unique_filename(self, base_name: str, extension: str) -> str:
        """Generate a unique filename for the image"""
        filename = f"{base_name}{extension}"
        counter = 1
        
        # Check if file exists and generate unique name
        while self.storage_handler.exists(filename):
            filename = f"{base_name}_{counter}{extension}"
            counter += 1
            
        return filename

    def _paths_to_image_parts(self, paths: list) -> List[Dict]:
        parts: List[Dict] = []
        for p in paths:
            try:
                parts.append(self._url_to_image_part(self._path_to_data_url(p)))
            except Exception:
                # skip unreadable path
                continue
        return parts

    def _urls_to_image_parts(self, urls: list) -> List[Dict]:
        return [self._url_to_image_part(u) for u in urls]