FilestoreMCP / app.py
TNTFLO's picture
Update app.py (#1)
b5fdcfe verified
import gradio as gr
import sqlite3
import json
import os
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any
from datasets import load_dataset
from huggingface_hub import HfApi
import hashlib
from pathlib import Path
import requests
from io import BytesIO
from flask import Flask, request, jsonify
from flask_cors import CORS
import threading
class HFUUIDSSEFileStorageMCP:
def __init__(self):
# Lade Secrets aus Space Environment
self.hf_token = os.environ.get("HF_TOKEN", "")
self.dataset_name = os.environ.get("DATASET_NAME", "mcp-filestorage/default-files")
self.org_name = os.environ.get("ORG_NAME", "mcp-filestorage")
self.db_path = "user_management.db"
self.api = HfApi(token=self.hf_token if self.hf_token else None)
self.init_database()
self.init_storage()
self.user_sessions = {} # Speichert aktive Benutzer-Sessions
def init_database(self):
"""Initialisiert SQLite-Datenbank für Benutzerverwaltung"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Benutzer-Tabelle mit SSE-Endpunkt
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_uuid TEXT UNIQUE NOT NULL,
sse_endpoint TEXT UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_access DATETIME DEFAULT CURRENT_TIMESTAMP,
file_count INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT 1,
storage_type TEXT DEFAULT 'hf_dataset'
)
''')
# Datei-Metadaten
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_uuid TEXT NOT NULL,
file_uuid TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
file_path TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
modified_at DATETIME DEFAULT CURRENT_TIMESTAMP,
file_size INTEGER,
file_hash TEXT,
version INTEGER DEFAULT 1,
is_deleted BOOLEAN DEFAULT 0,
metadata TEXT,
storage_path TEXT,
FOREIGN KEY (user_uuid) REFERENCES users(user_uuid)
)
''')
conn.commit()
conn.close()
def init_storage(self):
"""Initialisiert Speicher (Dataset oder lokal)"""
try:
if self.dataset_name and self.dataset_name != "mcp-filestorage/default-files":
self.dataset = load_dataset(self.dataset_name, split='train', token=self.hf_token)
print(f"✅ Dataset geladen: {self.dataset_name}")
self.storage_mode = 'dataset'
else:
self.storage_mode = 'local'
self.local_storage_path = Path("user_storage")
self.local_storage_path.mkdir(exist_ok=True)
print("ℹ️ Verwende lokalen Speicher als Fallback")
except Exception as e:
print(f"⚠️ Dataset nicht verfügbar, verwende lokalen Speicher: {e}")
self.storage_mode = 'local'
self.local_storage_path = Path("user_storage")
self.local_storage_path.mkdir(exist_ok=True)
def register_user(self) -> Dict[str, Any]:
"""Registriert neuen Benutzer mit UUID und erstellt SSE-Endpunkt"""
user_uuid = str(uuid.uuid4())
sse_endpoint = f"user-{user_uuid[:8]}" # Kurze UUID für Endpoint
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO users (user_uuid, sse_endpoint) VALUES (?, ?)
''', (user_uuid, sse_endpoint))
conn.commit()
# Erstelle Benutzerordner
self._create_user_folder(user_uuid)
# Registriere Benutzer-Session
self.user_sessions[user_uuid] = {
'sse_endpoint': sse_endpoint,
'created_at': datetime.now(),
'last_access': datetime.now()
}
return {
'success': True,
'user_uuid': user_uuid,
'sse_endpoint': sse_endpoint,
'full_sse_url': f"/gradio_api/mcp/user/{user_uuid}/sse",
'message': f'Benutzer {user_uuid} registriert'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
finally:
conn.close()
def get_user_by_uuid(self, user_uuid: str) -> Optional[Dict[str, Any]]:
"""Holt Benutzer-Informationen anhand der UUID"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT user_uuid, sse_endpoint, created_at, file_count
FROM users WHERE user_uuid = ? AND is_active = 1
''', (user_uuid,))
result = cursor.fetchone()
if result:
return {
'user_uuid': result[0],
'sse_endpoint': result[1],
'created_at': result[2],
'file_count': result[3]
}
return None
finally:
conn.close()
def _create_user_folder(self, user_uuid: str):
"""Erstellt Benutzerordner je nach Speicher-Modus"""
if self.storage_mode == 'dataset':
try:
folder_path = f"users/{user_uuid}"
readme_content = f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n"
self.api.upload_file(
path_or_fileobj=readme_content.encode(),
path_in_repo=f"{folder_path}/README.md",
repo_id=self.dataset_name,
repo_type="dataset"
)
except Exception as e:
print(f"⚠️ Konnte Benutzerordner nicht im Dataset erstellen: {e}")
else:
# Lokaler Speicher
user_path = self.local_storage_path / user_uuid
user_path.mkdir(exist_ok=True)
(user_path / "README.md").write_text(f"# User Folder: {user_uuid}\nCreated: {datetime.now().isoformat()}\n")
def create_file(self, user_uuid: str, file_path: str, content: bytes,
metadata: Optional[Dict] = None) -> Dict[str, Any]:
"""Erstellt neue Datei für Benutzer"""
# Verifiziere Benutzer
user = self.get_user_by_uuid(user_uuid)
if not user:
return {'success': False, 'error': 'Ungültige Benutzer-UUID'}
file_uuid = str(uuid.uuid4())
file_hash = hashlib.sha256(content).hexdigest()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
storage_path = f"users/{user_uuid}/{file_path}"
# Speichere Datei
if self.storage_mode == 'dataset':
try:
self.api.upload_file(
path_or_fileobj=content,
path_in_repo=storage_path,
repo_id=self.dataset_name,
repo_type="dataset"
)
except Exception as e:
return {'success': False, 'error': f'Dataset-Upload fehlgeschlagen: {e}'}
else:
# Lokaler Speicher
local_file_path = self.local_storage_path / user_uuid / file_path
local_file_path.parent.mkdir(parents=True, exist_ok=True)
local_file_path.write_bytes(content)
# Speichere Metadaten
cursor.execute('''
INSERT INTO file_metadata
(user_uuid, file_uuid, filename, file_path, file_size, file_hash, metadata, storage_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (user_uuid, file_uuid, os.path.basename(file_path), file_path,
len(content), file_hash, json.dumps(metadata or {}), storage_path))
# Aktualisiere Benutzer-Statistik
cursor.execute('''
UPDATE users SET file_count = file_count + 1, last_access = ?
WHERE user_uuid = ?
''', (datetime.now(), user_uuid))
conn.commit()
# Aktualisiere Session
if user_uuid in self.user_sessions:
self.user_sessions[user_uuid]['last_access'] = datetime.now()
return {
'success': True,
'file_uuid': file_uuid,
'file_path': file_path,
'user_uuid': user_uuid
}
except Exception as e:
return {'success': False, 'error': str(e)}
finally:
conn.close()
def read_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]:
"""Liest Datei eines Benutzers"""
# Verifiziere Benutzer
user = self.get_user_by_uuid(user_uuid)
if not user:
return {'success': False, 'error': 'Ungültige Benutzer-UUID'}
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT file_uuid, filename, storage_path FROM file_metadata
WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0
''', (user_uuid, file_path))
result = cursor.fetchone()
if not result:
return {'success': False, 'error': 'Datei nicht gefunden'}
file_uuid, filename, storage_path = result
# Erstelle Download-URL
if self.storage_mode == 'dataset':
file_url = f"https://huggingface.co/datasets/{self.dataset_name}/resolve/main/{storage_path}"
else:
# Lokaler Speicher
local_file_path = self.local_storage_path / user_uuid / file_path
if local_file_path.exists():
import base64
content = local_file_path.read_bytes()
file_url = f"data:application/octet-stream;base64,{base64.b64encode(content).decode()}"
else:
return {'success': False, 'error': 'Datei nicht im lokalen Speicher gefunden'}
# Aktualisiere Session
if user_uuid in self.user_sessions:
self.user_sessions[user_uuid]['last_access'] = datetime.now()
return {
'success': True,
'file_uuid': file_uuid,
'file_path': file_path,
'file_url': file_url,
'filename': filename,
'user_uuid': user_uuid
}
except Exception as e:
return {'success': False, 'error': str(e)}
finally:
conn.close()
def update_file(self, user_uuid: str, file_path: str, content: bytes) -> Dict[str, Any]:
"""Aktualisiert vorhandene Datei"""
user = self.get_user_by_uuid(user_uuid)
if not user:
return {'success': False, 'error': 'Ungültige Benutzer-UUID'}
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT file_uuid, version FROM file_metadata
WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0
''', (user_uuid, file_path))
result = cursor.fetchone()
if not result:
return {'success': False, 'error': 'Datei nicht gefunden'}
file_uuid, current_version = result
# Speichere neue Version
storage_path = f"users/{user_uuid}/{file_path}"
if self.storage_mode == 'dataset':
try:
self.api.upload_file(
path_or_fileobj=content,
path_in_repo=storage_path,
repo_id=self.dataset_name,
repo_type="dataset"
)
except Exception as e:
return {'success': False, 'error': f'Dataset-Update fehlgeschlagen: {e}'}
else:
# Lokaler Speicher
local_file_path = self.local_storage_path / user_uuid / file_path
local_file_path.parent.mkdir(parents=True, exist_ok=True)
local_file_path.write_bytes(content)
# Aktualisiere Metadaten
file_hash = hashlib.sha256(content).hexdigest()
cursor.execute('''
UPDATE file_metadata
SET modified_at = ?, file_size = ?, file_hash = ?, version = ?
WHERE file_uuid = ?
''', (datetime.now(), len(content), file_hash, current_version + 1, file_uuid))
conn.commit()
# Aktualisiere Session
if user_uuid in self.user_sessions:
self.user_sessions[user_uuid]['last_access'] = datetime.now()
return {
'success': True,
'file_uuid': file_uuid,
'version': current_version + 1,
'file_path': file_path,
'user_uuid': user_uuid
}
except Exception as e:
return {'success': False, 'error': str(e)}
finally:
conn.close()
def delete_file(self, user_uuid: str, file_path: str) -> Dict[str, Any]:
"""Löscht Datei (Soft Delete)"""
user = self.get_user_by_uuid(user_uuid)
if not user:
return {'success': False, 'error': 'Ungültige Benutzer-UUID'}
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE file_metadata
SET is_deleted = 1, modified_at = ?
WHERE user_uuid = ? AND file_path = ? AND is_deleted = 0
''', (datetime.now(), user_uuid, file_path))
if cursor.rowcount == 0:
return {'success': False, 'error': 'Datei nicht gefunden'}
# Aktualisiere Benutzer-Statistik
cursor.execute('''
UPDATE users SET file_count = file_count - 1, last_access = ?
WHERE user_uuid = ?
''', (datetime.now(), user_uuid))
conn.commit()
# Aktualisiere Session
if user_uuid in self.user_sessions:
self.user_sessions[user_uuid]['last_access'] = datetime.now()
return {
'success': True,
'message': f'Datei {file_path} gelöscht',
'file_path': file_path,
'user_uuid': user_uuid
}
except Exception as e:
return {'success': False, 'error': str(e)}
finally:
conn.close()
def list_files(self, user_uuid: str, folder_path: str = "") -> List[Dict[str, Any]]:
"""Listet alle Dateien eines Benutzers auf"""
user = self.get_user_by_uuid(user_uuid)
if not user:
return []
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
if folder_path:
cursor.execute('''
SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size
FROM file_metadata
WHERE user_uuid = ? AND file_path LIKE ? AND is_deleted = 0
ORDER BY file_path
''', (user_uuid, f"{folder_path}%"))
else:
cursor.execute('''
SELECT file_uuid, filename, file_path, created_at, modified_at, version, file_size
FROM file_metadata
WHERE user_uuid = ? AND is_deleted = 0
ORDER BY file_path
''', (user_uuid,))
results = cursor.fetchall()
files = []
for row in results:
files.append({
'file_uuid': row[0],
'filename': row[1],
'file_path': row[2],
'created_at': row[3],
'modified_at': row[4],
'version': row[5],
'file_size': row[6]
})
# Aktualisiere Session
if user_uuid in self.user_sessions:
self.user_sessions[user_uuid]['last_access'] = datetime.now()
return files
finally:
conn.close()
def get_user_sse_endpoint(self, user_uuid: str) -> Optional[str]:
"""Gibt den SSE-Endpunkt für einen Benutzer zurück"""
user = self.get_user_by_uuid(user_uuid)
if user:
return f"/gradio_api/mcp/user/{user_uuid}/sse"
return None
# Globale Instanz
hf_uuid_sse_storage = HFUUIDSSEFileStorageMCP()
# ===== BENUTZERSPEZIFISCHE MCP TOOLS =====
@gr.mcp.tool()
def register_user_uuid_sse() -> str:
"""
Registriert einen neuen Benutzer und erstellt einen persönlichen SSE-Endpunkt.
Returns:
Benutzer-UUID und persönlicher SSE-Endpunkt
"""
result = hf_uuid_sse_storage.register_user()
if result['success']:
return f"""✅ Benutzer erfolgreich registriert!
🆔 Ihre UUID: {result['user_uuid']}
🌐 Ihr persönlicher SSE-Endpunkt: {result['full_sse_url']}
📁 Ihr Ordner: users/{result['user_uuid']}/
🔐 Speichern Sie diese UUID! Sie ist Ihr persönlicher Zugangsschlüssel!
🚀 Verwenden Sie Ihren SSE-Endpunkt für MCP-Integration!"""
else:
return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}"
@gr.mcp.tool()
def create_user_file_uuid(user_uuid: str, file_path: str, content: str,
metadata: Optional[str] = None) -> str:
"""
Erstellt eine neue Datei für einen Benutzer.
Args:
user_uuid: Die UUID des Benutzers
file_path: Pfad zur Datei (z.B. "dokumente/meindatei.txt")
content: Inhalt der Datei als String
metadata: Optionale Metadaten als JSON-String
Returns:
Erfolgsmeldung mit Datei-Details
"""
content_bytes = content.encode('utf-8')
meta_dict = json.loads(metadata) if metadata else None
result = hf_uuid_sse_storage.create_file(user_uuid, file_path, content_bytes, meta_dict)
if result['success']:
return f"""✅ Datei erfolgreich erstellt!
📁 Datei-UUID: {result['file_uuid']}
📄 Pfad: {result['file_path']}
👤 Benutzer: {result['user_uuid']}
🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse"""
else:
return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}"
@gr.mcp.tool()
def read_user_file_uuid(user_uuid: str, file_path: str) -> str:
"""
Liest eine Datei eines Benutzers.
Args:
user_uuid: Die UUID des Benutzers
file_path: Pfad zur Datei
Returns:
Datei-Details und Zugriffs-URL
"""
result = hf_uuid_sse_storage.read_file(user_uuid, file_path)
if result['success']:
return f"""📄 Datei erfolgreich gelesen!
🆔 Datei-UUID: {result['file_uuid']}
🔗 Direkt-Link: {result['file_url']}
📁 Dateiname: {result['filename']}
👤 Benutzer: {result['user_uuid']}
🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse"""
else:
return f"❌ Fehler: {result.get('error', 'Datei nicht gefunden')}"
@gr.mcp.tool()
def update_user_file_uuid(user_uuid: str, file_path: str, content: str) -> str:
"""
Aktualisiert eine vorhandene Datei.
Args:
user_uuid: Die UUID des Benutzers
file_path: Pfad zur Datei
content: Neuer Inhalt der Datei
Returns:
Erfolgsmeldung mit Versions-Info
"""
content_bytes = content.encode('utf-8')
result = hf_uuid_sse_storage.update_file(user_uuid, file_path, content_bytes)
if result['success']:
return f"""✅ Datei erfolgreich aktualisiert!
🆔 Datei-UUID: {result['file_uuid']}
🔢 Neue Version: {result['version']}
📁 Pfad: {result['file_path']}
👤 Benutzer: {result['user_uuid']}"""
else:
return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}"
@gr.mcp.tool()
def delete_user_file_uuid(user_uuid: str, file_path: str) -> str:
"""
Löscht eine Datei eines Benutzers.
Args:
user_uuid: Die UUID des Benutzers
file_path: Pfad zur zu löschenden Datei
Returns:
Erfolgsmeldung
"""
result = hf_uuid_sse_storage.delete_file(user_uuid, file_path)
if result['success']:
return f"""🗑️ Datei erfolgreich gelöscht!
📁 Pfad: {result['file_path']}
👤 Benutzer: {result['user_uuid']}
🌐 Ihr SSE-Endpunkt bleibt: /gradio_api/mcp/user/{user_uuid}/sse"""
else:
return f"❌ Fehler: {result.get('error', 'Unbekannter Fehler')}"
@gr.mcp.tool()
def list_user_files_uuid(user_uuid: str, folder_path: str = "") -> str:
"""
Listet alle Dateien eines Benutzers auf.
Args:
user_uuid: Die UUID des Benutzers
folder_path: Optionaler Ordnerpfad zum Filtern
Returns:
Liste aller Dateien des Benutzers
"""
files = hf_uuid_sse_storage.list_files(user_uuid, folder_path)
if not files:
return "📁 Keine Dateien gefunden!"
output = f"📊 Gefunden: {len(files)} Dateien für Benutzer {user_uuid}\n"
output += f"🌐 Ihr SSE-Endpunkt: /gradio_api/mcp/user/{user_uuid}/sse\n\n"
for file in files:
output += f"""📄 {file['filename']} (Version {file['version']})
📁 Pfad: {file['file_path']}
🆔 UUID: {file['file_uuid']}
📅 Erstellt: {file['created_at']}
📝 Geändert: {file['modified_at']}
📊 Größe: {file['file_size']} Bytes
{'-' * 40}
"""
return output
# ===== GRADIO INTERFACE MIT UUID-SSE INTEGRATION =====
# Benutzerdefinierte MCP-Routen erstellen
def create_user_mcp_server(user_uuid: str):
"""Erstellt einen benutzerspezifischen MCP-Server"""
@gr.mcp.tool()
def user_create_file(file_path: str, content: str, metadata: Optional[str] = None) -> str:
"""Erstelle Datei für aktuellen Benutzer"""
return create_user_file_uuid(user_uuid, file_path, content, metadata)
@gr.mcp.tool()
def user_read_file(file_path: str) -> str:
"""Lese Datei für aktuellen Benutzer"""
return read_user_file_uuid(user_uuid, file_path)
@gr.mcp.tool()
def user_update_file(file_path: str, content: str) -> str:
"""Aktualisiere Datei für aktuellen Benutzer"""
return update_user_file_uuid(user_uuid, file_path, content)
@gr.mcp.tool()
def user_delete_file(file_path: str) -> str:
"""Lösche Datei für aktuellen Benutzer"""
return delete_user_file_uuid(user_uuid, file_path)
@gr.mcp.tool()
def user_list_files(folder_path: str = "") -> str:
"""Liste Dateien für aktuellen Benutzer auf"""
return list_user_files_uuid(user_uuid, folder_path)
@gr.mcp.tool()
def user_get_stats() -> str:
"""Hole Statistiken für aktuellen Benutzer"""
return get_user_stats_uuid(user_uuid)
# Haupt-Gradio-Interface
with gr.Blocks(title="🤗 HF UUID-SSE Filestorage MCP", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🗃️ Hugging Face Dataset Filestorage MCP Server
### 🔗 UUID-basierte persönliche SSE-Endpunkte
Jeder Benutzer bekommt einen eigenen SSE-Endpunkt mit seiner UUID - z.B. `/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse`
""")
with gr.Tab("📝 Registrieren"):
with gr.Row():
with gr.Column():
gr.Markdown("### 🎯 Neuen Benutzer mit persönlichem SSE-Endpunkt erstellen")
register_btn = gr.Button("🎯 Neuen Benutzer erstellen", variant="primary", size="lg")
with gr.Column():
register_output = gr.Textbox(label="Registrierungsergebnis", lines=8, max_lines=10)
register_btn.click(register_user_uuid_sse, outputs=register_output)
with gr.Tab("📤 Datei erstellen"):
with gr.Row():
with gr.Column():
gr.Markdown("### 📄 Neue Datei erstellen")
create_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="z.B. 123e4567-e89b-12d3-a456-426614174000")
create_file_path = gr.Textbox(label="📁 Dateipfad", placeholder="z.B. meine-dokumente/test.txt")
create_content = gr.TextArea(label="📝 Inhalt", placeholder="Datei-Inhalt hier eingeben...", lines=6)
create_metadata = gr.Textbox(label="🏷️ Metadaten (JSON, optional)", placeholder='{"autor": "Max", "projekt": "Test"}')
create_btn = gr.Button("📄 Datei erstellen", variant="primary")
with gr.Column():
create_output = gr.Textbox(label="Ergebnis", lines=8, max_lines=10)
create_btn.click(
create_user_file_uuid,
inputs=[create_user_uuid, create_file_path, create_content, create_metadata],
outputs=create_output
)
with gr.Tab("📋 Meine Dateien"):
with gr.Row():
with gr.Column():
gr.Markdown("### 📋 Alle meine Dateien anzeigen")
list_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben")
list_folder = gr.Textbox(label="📁 Ordner (optional)", placeholder="z.B. meine-dokumente/")
list_btn = gr.Button("📋 Dateien anzeigen", variant="secondary")
with gr.Column():
list_output = gr.Textbox(label="Datei-Liste", lines=15, max_lines=20)
list_btn.click(list_user_files_uuid, inputs=[list_user_uuid, list_folder], outputs=list_output)
with gr.Tab("ℹ️ Mein SSE-Endpunkt"):
with gr.Row():
with gr.Column():
gr.Markdown("### 🔗 Meinen persönlichen SSE-Endpunkt finden")
sse_user_uuid = gr.Textbox(label="🆔 Ihre UUID", placeholder="Ihre UUID eingeben")
sse_btn = gr.Button("🔗 SSE-Endpunkt anzeigen", variant="secondary")
with gr.Column():
sse_output = gr.Textbox(label="SSE-Endpunkt-Details", lines=8)
def get_sse_endpoint(user_uuid):
if not user_uuid:
return "❌ Bitte geben Sie Ihre UUID ein!"
endpoint = hf_uuid_sse_storage.get_user_sse_endpoint(user_uuid)
if endpoint:
full_url = f"https://dein-space-name.hf.space{endpoint}"
return f"""🔗 Ihr persönlicher SSE-Endpunkt:
🌐 Komplett: {full_url}
📍 Kurz: {endpoint}
📝 MCP-Konfiguration:
```json
{{
"mcpServers": {{
"mein-filestorage-{user_uuid[:8]}": {{
"url": "{full_url}"
}}
}}
}}
```"""
else:
return "❌ Kein SSE-Endpunkt für diese UUID gefunden. Bitte registrieren Sie sich zuerst."
sse_btn.click(get_sse_endpoint, inputs=[sse_user_uuid], outputs=sse_output)
gr.Markdown("""
---
### 🚀 UUID-basierte MCP Server Integration
**Persönliche SSE-Endpunkte:** `/gradio_api/mcp/user/{Ihre-UUID}/sse`
**Verfügbare MCP Tools:**
- 📝 `register_user_uuid_sse` - Neuen Benutzer mit SSE-Endpunkt registrieren
- 📤 `create_user_file_uuid` - Datei für Benutzer erstellen
- 📖 `read_user_file_uuid` - Datei lesen und Download-Link erhalten
- ✏️ `update_user_file_uuid` - Datei-Inhalt aktualisieren
- 🗑️ `delete_user_file_uuid` - Datei löschen
- 📋 `list_user_files_uuid` - Alle Dateien eines Benutzers auflisten
**Beispiel MCP Konfiguration für Benutzer:**
```json
{
"mcpServers": {
"mein-filestorage-123e4567": {
"url": "https://dein-space-name.hf.space/gradio_api/mcp/user/123e4567-e89b-12d3-a456-426614174000/sse"
}
}
}
```
🔐 **Jeder Benutzer hat seinen eigenen MCP-Server-Endpunkt!**
""")
# Benutzerdefinierte SSE-Routen handler
def user_sse_handler(user_uuid: str):
"""Handler für benutzerspezifische SSE-Endpunkte"""
user = hf_uuid_sse_storage.get_user_by_uuid(user_uuid)
if not user:
return {"error": "Ungültige Benutzer-UUID"}, 404
# Erstelle benutzerspezifischen MCP-Server
create_user_mcp_server(user_uuid)
# Hier würde die SSE-Verbindung etabliert
return {"status": "connected", "user_uuid": user_uuid, "endpoint": f"/user/{user_uuid}/sse"}
# MCP Server mit benutzerdefinierten Routen
if __name__ == "__main__":
# Standard-MCP-Server für allgemeine Funktionen
demo.launch(
mcp_server=True,
server_name="0.0.0.0",
server_port=7860,
share=False,
# Benutzerdefinierte Routen werden über Gradio's MCP-Integration gehandhabt
routes=[
{
"path": "/gradio_api/mcp/user/{user_uuid}/sse",
"method": "GET",
"handler": user_sse_handler
}
]
)