Spaces:
Sleeping
Sleeping
Integration Guide - Complete Backend
π― Overview
This guide shows how all modules integrate together to form a complete voice-to-voice translation system.
π System Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FastAPI Server β
β (main.py) β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββΌβββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Security β β WebSocket β β Rooms β
β β β Server β β Manager β
β β’ Auth β β β β β
β β’ Rate Limit β β β’ Connection β β β’ Multi-room β
ββββββββββββββββ β β’ Heartbeat β β β’ Users β
ββββββββ¬ββββββββ ββββββββ¬ββββββββ
β β
ββββββββββ¬βββββββββ
β
ββββββββββΌβββββββββ
β Message β
β Router β
ββββββββββ¬βββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Audio β β Pipeline β β Workers β
β β β Manager β β β
β β’ Buffer β β β β β’ Translationβ
β β’ Utils β β STTβTransβTTSβ β β’ TTS Pool β
β β’ Validator β β β β β
ββββββββββββββββ ββββββββ¬ββββββββ ββββββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β STT β β Translation β β TTS β
β β β β β β
β β’ Vosk β β β’ Argos β β β’ Coqui β
β β’ Factory β β β’ Translator β β β’ Factory β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
π Module Integration Flow
1. Client Connection Flow
# Client connects via WebSocket
WebSocket Connect
β
Security Check (auth.py, rate_limiter.py)
β
Connection Manager (connection_manager.py)
β
Heartbeat Registration (heartbeat.py)
β
Connection Active
Code Integration:
# In websocket_server.py
from app.security import get_auth_manager, get_connection_limiter
async def handle_connection(websocket):
# Security check
limiter = get_connection_limiter()
limiter.check_connection_limit(client_ip)
limiter.register_connection(user_id, client_ip)
# Register connection
await connection_manager.connect(user_id, websocket)
# Start heartbeat
heartbeat_manager.register(user_id)
2. Room Join Flow
Client sends JOIN_ROOM message
β
Message Router (message_router.py)
β
Room Manager (room_manager.py)
β
User added to Room (room.py)
β
Confirmation sent to all room users
Code Integration:
# In message_router.py
from app.rooms import room_manager
async def _handle_join_room(self, message, user_id, websocket):
# Add user to room
user = await room_manager.add_user_to_room(
room_id=room_id,
user_id=user_id,
name=name,
language=language
)
# Send confirmation
await connection_manager.send_to_room(room_id, message)
3. Audio Processing Flow
Client sends AUDIO_DATA (binary)
β
Audio Buffer (audio_buffer.py)
β
PCM Validator (pcm_validator.py)
β
Pipeline Manager (pipeline_manager.py)
β
βββββββββββββββββββββββββββββββββββββββ
β Translation Pipeline β
β β
β 1. STT (vosk_engine.py) β
β β Text recognized β
β 2. Translation (argos_engine.py) β
β β Text translated β
β 3. TTS (coqui_engine.py) β
β β Audio synthesized β
βββββββββββββββββββββββββββββββββββββββ
β
Workers (translation_worker.py, tts_worker.py)
β
Send translated audio to target users
Code Integration:
# In message_router.py
from app.audio import AudioBuffer, PCMValidator
from app.pipeline import get_pipeline_manager
async def _handle_audio_data(self, audio_data, user_id):
# Buffer audio
buffer = AudioBuffer(chunk_size=4096)
buffer.add(audio_data)
# Validate
validator = PCMValidator(sample_rate=16000)
if not validator.is_valid(audio_data):
return
# Process through pipeline
pipeline = get_pipeline_manager()
result = await pipeline.process_complete_audio(
audio_data=buffer.get_all(),
user_id=user_id,
source_lang=user.language,
target_lang=target_user.language
)
# Send translated audio
await connection_manager.send_binary_to_user(
target_user.id,
result.audio_data
)
π Security Integration
Rate Limiting
from app.security import get_rate_limiter, get_connection_limiter
# In WebSocket message handler
rate_limiter = get_rate_limiter()
connection_limiter = get_connection_limiter()
# Check connection limit (on connect)
connection_limiter.check_connection_limit(ip_address)
connection_limiter.register_connection(user_id, ip_address)
# Check message rate (on each message)
connection_limiter.check_message_rate(user_id)
# Check API rate limit (on API calls)
rate_limiter.check_rate_limit(user_id)
Authentication
from app.security import get_auth_manager
auth_manager = get_auth_manager()
# Create token
token = auth_manager.create_access_token(user_id)
# Verify token
payload = auth_manager.verify_token(token)
user_id = auth_manager.get_user_id_from_token(token)
π· Worker Pool Integration
Translation Workers
from app.workers import get_translation_pool, TranslationTask
# Start pool (in lifespan)
pool = get_translation_pool()
await pool.start()
# Submit task
task = TranslationTask(
task_id="task_123",
text="Hello world",
source_lang="en",
target_lang="es",
user_id="user_456",
callback=translation_callback
)
await pool.submit_task(task)
# Callback receives result
async def translation_callback(result):
print(f"Translated: {result.translated_text}")
TTS Workers
from app.workers import get_tts_pool, TTSTask
# Start pool
tts_pool = get_tts_pool()
await tts_pool.start()
# Submit task
task = TTSTask(
task_id="tts_123",
text="Hola mundo",
language="es",
user_id="user_456",
callback=tts_callback
)
await tts_pool.submit_task(task)
# Callback receives audio
async def tts_callback(result):
audio_bytes = result.audio_data
await send_audio_to_client(audio_bytes)
π Complete Usage Example
from fastapi import FastAPI, WebSocket
from app.config import get_settings
from app.server import WebSocketServer
from app.rooms import room_manager
from app.pipeline import get_pipeline_manager
from app.security import get_auth_manager, get_connection_limiter
from app.workers import get_translation_pool, get_tts_pool
app = FastAPI()
settings = get_settings()
@app.on_event("startup")
async def startup():
# Start worker pools
translation_pool = get_translation_pool()
await translation_pool.start()
tts_pool = get_tts_pool()
await tts_pool.start()
@app.on_event("shutdown")
async def shutdown():
# Stop worker pools
translation_pool = get_translation_pool()
await translation_pool.stop()
tts_pool = get_tts_pool()
await tts_pool.stop()
# Cleanup rooms
await room_manager.cleanup()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# Security checks
connection_limiter = get_connection_limiter()
client_ip = websocket.client.host
try:
connection_limiter.check_connection_limit(client_ip)
except RateLimitError:
await websocket.close(code=1008, reason="Too many connections")
return
# Handle connection
ws_server = WebSocketServer()
await ws_server.handle_connection(websocket)
π Configuration
All modules use centralized settings from app/config/settings.py:
from app.config import get_settings
settings = get_settings()
# Audio settings
sample_rate = settings.audio_sample_rate # 16000
chunk_size = settings.audio_chunk_size # 4096
# Worker settings
translation_workers = settings.translation_workers # 4
tts_workers = settings.tts_workers # 2
# Security settings
max_connections = settings.max_connections_per_ip # 10
max_messages = settings.max_messages_per_second # 10
# Rate limiting
requests_per_min = settings.max_requests_per_minute # 100
π§ͺ Testing Integration
import pytest
from app.pipeline import get_pipeline_manager
from app.workers import get_translation_pool
@pytest.mark.asyncio
async def test_complete_pipeline():
# Initialize
pipeline = get_pipeline_manager()
pool = get_translation_pool()
await pool.start()
# Process audio
result = await pipeline.process_complete_audio(
audio_data=test_audio,
user_id="test_user",
source_lang="en",
target_lang="es"
)
# Verify
assert result.recognized_text
assert result.translated_text
assert result.audio_data
assert result.processing_time_ms < 5000
# Cleanup
await pool.stop()
π Lifecycle Management
Application Startup
- Load configuration (
settings.py) - Initialize logging (
logging.py) - Start worker pools (
translation_worker.py,tts_worker.py) - Initialize pipeline manager (
pipeline_manager.py) - Load AI models (Vosk, Argos, Coqui)
- Start FastAPI server (
main.py)
Request Handling
- Accept WebSocket connection
- Authenticate (if enabled)
- Check rate limits
- Register connection
- Start heartbeat
- Route messages
- Process audio through pipeline
- Send results to target users
Application Shutdown
- Stop accepting new connections
- Close existing connections gracefully
- Stop worker pools
- Cleanup rooms
- Release model resources
- Flush logs
π Monitoring Points
# Connection stats
connection_manager.get_active_connections()
# Room stats
room_manager.get_room_count()
room_manager.get_all_rooms()
# Worker stats
translation_pool.get_stats()
tts_pool.get_stats()
# Rate limiter stats
rate_limiter.get_remaining_requests(user_id)
connection_limiter.get_busy_workers()
π― Key Integration Points
- WebSocket β Security: Authentication and rate limiting
- WebSocket β Rooms: User management and message routing
- Rooms β Pipeline: Audio processing and translation
- Pipeline β Workers: Parallel processing
- Pipeline β Audio: Buffering and validation
- All β Config: Centralized settings
- All β Logging: Structured logging
β Integration Checklist
- WebSocket server handles connections
- Security middleware validates and rate limits
- Room manager orchestrates multi-room support
- Audio module buffers and validates
- Pipeline manager coordinates STTβTranslationβTTS
- Worker pools enable parallel processing
- All modules use centralized configuration
- All modules use structured logging
- Graceful startup and shutdown
- Error handling throughout
π Ready for Production!
All modules are integrated and work together seamlessly. The system is ready for:
- Load testing
- Performance optimization
- Production deployment
Integration Status: β COMPLETE