Spaces:
Sleeping
Sleeping
| import { Client } from '@gradio/client'; | |
| import { gradioLogger } from './gradio-logger'; | |
| // Gradio 型定義(@gradio/clientから直接exportされていないため) | |
| interface ClientOptions { | |
| hf_token?: `hf_${string}`; | |
| status_callback?: ((status: unknown) => void) | null; | |
| auth?: [string, string] | null; | |
| with_null_state?: boolean; | |
| events?: ('data' | 'status' | 'log' | 'render')[]; | |
| headers?: Record<string, string>; | |
| query_params?: Record<string, string>; | |
| session_hash?: string; | |
| } | |
| export interface PayloadMessage { | |
| type: 'data'; | |
| data: unknown; | |
| endpoint: string; | |
| fn_index: number; | |
| } | |
| interface StatusMessage { | |
| type: 'status'; | |
| stage: 'pending' | 'error' | 'complete' | 'generating' | 'streaming'; | |
| code?: string; | |
| endpoint: string; | |
| fn_index: number; | |
| } | |
| interface PooledClient { | |
| client: Client; | |
| repository: string; | |
| lastUsed: number; | |
| isHealthy: boolean; | |
| connectionCount: number; | |
| connectionId: string; | |
| } | |
| interface UserAssignment { | |
| repository: string; | |
| assignedAt: number; | |
| } | |
| interface QueueItem { | |
| resolve: (client: Client) => void; | |
| reject: (error: Error) => void; | |
| } | |
| interface UserStats { | |
| connections: number; | |
| healthy: number; | |
| active: number; | |
| } | |
| interface RepoStats { | |
| users: number; | |
| connections: number; | |
| healthyConnections: number; | |
| activeConnections: number; | |
| userDetails: Record<string, UserStats>; | |
| } | |
| interface PoolStats { | |
| pools: Record<string, RepoStats>; | |
| totalConnections: number; | |
| totalUsers: number; | |
| userAssignments: number; | |
| repositories: string[]; | |
| } | |
| export class GradioConnectionPool { | |
| // ユーザーごとの接続マップ(リポジトリ -> ユーザー -> 接続リスト) | |
| private userPools: Map<string, Map<string, PooledClient[]>> = new Map(); | |
| private userAssignments: Map<string, UserAssignment> = new Map(); | |
| private repositories: string[]; | |
| private initPromise: Promise<void> | null = null; | |
| private connectionQueue: Map<string, QueueItem[]> = new Map(); | |
| // 設定値 | |
| private readonly MIN_CONNECTIONS = 0; // ユーザーごとの接続なので初期接続は作らない | |
| private readonly MAX_CONNECTIONS_PER_USER = 3; // ユーザーごとの最大接続数 | |
| private readonly MAX_TOTAL_CONNECTIONS = 100; // 全体の最大接続数 | |
| private readonly CONNECTION_TIMEOUT = 30000; // 30秒(接続確立のタイムアウト) | |
| private readonly MAX_CONCURRENT_PER_CONNECTION = 1; // 1接続あたり1リクエストのみ(Gradioの制約) | |
| private readonly QUEUE_WAIT_TIMEOUT = 60000; // 60秒(キュー待機のタイムアウト) | |
| private readonly MAX_RETRIES = 3; | |
| private readonly RETRY_DELAY = 2000; | |
| private readonly MAX_CONNECTION_RETRIES = 1; // 接続エラー時のリトライ1回(合計2回試行) | |
| private readonly USER_ASSIGNMENT_TTL = 7 * 24 * 60 * 60 * 1000; // 7日間 | |
| private readonly USER_CONNECTION_TTL = 30 * 60 * 1000; // 30分(ユーザー接続の有効期限) | |
| private readonly HEALTH_CHECK_INTERVAL = 5 * 60 * 1000; // 5分 | |
| private healthCheckTimer: NodeJS.Timeout | null = null; | |
| constructor() { | |
| // 環境変数検証 | |
| const gradioToken = process.env.GRADIO_TOKEN; | |
| if (!gradioToken || !gradioToken.startsWith('hf_')) { | |
| throw new Error('[Pool] GRADIO_TOKEN is missing or invalid (must start with "hf_")'); | |
| } | |
| // デバッグ: 環境変数の生の値を出力 | |
| console.log( | |
| JSON.stringify({ | |
| timestamp: new Date().toISOString(), | |
| component: 'Pool', | |
| event: 'ConstructorInit', | |
| DEV_REPOSITORY: process.env.DEV_REPOSITORY, | |
| NODE_ENV: process.env.NODE_ENV, | |
| }), | |
| ); | |
| const repoString = process.env.DEV_REPOSITORY || 'dentsudigital/mugenAILP_1,dentsudigital/mugenAILP_dev,dentsudigital/mugenAILP_stage'; | |
| this.repositories = repoString.split(',').map((r) => r.trim()); | |
| // デバッグ: パース後のリポジトリリストを出力 | |
| console.log( | |
| JSON.stringify({ | |
| timestamp: new Date().toISOString(), | |
| component: 'Pool', | |
| event: 'RepositoriesLoaded', | |
| repositories: this.repositories, | |
| }), | |
| ); | |
| // 各リポジトリ用のキューを初期化 | |
| for (const repo of this.repositories) { | |
| this.connectionQueue.set(repo, []); | |
| this.userPools.set(repo, new Map()); | |
| } | |
| // 定期的なヘルスチェック(Node.js環境のみ) | |
| if (typeof setInterval !== 'undefined') { | |
| this.healthCheckTimer = setInterval(() => this.performHealthCheck(), this.HEALTH_CHECK_INTERVAL); | |
| } | |
| } | |
| /** | |
| * 初期化(ユーザーごとの接続のため、最小接続は作らない) | |
| */ | |
| async initialize(): Promise<void> { | |
| if (this.initPromise) return this.initPromise; | |
| if (this.userPools.size > 0) return; // 既に初期化済み | |
| this.initPromise = this.doInitialize(); | |
| return this.initPromise; | |
| } | |
| private async doInitialize(): Promise<void> { | |
| gradioLogger.logPool('Initializing connection pool (user-based mode)'); | |
| gradioLogger.logPool(`Pool initialization complete for ${this.repositories.length} repositories`); | |
| } | |
| /** | |
| * 単一の接続を作成(ユーザー専用) | |
| */ | |
| private async createConnection(repository: string, connectionIndex: number = 0, userEmail?: string): Promise<PooledClient | null> { | |
| for (let attempt = 1; attempt <= this.MAX_RETRIES; attempt++) { | |
| try { | |
| const headers = { | |
| user_email: userEmail || '', | |
| }; | |
| const options: ClientOptions = { | |
| hf_token: process.env.GRADIO_TOKEN as `hf_${string}`, | |
| headers, // 接続時にヘッダーを固定 | |
| }; | |
| // timeoutはClientOptionsの公式プロパティではないが、内部で使用される | |
| const client = await Client.connect(repository, { | |
| ...options, | |
| timeout: this.CONNECTION_TIMEOUT, | |
| } as ClientOptions & { timeout?: number }); | |
| return { | |
| client, | |
| repository, | |
| lastUsed: Date.now(), | |
| isHealthy: true, | |
| connectionCount: 0, | |
| connectionId: `${repository}#${userEmail || 'anonymous'}#${connectionIndex}`, | |
| }; | |
| } catch (error: unknown) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError(`Failed attempt ${attempt}`, errorMessage, undefined, userEmail || undefined, repository); | |
| if (attempt < this.MAX_RETRIES) { | |
| await new Promise((resolve) => setTimeout(resolve, this.RETRY_DELAY)); | |
| } | |
| } | |
| } | |
| return null; | |
| } | |
| /** | |
| * レスポンスを安全に処理(ファイル/Blob等を保護) | |
| */ | |
| private sanitizeResponse(response: unknown): unknown { | |
| try { | |
| // ファイル系やBlob系はそのまま返す(重要!) | |
| if (response && typeof response === 'object') { | |
| // Gradioの典型的なファイルレスポンス構造を保護 | |
| if ('name' in response || ('data' in response && response.data instanceof Blob) || 'url' in response) { | |
| // console.log('[Pool] Preserving file/blob response structure'); | |
| return response; | |
| } | |
| // dataプロパティがある場合はその中身もチェック | |
| if ('data' in response && Array.isArray(response.data)) { | |
| const hasFileObjects = response.data.some( | |
| (item: unknown) => item && typeof item === 'object' && ('name' in item || item instanceof Blob || 'url' in item), | |
| ); | |
| if (hasFileObjects) { | |
| // console.log('[Pool] Preserving response with file objects in data array'); | |
| return response; | |
| } | |
| } | |
| } | |
| // プリミティブ値や通常のJSONオブジェクトのみサニタイズ | |
| return JSON.parse(JSON.stringify(response)); | |
| } catch { | |
| // console.warn('[Pool] Response sanitization failed, returning original response'); | |
| // サニタイズ失敗時は元のレスポンスをそのまま返す(nullではなく) | |
| return response; | |
| } | |
| } | |
| /** | |
| * 接続確立前のエラーかどうかを判定 | |
| * (バックエンドAPIまで到達していないエラー。接続を破棄してリトライすべきエラー) | |
| */ | |
| private isPreConnectionError(error: unknown): boolean { | |
| if (!error || typeof error !== 'object') return false; | |
| // 接続確立前のエラーコード(バックエンドAPIまで到達していない) | |
| const preConnectionErrorCodes = [ | |
| 'ECONNREFUSED', // 接続拒否 | |
| 'ETIMEDOUT', // 接続タイムアウト(注:リクエストタイムアウトと区別できない場合がある) | |
| 'ENOTFOUND', // DNS解決失敗 | |
| 'ENETUNREACH', // ネットワーク到達不可 | |
| 'ENOTCONN', // ソケット未接続 | |
| ]; | |
| // エラーコードのチェック | |
| if ('code' in error && typeof error.code === 'string' && preConnectionErrorCodes.includes(error.code)) { | |
| return true; | |
| } | |
| // cause プロパティを1階層だけチェック | |
| if ('cause' in error && error.cause && typeof error.cause === 'object') { | |
| const cause = error.cause; | |
| if ('code' in cause && typeof cause.code === 'string' && preConnectionErrorCodes.includes(cause.code)) { | |
| return true; | |
| } | |
| } | |
| return false; | |
| } | |
| /** | |
| * 接続を破棄してプールから削除 | |
| */ | |
| private async destroyConnection(connection: PooledClient, repository: string, userIdentifier: string): Promise<void> { | |
| connection.isHealthy = false; | |
| // プールから削除 | |
| const repoPool = this.userPools.get(repository); | |
| if (repoPool) { | |
| const userConnections = repoPool.get(userIdentifier); | |
| if (userConnections) { | |
| const index = userConnections.indexOf(connection); | |
| if (index >= 0) { | |
| userConnections.splice(index, 1); | |
| } | |
| // ユーザーの接続がなくなったら削除 | |
| if (userConnections.length === 0) { | |
| repoPool.delete(userIdentifier); | |
| } | |
| } | |
| } | |
| // クライアントをクローズ | |
| try { | |
| if (connection.client && typeof connection.client.close === 'function') { | |
| connection.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, userIdentifier, repository); | |
| } | |
| gradioLogger.logPool(`Connection destroyed: ${connection.connectionId}`, userIdentifier, repository); | |
| } | |
| /** | |
| * Submitを直接実行(推奨API) | |
| * プールが submit を実行して結果だけ返す安全な設計 | |
| * リトライなし(最大1回試行) | |
| */ | |
| /** | |
| * 接続プールを使用してGradio APIを呼び出す | |
| */ | |
| public async runSubmit(params: { | |
| userEmail: string | null; | |
| userIdentifier: string; | |
| endpoint: string; | |
| payload: unknown[] | Record<string, unknown> | undefined; | |
| timeoutMs?: number; | |
| }): Promise<PayloadMessage> { | |
| const { userEmail, userIdentifier, endpoint, payload, timeoutMs = 720000 } = params; // デフォルト12分 | |
| const selectedRepo = this.selectRepository(userIdentifier, this.repositories); | |
| // 接続確立前のエラーをリトライするためのループ | |
| for (let connAttempt = 0; connAttempt < this.MAX_CONNECTION_RETRIES; connAttempt++) { | |
| const connection = await this.getUserConnection(selectedRepo, userIdentifier, userEmail ?? undefined); | |
| // connectionCountをインクリメント(finallyで必ずデクリメント) | |
| connection.connectionCount++; | |
| connection.lastUsed = Date.now(); | |
| try { | |
| // ペイロードサイズを計算してログ出力 | |
| const payloadSize = JSON.stringify(payload).length; | |
| const payloadSizeKB = (payloadSize / 1024).toFixed(2); | |
| const payloadSizeMB = (payloadSize / 1024 / 1024).toFixed(2); | |
| gradioLogger.logPool( | |
| `[SUBMIT START] Starting submit - endpoint: ${endpoint}, attempt: ${connAttempt + 1}/${this.MAX_CONNECTION_RETRIES}, payload size: ${payloadSizeKB}KB (${payloadSizeMB}MB)`, | |
| userIdentifier, | |
| selectedRepo, | |
| ); | |
| // ペイロードが1MBを超える場合は警告 | |
| if (payloadSize > 1024 * 1024) { | |
| gradioLogger.logPool( | |
| `[WARNING] Large payload detected: ${payloadSizeMB}MB - this may cause timeout or rejection by Gradio server`, | |
| userIdentifier, | |
| selectedRepo, | |
| ); | |
| } | |
| // Promiseでラップしてasync generatorの問題を回避 | |
| const result = await new Promise<PayloadMessage>((resolve, reject) => { | |
| let timeoutHandle: NodeJS.Timeout | null = null; | |
| let hasResolved = false; | |
| let iterationCount = 0; | |
| const cleanup = () => { | |
| if (timeoutHandle) { | |
| clearTimeout(timeoutHandle); | |
| timeoutHandle = null; | |
| } | |
| }; | |
| const finalize = (action: () => void) => { | |
| if (!hasResolved) { | |
| hasResolved = true; | |
| cleanup(); | |
| action(); | |
| } | |
| }; | |
| timeoutHandle = setTimeout(() => { | |
| finalize(() => { | |
| gradioLogger.logPool(`[TIMEOUT] Request timed out after ${timeoutMs}ms - endpoint: ${endpoint}`, userIdentifier, selectedRepo); | |
| reject(new Error(`[Pool] Gradio API call timed out after ${timeoutMs}ms`)); | |
| }); | |
| }, timeoutMs); | |
| (async () => { | |
| try { | |
| const job = connection.client.submit(endpoint, payload); | |
| for await (const event of job) { | |
| if (hasResolved) break; | |
| iterationCount++; | |
| gradioLogger.logPool( | |
| `[ITERATION ${iterationCount}] Event type: ${event.type}${event.type === 'status' ? `, stage: ${(event as StatusMessage).stage}` : ''}`, | |
| userIdentifier, | |
| selectedRepo, | |
| ); | |
| if (event.type === 'data') { | |
| finalize(() => { | |
| const sanitizedResult = this.sanitizeResponse(event as PayloadMessage) as PayloadMessage; | |
| gradioLogger.logPool(`[SUCCESS] Request succeeded - endpoint: ${endpoint}`, userIdentifier, selectedRepo); | |
| resolve(sanitizedResult); | |
| }); | |
| break; | |
| } else if (event.type === 'status' && (event as StatusMessage).stage === 'error') { | |
| finalize(() => { | |
| reject(new Error((event as StatusMessage).code || 'Gradio API error')); | |
| }); | |
| break; | |
| } | |
| } | |
| finalize(() => { | |
| reject(new Error('[Pool] No data received from Gradio API')); | |
| }); | |
| } catch (error) { | |
| finalize(() => { | |
| reject(error); | |
| }); | |
| } | |
| })(); | |
| }); | |
| gradioLogger.logPool(`[SUBMIT COMPLETE] Submit completed - endpoint: ${endpoint}`, userIdentifier, selectedRepo); | |
| return result; | |
| } catch (error: unknown) { | |
| gradioLogger.logPool(`[CATCH BLOCK] Caught error in outer try-catch - endpoint: ${endpoint}`, userIdentifier, selectedRepo); | |
| const isConnError = this.isPreConnectionError(error); | |
| const errorType = isConnError ? 'CONNECTION' : 'PERMANENT'; | |
| gradioLogger.logPoolError('Submit failed', error, errorType, userIdentifier, selectedRepo); | |
| if (isConnError && connAttempt < this.MAX_CONNECTION_RETRIES) { | |
| // 接続確立前のエラーの場合のみリトライ | |
| await this.destroyConnection(connection, selectedRepo, userIdentifier); | |
| gradioLogger.logPool(`[RETRY] Connection destroyed, will retry`, userIdentifier, selectedRepo); | |
| await new Promise((resolve) => setTimeout(resolve, this.RETRY_DELAY)); | |
| continue; // 次の試行へ | |
| } | |
| // 接続エラー以外、またはリトライ回数超過の場合は即座にエラーをthrow | |
| throw error; | |
| } finally { | |
| // 必ずカウントを戻す | |
| connection.connectionCount = Math.max(0, connection.connectionCount - 1); | |
| connection.lastUsed = Date.now(); | |
| } | |
| } | |
| throw new Error('[Pool] All retry attempts exhausted'); | |
| } | |
| /** | |
| * ユーザー専用の接続を取得または作成 | |
| */ | |
| private async getUserConnection(repository: string, userIdentifier: string, userEmail?: string): Promise<PooledClient> { | |
| const repoPool = this.userPools.get(repository); | |
| if (!repoPool) { | |
| throw new Error(`[Pool] Repository ${repository} not found`); | |
| } | |
| // ユーザーの接続リストを取得または作成 | |
| let userConnections = repoPool.get(userIdentifier); | |
| if (!userConnections) { | |
| userConnections = []; | |
| repoPool.set(userIdentifier, userConnections); | |
| } | |
| // 利用可能な接続を探す | |
| const availableConnection = userConnections.find((conn) => conn.isHealthy && conn.connectionCount < this.MAX_CONCURRENT_PER_CONNECTION); | |
| if (availableConnection) { | |
| availableConnection.lastUsed = Date.now(); | |
| return availableConnection; | |
| } | |
| // 全体の接続数をチェック | |
| const totalConnections = this.getTotalConnectionCount(); | |
| if (totalConnections >= this.MAX_TOTAL_CONNECTIONS) { | |
| // 最も古い接続を削除 | |
| this.removeOldestConnection(); | |
| } | |
| // ユーザーごとの接続数制限をチェック | |
| if (userConnections.length >= this.MAX_CONNECTIONS_PER_USER) { | |
| // 🔧 実行中ではないアイドル接続を優先的に削除 | |
| const idleConnections = userConnections.map((conn, idx) => ({ conn, idx })).filter(({ conn }) => conn.connectionCount === 0); | |
| if (idleConnections.length > 0) { | |
| // アイドル接続の中で最も古いものを削除 | |
| const oldestIdle = idleConnections.reduce((oldest, current) => (current.conn.lastUsed < oldest.conn.lastUsed ? current : oldest)); | |
| const removed = userConnections.splice(oldestIdle.idx, 1)[0]; | |
| gradioLogger.logPool(`Removed idle connection ${removed.connectionId}`, userIdentifier, repository); | |
| try { | |
| if (removed.client && typeof removed.client.close === 'function') { | |
| removed.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, userIdentifier, repository); | |
| } | |
| } else { | |
| // 🔧 全ての接続が実行中の場合は、接続が空くまで待機 | |
| gradioLogger.logPool(`All ${userConnections.length} connections are active, waiting for idle connection...`, userIdentifier, repository); | |
| const waitStartTime = Date.now(); | |
| const checkInterval = 500; // 500ms間隔でチェック | |
| while (Date.now() - waitStartTime < this.QUEUE_WAIT_TIMEOUT) { | |
| // 再度アイドル接続をチェック | |
| const nowIdle = userConnections.find((conn) => conn.connectionCount === 0 && conn.isHealthy); | |
| if (nowIdle) { | |
| // アイドル接続が見つかったら削除 | |
| const idx = userConnections.indexOf(nowIdle); | |
| const removed = userConnections.splice(idx, 1)[0]; | |
| gradioLogger.logPool( | |
| `Connection became idle after ${Date.now() - waitStartTime}ms, removing ${removed.connectionId}`, | |
| userIdentifier, | |
| repository, | |
| ); | |
| try { | |
| if (removed.client && typeof removed.client.close === 'function') { | |
| removed.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, userIdentifier, repository); | |
| } | |
| break; | |
| } | |
| // 少し待機 | |
| await new Promise((resolve) => setTimeout(resolve, checkInterval)); | |
| } | |
| // タイムアウトした場合 | |
| if (Date.now() - waitStartTime >= this.QUEUE_WAIT_TIMEOUT) { | |
| gradioLogger.logPoolError( | |
| 'Timeout waiting for idle connection, forcing removal of oldest', | |
| `User has ${userConnections.length} active connections for ${this.QUEUE_WAIT_TIMEOUT}ms`, | |
| 'CONNECTION', | |
| userIdentifier, | |
| repository, | |
| ); | |
| // やむを得ず最も古い接続を削除 | |
| const oldestIndex = userConnections.reduce((minIdx, conn, idx, arr) => (conn.lastUsed < arr[minIdx].lastUsed ? idx : minIdx), 0); | |
| const removed = userConnections.splice(oldestIndex, 1)[0]; | |
| try { | |
| if (removed.client && typeof removed.client.close === 'function') { | |
| removed.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, userIdentifier, repository); | |
| } | |
| } | |
| } | |
| } | |
| // 新しい接続を作成 | |
| const newConnection = await this.createConnection(repository, userConnections.length, userEmail); | |
| if (!newConnection) { | |
| throw new Error(`[Pool] Failed to create connection for user ${userIdentifier}`); | |
| } | |
| userConnections.push(newConnection); | |
| return newConnection; | |
| } | |
| /** | |
| * 全体の接続数を取得 | |
| */ | |
| private getTotalConnectionCount(): number { | |
| let total = 0; | |
| for (const [, repoPool] of this.userPools) { | |
| for (const [, userConnections] of repoPool) { | |
| total += userConnections.length; | |
| } | |
| } | |
| return total; | |
| } | |
| /** | |
| * 最も古い接続を削除 | |
| */ | |
| private removeOldestConnection(): void { | |
| let oldestConnection: PooledClient | null = null; | |
| let oldestRepo: string | null = null; | |
| let oldestUser: string | null = null; | |
| let oldestIndex = -1; | |
| for (const [repo, repoPool] of this.userPools) { | |
| for (const [user, userConnections] of repoPool) { | |
| userConnections.forEach((conn, idx) => { | |
| if (!oldestConnection || conn.lastUsed < oldestConnection.lastUsed) { | |
| oldestConnection = conn; | |
| oldestRepo = repo; | |
| oldestUser = user; | |
| oldestIndex = idx; | |
| } | |
| }); | |
| } | |
| } | |
| if (oldestConnection && oldestRepo && oldestUser && oldestIndex >= 0) { | |
| const repoPool = this.userPools.get(oldestRepo); | |
| const userConnections = repoPool?.get(oldestUser); | |
| if (userConnections) { | |
| const removedConnection = oldestConnection as PooledClient; | |
| userConnections.splice(oldestIndex, 1); | |
| if (userConnections.length === 0) { | |
| repoPool?.delete(oldestUser); | |
| } | |
| gradioLogger.logPool(`Removed oldest global connection ${removedConnection.connectionId}`, oldestUser, oldestRepo); | |
| try { | |
| if (removedConnection.client && typeof removedConnection.client.close === 'function') { | |
| removedConnection.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, oldestUser, oldestRepo); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * リポジトリ選択ロジック | |
| */ | |
| public selectRepository(userIdentifier: string | null, availableRepos: string[]): string { | |
| if (!userIdentifier) { | |
| // 識別子がない場合は負荷が最も低いリポジトリ | |
| return this.selectLeastLoaded(availableRepos); | |
| } | |
| // 既存割り当てを確認 | |
| const existing = this.userAssignments.get(userIdentifier); | |
| if (existing && availableRepos.includes(existing.repository)) { | |
| // 割り当てを更新 | |
| existing.assignedAt = Date.now(); | |
| return existing.repository; | |
| } | |
| // 新規割り当て | |
| const selected = this.selectLeastLoaded(availableRepos); | |
| this.userAssignments.set(userIdentifier, { | |
| repository: selected, | |
| assignedAt: Date.now(), | |
| }); | |
| gradioLogger.logPool('Assigned', userIdentifier, selected); | |
| return selected; | |
| } | |
| /** | |
| * 最も負荷が低いリポジトリを選択 | |
| */ | |
| private selectLeastLoaded(availableRepos: string[]): string { | |
| let minLoad = Infinity; | |
| let selected = availableRepos[0]; | |
| for (const repo of availableRepos) { | |
| const repoPool = this.userPools.get(repo); | |
| // ユーザー数で負荷を判定 | |
| const load = repoPool ? repoPool.size : 0; | |
| if (load < minLoad) { | |
| minLoad = load; | |
| selected = repo; | |
| } else if (load === minLoad && Math.random() < 0.5) { | |
| // 同じユーザー数の場合は50%の確率で選択 | |
| selected = repo; | |
| } | |
| } | |
| return selected; | |
| } | |
| /** | |
| * ヘルスチェック | |
| */ | |
| private async performHealthCheck(): Promise<void> { | |
| gradioLogger.logPool('Starting health check'); | |
| const now = Date.now(); | |
| for (const [repo, repoPool] of this.userPools) { | |
| let totalConnections = 0; | |
| let healthyConnections = 0; | |
| let removedConnections = 0; | |
| for (const [user, userConnections] of repoPool) { | |
| const toRemove: number[] = []; | |
| userConnections.forEach((conn, idx) => { | |
| totalConnections++; | |
| // 古い接続を削除(30分未使用) | |
| if (now - conn.lastUsed > this.USER_CONNECTION_TTL && conn.connectionCount === 0) { | |
| toRemove.push(idx); | |
| gradioLogger.logPool(`Marking ${conn.connectionId} for removal (idle for >30 min)`, user, repo); | |
| } else if (conn.isHealthy) { | |
| healthyConnections++; | |
| } | |
| }); | |
| // 逆順で削除 | |
| for (let i = toRemove.length - 1; i >= 0; i--) { | |
| const removed = userConnections.splice(toRemove[i], 1)[0]; | |
| removedConnections++; | |
| try { | |
| if (removed.client && typeof removed.client.close === 'function') { | |
| removed.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError('Error closing connection', errorMessage, undefined, user, repo); | |
| } | |
| } | |
| // ユーザーの接続がなくなったら削除 | |
| if (userConnections.length === 0) { | |
| repoPool.delete(user); | |
| } | |
| } | |
| if (totalConnections > 0) { | |
| gradioLogger.logPool( | |
| `${repo}: ${totalConnections} total, ${healthyConnections} healthy, ${removedConnections} removed, ${repoPool.size} active users`, | |
| ); | |
| } | |
| } | |
| } | |
| /** | |
| * 古いユーザー割り当てをクリーンアップ | |
| */ | |
| private cleanupOldAssignments(): void { | |
| const now = Date.now(); | |
| let cleanedCount = 0; | |
| for (const [email, assignment] of this.userAssignments.entries()) { | |
| if (now - assignment.assignedAt > this.USER_ASSIGNMENT_TTL) { | |
| this.userAssignments.delete(email); | |
| cleanedCount++; | |
| } | |
| } | |
| if (cleanedCount > 0) { | |
| gradioLogger.logPool(`Cleaned up ${cleanedCount} old user assignments`); | |
| } | |
| } | |
| /** | |
| * 統計情報取得 | |
| */ | |
| getStats(): PoolStats { | |
| const stats: PoolStats = { | |
| pools: {}, | |
| totalConnections: 0, | |
| totalUsers: 0, | |
| userAssignments: this.userAssignments.size, | |
| repositories: this.repositories, | |
| }; | |
| for (const [repo, repoPool] of this.userPools) { | |
| const repoStats: RepoStats = { | |
| users: repoPool.size, | |
| connections: 0, | |
| healthyConnections: 0, | |
| activeConnections: 0, | |
| userDetails: {}, | |
| }; | |
| for (const [user, userConnections] of repoPool) { | |
| const healthyCount = userConnections.filter((conn) => conn.isHealthy).length; | |
| const activeCount = userConnections.filter((conn) => conn.connectionCount > 0).length; | |
| repoStats.connections += userConnections.length; | |
| repoStats.healthyConnections += healthyCount; | |
| repoStats.activeConnections += activeCount; | |
| const userStats: UserStats = { | |
| connections: userConnections.length, | |
| healthy: healthyCount, | |
| active: activeCount, | |
| }; | |
| repoStats.userDetails[user] = userStats; | |
| } | |
| stats.pools[repo] = repoStats; | |
| stats.totalConnections += repoStats.connections; | |
| stats.totalUsers += repoStats.users; | |
| } | |
| return stats; | |
| } | |
| /** | |
| * プールをクリーンアップ(プロセス終了時用) | |
| */ | |
| cleanup(): void { | |
| if (this.healthCheckTimer) { | |
| clearInterval(this.healthCheckTimer); | |
| this.healthCheckTimer = null; | |
| } | |
| for (const [repo, repoPool] of this.userPools) { | |
| for (const [user, userConnections] of repoPool) { | |
| for (const conn of userConnections) { | |
| try { | |
| if (conn.client && typeof conn.client.close === 'function') { | |
| conn.client.close(); | |
| } | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| gradioLogger.logPoolError(`Error closing connection ${conn.connectionId}`, errorMessage, undefined, user, repo); | |
| } | |
| } | |
| } | |
| } | |
| this.userPools.clear(); | |
| this.userAssignments.clear(); | |
| this.connectionQueue.clear(); | |
| gradioLogger.logPool('Connection pool cleaned up'); | |
| } | |
| } | |
| // シングルトンインスタンス | |
| let connectionPoolInstance: GradioConnectionPool | null = null; | |
| /** | |
| * 接続プールのシングルトンインスタンスを取得 | |
| */ | |
| export function getConnectionPool(): GradioConnectionPool { | |
| if (!connectionPoolInstance) { | |
| connectionPoolInstance = new GradioConnectionPool(); | |
| } | |
| return connectionPoolInstance; | |
| } | |