import gradio as gr import sqlite3 import json import os import uuid from datetime import datetime from typing import Optional, List, Dict, Any from datasets import load_dataset from huggingface_hub import HfApi import hashlib from pathlib import Path import requests from io import BytesIO from flask import Flask, request, jsonify from flask_cors import CORS import threading class HFUUIDSSEFileStorageMCP: def __init__(self): # Lade Secrets aus Space Environment self.hf_token = os.environ.get("HF_TOKEN", "") self.dataset_name = os.environ.get("DATASET_NAME", "mcp-filestorage/default-files") self.org_name = os.environ.get("ORG_NAME", "mcp-filestorage") self.db_path = "user_management.db" self.api = HfApi(token=self.hf_token if self.hf_token else None) self.init_database() self.init_storage() self.user_sessions = {} # Speichert aktive Benutzer-Sessions def init_database(self): """Initialisiert SQLite-Datenbank für Benutzerverwaltung""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Benutzer-Tabelle mit SSE-Endpunkt cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_uuid TEXT UNIQUE NOT NULL, sse_endpoint TEXT UNIQUE, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_access DATETIME DEFAULT CURRENT_TIMESTAMP, file_count INTEGER DEFAULT 0, is_active BOOLEAN DEFAULT 1, storage_type TEXT DEFAULT 'hf_dataset' ) ''') # Datei-Metadaten cursor.execute(''' CREATE TABLE IF NOT EXISTS file_metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_uuid TEXT NOT NULL, file_uuid TEXT UNIQUE NOT NULL, filename TEXT NOT NULL, file_path TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, modified_at DATETIME DEFAULT CURRENT_TIMESTAMP, file_size INTEGER, file_hash TEXT, version INTEGER DEFAULT 1, is_deleted BOOLEAN DEFAULT 0, metadata TEXT, storage_path TEXT, FOREIGN KEY (user_uuid) REFERENCES users(user_uuid) ) ''') conn.commit() conn.close() def init_storage(self): """Initialisiert Speicher (Dataset oder lokal)""" try: if self.dataset_name and self.dataset_name != "mcp-filestorage/default-files": self.dataset = load_dataset(self.dataset_name, split='train', token=self.hf_token) print(f"✅ Dataset geladen: {self.dataset_name}") self.storage_mode = 'dataset' else: self.storage_mode = 'local' self.local_storage_path = Path("user_storage") self.local_storage_path.mkdir(exist_ok=True) print("ℹ️ Verwende lokalen Speicher als Fallback") except Exception as e: print(f"⚠️ Dataset nicht verfügbar, verwende lokalen Speicher: {e}") self.storage_mode = 'local' self.local_storage_path = Path("user_storage") self.local_storage_path.mkdir(exist_ok=True) def register_user(self) -> Dict[str, Any]: """Registriert neuen Benutzer mit UUID und erstellt SSE-Endpunkt""" user_uuid = str(uuid.uuid4()) sse_endpoint = f"user-{user_uuid[:8]}" # Kurze UUID für Endpoint conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' INSERT INTO users (user_uuid, sse_endpoint) VALUES (?, ?) ''', (user_uuid, sse_endpoint)) conn.commit() # Erstelle Benutzerordner self._create_user_folder(user_uuid) # Registriere Benutzer-Session self.user_sessions[user_uuid] = { 'sse_endpoint': sse_endpoint, 'created_at': datetime.now(), 'last_access': datetime.now() } return { 'success': True, 'user_uuid': user_uuid, 'sse_endpoint': sse_endpoint, 'full_sse_url': f"/gradio_api/mcp/user/{user_uuid}/sse", 'message': f'Benutzer {user_uuid} registriert' } except Exception as e: return { 'success': False, 'error': str(e) } finally: conn.close() def get_user_by_uuid(self, user_uuid: str) -> Optional[Dict[str, Any]]: """Holt Benutzer-Informationen anhand der UUID""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' SELECT user_uuid, sse_endpoint, created_at, file_count FROM users WHERE user_uuid = ? AND is_active = 1 ''', (user_uuid,)) result = cursor.fetchone() if result: return { 'user_uuid': result[0], 'sse_endpoint': result[1], 'created_at': result[2], 'file_count': result[3] } return None finally: conn.close() def _create_user_folder(self, user_uuid: str): """Erstellt Benutzerordner je nach Speicher-Modus""" if self.storage_mode == 'dataset': try: folder_path = f"users/{user_uuid}" readme_content = f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n" self.api.upload_file( path_or_fileobj=readme_content.encode(), path_in_repo=f"{folder_path}/README.md", repo_id=self.dataset_name, repo_type="dataset" ) except Exception as e: print(f"⚠️ Konnte Benutzerordner nicht im Dataset erstellen: {e}") else: # Lokaler Speicher user_path = self.local_storage_path / user_uuid user_path.mkdir(exist_ok=True) (user_path / "README.md").write_text(f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n") def create_file(self, user_uuid: str, file_path: str, content: bytes, metadata: Optional[Dict] = None) -> Dict[str, Any]: """Erstellt neue Datei für Benutzer""" # Verifiziere Benutzer user = self.get_user_by_uuid(user_uuid) if not user: return {'success': False, 'error': 'Ungültige Benutzer-UUID'} file_uuid = str(uuid.uuid4()) file_hash = hashlib.sha256(content).hexdigest() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: storage_path = f"users/{user_uuid}/{file_path}" # Speichere Datei if self.storage_mode == 'dataset': try: self.api.upload_file( path_or_fileobj=content, path_in_repo=storage_path, repo_id=self.dataset_name, repo_type="dataset" ) except Exception as e: return {'success': False, 'error': f'Dataset-Upload fehlgeschlagen: {e}'} else: # Lokaler Speicher local_file_path = self.local_storage_path / user_uuid / file_path local_file_path.parent.mkdir(parents=True, exist_ok=True) local_file_path.write_bytes(content) # Speichere Metadaten cursor.execute(''' INSERT INTO file_metadata (user_uuid, file_uuid, filename, file_path, file_size, file_hash, metadata, storage_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (user_uuid, file_uuid, os.path.basename(file_path), file_path, len(content), file_hash, json.dumps(metadata or {}), storage_path)) # Aktualisiere Benutzer-Statistik cursor.execute(''' UPDATE users SET file_count = file_count + 1, last_access = ? WHERE user_uuid = ? ''', (datetime.now(), user_uuid)) conn.commit() # Aktualisiere Session if user_uuid in self.user_sessions: self.user_sessions[user_uuid]['last_access'] = datetime.now() return { 'success': True, 'file_uuid': file_uuid, 'file_path': file_path, 'user_uuid': user_uuid } except Exception as e: return {'success': False, 'error': str(e)} finally: conn.close() def read_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]: """Liest Datei eines Benutzers""" # Verifiziere Benutzer user = self.get_user_by_uuid(user_uuid) if not user: return {'success': False, 'error': 'Ungültige Benutzer-UUID'} conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' SELECT file_uuid, filename, storage_path FROM file_metadata WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 ''', (user_uuid, file_path)) result = cursor.fetchone() if not result: return {'success': False, 'error': 'Datei nicht gefunden'} file_uuid, filename, storage_path = result # Erstelle Download-URL if self.storage_mode == 'dataset': file_url = f"https://huggingface.co/datasets/{self.dataset_name}/resolve/main/{storage_path}" else: # Lokaler Speicher local_file_path = self.local_storage_path / user_uuid / file_path if local_file_path.exists(): import base64 content = local_file_path.read_bytes() file_url = f"data:application/octet-stream;base64,{base64.b64encode(content).decode()}" else: return {'success': False, 'error': 'Datei nicht im lokalen Speicher gefunden'} # Aktualisiere Session if user_uuid in self.user_sessions: self.user_sessions[user_uuid]['last_access'] = datetime.now() return { 'success': True, 'file_uuid': file_uuid, 'file_path': file_path, 'file_url': file_url, 'filename': filename, 'user_uuid': user_uuid } except Exception as e: return {'success': False, 'error': str(e)} finally: conn.close() def update_file(self, user_uuid: str, file_path: str, content: bytes) -> Dict[str, Any]: """Aktualisiert vorhandene Datei""" user = self.get_user_by_uuid(user_uuid) if not user: return {'success': False, 'error': 'Ungültige Benutzer-UUID'} conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' SELECT file_uuid, version FROM file_metadata WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 ''', (user_uuid, file_path)) result = cursor.fetchone() if not result: return {'success': False, 'error': 'Datei nicht gefunden'} file_uuid, current_version = result # Speichere neue Version storage_path = f"users/{user_uuid}/{file_path}" if self.storage_mode == 'dataset': try: self.api.upload_file( path_or_fileobj=content, path_in_repo=storage_path, repo_id=self.dataset_name, repo_type="dataset" ) except Exception as e: return {'success': False, 'error': f'Dataset-Update fehlgeschlagen: {e}'} else: # Lokaler Speicher local_file_path = self.local_storage_path / user_uuid / file_path local_file_path.parent.mkdir(parents=True, exist_ok=True) local_file_path.write_bytes(content) # Aktualisiere Metadaten file_hash = hashlib.sha256(content).hexdigest() cursor.execute(''' UPDATE file_metadata SET modified_at = ?, file_size = ?, file_hash = ?, version = ? WHERE file_uuid = ? ''', (datetime.now(), len(content), file_hash, current_version + 1, file_uuid)) conn.commit() # Aktualisiere Session if user_uuid in self.user_sessions: self.user_sessions[user_uuid]['last_access'] = datetime.now() return { 'success': True, 'file_uuid': file_uuid, 'version': current_version + 1, 'file_path': file_path, 'user_uuid': user_uuid } except Exception as e: return {'success': False, 'error': str(e)} finally: conn.close() def delete_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]: """Löscht Datei (Soft Delete)""" user = self.get_user_by_uuid(user_uuid) if not user: return {'success': False, 'error': 'Ungültige Benutzer-UUID'} conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' UPDATE file_metadata SET is_deleted = 1, modified_at = ? WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0 ''', (datetime.now(), user_uuid, file_path)) if cursor.rowcount == 0: return {'success': False, 'error': 'Datei nicht gefunden'} # Aktualisiere Benutzer-Statistik cursor.execute(''' UPDATE users SET file_count = file_count - 1, last_access = ? WHERE user_uuid = ? ''', (datetime.now(), user_uuid)) conn.commit() # Aktualisiere Session if user_uuid in self.user_sessions: self.user_sessions[user_uuid]['last_access'] = datetime.now() return { 'success': True, 'message': f'Datei {file_path} gelöscht', 'file_path': file_path, 'user_uuid': user_uuid } except Exception as e: return {'success': False, 'error': str(e)} finally: conn.close() def list_files(self, user_uuid: str, folder_path: str = "") -> List[Dict[str, Any]]: """Listet alle Dateien eines Benutzers auf""" user = self.get_user_by_uuid(user_uuid) if not user: return [] conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: if folder_path: cursor.execute(''' SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size FROM file_metadata WHERE user_uuid = ? AND file_path LIKE ? AND is_deleted = 0 ORDER BY file_path ''', (user_uuid, f"{folder_path}%")) else: cursor.execute(''' SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size FROM file_metadata WHERE user_uuid = ? AND is_deleted = 0 ORDER BY file_path ''', (user_uuid,)) results = cursor.fetchall() files = [] for row in results: files.append({ 'file_uuid': row[0], 'filename': row[1], 'file_path': row[2], 'created_at': row[3], 'modified_at': row[4], 'version': row[5], 'file_size': row[6] }) # Aktualisiere Session if user_uuid in self.user_sessions: self.user_sessions[user_uuid]['last_access'] = datetime.now() return files finally: conn.close() def get_user_sse_endpoint(self, user_uuid: str) -> Optional[str]: """Gibt den SSE-Endpunkt für einen Benutzer zurück""" user = self.get_user_by_uuid(user_uuid) if user: return f"/gradio_api/mcp/user/{user_uuid}/sse" return None # Globale Instanz hf_uuid_sse_storage = HFUUIDSSEFileStorageMCP() # ===== BENUTZERSPEZIFISCHE MCP TOOLS ===== @gr.mcp.tool() def register_user_uuid_sse() -> str: """ Registriert einen neuen Benutzer und erstellt einen persönlichen SSE-Endpunkt. Returns: Benutzer-UUID und persönlicher SSE-Endpunkt """ result = hf_uuid_sse_storage.register_user() if result['success']: return f"""✅ Benutzer erfolgreich registriert! 🆔 Ihre UUID: {result['user_uuid']} 🌐 Ihr persönlicher SSE-Endpunkt: {result['full_sse_url']} 📁 Ihr Ordner: users/{result['user_uuid']}/ 🔐 Speichern Sie diese UUID! Sie ist Ihr persönlicher Zugangsschlüssel! 🚀 Verwenden Sie Ihren SSE-Endpunkt für MCP-Integration!""" else: return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" @gr.mcp.tool() def create_user_file_uuid(user_uuid: str, file_path: str, content: str, metadata: Optional[str] = None) -> str: """ Erstellt eine neue Datei für einen Benutzer. Args: user_uuid: Die UUID des Benutzers file_path: Pfad zur Datei (z.B. "dokumente/meindatei.txt") content: Inhalt der Datei als String metadata: Optionale Metadaten als JSON-String Returns: Erfolgsmeldung mit Datei-Details """ content_bytes = content.encode('utf-8') meta_dict = json.loads(metadata) if metadata else None result = hf_uuid_sse_storage.create_file(user_uuid, file_path, content_bytes, meta_dict) if result['success']: return f"""✅ Datei erfolgreich erstellt! 📁 Datei-UUID: {result['file_uuid']} 📄 Pfad: {result['file_path']} 👤 Benutzer: {result['user_uuid']} 🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse""" else: return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" @gr.mcp.tool() def read_user_file_uuid(user_uuid: str, file_path: str) -> str: """ Liest eine Datei eines Benutzers. Args: user_uuid: Die UUID des Benutzers file_path: Pfad zur Datei Returns: Datei-Details und Zugriffs-URL """ result = hf_uuid_sse_storage.read_file(user_uuid, file_path) if result['success']: return f"""📄 Datei erfolgreich gelesen! 🆔 Datei-UUID: {result['file_uuid']} 🔗 Direkt-Link: {result['file_url']} 📁 Dateiname: {result['filename']} 👤 Benutzer: {result['user_uuid']} 🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse""" else: return f"❌ Fehler: {result.get('error', 'Datei nicht gefunden')}" @gr.mcp.tool() def update_user_file_uuid(user_uuid: str, file_path: str, content: str) -> str: """ Aktualisiert eine vorhandene Datei. Args: user_uuid: Die UUID des Benutzers file_path: Pfad zur Datei content: Neuer Inhalt der Datei Returns: Erfolgsmeldung mit Versions-Info """ content_bytes = content.encode('utf-8') result = hf_uuid_sse_storage.update_file(user_uuid, file_path, content_bytes) if result['success']: return f"""✅ Datei erfolgreich aktualisiert! 🆔 Datei-UUID: {result['file_uuid']} 🔢 Neue Version: {result['version']} 📁 Pfad: {result['file_path']} 👤 Benutzer: {result['user_uuid']}""" else: return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" @gr.mcp.tool() def delete_user_file_uuid(user_uuid: str, file_path: str) -> str: """ Löscht eine Datei eines Benutzers. Args: user_uuid: Die UUID des Benutzers file_path: Pfad zur zu löschenden Datei Returns: Erfolgsmeldung """ result = hf_uuid_sse_storage.delete_file(user_uuid, file_path) if result['success']: return f"""🗑️ Datei erfolgreich gelöscht! 📁 Pfad: {result['file_path']} 👤 Benutzer: {result['user_uuid']} 🌐 Ihr SSE-Endpunkt bleibt: /gradio_api/mcp/user/{user_uuid}/sse""" else: return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}" @gr.mcp.tool() def list_user_files_uuid(user_uuid: str, folder_path: str = "") -> str: """ Listet alle Dateien eines Benutzers auf. Args: user_uuid: Die UUID des Benutzers folder_path: Optionaler Ordnerpfad zum Filtern Returns: Liste aller Dateien des Benutzers """ files = hf_uuid_sse_storage.list_files(user_uuid, folder_path) if not files: return "📁 Keine Dateien gefunden!" output = f"📊 Gefunden: {len(files)} Dateien für Benutzer {user_uuid}\n" output += f"🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse\n\n" for file in files: output += f"""📄 {file['filename']} (Version {file['version']}) 📁 Pfad: {file['file_path']} 🆔 UUID: {file['file_uuid']} 📅 Erstellt: {file['created_at']} 📝 Geändert: {file['modified_at']} 📊 Größe: {file['file_size']} Bytes {'-' * 40} """ return output # ===== GRADIO INTERFACE MIT UUID-SSE INTEGRATION ===== # Benutzerdefinierte MCP-Routen erstellen def create_user_mcp_server(user_uuid: str): """Erstellt einen benutzerspezifischen MCP-Server""" @gr.mcp.tool() def user_create_file(file_path: str, content: str, metadata: Optional[str] = None) -> str: """Erstelle Datei für aktuellen Benutzer""" return create_user_file_uuid(user_uuid, file_path, content, metadata) @gr.mcp.tool() def user_read_file(file_path: str) -> str: """Lese Datei für aktuellen Benutzer""" return read_user_file_uuid(user_uuid, file_path) @gr.mcp.tool() def user_update_file(file_path: str, content: str) -> str: """Aktualisiere Datei für aktuellen Benutzer""" return update_user_file_uuid(user_uuid, file_path, content) @gr.mcp.tool() def user_delete_file(file_path: str) -> str: """Lösche Datei für aktuellen Benutzer""" return delete_user_file_uuid(user_uuid, file_path) @gr.mcp.tool() def user_list_files(folder_path: str = "") -> str: """Liste Dateien für aktuellen Benutzer auf""" return list_user_files_uuid(user_uuid, folder_path) @gr.mcp.tool() def user_get_stats() -> str: """Hole Statistiken für aktuellen Benutzer""" return get_user_stats_uuid(user_uuid) # Haupt-Gradio-Interface with gr.Blocks(title="🤗 HF UUID-SSE Filestorage MCP", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🗃️ Hugging Face Dataset Filestorage MCP Server ### 🔗 UUID-basierte persönliche SSE-Endpunkte Jeder Benutzer bekommt einen eigenen SSE-Endpunkt mit seiner UUID - z.B. `/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse` """) with gr.Tab("📝 Registrieren"): with gr.Row(): with gr.Column(): gr.Markdown("### 🎯 Neuen Benutzer mit persönlichem SSE-Endpunkt erstellen") register_btn = gr.Button("🎯 Neuen Benutzer erstellen", variant="primary", size="lg") with gr.Column(): register_output = gr.Textbox(label="Registrierungsergebnis", lines=8, max_lines=10) register_btn.click(register_user_uuid_sse, outputs=register_output) with gr.Tab("📤 Datei erstellen"): with gr.Row(): with gr.Column(): gr.Markdown("### 📄 Neue Datei erstellen") create_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="z.B. 123e4567-e89b-12d3-a456-426614174000") create_file_path = gr.Textbox(label="📁 Dateipfad", placeholder="z.B. meine-dokumente/test.txt") create_content = gr.TextArea(label="📝 Inhalt", placeholder="Datei-Inhalt hier eingeben...", lines=6) create_metadata = gr.Textbox(label="🏷️ Metadaten (JSON, optional)", placeholder='{"autor": "Max", "projekt": "Test"}') create_btn = gr.Button("📄 Datei erstellen", variant="primary") with gr.Column(): create_output = gr.Textbox(label="Ergebnis", lines=8, max_lines=10) create_btn.click( create_user_file_uuid, inputs=[create_user_uuid, create_file_path, create_content, create_metadata], outputs=create_output ) with gr.Tab("📋 Meine Dateien"): with gr.Row(): with gr.Column(): gr.Markdown("### 📋 Alle meine Dateien anzeigen") list_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben") list_folder = gr.Textbox(label="📁 Ordner (optional)", placeholder="z.B. meine-dokumente/") list_btn = gr.Button("📋 Dateien anzeigen", variant="secondary") with gr.Column(): list_output = gr.Textbox(label="Datei-Liste", lines=15, max_lines=20) list_btn.click(list_user_files_uuid, inputs=[list_user_uuid, list_folder], outputs=list_output) with gr.Tab("ℹ️ Mein SSE-Endpunkt"): with gr.Row(): with gr.Column(): gr.Markdown("### 🔗 Meinen persönlichen SSE-Endpunkt finden") sse_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben") sse_btn = gr.Button("🔗 SSE-Endpunkt anzeigen", variant="secondary") with gr.Column(): sse_output = gr.Textbox(label="SSE-Endpunkt-Details", lines=8) def get_sse_endpoint(user_uuid): if not user_uuid: return "❌ Bitte geben Sie Ihre UUID ein!" endpoint = hf_uuid_sse_storage.get_user_sse_endpoint(user_uuid) if endpoint: full_url = f"https://dein-space-name.hf.space{endpoint}" return f"""🔗 Ihr persönlicher SSE-Endpunkt: 🌐 Komplett: {full_url} 📍 Kurz: {endpoint} 📝 MCP-Konfiguration: ```json {{ "mcpServers": {{ "mein-filestorage-{user_uuid[:8]}": {{ "url": "{full_url}" }} }} }} ```""" else: return "❌ Kein SSE-Endpunkt für diese UUID gefunden. Bitte registrieren Sie sich zuerst." sse_btn.click(get_sse_endpoint, inputs=[sse_user_uuid], outputs=sse_output) gr.Markdown(""" --- ### 🚀 UUID-basierte MCP Server Integration **Persönliche SSE-Endpunkte:** `/gradio_api/mcp/user/{Ihre-UUID}/sse` **Verfügbare MCP Tools:** - 📝 `register_user_uuid_sse` - Neuen Benutzer mit SSE-Endpunkt registrieren - 📤 `create_user_file_uuid` - Datei für Benutzer erstellen - 📖 `read_user_file_uuid` - Datei lesen und Download-Link erhalten - ✏️ `update_user_file_uuid` - Datei-Inhalt aktualisieren - 🗑️ `delete_user_file_uuid` - Datei löschen - 📋 `list_user_files_uuid` - Alle Dateien eines Benutzers auflisten **Beispiel MCP Konfiguration für Benutzer:** ```json { "mcpServers": { "mein-filestorage-123e4567": { "url": "https://dein-space-name.hf.space/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse" } } } ``` 🔐 **Jeder Benutzer hat seinen eigenen MCP-Server-Endpunkt!** """) # Benutzerdefinierte SSE-Routen handler def user_sse_handler(user_uuid: str): """Handler für benutzerspezifische SSE-Endpunkte""" user = hf_uuid_sse_storage.get_user_by_uuid(user_uuid) if not user: return {"error": "Ungültige Benutzer-UUID"}, 404 # Erstelle benutzerspezifischen MCP-Server create_user_mcp_server(user_uuid) # Hier würde die SSE-Verbindung etabliert return {"status": "connected", "user_uuid": user_uuid, "endpoint": f"/user/{user_uuid}/sse"} # MCP Server mit benutzerdefinierten Routen if __name__ == "__main__": # Standard-MCP-Server für allgemeine Funktionen demo.launch( mcp_server=True, server_name="0.0.0.0", server_port=7860, share=False, # Benutzerdefinierte Routen werden über Gradio's MCP-Integration gehandhabt routes=[ { "path": "/gradio_api/mcp/user/{user_uuid}/sse", "method": "GET", "handler": user_sse_handler } ] )