Spaces:
Sleeping
Sleeping
| from typing import Dict | |
| from services.connection_manager import RepoConnection | |
| import asyncio | |
| class ConnectionRegistry: | |
| def __init__(self): | |
| self.connections: Dict[str, RepoConnection] = {} | |
| def register(self, repo): | |
| if repo.id in self.connections: return | |
| conn = RepoConnection(repo.id, repo.namespace, repo.repo) | |
| self.connections[repo.id] = conn | |
| asyncio.create_task(conn._listen_events()) | |
| def unregister(self, repo_id: str): | |
| if conn := self.connections.pop(repo_id, None): | |
| conn.stop() | |
| registry = ConnectionRegistry() |