| """ |
| Utility functions for Universal MCP Client |
| """ |
| import re |
| import logging |
| from typing import List, Dict, Any, Optional |
| from pathlib import Path |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def validate_huggingface_space_name(space_name: str) -> bool: |
| """ |
| Validate HuggingFace space name format |
| Expected format: username/space-name |
| """ |
| if not space_name or not isinstance(space_name, str): |
| return False |
| |
| |
| if "/" not in space_name: |
| return False |
| |
| parts = space_name.split("/") |
| if len(parts) != 2: |
| return False |
| |
| username, space_name_part = parts |
| |
| |
| |
| username_pattern = r'^[a-zA-Z0-9\-_]+$' |
| space_pattern = r'^[a-zA-Z0-9\-_]+$' |
| |
| return bool(re.match(username_pattern, username) and re.match(space_pattern, space_name_part)) |
|
|
| def sanitize_server_name(name: str) -> str: |
| """ |
| Sanitize server name for use as MCP server identifier |
| """ |
| if not name: |
| return "unnamed_server" |
| |
| |
| sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower()) |
| |
| |
| sanitized = re.sub(r'_+', '_', sanitized) |
| |
| |
| sanitized = sanitized.strip('_') |
| |
| return sanitized or "unnamed_server" |
|
|
| def format_file_size(size_bytes: int) -> str: |
| """ |
| Format file size in human readable format |
| """ |
| if size_bytes == 0: |
| return "0 B" |
| |
| size_names = ["B", "KB", "MB", "GB", "TB"] |
| i = 0 |
| while size_bytes >= 1024 and i < len(size_names) - 1: |
| size_bytes /= 1024.0 |
| i += 1 |
| |
| return f"{size_bytes:.1f} {size_names[i]}" |
|
|
| def get_file_info(file_path: str) -> Dict[str, Any]: |
| """ |
| Get information about a file |
| """ |
| try: |
| path = Path(file_path) |
| if not path.exists(): |
| return {"error": "File not found"} |
| |
| stat = path.stat() |
| |
| return { |
| "name": path.name, |
| "size": stat.st_size, |
| "size_formatted": format_file_size(stat.st_size), |
| "extension": path.suffix.lower(), |
| "exists": True |
| } |
| except Exception as e: |
| logger.error(f"Error getting file info for {file_path}: {e}") |
| return {"error": str(e)} |
|
|
| def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: |
| """ |
| Truncate text to a maximum length with suffix |
| """ |
| if not text or len(text) <= max_length: |
| return text |
| |
| return text[:max_length - len(suffix)] + suffix |
|
|
| def format_tool_description(tool_name: str, description: str, max_desc_length: int = 150) -> str: |
| """ |
| Format tool description for display |
| """ |
| formatted_name = tool_name.replace("_", " ").title() |
| truncated_desc = truncate_text(description, max_desc_length) |
| |
| return f"**{formatted_name}**: {truncated_desc}" |
|
|
| def extract_media_type_from_url(url: str) -> Optional[str]: |
| """ |
| Extract media type from URL based on file extension |
| """ |
| if not url: |
| return None |
| |
| |
| if url.startswith('data:'): |
| if 'image/' in url: |
| return 'image' |
| elif 'audio/' in url: |
| return 'audio' |
| elif 'video/' in url: |
| return 'video' |
| return None |
| |
| |
| url_lower = url.lower() |
| |
| if any(ext in url_lower for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.svg']): |
| return 'image' |
| elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']): |
| return 'audio' |
| elif any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm']): |
| return 'video' |
| |
| return None |
|
|
| def clean_html_for_display(html_text: str) -> str: |
| """ |
| Clean HTML text for safe display in Gradio |
| """ |
| if not html_text: |
| return "" |
| |
| |
| html_text = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.IGNORECASE | re.DOTALL) |
| |
| |
| html_text = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_text, flags=re.IGNORECASE) |
| |
| return html_text |
|
|
| def generate_accordion_html(title: str, content: str, is_open: bool = False) -> str: |
| """ |
| Generate HTML for a collapsible accordion section |
| """ |
| open_attr = "open" if is_open else "" |
| |
| return f""" |
| <details {open_attr} style="margin-bottom: 10px;"> |
| <summary style="cursor: pointer; padding: 8px; background: #e9ecef; border-radius: 4px;"> |
| <strong>{title}</strong> |
| </summary> |
| <div style="padding: 10px; border-left: 3px solid #007bff; margin-left: 10px; margin-top: 5px;"> |
| {content} |
| </div> |
| </details> |
| """ |
|
|
| class EventTracker: |
| """Simple event tracking for debugging and monitoring""" |
| |
| def __init__(self): |
| self.events: List[Dict[str, Any]] = [] |
| self.max_events = 100 |
| |
| def track_event(self, event_type: str, data: Dict[str, Any] = None): |
| """Track an event""" |
| import datetime |
| |
| event = { |
| "timestamp": datetime.datetime.now().isoformat(), |
| "type": event_type, |
| "data": data or {} |
| } |
| |
| self.events.append(event) |
| |
| |
| if len(self.events) > self.max_events: |
| self.events = self.events[-self.max_events:] |
| |
| logger.debug(f"Event tracked: {event_type}") |
| |
| def get_recent_events(self, count: int = 10) -> List[Dict[str, Any]]: |
| """Get recent events""" |
| return self.events[-count:] |
| |
| def clear_events(self): |
| """Clear all tracked events""" |
| self.events.clear() |
|
|
| |
| event_tracker = EventTracker() |
|
|
|
|