Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import sqlite3 | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| from datasets import load_dataset | |
| from huggingface_hub import HfApi | |
| import hashlib | |
| from pathlib import Path | |
| import requests | |
| from io import BytesIO | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import threading | |
| class HFUUIDSSEFileStorageMCP: | |
| def __init__(self): | |
| # Lade Secrets aus Space Environment | |
| self.hf_token = os.environ.get("HF_TOKEN", "") | |
| self.dataset_name = os.environ.get("DATASET_NAME", "mcp-filestorage/default-files") | |
| self.org_name = os.environ.get("ORG_NAME", "mcp-filestorage") | |
| self.db_path = "user_management.db" | |
| self.api = HfApi(token=self.hf_token if self.hf_token else None) | |
| self.init_database() | |
| self.init_storage() | |
| self.user_sessions = {} # Speichert aktive Benutzer-Sessions | |
| def init_database(self): | |
| """Initialisiert SQLite-Datenbank für Benutzerverwaltung""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Benutzer-Tabelle mit SSE-Endpunkt | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_uuid TEXT UNIQUE NOT NULL, | |
| sse_endpoint TEXT UNIQUE, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| last_access DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| file_count INTEGER DEFAULT 0, | |
| is_active BOOLEAN DEFAULT 1, | |
| storage_type TEXT DEFAULT 'hf_dataset' | |
| ) | |
| ''') | |
| # Datei-Metadaten | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS file_metadata ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_uuid TEXT NOT NULL, | |
| file_uuid TEXT UNIQUE NOT NULL, | |
| filename TEXT NOT NULL, | |
| file_path TEXT NOT NULL, | |
| created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| modified_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| file_size INTEGER, | |
| file_hash TEXT, | |
| version INTEGER DEFAULT 1, | |
| is_deleted BOOLEAN DEFAULT 0, | |
| metadata TEXT, | |
| storage_path TEXT, | |
| FOREIGN KEY (user_uuid) REFERENCES users(user_uuid) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def init_storage(self): | |
| """Initialisiert Speicher (Dataset oder lokal)""" | |
| try: | |
| if self.dataset_name and self.dataset_name != "mcp-filestorage/default-files": | |
| self.dataset = load_dataset(self.dataset_name, split='train', token=self.hf_token) | |
| print(f"✅ Dataset geladen: {self.dataset_name}") | |
| self.storage_mode = 'dataset' | |
| else: | |
| self.storage_mode = 'local' | |
| self.local_storage_path = Path("user_storage") | |
| self.local_storage_path.mkdir(exist_ok=True) | |
| print("ℹ️ Verwende lokalen Speicher als Fallback") | |
| except Exception as e: | |
| print(f"⚠️ Dataset nicht verfügbar, verwende lokalen Speicher: {e}") | |
| self.storage_mode = 'local' | |
| self.local_storage_path = Path("user_storage") | |
| self.local_storage_path.mkdir(exist_ok=True) | |
| def register_user(self) -> Dict[str, Any]: | |
| """Registriert neuen Benutzer mit UUID und erstellt SSE-Endpunkt""" | |
| user_uuid = str(uuid.uuid4()) | |
| sse_endpoint = f"user-{user_uuid[:8]}" # Kurze UUID für Endpoint | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO users (user_uuid, sse_endpoint) VALUES (?, ?) | |
| ''', (user_uuid, sse_endpoint)) | |
| conn.commit() | |
| # Erstelle Benutzerordner | |
| self._create_user_folder(user_uuid) | |
| # Registriere Benutzer-Session | |
| self.user_sessions[user_uuid] = { | |
| 'sse_endpoint': sse_endpoint, | |
| 'created_at': datetime.now(), | |
| 'last_access': datetime.now() | |
| } | |
| return { | |
| 'success': True, | |
| 'user_uuid': user_uuid, | |
| 'sse_endpoint': sse_endpoint, | |
| 'full_sse_url': f"/gradio_api/mcp/user/{user_uuid}/sse", | |
| 'message': f'Benutzer {user_uuid} registriert' | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| finally: | |
| conn.close() | |
| def get_user_by_uuid(self, user_uuid: str) -> Optional[Dict[str, Any]]: | |
| """Holt Benutzer-Informationen anhand der UUID""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| SELECT user_uuid, sse_endpoint, created_at, file_count | |
| FROM users WHERE user_uuid = ? AND is_active = 1 | |
| ''', (user_uuid,)) | |
| result = cursor.fetchone() | |
| if result: | |
| return { | |
| 'user_uuid': result[0], | |
| 'sse_endpoint': result[1], | |
| 'created_at': result[2], | |
| 'file_count': result[3] | |
| } | |
| return None | |
| finally: | |
| conn.close() | |
| def _create_user_folder(self, user_uuid: str): | |
| """Erstellt Benutzerordner je nach Speicher-Modus""" | |
| if self.storage_mode == 'dataset': | |
| try: | |
| folder_path = f"users/{user_uuid}" | |
| readme_content = f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n" | |
| self.api.upload_file( | |
| path_or_fileobj=readme_content.encode(), | |
| path_in_repo=f"{folder_path}/README.md", | |
| repo_id=self.dataset_name, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Konnte Benutzerordner nicht im Dataset erstellen: {e}") | |
| else: | |
| # Lokaler Speicher | |
| user_path = self.local_storage_path / user_uuid | |
| user_path.mkdir(exist_ok=True) | |
| (user_path / "README.md").write_text(f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n") | |
| def create_file(self, user_uuid: str, file_path: str, content: bytes, | |
| metadata: Optional[Dict] = None) -> Dict[str, Any]: | |
| """Erstellt neue Datei für Benutzer""" | |
| # Verifiziere Benutzer | |
| user = self.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return {'success': False, 'error': 'Ungültige Benutzer-UUID'} | |
| file_uuid = str(uuid.uuid4()) | |
| file_hash = hashlib.sha256(content).hexdigest() | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| storage_path = f"users/{user_uuid}/{file_path}" | |
| # Speichere Datei | |
| if self.storage_mode == 'dataset': | |
| try: | |
| self.api.upload_file( | |
| path_or_fileobj=content, | |
| path_in_repo=storage_path, | |
| repo_id=self.dataset_name, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| return {'success': False, 'error': f'Dataset-Upload fehlgeschlagen: {e}'} | |
| else: | |
| # Lokaler Speicher | |
| local_file_path = self.local_storage_path / user_uuid / file_path | |
| local_file_path.parent.mkdir(parents=True, exist_ok=True) | |
| local_file_path.write_bytes(content) | |
| # Speichere Metadaten | |
| cursor.execute(''' | |
| INSERT INTO file_metadata | |
| (user_uuid, file_uuid, filename, file_path, file_size, file_hash, metadata, storage_path) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', (user_uuid, file_uuid, os.path.basename(file_path), file_path, | |
| len(content), file_hash, json.dumps(metadata or {}), storage_path)) | |
| # Aktualisiere Benutzer-Statistik | |
| cursor.execute(''' | |
| UPDATE users SET file_count = file_count + 1, last_access = ? | |
| WHERE user_uuid = ? | |
| ''', (datetime.now(), user_uuid)) | |
| conn.commit() | |
| # Aktualisiere Session | |
| if user_uuid in self.user_sessions: | |
| self.user_sessions[user_uuid]['last_access'] = datetime.now() | |
| return { | |
| 'success': True, | |
| 'file_uuid': file_uuid, | |
| 'file_path': file_path, | |
| 'user_uuid': user_uuid | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| finally: | |
| conn.close() | |
| def read_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]: | |
| """Liest Datei eines Benutzers""" | |
| # Verifiziere Benutzer | |
| user = self.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return {'success': False, 'error': 'Ungültige Benutzer-UUID'} | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| SELECT file_uuid, filename, storage_path FROM file_metadata | |
| WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 | |
| ''', (user_uuid, file_path)) | |
| result = cursor.fetchone() | |
| if not result: | |
| return {'success': False, 'error': 'Datei nicht gefunden'} | |
| file_uuid, filename, storage_path = result | |
| # Erstelle Download-URL | |
| if self.storage_mode == 'dataset': | |
| file_url = f"https://huggingface.co/datasets/{self.dataset_name}/resolve/main/{storage_path}" | |
| else: | |
| # Lokaler Speicher | |
| local_file_path = self.local_storage_path / user_uuid / file_path | |
| if local_file_path.exists(): | |
| import base64 | |
| content = local_file_path.read_bytes() | |
| file_url = f"data:application/octet-stream;base64,{base64.b64encode(content).decode()}" | |
| else: | |
| return {'success': False, 'error': 'Datei nicht im lokalen Speicher gefunden'} | |
| # Aktualisiere Session | |
| if user_uuid in self.user_sessions: | |
| self.user_sessions[user_uuid]['last_access'] = datetime.now() | |
| return { | |
| 'success': True, | |
| 'file_uuid': file_uuid, | |
| 'file_path': file_path, | |
| 'file_url': file_url, | |
| 'filename': filename, | |
| 'user_uuid': user_uuid | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| finally: | |
| conn.close() | |
| def update_file(self, user_uuid: str, file_path: str, content: bytes) -> Dict[str, Any]: | |
| """Aktualisiert vorhandene Datei""" | |
| user = self.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return {'success': False, 'error': 'Ungültige Benutzer-UUID'} | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| SELECT file_uuid, version FROM file_metadata | |
| WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 | |
| ''', (user_uuid, file_path)) | |
| result = cursor.fetchone() | |
| if not result: | |
| return {'success': False, 'error': 'Datei nicht gefunden'} | |
| file_uuid, current_version = result | |
| # Speichere neue Version | |
| storage_path = f"users/{user_uuid}/{file_path}" | |
| if self.storage_mode == 'dataset': | |
| try: | |
| self.api.upload_file( | |
| path_or_fileobj=content, | |
| path_in_repo=storage_path, | |
| repo_id=self.dataset_name, | |
| repo_type="dataset" | |
| ) | |
| except Exception as e: | |
| return {'success': False, 'error': f'Dataset-Update fehlgeschlagen: {e}'} | |
| else: | |
| # Lokaler Speicher | |
| local_file_path = self.local_storage_path / user_uuid / file_path | |
| local_file_path.parent.mkdir(parents=True, exist_ok=True) | |
| local_file_path.write_bytes(content) | |
| # Aktualisiere Metadaten | |
| file_hash = hashlib.sha256(content).hexdigest() | |
| cursor.execute(''' | |
| UPDATE file_metadata | |
| SET modified_at = ?, file_size = ?, file_hash = ?, version = ? | |
| WHERE file_uuid = ? | |
| ''', (datetime.now(), len(content), file_hash, current_version + 1, file_uuid)) | |
| conn.commit() | |
| # Aktualisiere Session | |
| if user_uuid in self.user_sessions: | |
| self.user_sessions[user_uuid]['last_access'] = datetime.now() | |
| return { | |
| 'success': True, | |
| 'file_uuid': file_uuid, | |
| 'version': current_version + 1, | |
| 'file_path': file_path, | |
| 'user_uuid': user_uuid | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| finally: | |
| conn.close() | |
| def delete_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]: | |
| """Löscht Datei (Soft Delete)""" | |
| user = self.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return {'success': False, 'error': 'Ungültige Benutzer-UUID'} | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| UPDATE file_metadata | |
| SET is_deleted = 1, modified_at = ? | |
| WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 | |
| ''', (datetime.now(), user_uuid, file_path)) | |
| if cursor.rowcount == 0: | |
| return {'success': False, 'error': 'Datei nicht gefunden'} | |
| # Aktualisiere Benutzer-Statistik | |
| cursor.execute(''' | |
| UPDATE users SET file_count = file_count - 1, last_access = ? | |
| WHERE user_uuid = ? | |
| ''', (datetime.now(), user_uuid)) | |
| conn.commit() | |
| # Aktualisiere Session | |
| if user_uuid in self.user_sessions: | |
| self.user_sessions[user_uuid]['last_access'] = datetime.now() | |
| return { | |
| 'success': True, | |
| 'message': f'Datei {file_path} gelöscht', | |
| 'file_path': file_path, | |
| 'user_uuid': user_uuid | |
| } | |
| except Exception as e: | |
| return {'success': False, 'error': str(e)} | |
| finally: | |
| conn.close() | |
| def list_files(self, user_uuid: str, folder_path: str = "") -> List[Dict[str, Any]]: | |
| """Listet alle Dateien eines Benutzers auf""" | |
| user = self.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return [] | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| if folder_path: | |
| cursor.execute(''' | |
| SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size | |
| FROM file_metadata | |
| WHERE user_uuid = ? AND file_path LIKE ? AND is_deleted = 0 | |
| ORDER BY file_path | |
| ''', (user_uuid, f"{folder_path}%")) | |
| else: | |
| cursor.execute(''' | |
| SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size | |
| FROM file_metadata | |
| WHERE user_uuid = ? AND is_deleted = 0 | |
| ORDER BY file_path | |
| ''', (user_uuid,)) | |
| results = cursor.fetchall() | |
| files = [] | |
| for row in results: | |
| files.append({ | |
| 'file_uuid': row[0], | |
| 'filename': row[1], | |
| 'file_path': row[2], | |
| 'created_at': row[3], | |
| 'modified_at': row[4], | |
| 'version': row[5], | |
| 'file_size': row[6] | |
| }) | |
| # Aktualisiere Session | |
| if user_uuid in self.user_sessions: | |
| self.user_sessions[user_uuid]['last_access'] = datetime.now() | |
| return files | |
| finally: | |
| conn.close() | |
| def get_user_sse_endpoint(self, user_uuid: str) -> Optional[str]: | |
| """Gibt den SSE-Endpunkt für einen Benutzer zurück""" | |
| user = self.get_user_by_uuid(user_uuid) | |
| if user: | |
| return f"/gradio_api/mcp/user/{user_uuid}/sse" | |
| return None | |
| # Globale Instanz | |
| hf_uuid_sse_storage = HFUUIDSSEFileStorageMCP() | |
| # ===== BENUTZERSPEZIFISCHE MCP TOOLS ===== | |
| def register_user_uuid_sse() -> str: | |
| """ | |
| Registriert einen neuen Benutzer und erstellt einen persönlichen SSE-Endpunkt. | |
| Returns: | |
| Benutzer-UUID und persönlicher SSE-Endpunkt | |
| """ | |
| result = hf_uuid_sse_storage.register_user() | |
| if result['success']: | |
| return f"""✅ Benutzer erfolgreich registriert! | |
| 🆔 Ihre UUID: {result['user_uuid']} | |
| 🌐 Ihr persönlicher SSE-Endpunkt: {result['full_sse_url']} | |
| 📁 Ihr Ordner: users/{result['user_uuid']}/ | |
| 🔐 Speichern Sie diese UUID! Sie ist Ihr persönlicher Zugangsschlüssel! | |
| 🚀 Verwenden Sie Ihren SSE-Endpunkt für MCP-Integration!""" | |
| else: | |
| return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" | |
| def create_user_file_uuid(user_uuid: str, file_path: str, content: str, | |
| metadata: Optional[str] = None) -> str: | |
| """ | |
| Erstellt eine neue Datei für einen Benutzer. | |
| Args: | |
| user_uuid: Die UUID des Benutzers | |
| file_path: Pfad zur Datei (z.B. "dokumente/meindatei.txt") | |
| content: Inhalt der Datei als String | |
| metadata: Optionale Metadaten als JSON-String | |
| Returns: | |
| Erfolgsmeldung mit Datei-Details | |
| """ | |
| content_bytes = content.encode('utf-8') | |
| meta_dict = json.loads(metadata) if metadata else None | |
| result = hf_uuid_sse_storage.create_file(user_uuid, file_path, content_bytes, meta_dict) | |
| if result['success']: | |
| return f"""✅ Datei erfolgreich erstellt! | |
| 📁 Datei-UUID: {result['file_uuid']} | |
| 📄 Pfad: {result['file_path']} | |
| 👤 Benutzer: {result['user_uuid']} | |
| 🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse""" | |
| else: | |
| return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" | |
| def read_user_file_uuid(user_uuid: str, file_path: str) -> str: | |
| """ | |
| Liest eine Datei eines Benutzers. | |
| Args: | |
| user_uuid: Die UUID des Benutzers | |
| file_path: Pfad zur Datei | |
| Returns: | |
| Datei-Details und Zugriffs-URL | |
| """ | |
| result = hf_uuid_sse_storage.read_file(user_uuid, file_path) | |
| if result['success']: | |
| return f"""📄 Datei erfolgreich gelesen! | |
| 🆔 Datei-UUID: {result['file_uuid']} | |
| 🔗 Direkt-Link: {result['file_url']} | |
| 📁 Dateiname: {result['filename']} | |
| 👤 Benutzer: {result['user_uuid']} | |
| 🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse""" | |
| else: | |
| return f"❌ Fehler: {result.get('error', 'Datei nicht gefunden')}" | |
| def update_user_file_uuid(user_uuid: str, file_path: str, content: str) -> str: | |
| """ | |
| Aktualisiert eine vorhandene Datei. | |
| Args: | |
| user_uuid: Die UUID des Benutzers | |
| file_path: Pfad zur Datei | |
| content: Neuer Inhalt der Datei | |
| Returns: | |
| Erfolgsmeldung mit Versions-Info | |
| """ | |
| content_bytes = content.encode('utf-8') | |
| result = hf_uuid_sse_storage.update_file(user_uuid, file_path, content_bytes) | |
| if result['success']: | |
| return f"""✅ Datei erfolgreich aktualisiert! | |
| 🆔 Datei-UUID: {result['file_uuid']} | |
| 🔢 Neue Version: {result['version']} | |
| 📁 Pfad: {result['file_path']} | |
| 👤 Benutzer: {result['user_uuid']}""" | |
| else: | |
| return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" | |
| def delete_user_file_uuid(user_uuid: str, file_path: str) -> str: | |
| """ | |
| Löscht eine Datei eines Benutzers. | |
| Args: | |
| user_uuid: Die UUID des Benutzers | |
| file_path: Pfad zur zu löschenden Datei | |
| Returns: | |
| Erfolgsmeldung | |
| """ | |
| result = hf_uuid_sse_storage.delete_file(user_uuid, file_path) | |
| if result['success']: | |
| return f"""🗑️ Datei erfolgreich gelöscht! | |
| 📁 Pfad: {result['file_path']} | |
| 👤 Benutzer: {result['user_uuid']} | |
| 🌐 Ihr SSE-Endpunkt bleibt: /gradio_api/mcp/user/{user_uuid}/sse""" | |
| else: | |
| return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" | |
| def list_user_files_uuid(user_uuid: str, folder_path: str = "") -> str: | |
| """ | |
| Listet alle Dateien eines Benutzers auf. | |
| Args: | |
| user_uuid: Die UUID des Benutzers | |
| folder_path: Optionaler Ordnerpfad zum Filtern | |
| Returns: | |
| Liste aller Dateien des Benutzers | |
| """ | |
| files = hf_uuid_sse_storage.list_files(user_uuid, folder_path) | |
| if not files: | |
| return "📁 Keine Dateien gefunden!" | |
| output = f"📊 Gefunden: {len(files)} Dateien für Benutzer {user_uuid}\n" | |
| output += f"🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse\n\n" | |
| for file in files: | |
| output += f"""📄 {file['filename']} (Version {file['version']}) | |
| 📁 Pfad: {file['file_path']} | |
| 🆔 UUID: {file['file_uuid']} | |
| 📅 Erstellt: {file['created_at']} | |
| 📝 Geändert: {file['modified_at']} | |
| 📊 Größe: {file['file_size']} Bytes | |
| {'-' * 40} | |
| """ | |
| return output | |
| # ===== GRADIO INTERFACE MIT UUID-SSE INTEGRATION ===== | |
| # Benutzerdefinierte MCP-Routen erstellen | |
| def create_user_mcp_server(user_uuid: str): | |
| """Erstellt einen benutzerspezifischen MCP-Server""" | |
| def user_create_file(file_path: str, content: str, metadata: Optional[str] = None) -> str: | |
| """Erstelle Datei für aktuellen Benutzer""" | |
| return create_user_file_uuid(user_uuid, file_path, content, metadata) | |
| def user_read_file(file_path: str) -> str: | |
| """Lese Datei für aktuellen Benutzer""" | |
| return read_user_file_uuid(user_uuid, file_path) | |
| def user_update_file(file_path: str, content: str) -> str: | |
| """Aktualisiere Datei für aktuellen Benutzer""" | |
| return update_user_file_uuid(user_uuid, file_path, content) | |
| def user_delete_file(file_path: str) -> str: | |
| """Lösche Datei für aktuellen Benutzer""" | |
| return delete_user_file_uuid(user_uuid, file_path) | |
| def user_list_files(folder_path: str = "") -> str: | |
| """Liste Dateien für aktuellen Benutzer auf""" | |
| return list_user_files_uuid(user_uuid, folder_path) | |
| def user_get_stats() -> str: | |
| """Hole Statistiken für aktuellen Benutzer""" | |
| return get_user_stats_uuid(user_uuid) | |
| # Haupt-Gradio-Interface | |
| with gr.Blocks(title="🤗 HF UUID-SSE Filestorage MCP", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🗃️ Hugging Face Dataset Filestorage MCP Server | |
| ### 🔗 UUID-basierte persönliche SSE-Endpunkte | |
| Jeder Benutzer bekommt einen eigenen SSE-Endpunkt mit seiner UUID - z.B. `/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse` | |
| """) | |
| with gr.Tab("📝 Registrieren"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 🎯 Neuen Benutzer mit persönlichem SSE-Endpunkt erstellen") | |
| register_btn = gr.Button("🎯 Neuen Benutzer erstellen", variant="primary", size="lg") | |
| with gr.Column(): | |
| register_output = gr.Textbox(label="Registrierungsergebnis", lines=8, max_lines=10) | |
| register_btn.click(register_user_uuid_sse, outputs=register_output) | |
| with gr.Tab("📤 Datei erstellen"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 📄 Neue Datei erstellen") | |
| create_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="z.B. 123e4567-e89b-12d3-a456-426614174000") | |
| create_file_path = gr.Textbox(label="📁 Dateipfad", placeholder="z.B. meine-dokumente/test.txt") | |
| create_content = gr.TextArea(label="📝 Inhalt", placeholder="Datei-Inhalt hier eingeben...", lines=6) | |
| create_metadata = gr.Textbox(label="🏷️ Metadaten (JSON, optional)", placeholder='{"autor": "Max", "projekt": "Test"}') | |
| create_btn = gr.Button("📄 Datei erstellen", variant="primary") | |
| with gr.Column(): | |
| create_output = gr.Textbox(label="Ergebnis", lines=8, max_lines=10) | |
| create_btn.click( | |
| create_user_file_uuid, | |
| inputs=[create_user_uuid, create_file_path, create_content, create_metadata], | |
| outputs=create_output | |
| ) | |
| with gr.Tab("📋 Meine Dateien"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 📋 Alle meine Dateien anzeigen") | |
| list_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben") | |
| list_folder = gr.Textbox(label="📁 Ordner (optional)", placeholder="z.B. meine-dokumente/") | |
| list_btn = gr.Button("📋 Dateien anzeigen", variant="secondary") | |
| with gr.Column(): | |
| list_output = gr.Textbox(label="Datei-Liste", lines=15, max_lines=20) | |
| list_btn.click(list_user_files_uuid, inputs=[list_user_uuid, list_folder], outputs=list_output) | |
| with gr.Tab("ℹ️ Mein SSE-Endpunkt"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 🔗 Meinen persönlichen SSE-Endpunkt finden") | |
| sse_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben") | |
| sse_btn = gr.Button("🔗 SSE-Endpunkt anzeigen", variant="secondary") | |
| with gr.Column(): | |
| sse_output = gr.Textbox(label="SSE-Endpunkt-Details", lines=8) | |
| def get_sse_endpoint(user_uuid): | |
| if not user_uuid: | |
| return "❌ Bitte geben Sie Ihre UUID ein!" | |
| endpoint = hf_uuid_sse_storage.get_user_sse_endpoint(user_uuid) | |
| if endpoint: | |
| full_url = f"https://dein-space-name.hf.space{endpoint}" | |
| return f"""🔗 Ihr persönlicher SSE-Endpunkt: | |
| 🌐 Komplett: {full_url} | |
| 📍 Kurz: {endpoint} | |
| 📝 MCP-Konfiguration: | |
| ```json | |
| {{ | |
| "mcpServers": {{ | |
| "mein-filestorage-{user_uuid[:8]}": {{ | |
| "url": "{full_url}" | |
| }} | |
| }} | |
| }} | |
| ```""" | |
| else: | |
| return "❌ Kein SSE-Endpunkt für diese UUID gefunden. Bitte registrieren Sie sich zuerst." | |
| sse_btn.click(get_sse_endpoint, inputs=[sse_user_uuid], outputs=sse_output) | |
| gr.Markdown(""" | |
| --- | |
| ### 🚀 UUID-basierte MCP Server Integration | |
| **Persönliche SSE-Endpunkte:** `/gradio_api/mcp/user/{Ihre-UUID}/sse` | |
| **Verfügbare MCP Tools:** | |
| - 📝 `register_user_uuid_sse` - Neuen Benutzer mit SSE-Endpunkt registrieren | |
| - 📤 `create_user_file_uuid` - Datei für Benutzer erstellen | |
| - 📖 `read_user_file_uuid` - Datei lesen und Download-Link erhalten | |
| - ✏️ `update_user_file_uuid` - Datei-Inhalt aktualisieren | |
| - 🗑️ `delete_user_file_uuid` - Datei löschen | |
| - 📋 `list_user_files_uuid` - Alle Dateien eines Benutzers auflisten | |
| **Beispiel MCP Konfiguration für Benutzer:** | |
| ```json | |
| { | |
| "mcpServers": { | |
| "mein-filestorage-123e4567": { | |
| "url": "https://dein-space-name.hf.space/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse" | |
| } | |
| } | |
| } | |
| ``` | |
| 🔐 **Jeder Benutzer hat seinen eigenen MCP-Server-Endpunkt!** | |
| """) | |
| # Benutzerdefinierte SSE-Routen handler | |
| def user_sse_handler(user_uuid: str): | |
| """Handler für benutzerspezifische SSE-Endpunkte""" | |
| user = hf_uuid_sse_storage.get_user_by_uuid(user_uuid) | |
| if not user: | |
| return {"error": "Ungültige Benutzer-UUID"}, 404 | |
| # Erstelle benutzerspezifischen MCP-Server | |
| create_user_mcp_server(user_uuid) | |
| # Hier würde die SSE-Verbindung etabliert | |
| return {"status": "connected", "user_uuid": user_uuid, "endpoint": f"/user/{user_uuid}/sse"} | |
| # MCP Server mit benutzerdefinierten Routen | |
| if __name__ == "__main__": | |
| # Standard-MCP-Server für allgemeine Funktionen | |
| demo.launch( | |
| mcp_server=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| # Benutzerdefinierte Routen werden über Gradio's MCP-Integration gehandhabt | |
| routes=[ | |
| { | |
| "path": "/gradio_api/mcp/user/{user_uuid}/sse", | |
| "method": "GET", | |
| "handler": user_sse_handler | |
| } | |
| ] | |
| ) |