#!/usr/bin/env python3 """ Client library for MCP Speech-to-Text service """ import asyncio import aiohttp import json import base64 import logging from pathlib import Path from typing import Dict, Any, Optional # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MCPSpeechClient: """Client for MCP Speech-to-Text service""" def __init__(self, service_url: str = "http://localhost:8081"): self.service_url = service_url.rstrip('/') self.token = None self.encryption_key = None self.client_session = None async def __aenter__(self): self.client_session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.client_session: await self.client_session.close() async def authenticate(self, client_id: str = "test_client") -> bool: """Authenticate with the MCP service""" try: if not self.client_session: self.client_session = aiohttp.ClientSession() async with self.client_session.post( f"{self.service_url}/auth", json={"client_id": client_id} ) as response: if response.status == 200: data = await response.json() self.token = data.get("token") self.encryption_key = data.get("encryption_key") logger.info("Authentication successful") return True else: error = await response.text() logger.error(f"Authentication failed: {error}") return False except Exception as e: logger.error(f"Authentication error: {e}") return False async def transcribe_file(self, audio_file_path: str, session_id: Optional[str] = None) -> Dict[str, Any]: """Transcribe an audio file""" if not self.token: if not await self.authenticate(): return {"status": "error", "error": "Authentication failed"} try: if not Path(audio_file_path).exists(): raise FileNotFoundError(f"Audio file not found: {audio_file_path}") headers = { "Authorization": f"Bearer {self.token}" } if session_id: headers["X-Session-ID"] = session_id with open(audio_file_path, 'rb') as f: form_data = aiohttp.FormData() form_data.add_field('audio', f, filename=Path(audio_file_path).name) async with self.client_session.post( f"{self.service_url}/transcribe", data=form_data, headers=headers ) as response: if response.status == 200: return await response.json() else: error_text = await response.text() return { "status": "error", "error": f"HTTP {response.status}: {error_text}" } except Exception as e: logger.error(f"Transcription error: {e}") return {"status": "error", "error": str(e)} async def get_status(self) -> Dict[str, Any]: """Get service status""" try: if not self.client_session: self.client_session = aiohttp.ClientSession() async with self.client_session.get(f"{self.service_url}/status") as response: if response.status == 200: return await response.json() else: return { "status": "error", "error": f"HTTP {response.status}: {await response.text()}" } except Exception as e: logger.error(f"Status check error: {e}") return {"status": "error", "error": str(e)} # Convenience functions for backwards compatibility async def check_service_status(service_url: str = "http://localhost:8081") -> Dict[str, Any]: """Check if the MCP service is running""" async with MCPSpeechClient(service_url) as client: return await client.get_status() async def transcribe_audio_file(audio_file_path: str, service_url: str = "http://localhost:8081") -> Dict[str, Any]: """Transcribe an audio file using the MCP service""" async with MCPSpeechClient(service_url) as client: result = await client.transcribe_file(audio_file_path, session_id="single_request") return result