#!/usr/bin/env python3 """ MCP Speech-to-Text Service Exposes realtime-whisper-macbook service through a secure MCP interface """ import asyncio import json import logging import os import subprocess import tempfile import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any import hashlib import hmac import base64 import aiohttp from aiohttp import web, WSMsgType import aiofiles import numpy as np import soundfile as sf from cryptography.fernet import Fernet # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MCPSpeechService: """MCP Service for Speech-to-Text processing""" def __init__(self, whisper_service_url: str = "http://localhost:8000", encryption_key: Optional[bytes] = None): self.whisper_service_url = whisper_service_url self.encryption_key = encryption_key or Fernet.generate_key() self.cipher = Fernet(self.encryption_key) self.active_sessions: Dict[str, Dict] = {} self.auth_tokens: Dict[str, Dict] = {} def generate_session_id(self) -> str: """Generate a unique session ID""" return hashlib.sha256(f"{time.time()}{os.urandom(16)}".encode()).hexdigest()[:16] def generate_auth_token(self, client_id: str) -> str: """Generate JWT-like auth token for client""" payload = { "client_id": client_id, "issued_at": int(time.time()), "expires_at": int(time.time()) + 3600 # 1 hour expiry } token_data = json.dumps(payload).encode() signature = hmac.new(self.encryption_key, token_data, hashlib.sha256).hexdigest() token = base64.b64encode(token_data).decode() + "." + signature self.auth_tokens[token] = payload return token def verify_auth_token(self, token: str) -> Optional[Dict]: """Verify and decode auth token""" try: if "." not in token: return None token_b64, signature = token.rsplit(".", 1) token_data = base64.b64decode(token_b64.encode()) # Verify signature expected_sig = hmac.new(self.encryption_key, token_data, hashlib.sha256).hexdigest() if not hmac.compare_digest(signature, expected_sig): return None payload = json.loads(token_data.decode()) # Check expiry if payload.get("expires_at", 0) < int(time.time()): return None return payload except Exception as e: logger.error(f"Token verification failed: {e}") return None async def encrypt_data(self, data: bytes) -> bytes: """Encrypt sensitive data""" return self.cipher.encrypt(data) async def decrypt_data(self, encrypted_data: bytes) -> bytes: """Decrypt sensitive data""" return self.cipher.decrypt(encrypted_data) async def process_audio_chunk(self, audio_data: bytes, session_id: str, format: str = "wav") -> Dict[str, Any]: """Process audio chunk through whisper service""" try: # Create temporary file for audio data with tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False) as temp_file: temp_file.write(audio_data) temp_file_path = temp_file.name try: # Call the whisper service async with aiohttp.ClientSession() as session: with open(temp_file_path, 'rb') as f: form_data = aiohttp.FormData() form_data.add_field('audio', f, filename=f'audio.{format}') async with session.post( f"{self.whisper_service_url}/transcribe", data=form_data ) as response: if response.status == 200: result = await response.json() return { "session_id": session_id, "timestamp": datetime.utcnow().isoformat(), "transcription": result, "status": "success" } else: error_text = await response.text() logger.error(f"Whisper service error: {error_text}") return { "session_id": session_id, "timestamp": datetime.utcnow().isoformat(), "error": f"Service error: {response.status}", "status": "error" } finally: # Clean up temporary file os.unlink(temp_file_path) except Exception as e: logger.error(f"Audio processing error: {e}") return { "session_id": session_id, "timestamp": datetime.utcnow().isoformat(), "error": str(e), "status": "error" } async def handle_auth(self, request): """Handle authentication endpoint""" try: data = await request.json() client_id = data.get('client_id') if not client_id: return web.Response( text=json.dumps({"error": "client_id required"}), status=400, content_type='application/json' ) token = self.generate_auth_token(client_id) return web.Response( text=json.dumps({ "token": token, "encryption_key": base64.b64encode(self.encryption_key).decode(), "expires_in": 3600 }), content_type='application/json' ) except Exception as e: logger.error(f"Auth error: {e}") return web.Response( text=json.dumps({"error": "Authentication failed"}), status=500, content_type='application/json' ) async def handle_transcribe(self, request): """Handle transcription endpoint""" try: # Verify authentication auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return web.Response( text=json.dumps({"error": "Missing or invalid authorization"}), status=401, content_type='application/json' ) token = auth_header[7:] # Remove 'Bearer ' prefix auth_payload = self.verify_auth_token(token) if not auth_payload: return web.Response( text=json.dumps({"error": "Invalid or expired token"}), status=401, content_type='application/json' ) # Get session ID session_id = request.headers.get('X-Session-ID') if not session_id: session_id = self.generate_session_id() # Handle multipart form data (audio file) reader = await request.multipart() audio_data = None async for part in reader: if part.name == 'audio': audio_data = await part.read() break if not audio_data: return web.Response( text=json.dumps({"error": "No audio data provided"}), status=400, content_type='application/json' ) # Process audio result = await self.process_audio_chunk(audio_data, session_id) # Encrypt sensitive data if requested if request.headers.get('X-Encrypt-Response') == 'true': encrypted_result = await self.encrypt_data(json.dumps(result).encode()) return web.Response( body=encrypted_result, content_type='application/octet-stream', headers={'X-Encrypted': 'true'} ) return web.Response( text=json.dumps(result), content_type='application/json' ) except Exception as e: logger.error(f"Transcription error: {e}") return web.Response( text=json.dumps({"error": "Transcription failed"}), status=500, content_type='application/json' ) async def handle_websocket(self, request): """Handle WebSocket connections for real-time transcription""" ws = web.WebSocketResponse() await ws.prepare(request) session_id = None auth_payload = None try: async for msg in ws: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) if data.get('type') == 'auth': # Handle authentication token = data.get('token') auth_payload = self.verify_auth_token(token) if auth_payload: session_id = self.generate_session_id() self.active_sessions[session_id] = { 'client_id': auth_payload['client_id'], 'connected_at': time.time(), 'ws': ws } await ws.send_text(json.dumps({ 'type': 'auth_success', 'session_id': session_id })) else: await ws.send_text(json.dumps({ 'type': 'auth_error', 'message': 'Invalid token' })) elif data.get('type') == 'audio_chunk': # Handle audio chunk if not auth_payload or not session_id: await ws.send_text(json.dumps({ 'type': 'error', 'message': 'Not authenticated' })) continue # Decode base64 audio data audio_b64 = data.get('audio_data', '') audio_data = base64.b64decode(audio_b64) # Process audio result = await self.process_audio_chunk(audio_data, session_id) await ws.send_text(json.dumps({ 'type': 'transcription_result', 'data': result })) except json.JSONDecodeError: await ws.send_text(json.dumps({ 'type': 'error', 'message': 'Invalid JSON' })) elif msg.type == WSMsgType.BINARY: # Handle binary audio data if not auth_payload or not session_id: await ws.send_text(json.dumps({ 'type': 'error', 'message': 'Not authenticated' })) continue # Process binary audio data result = await self.process_audio_chunk(msg.data, session_id) await ws.send_text(json.dumps({ 'type': 'transcription_result', 'data': result })) elif msg.type == WSMsgType.ERROR: logger.error(f'WebSocket error: {ws.exception()}') break except Exception as e: logger.error(f"WebSocket error: {e}") finally: # Clean up session if session_id and session_id in self.active_sessions: del self.active_sessions[session_id] return ws async def handle_status(self, request): """Handle status endpoint""" return web.Response( text=json.dumps({ "service": "MCP Speech-to-Text", "status": "running", "active_sessions": len(self.active_sessions), "whisper_service": self.whisper_service_url, "timestamp": datetime.utcnow().isoformat() }), content_type='application/json' ) def create_app(self) -> web.Application: """Create the web application""" app = web.Application() # Add routes app.router.add_post('/auth', self.handle_auth) app.router.add_post('/transcribe', self.handle_transcribe) app.router.add_get('/ws', self.handle_websocket) app.router.add_get('/status', self.handle_status) # Fixed CORS middleware @web.middleware async def cors_middleware(request, handler): if request.method == 'OPTIONS': # Handle preflight requests response = web.Response() else: response = await handler(request) response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-Session-ID, X-Encrypt-Response' return response app.middlewares.append(cors_middleware) return app async def main(): """Main function to run the MCP service""" # Configuration host = os.getenv('MCP_HOST', '0.0.0.0') port = int(os.getenv('MCP_PORT', '7860')) whisper_url = os.getenv('WHISPER_SERVICE_URL', 'http://localhost:8000') # Create service service = MCPSpeechService(whisper_service_url=whisper_url) app = service.create_app() logger.info(f"Starting MCP Speech-to-Text service on {host}:{port}") logger.info(f"Whisper service URL: {whisper_url}") logger.info(f"Encryption key: {base64.b64encode(service.encryption_key).decode()}") # Run the service runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() logger.info("MCP Service is running...") try: # Keep the service running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down MCP service...") finally: await runner.cleanup() if __name__ == '__main__': asyncio.run(main())