docker-speech2text / mcp_speech_client.py
petergits
2nd checking push speech2text
c08e928
#!/usr/bin/env python3
"""
Client library for MCP Speech-to-Text service
"""
import asyncio
import aiohttp
import json
import base64
import logging
from pathlib import Path
from typing import Dict, Any, Optional
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MCPSpeechClient:
"""Client for MCP Speech-to-Text service"""
def __init__(self, service_url: str = "http://localhost:8081"):
self.service_url = service_url.rstrip('/')
self.token = None
self.encryption_key = None
self.client_session = None
async def __aenter__(self):
self.client_session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client_session:
await self.client_session.close()
async def authenticate(self, client_id: str = "test_client") -> bool:
"""Authenticate with the MCP service"""
try:
if not self.client_session:
self.client_session = aiohttp.ClientSession()
async with self.client_session.post(
f"{self.service_url}/auth",
json={"client_id": client_id}
) as response:
if response.status == 200:
data = await response.json()
self.token = data.get("token")
self.encryption_key = data.get("encryption_key")
logger.info("Authentication successful")
return True
else:
error = await response.text()
logger.error(f"Authentication failed: {error}")
return False
except Exception as e:
logger.error(f"Authentication error: {e}")
return False
async def transcribe_file(self, audio_file_path: str, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Transcribe an audio file"""
if not self.token:
if not await self.authenticate():
return {"status": "error", "error": "Authentication failed"}
try:
if not Path(audio_file_path).exists():
raise FileNotFoundError(f"Audio file not found: {audio_file_path}")
headers = {
"Authorization": f"Bearer {self.token}"
}
if session_id:
headers["X-Session-ID"] = session_id
with open(audio_file_path, 'rb') as f:
form_data = aiohttp.FormData()
form_data.add_field('audio', f, filename=Path(audio_file_path).name)
async with self.client_session.post(
f"{self.service_url}/transcribe",
data=form_data,
headers=headers
) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
return {
"status": "error",
"error": f"HTTP {response.status}: {error_text}"
}
except Exception as e:
logger.error(f"Transcription error: {e}")
return {"status": "error", "error": str(e)}
async def get_status(self) -> Dict[str, Any]:
"""Get service status"""
try:
if not self.client_session:
self.client_session = aiohttp.ClientSession()
async with self.client_session.get(f"{self.service_url}/status") as response:
if response.status == 200:
return await response.json()
else:
return {
"status": "error",
"error": f"HTTP {response.status}: {await response.text()}"
}
except Exception as e:
logger.error(f"Status check error: {e}")
return {"status": "error", "error": str(e)}
# Convenience functions for backwards compatibility
async def check_service_status(service_url: str = "http://localhost:8081") -> Dict[str, Any]:
"""Check if the MCP service is running"""
async with MCPSpeechClient(service_url) as client:
return await client.get_status()
async def transcribe_audio_file(audio_file_path: str, service_url: str = "http://localhost:8081") -> Dict[str, Any]:
"""Transcribe an audio file using the MCP service"""
async with MCPSpeechClient(service_url) as client:
result = await client.transcribe_file(audio_file_path, session_id="single_request")
return result