| | from operator import itemgetter |
| | import os |
| | from typing import Any, Optional, Tuple, Dict, TypedDict |
| | from urllib import parse |
| | from uuid import uuid4 |
| | import colorlog |
| | import io |
| | import logging |
| | from pprint import pformat |
| | import socketio |
| | import sys |
| | import time |
| | import random |
| | import string |
| | from starlette.applications import Starlette |
| | from starlette.routing import Mount, Route |
| | from starlette.staticfiles import StaticFiles |
| |
|
| |
|
| | from src.room import Room, Member |
| | from src.simuleval_agent_directory import NoAvailableAgentException |
| | from src.simuleval_agent_directory import SimulevalAgentDirectory |
| | from src.simuleval_transcoder import SimulevalTranscoder |
| | from src.transcoder_helpers import get_transcoder_output_events |
| |
|
| | |
| | |
| | |
| |
|
| | DEBUG = True |
| |
|
| | ALL_ROOM_ID = "ALL" |
| |
|
| | ROOM_ID_USABLE_CHARACTERS = string.ascii_uppercase |
| | ROOM_ID_LENGTH = 4 |
| |
|
| | ROOM_LISTENERS_SUFFIX = "_listeners" |
| | ROOM_SPEAKERS_SUFFIX = "_speakers" |
| |
|
| | ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME = "remove_server_lock" |
| |
|
| | |
| | |
| | |
| |
|
| | logger = logging.getLogger("socketio_server_pubsub") |
| | logger.propagate = False |
| |
|
| | handler = colorlog.StreamHandler(stream=sys.stdout) |
| |
|
| | formatter = colorlog.ColoredFormatter( |
| | "%(log_color)s[%(asctime)s][%(levelname)s][%(module)s]:%(reset)s %(message)s", |
| | reset=True, |
| | log_colors={ |
| | "DEBUG": "cyan", |
| | "INFO": "green", |
| | "WARNING": "yellow", |
| | "ERROR": "red", |
| | "CRITICAL": "red,bg_white", |
| | }, |
| | ) |
| |
|
| | handler.setFormatter(formatter) |
| | logger.addHandler(handler) |
| |
|
| | logger.setLevel(logging.WARNING) |
| |
|
| | print("") |
| | print("") |
| | print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20) |
| |
|
| | |
| | |
| | |
| |
|
| | CLIENT_BUILD_PATH = "../streaming-react-app/dist/" |
| | static_files = { |
| | "/": CLIENT_BUILD_PATH, |
| | "/assets/seamless-db6a2555.svg": { |
| | "filename": CLIENT_BUILD_PATH + "assets/seamless-db6a2555.svg", |
| | "content_type": "image/svg+xml", |
| | }, |
| | } |
| |
|
| | |
| | sio = socketio.AsyncServer( |
| | async_mode="asgi", |
| | cors_allowed_origins="*", |
| | logger=logger, |
| | |
| | ) |
| | |
| | socketio_app = socketio.ASGIApp(sio) |
| |
|
| | app_routes = [ |
| | Mount("/ws", app=socketio_app), |
| | Mount( |
| | "/", app=StaticFiles(directory=CLIENT_BUILD_PATH, html=True) |
| | ), |
| | ] |
| | app = Starlette(debug=True, routes=app_routes) |
| |
|
| | |
| | rooms: Dict[str, Room] = {} |
| |
|
| |
|
| | class MemberDirectoryObject(TypedDict): |
| | room: Room |
| | member_object: Member |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | member_directory: Dict[str, MemberDirectoryObject] = {} |
| |
|
| |
|
| | class ServerLock(TypedDict): |
| | name: str |
| | client_id: str |
| | member_object: Member |
| |
|
| | MAX_SPEAKERS = os.environ.get("MAX_SPEAKERS") |
| |
|
| | if os.environ.get("LOCK_SERVER_COMPLETELY", "0") == "1": |
| | logger.info("LOCK_SERVER_COMPLETELY is set. Server will be locked on startup.") |
| | if MAX_SPEAKERS is not None and int(MAX_SPEAKERS): |
| | logger.info(f"MAX_SPEAKERS is set to: {MAX_SPEAKERS}") |
| | dummy_server_lock_member_object = Member( |
| | client_id="seamless_user", session_id="dummy", name="Seamless User" |
| | ) |
| | |
| | dummy_server_lock_member_object.transcoder = True |
| | server_lock: Optional[ServerLock] = ( |
| | { |
| | "name": "Seamless User", |
| | "client_id": "seamless_user", |
| | "member_object": dummy_server_lock_member_object, |
| | } |
| | if os.environ.get("LOCK_SERVER_COMPLETELY", "0") == "1" |
| | else None |
| | ) |
| |
|
| | server_id = str(uuid4()) |
| |
|
| | |
| | |
| | models_override = os.environ.get("MODELS_OVERRIDE") |
| |
|
| | available_agents = SimulevalAgentDirectory() |
| | logger.info("Building and adding agents...") |
| | if models_override is not None: |
| | logger.info(f"MODELS_OVERRIDE supplied from env vars: {models_override}") |
| | available_agents.build_and_add_agents(models_override) |
| |
|
| | agents_capabilities_for_json = available_agents.get_agents_capabilities_list_for_json() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def catch_and_log_exceptions_for_sio_event_handlers(func): |
| | |
| | async def catch_exception_wrapper(*args, **kwargs): |
| | try: |
| | return await func(*args, **kwargs) |
| | except Exception as e: |
| | message = f"[app_pubsub] Caught exception in '{func.__name__}' event handler:\n\n{e}" |
| | logger.exception(message, stack_info=True) |
| |
|
| | try: |
| | exception_data = { |
| | "message": message, |
| | "timeEpochMs": int(time.time() * 1000), |
| | } |
| |
|
| | try: |
| | |
| | sid = args[0] |
| | if isinstance(sid, str) and len(sid) > 0: |
| | session_data = await get_session_data(sid) |
| | if session_data: |
| | client_id = session_data.get("client_id") |
| | member = session_data.get("member_object") |
| | room = session_data.get("room_object") |
| |
|
| | exception_data["room"] = str(room) |
| | exception_data["member"] = str(member) |
| | exception_data["clientID"] = str(client_id) |
| | except Exception as inner_e: |
| | |
| | logger.warn( |
| | f"[app_pubsub] Caught exception while trying add additional_data to server_exception:\n\n{inner_e}" |
| | ) |
| |
|
| | |
| | await sio.emit("server_exception", exception_data) |
| | except Exception as inner_e: |
| | logger.exception( |
| | f"[app_pubsub] Caught exception while trying to emit server_exception event:\n{inner_e}" |
| | ) |
| |
|
| | |
| | raise e |
| |
|
| | |
| | catch_exception_wrapper.__name__ = func.__name__ |
| | return catch_exception_wrapper |
| |
|
| |
|
| | async def emit_room_state_update(room): |
| | await sio.emit( |
| | "room_state_update", |
| | room.to_json(), |
| | room=room.room_id, |
| | ) |
| |
|
| |
|
| | async def emit_server_state_update(): |
| | room_statuses = { |
| | room_id: room.get_room_status_dict() for room_id, room in rooms.items() |
| | } |
| | total_active_connections = sum( |
| | [room_status["activeConnections"] for room_status in room_statuses.values()] |
| | ) |
| | total_active_transcoders = sum( |
| | [room_status["activeTranscoders"] for room_status in room_statuses.values()] |
| | ) |
| | logger.info( |
| | f"[Server Status]: {total_active_connections} active connections (in rooms); {total_active_transcoders} active transcoders" |
| | ) |
| | logger.info(f"[Server Status]: server_lock={server_lock}") |
| | server_lock_object_for_js = ( |
| | { |
| | "name": server_lock.get("name"), |
| | "clientID": server_lock.get("client_id"), |
| | "isActive": server_lock.get("member_object") |
| | and server_lock.get("member_object").transcoder is not None, |
| | } |
| | if server_lock |
| | else None |
| | ) |
| | await sio.emit( |
| | "server_state_update", |
| | { |
| | "statusByRoom": room_statuses, |
| | "totalActiveConnections": total_active_connections, |
| | "totalActiveTranscoders": total_active_transcoders, |
| | "agentsCapabilities": agents_capabilities_for_json, |
| | "serverLock": server_lock_object_for_js, |
| | }, |
| | room=ALL_ROOM_ID, |
| | ) |
| |
|
| |
|
| | async def get_session_data(sid): |
| | session = await sio.get_session(sid) |
| | |
| | return session or {} |
| |
|
| |
|
| | async def set_session_data(sid, client_id, room_id, room_object, member_object): |
| | await sio.save_session( |
| | sid, |
| | { |
| | "client_id": client_id, |
| | "room_id": room_id, |
| | "room_object": room_object, |
| | "member_object": member_object, |
| | }, |
| | ) |
| |
|
| |
|
| | def get_random_room_id(): |
| | return "".join(random.choices(ROOM_ID_USABLE_CHARACTERS, k=ROOM_ID_LENGTH)) |
| |
|
| |
|
| | def get_random_unused_room_id(): |
| | room_id = get_random_room_id() |
| | while room_id in rooms: |
| | room_id = get_random_room_id() |
| | return room_id |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | @sio.on("connect") |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def connect(sid, environ): |
| | logger.info(f"📥 [event: connected] sid={sid}") |
| |
|
| | |
| | query_params = dict(parse.parse_qsl(environ["QUERY_STRING"])) |
| | client_id = query_params.get("clientID") |
| |
|
| | logger.debug(f"query_params:\n{pformat(query_params)}") |
| |
|
| | if client_id is None: |
| | logger.info("No clientID provided. Disconnecting...") |
| | await sio.disconnect(sid) |
| | return |
| |
|
| | |
| | if member_directory.get(client_id): |
| | room = member_directory[client_id].get("room") |
| | room_id = room.room_id |
| | |
| | member = member_directory[client_id].get("member_object") |
| |
|
| | member.connection_status = "connected" |
| | member.session_id = sid |
| |
|
| | logger.info( |
| | f"[event: connect] {member} reconnected. Attempting to re-add them to socketio rooms and reset session data." |
| | ) |
| |
|
| | if room is None or member is None: |
| | logger.error( |
| | f"[event: connect] {client_id} is reconnecting, but room or member is None. This should not happen." |
| | ) |
| | await sio.disconnect(sid) |
| | return |
| |
|
| | sio.enter_room(sid, room_id) |
| | sio.enter_room(sid, ALL_ROOM_ID) |
| |
|
| | if client_id in room.listeners: |
| | sio.enter_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") |
| | if client_id in room.speakers: |
| | sio.enter_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") |
| |
|
| | |
| | await set_session_data( |
| | sid, |
| | client_id=client_id, |
| | room_id=room.room_id, |
| | room_object=room, |
| | member_object=member, |
| | ) |
| | await emit_room_state_update(room) |
| | else: |
| | |
| | await set_session_data( |
| | sid, client_id=client_id, room_id=None, room_object=None, member_object=None |
| | ) |
| |
|
| | await sio.emit("server_id", server_id, to=sid) |
| | await emit_server_state_update() |
| |
|
| |
|
| | @sio.event |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def disconnect(sid): |
| | global server_lock |
| | session_data = await get_session_data(sid) |
| | |
| |
|
| | client_id = None |
| | member = None |
| | room = None |
| |
|
| | if session_data: |
| | client_id = session_data.get("client_id") |
| | member = session_data.get("member_object") |
| | room = session_data.get("room_object") |
| |
|
| | logger.info( |
| | f"[event: disconnect][{room or 'NOT_IN_ROOM'}] member: {member or 'NO_MEMBER_OBJECT'} disconnected" |
| | ) |
| |
|
| | |
| | if server_lock and server_lock.get("client_id") == client_id: |
| | server_lock = None |
| |
|
| | if member: |
| | member.connection_status = "disconnected" |
| |
|
| | if member.transcoder: |
| | member.transcoder.close = True |
| | member.transcoder = None |
| | member.requested_output_type = None |
| |
|
| | if room: |
| | logger.info( |
| | f"[event: disconnect] {member} disconnected from room {room.room_id}" |
| | ) |
| | await emit_room_state_update(room) |
| | else: |
| | logger.info( |
| | f"[event: disconnect] {member} disconnected, but no room object present. This should not happen." |
| | ) |
| | else: |
| | logger.info( |
| | f"[event: disconnect] client_id {client_id or 'NO_CLIENT_ID'} with sid {sid} in rooms {str(sio.rooms(sid))} disconnected" |
| | ) |
| |
|
| | await emit_server_state_update() |
| |
|
| |
|
| | @sio.on("*") |
| | async def catch_all(event, sid, data): |
| | logger.info(f"[unhandled event: {event}] sid={sid} data={data}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | @sio.on("join_room") |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def join_room(sid, client_id, room_id_from_client, config_dict): |
| | global server_lock |
| |
|
| | args = { |
| | "sid": sid, |
| | "client_id": client_id, |
| | "room_id": room_id_from_client, |
| | "config_dict": config_dict, |
| | } |
| | logger.info(f"[event: join_room] {args}") |
| | session_data = await get_session_data(sid) |
| | logger.info(f"session_data: {session_data}") |
| |
|
| | room_id = room_id_from_client |
| | if room_id is None: |
| | room_id = get_random_unused_room_id() |
| | logger.info( |
| | f"No room_id provided. Generating a random, unused room_id: {room_id}" |
| | ) |
| |
|
| | |
| | if room_id not in rooms: |
| | rooms[room_id] = Room(room_id) |
| |
|
| | room = rooms[room_id] |
| |
|
| | member = None |
| |
|
| | name = "[NO_NAME]" |
| |
|
| | |
| | if client_id in room.members: |
| | member = room.members[client_id] |
| | logger.info(f"{member} is rejoining room {room_id}.") |
| | else: |
| | member_number = len(room.members) + 1 |
| | name = f"Member {member_number}" |
| | member = Member( |
| | client_id=client_id, |
| | session_id=sid, |
| | name=name, |
| | ) |
| | logger.info(f"Created a new Member object: {member}") |
| | logger.info(f"Adding {member} to room {room_id}") |
| | room.members[client_id] = member |
| |
|
| | |
| | member_directory[client_id] = {"room": room, "member_object": member} |
| |
|
| | |
| | sio.enter_room(sid, room_id) |
| | |
| | sio.enter_room(sid, ALL_ROOM_ID) |
| |
|
| | if "listener" in config_dict["roles"]: |
| | sio.enter_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") |
| | if client_id not in room.listeners: |
| | room.listeners.append(client_id) |
| | else: |
| | sio.leave_room(sid, f"{room_id}{ROOM_LISTENERS_SUFFIX}") |
| | room.listeners = [ |
| | listener_id for listener_id in room.listeners if listener_id != client_id |
| | ] |
| |
|
| | if "speaker" in config_dict["roles"]: |
| | sio.enter_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") |
| | if client_id not in room.speakers: |
| | room.speakers.append(client_id) |
| | else: |
| | sio.leave_room(sid, f"{room_id}{ROOM_SPEAKERS_SUFFIX}") |
| | |
| | if server_lock and server_lock.get("client_id") == client_id: |
| | logger.info( |
| | f"🔓 Server is now unlocked from client {server_lock.get('client_id')} with name/info: {server_lock.get('name')}" |
| | ) |
| | server_lock = None |
| | if member.transcoder: |
| | member.transcoder.close = True |
| | member.transcoder = None |
| | room.speakers = [ |
| | speaker_id for speaker_id in room.speakers if speaker_id != client_id |
| | ] |
| |
|
| | |
| | if ( |
| | server_lock is not None |
| | and server_lock["client_id"] == client_id |
| | and config_dict.get("lockServerName") is None |
| | ): |
| | logger.info(f"[join_room] Releasing server lock: {pformat(server_lock)}") |
| | server_lock = None |
| |
|
| | |
| | if config_dict.get("lockServerName") is not None and "speaker" in config_dict.get( |
| | "roles", {} |
| | ): |
| | |
| | |
| | if ( |
| | server_lock is not None |
| | and config_dict.get("lockServerName") |
| | == ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME |
| | |
| | and not os.environ.get("LOCK_SERVER_COMPLETELY", "0") == "1" |
| | ): |
| | server_lock = None |
| | logger.info( |
| | f"🔓 Server lock has been reset by {client_id} using the escape hatch name {ESCAPE_HATCH_SERVER_LOCK_RELEASE_NAME}" |
| | ) |
| |
|
| | |
| | if server_lock is None or server_lock.get("client_id") == client_id: |
| | |
| | server_lock = { |
| | "name": config_dict.get("lockServerName"), |
| | "client_id": client_id, |
| | "member_object": member, |
| | } |
| | logger.info( |
| | f"🔒 Server is now locked to client {server_lock.get('client_id')} with name/info: {server_lock.get('name')}\nThis client will have priority over all others until they disconnect." |
| | ) |
| | |
| | elif server_lock is not None and server_lock.get("client_id") != client_id: |
| | logger.warn( |
| | f"⚠️ Server is already locked to client {server_lock.get('client_id')}. Ignoring request to lock to client {client_id}." |
| | ) |
| | |
| |
|
| | |
| | await set_session_data( |
| | sid, |
| | client_id=client_id, |
| | room_id=room_id, |
| | room_object=room, |
| | member_object=member, |
| | ) |
| |
|
| | await emit_room_state_update(room) |
| | await emit_server_state_update() |
| |
|
| | return {"roomsJoined": sio.rooms(sid), "roomID": room_id} |
| |
|
| | def allow_speaker(room, client_id): |
| | if MAX_SPEAKERS is not None and client_id in room.speakers: |
| | room_statuses = {room_id: room.get_room_status_dict() for room_id, room in rooms.items()} |
| | speakers = sum(room_status["activeTranscoders"] for room_status in room_statuses.values()) |
| | return speakers < int(MAX_SPEAKERS) |
| | return True |
| |
|
| | |
| | @sio.event |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def configure_stream(sid, config): |
| | session_data = await get_session_data(sid) |
| | client_id, member, room = itemgetter("client_id", "member_object", "room_object")( |
| | session_data |
| | ) |
| |
|
| | logger.debug( |
| | f"[event: configure_stream][{room}] Received stream config from {member}\n{pformat(config)}" |
| | ) |
| |
|
| | if member is None or room is None: |
| | logger.error( |
| | f"Received stream config from {member}, but member or room is None. This should not happen." |
| | ) |
| | return {"status": "error", "message": "member_or_room_is_none"} |
| |
|
| | if not allow_speaker(room, client_id): |
| | logger.error( |
| | f"In MAX_SPEAKERS mode we only allow one speaker at a time. Ignoring request to configure stream from client {client_id}." |
| | ) |
| | return {"status": "error", "message": "max_speakers"} |
| |
|
| | |
| | |
| | |
| | if ( |
| | server_lock is not None |
| | and server_lock.get("client_id") != client_id |
| | and server_lock.get("member_object") |
| | and server_lock.get("member_object").transcoder is not None |
| | ): |
| | logger.warn( |
| | f"Server is locked to client {server_lock.get('client_id')}. Ignoring request to configure stream from client {client_id}." |
| | ) |
| | return {"status": "error", "message": "server_locked"} |
| |
|
| | debug = config.get("debug") |
| | async_processing = config.get("async_processing") |
| |
|
| | |
| | model_type = config.get("model_type") |
| | member.requested_output_type = model_type |
| |
|
| | model_name = config.get("model_name") |
| |
|
| | try: |
| | agent = available_agents.get_agent_or_throw(model_name) |
| | except NoAvailableAgentException as e: |
| | logger.warn(f"Error while getting agent: {e}") |
| | |
| | await sio.disconnect(sid) |
| | return {"status": "error", "message": str(e)} |
| |
|
| | if member.transcoder: |
| | logger.warn( |
| | "Member already has a transcoder configured. Closing it, and overwriting with a new transcoder..." |
| | ) |
| | member.transcoder.close = True |
| |
|
| | t0 = time.time() |
| | try: |
| | member.transcoder = SimulevalTranscoder( |
| | agent, |
| | config["rate"], |
| | debug=debug, |
| | buffer_limit=int(config["buffer_limit"]), |
| | ) |
| | except Exception as e: |
| | logger.warn(f"Got exception while initializing agents: {e}") |
| | |
| | await sio.disconnect(sid) |
| | return {"status": "error", "message": str(e)} |
| |
|
| | t1 = time.time() |
| | logger.debug(f"Booting up VAD and transcoder took {t1-t0} sec") |
| |
|
| | |
| | if async_processing: |
| | member.transcoder.start() |
| |
|
| | |
| | await emit_room_state_update(room) |
| | await emit_server_state_update() |
| |
|
| | return {"status": "ok", "message": "server_ready"} |
| |
|
| |
|
| | |
| | |
| | @sio.on("set_dynamic_config") |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def set_dynamic_config( |
| | sid, |
| | |
| | partial_config, |
| | ): |
| | session_data = await get_session_data(sid) |
| |
|
| | member = None |
| |
|
| | if session_data: |
| | member = session_data.get("member_object") |
| |
|
| | if member: |
| | new_dynamic_config = { |
| | **(member.transcoder_dynamic_config or {}), |
| | **partial_config, |
| | } |
| | logger.info( |
| | f"[set_dynamic_config] Setting new dynamic config:\n\n{pformat(new_dynamic_config)}\n" |
| | ) |
| | member.transcoder_dynamic_config = new_dynamic_config |
| |
|
| | return {"status": "ok", "message": "dynamic_config_set"} |
| |
|
| |
|
| | @sio.event |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def incoming_audio(sid, blob): |
| | session_data = await get_session_data(sid) |
| |
|
| | client_id = None |
| | member = None |
| | room = None |
| |
|
| | if session_data: |
| | client_id = session_data.get("client_id") |
| | member = session_data.get("member_object") |
| | room = session_data.get("room_object") |
| |
|
| | logger.debug(f"[event: incoming_audio] from member {member}") |
| |
|
| | |
| | |
| | |
| | if ( |
| | server_lock is not None |
| | and server_lock.get("client_id") != client_id |
| | and server_lock.get("member_object") |
| | and server_lock.get("member_object").transcoder is not None |
| | ): |
| | |
| | if member.transcoder: |
| | member.transcoder.close = True |
| | member.transcoder = None |
| | |
| | if room: |
| | await emit_room_state_update(room) |
| | await emit_server_state_update() |
| | logger.warn( |
| | f"[incoming_audio] Server is locked to client {server_lock.get('client_id')}. Ignoring incoming audio from client {client_id}." |
| | ) |
| | return |
| |
|
| | if member is None or room is None: |
| | logger.error( |
| | f"[incoming_audio] Received incoming_audio from {member}, but member or room is None. This should not happen." |
| | ) |
| | return |
| |
|
| | |
| | if not isinstance(blob, bytes): |
| | logger.error( |
| | f"[incoming_audio] Received audio from {member}, but it was not of type `bytes`. type(blob) = {type(blob)}" |
| | ) |
| | return |
| |
|
| | if member.transcoder is None: |
| | logger.error( |
| | f"[incoming_audio] Received audio from {member}, but no transcoder configured to process it (member.transcoder is None). This should not happen." |
| | ) |
| | return |
| |
|
| | member.transcoder.process_incoming_bytes( |
| | blob, dynamic_config=member.transcoder_dynamic_config |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | events = get_transcoder_output_events(member.transcoder) |
| | logger.debug(f"[incoming_audio] transcoder output events: {len(events)}") |
| |
|
| | if len(events) == 0: |
| | logger.debug("[incoming_audio] No transcoder output to send") |
| | else: |
| | for e in events: |
| | if e["event"] == "translation_speech" and member.requested_output_type in [ |
| | "s2s", |
| | "s2s&t", |
| | ]: |
| | logger.debug("[incoming_audio] Sending translation_speech event") |
| | await sio.emit( |
| | "translation_speech", e, room=f"{room.room_id}_listeners" |
| | ) |
| | elif e["event"] == "translation_text" and member.requested_output_type in [ |
| | "s2t", |
| | "s2s&t", |
| | ]: |
| | logger.debug("[incoming_audio] Sending translation_text event") |
| | await sio.emit("translation_text", e, room=f"{room.room_id}_listeners") |
| | else: |
| | logger.error(f"[incoming_audio] Unexpected event type: {e['event']}") |
| |
|
| | return |
| |
|
| |
|
| | @sio.event |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def stop_stream(sid): |
| | session_data = await get_session_data(sid) |
| | client_id, member, room = itemgetter("client_id", "member_object", "room_object")( |
| | session_data |
| | ) |
| |
|
| | logger.debug(f"[event: stop_stream][{room}] Attempting to stop stream for {member}") |
| |
|
| | if member is None or room is None: |
| | message = f"Received stop_stream from {member}, but member or room is None. This should not happen." |
| | logger.error(message) |
| | return {"status": "error", "message": message} |
| |
|
| | |
| | if member.transcoder: |
| | member.transcoder.close = True |
| | member.transcoder = None |
| | else: |
| | message = f"Received stop_stream from {member}, but member.transcoder is None. This should not happen." |
| | logger.warn(message) |
| |
|
| | |
| | await emit_room_state_update(room) |
| | |
| | await emit_server_state_update() |
| |
|
| | return {"status": "ok", "message": "Stream stopped"} |
| |
|
| |
|
| | @sio.on("clear_transcript_for_all") |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def clear_transcript_for_all(sid): |
| | session_data = await get_session_data(sid) |
| |
|
| | room = session_data.get("room_object") |
| |
|
| | if room: |
| | await sio.emit("clear_transcript", room=f"{room.room_id}") |
| | else: |
| | logger.error("[clear_transcript] room is None. This should not happen.") |
| |
|
| |
|
| | @sio.event |
| | @catch_and_log_exceptions_for_sio_event_handlers |
| | async def set_name(sid, name): |
| | logger.info(f"[Event: set_name] name={name}") |
| | await sio.save_session(sid, {"name": name}) |
| |
|