Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Client library for MCP Speech-to-Text service | |
| """ | |
| import asyncio | |
| import aiohttp | |
| import json | |
| import base64 | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MCPSpeechClient: | |
| """Client for MCP Speech-to-Text service""" | |
| def __init__(self, service_url: str = "http://localhost:8081"): | |
| self.service_url = service_url.rstrip('/') | |
| self.token = None | |
| self.encryption_key = None | |
| self.client_session = None | |
| async def __aenter__(self): | |
| self.client_session = aiohttp.ClientSession() | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self.client_session: | |
| await self.client_session.close() | |
| async def authenticate(self, client_id: str = "test_client") -> bool: | |
| """Authenticate with the MCP service""" | |
| try: | |
| if not self.client_session: | |
| self.client_session = aiohttp.ClientSession() | |
| async with self.client_session.post( | |
| f"{self.service_url}/auth", | |
| json={"client_id": client_id} | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| self.token = data.get("token") | |
| self.encryption_key = data.get("encryption_key") | |
| logger.info("Authentication successful") | |
| return True | |
| else: | |
| error = await response.text() | |
| logger.error(f"Authentication failed: {error}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Authentication error: {e}") | |
| return False | |
| async def transcribe_file(self, audio_file_path: str, session_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Transcribe an audio file""" | |
| if not self.token: | |
| if not await self.authenticate(): | |
| return {"status": "error", "error": "Authentication failed"} | |
| try: | |
| if not Path(audio_file_path).exists(): | |
| raise FileNotFoundError(f"Audio file not found: {audio_file_path}") | |
| headers = { | |
| "Authorization": f"Bearer {self.token}" | |
| } | |
| if session_id: | |
| headers["X-Session-ID"] = session_id | |
| with open(audio_file_path, 'rb') as f: | |
| form_data = aiohttp.FormData() | |
| form_data.add_field('audio', f, filename=Path(audio_file_path).name) | |
| async with self.client_session.post( | |
| f"{self.service_url}/transcribe", | |
| data=form_data, | |
| headers=headers | |
| ) as response: | |
| if response.status == 200: | |
| return await response.json() | |
| else: | |
| error_text = await response.text() | |
| return { | |
| "status": "error", | |
| "error": f"HTTP {response.status}: {error_text}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Transcription error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| async def get_status(self) -> Dict[str, Any]: | |
| """Get service status""" | |
| try: | |
| if not self.client_session: | |
| self.client_session = aiohttp.ClientSession() | |
| async with self.client_session.get(f"{self.service_url}/status") as response: | |
| if response.status == 200: | |
| return await response.json() | |
| else: | |
| return { | |
| "status": "error", | |
| "error": f"HTTP {response.status}: {await response.text()}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Status check error: {e}") | |
| return {"status": "error", "error": str(e)} | |
| # Convenience functions for backwards compatibility | |
| async def check_service_status(service_url: str = "http://localhost:8081") -> Dict[str, Any]: | |
| """Check if the MCP service is running""" | |
| async with MCPSpeechClient(service_url) as client: | |
| return await client.get_status() | |
| async def transcribe_audio_file(audio_file_path: str, service_url: str = "http://localhost:8081") -> Dict[str, Any]: | |
| """Transcribe an audio file using the MCP service""" | |
| async with MCPSpeechClient(service_url) as client: | |
| result = await client.transcribe_file(audio_file_path, session_id="single_request") | |
| return result | |