Spaces:
Running
Running
| # file_utils.py - Reusable file handling utilities | |
| """ | |
| Modular file handling utilities for tools. | |
| Provides abstracted, reusable functions for common file operations. | |
| """ | |
| import base64 | |
| import binascii | |
| import json | |
| import mimetypes | |
| import os | |
| from pathlib import Path | |
| import re | |
| import tempfile | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from pydantic import BaseModel, Field, field_validator | |
| class FileInfo(BaseModel): | |
| """Pydantic model for file information.""" | |
| exists: bool = Field(description="Whether the file exists and is accessible") | |
| path: str | None = Field(None, description="Full file path") | |
| name: str | None = Field(None, description="File name with extension") | |
| size: int = Field(0, description="File size in bytes") | |
| extension: str = Field("", description="File extension (lowercase)") | |
| error: str | None = Field(None, description="Error message if file access failed") | |
| def validate_size(cls, v): | |
| if v < 0: | |
| raise ValueError("File size cannot be negative") | |
| return v | |
| class TextFileResult(BaseModel): | |
| """Pydantic model for text file reading results.""" | |
| success: bool = Field(description="Whether the file was successfully read") | |
| content: str | None = Field(None, description="File content as text") | |
| encoding: str | None = Field(None, description="Encoding used to read the file") | |
| file_info: FileInfo | None = Field(None, description="File information") | |
| error: str | None = Field(None, description="Error message if reading failed") | |
| class BinaryFileResult(BaseModel): | |
| """Pydantic model for binary file reading results.""" | |
| success: bool = Field(description="Whether the file was successfully read") | |
| content: str | None = Field(None, description="Base64 encoded file content") | |
| file_info: FileInfo | None = Field(None, description="File information") | |
| error: str | None = Field(None, description="Error message if reading failed") | |
| class ToolResponse(BaseModel): | |
| """Pydantic model for standardized tool responses.""" | |
| type: str = Field(default="tool_response", description="Response type identifier") | |
| tool_name: str = Field(description="Name of the tool that generated the response") | |
| result: str | None = Field(None, description="Tool result content") | |
| error: str | None = Field(None, description="Error message if tool failed") | |
| file_info: FileInfo | None = Field( | |
| None, description="File information if applicable" | |
| ) | |
| extra: dict[str, Any] | None = Field( | |
| None, description="Optional structured payload for tool-specific data" | |
| ) | |
| class FileUtils: | |
| """Utility class for common file operations.""" | |
| def file_exists(file_path: str) -> bool: | |
| """Check if file exists and is accessible.""" | |
| return os.path.exists(file_path) and os.path.isfile(file_path) | |
| def get_file_size(file_path: str) -> int: | |
| """Get file size in bytes.""" | |
| try: | |
| return os.path.getsize(file_path) | |
| except OSError: | |
| return 0 | |
| def get_file_info(file_path: str) -> FileInfo: | |
| """Get comprehensive file information with Pydantic validation.""" | |
| if not FileUtils.file_exists(file_path): | |
| return FileInfo(exists=False, error=f"File not found: {file_path}") | |
| try: | |
| return FileInfo( | |
| exists=True, | |
| path=file_path, | |
| name=os.path.basename(file_path), | |
| size=FileUtils.get_file_size(file_path), | |
| extension=Path(file_path).suffix.lower(), | |
| ) | |
| except Exception as e: | |
| return FileInfo(exists=False, error=f"Error getting file info: {str(e)}") | |
| def file_info_for_tool_response(physical: FileInfo, logical_ref: str) -> FileInfo: | |
| """ | |
| Build **FileInfo** for :meth:`create_tool_response` (``ToolResponse.file_info`` in JSON). | |
| The model must not see on-disk temp names: **name** is only the **logical** reference | |
| (upload name, path the user sent, or full **http(s)/ftp** URL) — **never** *physical* ``name`` | |
| from :func:`get_file_info`. We still use the **FileInfo** type for schema consistency, | |
| not “legacy” — the wire format is a structured object, not a loose dict. | |
| If *logical_ref* is empty, **name** is **None** (we do not fall back to ``physical.name``). | |
| """ | |
| r = (logical_ref or "").strip() | |
| return FileInfo( | |
| exists=physical.exists, | |
| path=None, | |
| name=r or None, | |
| size=physical.size, | |
| extension=physical.extension, | |
| error=physical.error, | |
| ) | |
| def upload_basename_from_reference(file_reference: str) -> str | None: | |
| """ | |
| **fileName** for CMW upload: basename from **file_reference** only (chat/registry | |
| name, absolute local path, or **http(s)/ftp** URL path) — not from the resolved | |
| temp path, so a **Gradio** cache name is never used as the product filename. | |
| """ | |
| ref = (file_reference or "").strip() | |
| if not ref: | |
| return None | |
| rlow = ref.lower() | |
| if rlow.startswith(("http://", "https://", "ftp://")): | |
| from urllib.parse import unquote, urlparse | |
| path = unquote(urlparse(ref).path) or "" | |
| b = os.path.basename(path) if path else "" | |
| return b or None | |
| if os.path.isabs(ref): | |
| b = os.path.basename(ref) | |
| return b or None | |
| return os.path.basename(ref) or None | |
| def read_text_file( | |
| file_path: str, encodings: list[str] | None = None | |
| ) -> TextFileResult: | |
| """ | |
| Read text file with multiple encoding fallback and Pydantic validation. | |
| Args: | |
| file_path: Path to the text file | |
| encodings: List of encodings to try (default: ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']) | |
| Returns: | |
| TextFileResult with validated content, encoding used, and metadata | |
| """ | |
| if encodings is None: | |
| encodings = ["utf-8", "latin-1", "cp1252", "iso-8859-1"] | |
| file_info = FileUtils.get_file_info(file_path) | |
| if not file_info.exists: | |
| return TextFileResult( | |
| success=False, error=file_info.error, file_info=file_info | |
| ) | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, encoding=encoding) as f: | |
| content = f.read() | |
| return TextFileResult( | |
| success=True, | |
| content=content, | |
| encoding=encoding, | |
| file_info=file_info, | |
| ) | |
| except UnicodeDecodeError: | |
| continue | |
| except Exception as e: | |
| return TextFileResult( | |
| success=False, | |
| error=f"Error reading file: {str(e)}", | |
| file_info=file_info, | |
| ) | |
| return TextFileResult( | |
| success=False, | |
| error="File appears to be binary and cannot be read as text", | |
| file_info=file_info, | |
| ) | |
| def read_binary_file(file_path: str) -> BinaryFileResult: | |
| """Read binary file and return base64 encoded content with Pydantic validation.""" | |
| file_info = FileUtils.get_file_info(file_path) | |
| if not file_info.exists: | |
| return BinaryFileResult( | |
| success=False, error=file_info.error, file_info=file_info | |
| ) | |
| try: | |
| import base64 | |
| with open(file_path, "rb") as f: | |
| content = f.read() | |
| return BinaryFileResult( | |
| success=True, | |
| content=base64.b64encode(content).decode("utf-8"), | |
| file_info=file_info, | |
| ) | |
| except Exception as e: | |
| return BinaryFileResult( | |
| success=False, | |
| error=f"Error reading binary file: {str(e)}", | |
| file_info=file_info, | |
| ) | |
| def create_tool_response( | |
| tool_name: str, | |
| result: str | None = None, | |
| error: str | None = None, | |
| file_info: FileInfo = None, | |
| extra: dict[str, Any] | None = None, | |
| ) -> str: | |
| """Create standardized tool response JSON with Pydantic validation. | |
| For chat-uploaded files, pass *file_info* from | |
| :meth:`file_info_for_tool_response` so ``file_info.name`` in JSON is | |
| the **registered file reference**, not a temp *gradio* basename. | |
| """ | |
| if file_info: | |
| # Create a sanitized copy without the full path | |
| sanitized_file_info = FileInfo( | |
| exists=file_info.exists, | |
| path=None, # Remove full path for security | |
| name=file_info.name, | |
| size=file_info.size, | |
| extension=file_info.extension, | |
| error=file_info.error, | |
| ) | |
| else: | |
| sanitized_file_info = None | |
| response = ToolResponse( | |
| tool_name=tool_name, | |
| result=result, # Full result, no truncation | |
| error=error, | |
| file_info=sanitized_file_info, | |
| extra=extra, | |
| ) | |
| return response.model_dump_json(indent=2) | |
| def format_file_size(size_bytes: int) -> str: | |
| """Format file size in human-readable format.""" | |
| if size_bytes == 0: | |
| return "0 bytes" | |
| elif size_bytes < 1024: | |
| return f"{size_bytes} bytes" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes // 1024} KB" | |
| else: | |
| return f"{size_bytes // (1024 * 1024)} MB" | |
| def file_to_base64(file_path: str) -> str: | |
| """ | |
| Convert file to base64 encoded string. | |
| Args: | |
| file_path (str): Path to the file to convert | |
| Returns: | |
| str: Base64 encoded file content | |
| Raises: | |
| FileNotFoundError: If file doesn't exist | |
| IOError: If file can't be read | |
| """ | |
| import base64 | |
| if not FileUtils.file_exists(file_path): | |
| msg = f"File not found: {file_path}" | |
| raise FileNotFoundError(msg) | |
| try: | |
| with open(file_path, "rb") as f: | |
| file_content = f.read() | |
| return base64.b64encode(file_content).decode("utf-8") | |
| except Exception as e: | |
| msg = f"Error reading file {file_path}: {str(e)}" | |
| raise OSError(msg) | |
| def download_file_to_path(url: str, target_path: str | None = None) -> str: | |
| """ | |
| Download file from URL to local path. | |
| Args: | |
| url (str): URL to download from | |
| target_path (str, optional): Local path to save to. If None, creates temp file. | |
| Returns: | |
| str: Path to downloaded file | |
| Raises: | |
| requests.RequestException: If download fails | |
| IOError: If file can't be written | |
| """ | |
| import logging | |
| import os | |
| import tempfile | |
| from urllib.parse import urlparse | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| try: | |
| # Add polite bot identification headers | |
| headers = { | |
| "User-Agent": "CMW-Platform-Agent/1.0 (+https://github.com/arterm-sedov/cmw-platform-agent) Mozilla/5.0" | |
| } | |
| # First make a HEAD request to get Content-Type | |
| logger.info(f"Attempting to download from URL: {url}") | |
| head_response = requests.head( | |
| url, headers=headers, timeout=30, allow_redirects=True | |
| ) | |
| head_response.raise_for_status() | |
| content_type = head_response.headers.get("content-type", "unknown") | |
| logger.info(f"HEAD request successful, Content-Type: {content_type}") | |
| if target_path is None: | |
| # Create temp file with proper extension | |
| parsed_url = urlparse(url) | |
| filename = os.path.basename(parsed_url.path) or "downloaded_file" | |
| # Extract extension from URL | |
| _, url_ext = os.path.splitext(filename) | |
| # Get Content-Type header | |
| content_type = head_response.headers.get("content-type", "").lower() | |
| # MIME type to extension mapping | |
| mime_to_ext = { | |
| # Documents | |
| "application/pdf": ".pdf", | |
| "application/msword": ".doc", | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", | |
| "application/vnd.ms-excel": ".xls", | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", | |
| "application/vnd.ms-powerpoint": ".ppt", | |
| "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", | |
| "application/rtf": ".rtf", | |
| "application/zip": ".zip", | |
| "application/x-zip-compressed": ".zip", | |
| # Text formats | |
| "text/plain": ".txt", | |
| "text/html": ".html", | |
| "text/css": ".css", | |
| "text/javascript": ".js", | |
| "text/csv": ".csv", | |
| "text/xml": ".xml", | |
| "application/json": ".json", | |
| "application/xml": ".xml", | |
| # Images | |
| "image/jpeg": ".jpg", | |
| "image/jpg": ".jpg", | |
| "image/png": ".png", | |
| "image/gif": ".gif", | |
| "image/webp": ".webp", | |
| "image/svg+xml": ".svg", | |
| "image/bmp": ".bmp", | |
| "image/tiff": ".tiff", | |
| # Audio | |
| "audio/mpeg": ".mp3", | |
| "audio/wav": ".wav", | |
| "audio/ogg": ".ogg", | |
| "audio/mp4": ".m4a", | |
| # Video | |
| "video/mp4": ".mp4", | |
| "video/avi": ".avi", | |
| "video/quicktime": ".mov", | |
| "video/x-msvideo": ".avi", | |
| } | |
| # Smart extension detection strategy: | |
| # 1. If Content-Type is specific and matches known types, use it | |
| # 2. If URL has a standard extension, use it | |
| # 3. Fallback to Content-Type if URL extension is non-standard | |
| ext = None | |
| content_type_ext = None | |
| url_ext_valid = False | |
| # Get extension from Content-Type | |
| for mime_type, extension in mime_to_ext.items(): | |
| if mime_type in content_type: | |
| content_type_ext = extension | |
| break | |
| # Check if URL extension is valid (standard file extension) | |
| if url_ext: | |
| # Check if URL extension matches any known extension | |
| known_extensions = set(mime_to_ext.values()) | |
| url_ext_valid = url_ext.lower() in known_extensions | |
| # Decision logic | |
| if content_type_ext and url_ext_valid: | |
| # Both are valid - prefer Content-Type for accuracy | |
| ext = content_type_ext | |
| elif content_type_ext and not url_ext_valid: | |
| # Content-Type is valid, URL extension is not standard | |
| ext = content_type_ext | |
| elif not content_type_ext and url_ext_valid: | |
| # Only URL extension is valid | |
| ext = url_ext | |
| elif url_ext: | |
| # URL extension exists but not standard - use it as fallback | |
| ext = url_ext | |
| else: | |
| # No extension found | |
| ext = "" | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) | |
| target_path = temp_file.name | |
| temp_file.close() | |
| # Now download the file | |
| logger.info(f"Starting download to: {target_path}") | |
| response = requests.get( | |
| url, headers=headers, stream=True, timeout=60, allow_redirects=True | |
| ) | |
| response.raise_for_status() | |
| with open(target_path, "wb") as f: | |
| f.writelines(response.iter_content(chunk_size=8192)) | |
| logger.info(f"Download completed successfully: {target_path}") | |
| return target_path | |
| except Exception as e: | |
| msg = f"Error downloading file from {url}: {str(e)}" | |
| raise OSError(msg) | |
| def generate_unique_filename( | |
| original_filename: str, session_id: str = "default" | |
| ) -> str: | |
| """ | |
| Generate a unique filename with timestamp and hash (no session prefix since we use session folders). | |
| Args: | |
| original_filename (str): Original filename from user upload | |
| session_id (str): Session ID for isolation (used for folder organization) | |
| Returns: | |
| str: Unique filename with timestamp and hash | |
| """ | |
| import hashlib | |
| from pathlib import Path | |
| import time | |
| # Get file extension | |
| path_obj = Path(original_filename) | |
| name_without_ext = path_obj.stem | |
| extension = path_obj.suffix | |
| # Generate timestamp and hash (include session_id for uniqueness across sessions) | |
| timestamp = str(int(time.time() * 1000)) # milliseconds | |
| hash_suffix = hashlib.md5( | |
| f"{original_filename}{timestamp}{session_id}".encode() | |
| ).hexdigest()[:8] | |
| # Create unique filename with session ID for better uniqueness and clarity | |
| unique_name = ( | |
| f"{session_id}_{name_without_ext}_{timestamp}_{hash_suffix}{extension}" | |
| ) | |
| return unique_name | |
| def get_gradio_cache_path() -> str: | |
| """ | |
| Get the current Gradio cache directory path. | |
| Returns: | |
| str: Path to Gradio's cache directory | |
| """ | |
| import os | |
| import tempfile | |
| # Check if GRADIO_TEMP_DIR is set | |
| gradio_temp = os.environ.get("GRADIO_TEMP_DIR") | |
| if gradio_temp: | |
| return gradio_temp | |
| # Default to system temp directory | |
| return tempfile.gettempdir() | |
| def resolve_file_reference(file_reference: str, agent=None) -> str: | |
| """ | |
| Resolve file reference (filename or URL) to full file path. | |
| Args: | |
| file_reference (str): Original filename from user upload OR URL | |
| agent: Agent instance with file registry (optional) | |
| Returns: | |
| str: Full path to the file, or None if not found | |
| """ | |
| # Check if it's a URL | |
| if file_reference.startswith(("http://", "https://", "ftp://")): | |
| try: | |
| # Download URL to temp file | |
| return FileUtils.download_file_to_path(file_reference) | |
| except Exception as e: | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logger.exception(f"Failed to download URL {file_reference}: {e}") | |
| logger.exception(f"Error type: {type(e).__name__}") | |
| # Re-raise the exception to get more details | |
| raise | |
| if os.path.isabs(file_reference) and os.path.isfile(file_reference): | |
| return file_reference | |
| # It's a filename - resolve using agent's file registry | |
| if agent and hasattr(agent, "get_file_path"): | |
| return agent.get_file_path(file_reference) | |
| return None | |
| def read_file_reference_bytes( | |
| file_reference: str, agent=None | |
| ) -> tuple[bytes | None, str | None]: | |
| """ | |
| Resolve a file reference to a local path and read its bytes. | |
| Returns: | |
| ``(data, error)`` — on success *error* is ``None``; on failure *data* is | |
| ``None`` and *error* is a message. The resolved on-disk path is an | |
| implementation detail and is not returned. | |
| """ | |
| path = FileUtils.resolve_file_reference(file_reference, agent) | |
| if not path: | |
| return None, f"Could not resolve file reference: {file_reference!r}" | |
| if not os.path.isfile(path): | |
| return None, f"Not a file: {path!r}" | |
| try: | |
| data = Path(path).read_bytes() | |
| except OSError as e: | |
| return None, f"Failed to read file: {e!s}" | |
| return data, None | |
| def resolve_file_path(original_filename: str, agent=None) -> str: | |
| """ | |
| Resolve original filename to full file path using agent's file registry. | |
| Args: | |
| original_filename (str): Original filename from user upload | |
| agent: Agent instance with file registry (optional) | |
| Returns: | |
| str: Full path to the file, or None if not found | |
| """ | |
| if agent and hasattr(agent, "get_file_path"): | |
| return agent.get_file_path(original_filename) | |
| return None | |
| def resolve_code_input(code_reference: str, agent=None) -> tuple[str, str]: | |
| """ | |
| Resolve code reference to actual code content and detected language. | |
| Args: | |
| code_reference (str): Code content, filename, or URL | |
| agent: Agent instance for file resolution (optional) | |
| Returns: | |
| tuple: (code_content, detected_language) | |
| """ | |
| # Check if it's a URL | |
| if code_reference.startswith(("http://", "https://", "ftp://")): | |
| try: | |
| file_path = FileUtils.download_file_to_path(code_reference) | |
| result = FileUtils.read_text_file(file_path) | |
| if not result.success: | |
| msg = f"Failed to read URL content: {result.error}" | |
| raise ValueError(msg) | |
| language = FileUtils.detect_language_from_extension(file_path) | |
| return result.content, language | |
| except Exception as e: | |
| msg = f"Failed to download URL {code_reference}: {str(e)}" | |
| raise ValueError(msg) | |
| # Check if it's a file path (try to resolve via agent first, then direct path) | |
| file_path = None | |
| if agent and hasattr(agent, "get_file_path"): | |
| file_path = agent.get_file_path(code_reference) | |
| if not file_path and os.path.exists(code_reference): | |
| file_path = code_reference | |
| if file_path and os.path.exists(file_path): | |
| result = FileUtils.read_text_file(file_path) | |
| if not result.success: | |
| msg = f"Failed to read file: {result.error}" | |
| raise ValueError(msg) | |
| language = FileUtils.detect_language_from_extension(file_path) | |
| return result.content, language | |
| # It's code content - return as-is with no language detection | |
| return code_reference, None | |
| def detect_language_from_extension(file_path: str) -> str: | |
| """Detect programming language from file extension.""" | |
| extension_map = { | |
| ".py": "python", | |
| ".sh": "bash", | |
| ".bash": "bash", | |
| ".sql": "sql", | |
| ".c": "c", | |
| ".h": "c", | |
| ".java": "java", | |
| ".js": "javascript", | |
| ".ts": "typescript", | |
| ".rb": "ruby", | |
| ".go": "go", | |
| ".rs": "rust", | |
| ".cpp": "cpp", | |
| ".cc": "cpp", | |
| ".cxx": "cpp", | |
| ".cs": "csharp", | |
| ".php": "php", | |
| ".r": "r", | |
| ".m": "matlab", | |
| ".scala": "scala", | |
| ".kt": "kotlin", | |
| ".swift": "swift", | |
| } | |
| return extension_map.get(Path(file_path).suffix.lower(), "python") | |
| def is_text_file(file_path: str) -> bool: | |
| """Check if file is likely a text file based on extension.""" | |
| text_extensions = { | |
| ".txt", | |
| ".md", | |
| ".log", | |
| ".json", | |
| ".xml", | |
| ".yaml", | |
| ".yml", | |
| ".html", | |
| ".htm", | |
| ".css", | |
| ".js", | |
| ".py", | |
| ".sql", | |
| ".ini", | |
| ".cfg", | |
| ".conf", | |
| ".env", | |
| ".csv", | |
| ".tsv", | |
| } | |
| return Path(file_path).suffix.lower() in text_extensions | |
| def is_image_file(file_path: str) -> bool: | |
| """Check if file is likely an image file based on extension.""" | |
| image_extensions = { | |
| ".png", | |
| ".jpg", | |
| ".jpeg", | |
| ".gif", | |
| ".bmp", | |
| ".tiff", | |
| ".webp", | |
| ".svg", | |
| } | |
| return Path(file_path).suffix.lower() in image_extensions | |
| def is_audio_file(file_path: str) -> bool: | |
| """Check if file is likely an audio file based on extension.""" | |
| audio_extensions = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a", ".wma"} | |
| return Path(file_path).suffix.lower() in audio_extensions | |
| def is_video_file(file_path: str) -> bool: | |
| """Check if file is likely a video file based on extension.""" | |
| video_extensions = {".mp4", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mkv"} | |
| return Path(file_path).suffix.lower() in video_extensions | |
| def is_pdf_file(file_path: str) -> bool: | |
| """Check if file is likely a PDF file based on extension.""" | |
| return Path(file_path).suffix.lower() == ".pdf" | |
| def get_mime_type(file_path: str) -> str: | |
| """Get MIME type for a file based on extension and content.""" | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| if mime_type: | |
| return mime_type | |
| ext = Path(file_path).suffix.lower() | |
| mime_map = { | |
| ".png": "image/png", | |
| ".jpg": "image/jpeg", | |
| ".jpeg": "image/jpeg", | |
| ".gif": "image/gif", | |
| ".webp": "image/webp", | |
| ".svg": "image/svg+xml", | |
| ".tiff": "image/tiff", | |
| ".bmp": "image/bmp", | |
| ".mp4": "video/mp4", | |
| ".webm": "video/webm", | |
| ".avi": "video/x-msvideo", | |
| ".mov": "video/quicktime", | |
| ".wav": "audio/wav", | |
| ".mp3": "audio/mpeg", | |
| ".ogg": "audio/ogg", | |
| ".flac": "audio/flac", | |
| ".aac": "audio/aac", | |
| ".m4a": "audio/mp4", | |
| ".html": "text/html", | |
| ".htm": "text/html", | |
| ".json": "application/json", | |
| ".xml": "application/xml", | |
| ".pdf": "application/pdf", | |
| } | |
| return mime_map.get(ext, "application/octet-stream") | |
| def detect_media_type(file_path: str) -> str: | |
| """Detect media type category for a file.""" | |
| if FileUtils.is_image_file(file_path): | |
| return "image" | |
| elif FileUtils.is_video_file(file_path): | |
| return "video" | |
| elif FileUtils.is_audio_file(file_path): | |
| return "audio" | |
| elif Path(file_path).suffix.lower() == ".html": | |
| return "html" | |
| elif ( | |
| Path(file_path).suffix.lower() in [".png", ".svg"] | |
| and "plot" in file_path.lower() | |
| ): | |
| return "plot" | |
| else: | |
| return "unknown" | |
| def create_media_attachment( | |
| file_path: str, | |
| caption: str | None = None, | |
| metadata: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Create a media attachment dictionary for rich content. | |
| Args: | |
| file_path: Path to the media file | |
| caption: Optional caption for the media | |
| metadata: Optional metadata dictionary | |
| Returns: | |
| Dict with media attachment information | |
| """ | |
| if not FileUtils.file_exists(file_path): | |
| return {"type": "error", "error": f"File not found: {file_path}"} | |
| file_info = FileUtils.get_file_info(file_path) | |
| media_type = FileUtils.detect_media_type(file_path) | |
| mime_type = FileUtils.get_mime_type(file_path) | |
| attachment = { | |
| "type": "media_attachment", | |
| "media_type": media_type, | |
| "file_path": file_path, | |
| "mime_type": mime_type, | |
| "file_info": file_info.dict() if file_info else None, | |
| } | |
| if caption: | |
| attachment["caption"] = caption | |
| if metadata: | |
| attachment["metadata"] = metadata | |
| return attachment | |
| def add_media_to_response( | |
| tool_response: dict[str, Any], | |
| file_path: str, | |
| caption: str | None = None, | |
| metadata: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Add media attachment to an existing tool response. | |
| Args: | |
| tool_response: Existing tool response dictionary | |
| file_path: Path to the media file | |
| caption: Optional caption for the media | |
| metadata: Optional metadata dictionary | |
| Returns: | |
| Updated tool response with media attachment | |
| """ | |
| if "media_attachments" not in tool_response: | |
| tool_response["media_attachments"] = [] | |
| media_attachment = FileUtils.create_media_attachment( | |
| file_path, caption, metadata | |
| ) | |
| tool_response["media_attachments"].append(media_attachment) | |
| return tool_response | |
| def extract_media_from_response( | |
| tool_response: dict[str, Any], | |
| ) -> list[dict[str, Any]]: | |
| """ | |
| Extract media attachments from a tool response. | |
| Args: | |
| tool_response: Tool response dictionary | |
| Returns: | |
| List of media attachment dictionaries | |
| """ | |
| media_attachments = [] | |
| if "media_attachments" in tool_response: | |
| media_attachments.extend(tool_response["media_attachments"]) | |
| if "result" in tool_response and isinstance(tool_response["result"], dict): | |
| result = tool_response["result"] | |
| for key, value in result.items(): | |
| if isinstance(value, str) and FileUtils.file_exists(value): | |
| media_attachment = FileUtils.create_media_attachment( | |
| value, f"File: {key}" | |
| ) | |
| media_attachments.append(media_attachment) | |
| return media_attachments | |
| def is_base64_image(data: str) -> bool: | |
| """Check if string contains base64 image data.""" | |
| import base64 | |
| if data.startswith("data:image/"): | |
| return True | |
| if len(data) > 100: | |
| try: | |
| clean_data = "".join(data.split()) | |
| decoded = base64.b64decode(clean_data) | |
| image_magic = [ | |
| b"\x89PNG\r\n\x1a\n", | |
| b"\xff\xd8\xff", | |
| b"GIF87a", | |
| b"GIF89a", | |
| b"RIFF", | |
| b"BM", | |
| ] | |
| return any(decoded.startswith(magic) for magic in image_magic) | |
| except: | |
| return False | |
| return False | |
| def save_base64_to_file( | |
| base64_data: str, | |
| output_path: str | None = None, | |
| file_extension: str | None = None, | |
| session_id: str | None = None, | |
| ) -> str: | |
| """ | |
| Save base64 data to a file. | |
| Args: | |
| base64_data: Base64 encoded data (with or without data URI prefix) | |
| output_path: Optional output file path | |
| file_extension: Optional file extension for temp file | |
| session_id: Optional session ID to save in session-isolated directory | |
| Returns: | |
| Path to the saved file | |
| """ | |
| import base64 | |
| from datetime import datetime | |
| import tempfile | |
| import uuid | |
| if base64_data.startswith("data:"): | |
| header, data = base64_data.split(",", 1) | |
| mime_type = header.split(":")[1].split(";")[0] | |
| if not file_extension: | |
| file_extension = mimetypes.guess_extension(mime_type) or ".bin" | |
| else: | |
| data = base64_data | |
| if not file_extension: | |
| file_extension = ".bin" | |
| if not output_path: | |
| if session_id: | |
| session_dir = Path(f".gradio/sessions/{session_id}") | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| filename = f"llm_image_{timestamp}_{unique_id}{file_extension}" | |
| output_path = str(session_dir / filename) | |
| else: | |
| temp_fd, output_path = tempfile.mkstemp(suffix=file_extension) | |
| os.close(temp_fd) | |
| decoded_data = base64.b64decode(data) | |
| with open(output_path, "wb") as f: | |
| f.write(decoded_data) | |
| return output_path | |
| def b64_to_temp_file( | |
| b64: str, suffix: str, context: str = "" | |
| ) -> tuple[str, str | None]: | |
| """ | |
| Decode base64 string and write to a temporary file using mkstemp. | |
| Returns (temp_file_path, None) on success or ("", error_message) on failure. | |
| Matches the exact contract and behavior of the previous b64_to_temp_* functions | |
| from platform_record_document/image (DRY extraction). | |
| The 'context' param customizes the error message (e.g. "document", "image"). | |
| Used by media record tools and tests. Temp files are NOT auto-cleaned here; | |
| callers (tools) handle unlink on error or register with agent. | |
| """ | |
| if not isinstance(b64, str) or not b64.strip(): | |
| ctx = context or "content" | |
| return "", f"Invalid base64 {ctx}: empty or non-string input" | |
| try: | |
| data = base64.b64decode(b64, validate=False) | |
| except (binascii.Error, ValueError) as e: | |
| ctx = context or "content" | |
| return "", f"Invalid base64 {ctx}: {e!s}" | |
| suf = ( | |
| suffix | |
| if isinstance(suffix, str) and suffix.startswith(".") | |
| else f".{suffix or 'bin'}" | |
| ) | |
| try: | |
| fd, path = tempfile.mkstemp(suffix=suf) | |
| with open(fd, "wb") as f: # exact match to original platform impl | |
| f.write(data) | |
| return path, None | |
| except OSError as e: | |
| return "", str(e) | |
| def create_gallery_attachment( | |
| image_paths: list[str], captions: list[str] | None = None | |
| ) -> dict[str, Any]: | |
| """ | |
| Create a gallery attachment for multiple images. | |
| Args: | |
| image_paths: List of image file paths | |
| captions: Optional list of captions for each image | |
| Returns: | |
| Gallery attachment dictionary | |
| """ | |
| if not image_paths: | |
| return {"type": "error", "error": "No image paths provided"} | |
| valid_images = [] | |
| for i, path in enumerate(image_paths): | |
| if FileUtils.file_exists(path) and FileUtils.is_image_file(path): | |
| image_info = { | |
| "path": path, | |
| "caption": captions[i] if captions and i < len(captions) else None, | |
| } | |
| valid_images.append(image_info) | |
| if not valid_images: | |
| return {"type": "error", "error": "No valid image files found"} | |
| return { | |
| "type": "gallery_attachment", | |
| "media_type": "gallery", | |
| "images": valid_images, | |
| "count": len(valid_images), | |
| } | |