Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import csv | |
| from datetime import datetime, timedelta | |
| from huggingface_hub import Repository | |
| import threading | |
| import time | |
| import json | |
| # Configuration | |
| DATA_STORAGE_REPO = "CIV3283/Data_Storage" | |
| DATA_BRANCH_NAME = "data_branch" | |
| LOCAL_DATA_DIR = "temp_data_storage" | |
| MIN_IDLE_MINUTES = 20 # Minimum idle time required for space assignment | |
| ALLOCATION_LOCK_DURATION = 10 # Lock duration in minutes | |
| # 缓存配置 - 全部改为内存存储 | |
| CACHE_UPDATE_INTERVAL = 300 # 10分钟 = 600秒 | |
| DATA_SYNC_INTERVAL = 300 # 数据同步间隔,10分钟 = 600秒 | |
| CACHE_LOCK = threading.Lock() | |
| ALLOCATION_LOCK = threading.Lock() | |
| # 全局内存存储 | |
| class MemoryAllocationStore: | |
| """内存中的分配记录存储""" | |
| def __init__(self): | |
| self._allocations = {} # {space_name: {'student_id': str, 'allocated_time': datetime, 'expires_at': datetime}} | |
| self._lock = threading.Lock() | |
| self._cleanup_interval = 60 # 每分钟清理一次过期记录 | |
| # 启动后台清理线程 | |
| self._start_cleanup_thread() | |
| def _start_cleanup_thread(self): | |
| """启动后台清理过期分配记录的线程""" | |
| def cleanup_worker(): | |
| while True: | |
| time.sleep(self._cleanup_interval) | |
| self._cleanup_expired_allocations() | |
| cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| cleanup_thread.start() | |
| print("[MemoryAllocationStore] Background cleanup thread started") | |
| def _cleanup_expired_allocations(self): | |
| """清理过期的分配记录""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| expired_spaces = [] | |
| for space_name, alloc_info in self._allocations.items(): | |
| if alloc_info['expires_at'] <= current_time: | |
| expired_spaces.append(space_name) | |
| for space_name in expired_spaces: | |
| del self._allocations[space_name] | |
| if expired_spaces: # 只在有过期记录时打印 | |
| print(f"[MemoryAllocationStore] Cleaned up expired allocation: {space_name}") | |
| def add_allocation(self, space_name, student_id): | |
| """添加新的分配记录""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| expires_at = current_time + timedelta(minutes=ALLOCATION_LOCK_DURATION) | |
| self._allocations[space_name] = { | |
| 'student_id': student_id, | |
| 'allocated_time': current_time, | |
| 'expires_at': expires_at | |
| } | |
| print(f"[MemoryAllocationStore] Added allocation: {space_name} -> {student_id} (expires at {expires_at.strftime('%H:%M:%S')})") | |
| def get_active_allocations(self): | |
| """��取所有有效的分配记录""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| active_allocations = {} | |
| for space_name, alloc_info in self._allocations.items(): | |
| if alloc_info['expires_at'] > current_time: | |
| active_allocations[space_name] = alloc_info.copy() | |
| return active_allocations | |
| def is_allocated(self, space_name): | |
| """检查指定空间是否被分配""" | |
| with self._lock: | |
| if space_name not in self._allocations: | |
| return False, None | |
| alloc_info = self._allocations[space_name] | |
| current_time = datetime.now() | |
| if alloc_info['expires_at'] > current_time: | |
| return True, alloc_info['student_id'] | |
| else: | |
| # 过期了,删除记录 | |
| del self._allocations[space_name] | |
| return False, None | |
| def get_status_summary(self): | |
| """获取分配状态摘要""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| active_count = 0 | |
| summary = [] | |
| for space_name, alloc_info in self._allocations.items(): | |
| if alloc_info['expires_at'] > current_time: | |
| active_count += 1 | |
| remaining_minutes = (alloc_info['expires_at'] - current_time).total_seconds() / 60 | |
| summary.append(f"{space_name} -> {alloc_info['student_id']} ({remaining_minutes:.1f}min left)") | |
| return { | |
| 'active_count': active_count, | |
| 'total_stored': len(self._allocations), | |
| 'summary': summary | |
| } | |
| # 全局数据管理器 | |
| class DataManager: | |
| def __init__(self): | |
| self.repo = None | |
| self.available_spaces = [] | |
| self.last_data_sync = None | |
| self._sync_lock = threading.Lock() | |
| self._sync_thread = None | |
| self._stop_event = threading.Event() | |
| self.initialized = False | |
| def initialize(self): | |
| """初始化数据管理器 - 只在启动时调用一次""" | |
| if self.initialized: | |
| return True | |
| try: | |
| print("[DataManager] Initializing data manager...") | |
| # 初始化仓库连接 | |
| self.repo = Repository( | |
| local_dir=LOCAL_DATA_DIR, | |
| clone_from=DATA_STORAGE_REPO, | |
| revision=DATA_BRANCH_NAME, | |
| repo_type="space", | |
| use_auth_token=os.environ.get("HF_HUB_TOKEN") | |
| ) | |
| # 配置git用户 | |
| self.repo.git_config_username_and_email("git_user", f"load_distributor") | |
| self.repo.git_config_username_and_email("git_email", f"loaddistributor@takeiteasy.space") | |
| # 初始数据同步 | |
| self._sync_data() | |
| # 启动后台同步线程 | |
| self._start_background_sync() | |
| self.initialized = True | |
| print("[DataManager] Data manager initialized successfully") | |
| return True | |
| except Exception as e: | |
| print(f"[DataManager] Initialization failed: {e}") | |
| return False | |
| def _start_background_sync(self): | |
| """启动后台数据同步线程""" | |
| if self._sync_thread and self._sync_thread.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._sync_thread = threading.Thread(target=self._background_sync_worker, daemon=True) | |
| self._sync_thread.start() | |
| print("[DataManager] Background sync thread started") | |
| def _background_sync_worker(self): | |
| """后台同步工作线程""" | |
| while not self._stop_event.is_set(): | |
| try: | |
| # 等待指定间隔或停止信号 | |
| if self._stop_event.wait(timeout=DATA_SYNC_INTERVAL): | |
| break | |
| print("[DataManager] Starting scheduled data sync...") | |
| self._sync_data() | |
| print("[DataManager] Scheduled data sync completed") | |
| except Exception as e: | |
| print(f"[DataManager] Error in background sync: {e}") | |
| if not self._stop_event.wait(timeout=60): # 1分钟后重试 | |
| continue | |
| def _sync_data(self): | |
| """同步数据 - 拉取最新数据并更新可用空间列表""" | |
| with self._sync_lock: | |
| try: | |
| start_time = time.time() | |
| print("[DataManager] Syncing data from remote repository...") | |
| # 拉取最新数据 | |
| self.repo.git_pull(rebase=True) | |
| # 更新可用空间列表 | |
| self.available_spaces = self._get_available_spaces() | |
| self.last_data_sync = datetime.now() | |
| elapsed_time = time.time() - start_time | |
| print(f"[DataManager] Data sync completed in {elapsed_time:.2f}s, found {len(self.available_spaces)} spaces") | |
| except Exception as e: | |
| print(f"[DataManager] Error syncing data: {e}") | |
| def _get_available_spaces(self): | |
| """获取可用空间列表""" | |
| available_spaces = set() | |
| try: | |
| for filename in os.listdir(LOCAL_DATA_DIR): | |
| if filename.endswith('_query_log.csv') and '_Student_' in filename: | |
| space_name = filename.replace('_query_log.csv', '') | |
| available_spaces.add(space_name) | |
| return sorted(list(available_spaces)) | |
| except Exception as e: | |
| print(f"[DataManager] Error getting available spaces: {e}") | |
| return [] | |
| def get_available_spaces(self): | |
| """获取可用空间列表 - 不触发数据同步""" | |
| if not self.initialized: | |
| raise Exception("DataManager not initialized") | |
| return self.available_spaces.copy() | |
| def get_repo(self): | |
| """获取仓库对象""" | |
| if not self.initialized: | |
| raise Exception("DataManager not initialized") | |
| return self.repo | |
| def get_repo_dir(self): | |
| """获取仓库本地目录""" | |
| if not self.initialized: | |
| raise Exception("DataManager not initialized") | |
| return LOCAL_DATA_DIR | |
| def stop(self): | |
| """停止后台同步""" | |
| if self._sync_thread: | |
| self._stop_event.set() | |
| print("[DataManager] Background sync thread stopping...") | |
| class SpaceActivityCache: | |
| """内存中的空间活动缓存""" | |
| def __init__(self, data_manager): | |
| self.data_manager = data_manager | |
| self._cache_data = {} | |
| self._last_update = None | |
| self._update_thread = None | |
| self._stop_event = threading.Event() | |
| # 启动时立即更新一次缓存 | |
| self._update_cache() | |
| # 启动后台更新线程 | |
| self.start_background_updates() | |
| def start_background_updates(self): | |
| """启动后台缓存更新线程""" | |
| if self._update_thread and self._update_thread.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._update_thread = threading.Thread(target=self._background_update_worker, daemon=True) | |
| self._update_thread.start() | |
| print("[SpaceActivityCache] Background update thread started") | |
| def stop_background_updates(self): | |
| """停止后台更新线程""" | |
| if self._update_thread: | |
| self._stop_event.set() | |
| print("[SpaceActivityCache] Background update thread stopping...") | |
| def _background_update_worker(self): | |
| """后台更新工作线程""" | |
| while not self._stop_event.is_set(): | |
| try: | |
| # 等待指定间隔或停止信号 | |
| if self._stop_event.wait(timeout=CACHE_UPDATE_INTERVAL): | |
| break | |
| print("[SpaceActivityCache] Starting scheduled cache update...") | |
| self._update_cache() | |
| print("[SpaceActivityCache] Scheduled cache update completed") | |
| except Exception as e: | |
| print(f"[SpaceActivityCache] Error in background update: {e}") | |
| if not self._stop_event.wait(timeout=60): # 1分钟后重试 | |
| continue | |
| def _update_cache(self): | |
| """更新缓存数据""" | |
| try: | |
| print("[SpaceActivityCache] Updating activity cache...") | |
| start_time = time.time() | |
| # 从数据管理器获取可用空间(不触发数据同步) | |
| available_spaces = self.data_manager.get_available_spaces() | |
| repo_dir = self.data_manager.get_repo_dir() | |
| new_cache_data = { | |
| 'last_updated': datetime.now().isoformat(), | |
| 'update_interval_seconds': CACHE_UPDATE_INTERVAL, | |
| 'spaces': {} | |
| } | |
| # 为每个空间读取最后活动时间 | |
| for space_name in available_spaces: | |
| csv_file = os.path.join(repo_dir, f"{space_name}_query_log.csv") | |
| last_activity, status = self._get_last_activity_from_file(csv_file) | |
| new_cache_data['spaces'][space_name] = { | |
| 'last_activity': last_activity.isoformat() if last_activity != datetime.min else None, | |
| 'status': status, | |
| 'cache_update_time': datetime.now().isoformat() | |
| } | |
| print(f"[SpaceActivityCache] {space_name}: {status}") | |
| # 线程安全地更新缓存 | |
| with CACHE_LOCK: | |
| self._cache_data = new_cache_data | |
| self._last_update = datetime.now() | |
| elapsed_time = time.time() - start_time | |
| print(f"[SpaceActivityCache] Cache updated in {elapsed_time:.2f}s, {len(available_spaces)} spaces processed") | |
| except Exception as e: | |
| print(f"[SpaceActivityCache] Error updating cache: {e}") | |
| import traceback | |
| print(f"[SpaceActivityCache] Traceback: {traceback.format_exc()}") | |
| def _get_last_activity_from_file(self, csv_file_path): | |
| """从文件读取最后活动时间""" | |
| try: | |
| if not os.path.exists(csv_file_path): | |
| return datetime.min, "file_not_found" | |
| # 检查文件大小 | |
| file_size = os.path.getsize(csv_file_path) | |
| if file_size <= 100: | |
| return datetime.min, "empty_or_header_only" | |
| # 使用CSV reader读取最后一行 | |
| with open(csv_file_path, 'r', encoding='utf-8') as f: | |
| csv_reader = csv.reader(f) | |
| # 跳过header | |
| try: | |
| header = next(csv_reader) | |
| except StopIteration: | |
| return datetime.min, "empty_file" | |
| # 读取所有行,获取最后一行 | |
| rows = [] | |
| try: | |
| for row in csv_reader: | |
| if row: # 跳过空行 | |
| rows.append(row) | |
| except Exception as csv_error: | |
| print(f"[SpaceActivityCache] CSV parsing error for {csv_file_path}: {csv_error}") | |
| return datetime.min, "csv_parse_error" | |
| if not rows: | |
| return datetime.min, "no_data_rows" | |
| # 解析最后一行的时间戳 | |
| last_row = rows[-1] | |
| if len(last_row) >= 3: | |
| timestamp_str = last_row[2].strip() # timestamp column | |
| try: | |
| parsed_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') | |
| return parsed_time, f"active_last_at_{parsed_time.strftime('%Y-%m-%d_%H:%M:%S')}" | |
| except ValueError as ve: | |
| print(f"[SpaceActivityCache] Date parsing error for '{timestamp_str}': {ve}") | |
| return datetime.min, "date_parse_error" | |
| else: | |
| return datetime.min, "invalid_row_format" | |
| except Exception as e: | |
| print(f"[SpaceActivityCache] Error reading {csv_file_path}: {e}") | |
| return datetime.min, "read_error" | |
| def get_space_activity(self, space_name): | |
| """获取指定空间的活动信息""" | |
| with CACHE_LOCK: | |
| if not self._cache_data or 'spaces' not in self._cache_data: | |
| print("[SpaceActivityCache] No cache available, updating immediately...") | |
| self._update_cache() | |
| spaces_data = self._cache_data.get('spaces', {}) | |
| space_info = spaces_data.get(space_name, {}) | |
| if not space_info: | |
| return datetime.min, "not_in_cache" | |
| # 解析时间戳 | |
| last_activity_str = space_info.get('last_activity') | |
| if last_activity_str: | |
| try: | |
| last_activity = datetime.fromisoformat(last_activity_str) | |
| except: | |
| last_activity = datetime.min | |
| else: | |
| last_activity = datetime.min | |
| status = space_info.get('status', 'unknown') | |
| return last_activity, status | |
| def get_all_spaces_activity(self): | |
| """获取所有空间的活动信息""" | |
| with CACHE_LOCK: | |
| if not self._cache_data or 'spaces' not in self._cache_data: | |
| print("[SpaceActivityCache] No cache available, updating immediately...") | |
| self._update_cache() | |
| result = {} | |
| spaces_data = self._cache_data.get('spaces', {}) | |
| for space_name, space_info in spaces_data.items(): | |
| last_activity_str = space_info.get('last_activity') | |
| if last_activity_str: | |
| try: | |
| last_activity = datetime.fromisoformat(last_activity_str) | |
| except: | |
| last_activity = datetime.min | |
| else: | |
| last_activity = datetime.min | |
| result[space_name] = { | |
| 'last_activity': last_activity, | |
| 'status': space_info.get('status', 'unknown') | |
| } | |
| return result | |
| def get_cache_info(self): | |
| """获取缓存状态信息""" | |
| with CACHE_LOCK: | |
| if self._cache_data and 'last_updated' in self._cache_data: | |
| last_updated_str = self._cache_data['last_updated'] | |
| try: | |
| last_updated = datetime.fromisoformat(last_updated_str) | |
| age_minutes = (datetime.now() - last_updated).total_seconds() / 60 | |
| return { | |
| 'last_updated': last_updated, | |
| 'age_minutes': age_minutes, | |
| 'spaces_count': len(self._cache_data.get('spaces', {})), | |
| 'is_fresh': age_minutes < (CACHE_UPDATE_INTERVAL / 60) * 1.5 | |
| } | |
| except: | |
| pass | |
| return { | |
| 'last_updated': None, | |
| 'age_minutes': float('inf'), | |
| 'spaces_count': 0, | |
| 'is_fresh': False | |
| } | |
| def force_update(self): | |
| """强制立即更新缓存""" | |
| print("[SpaceActivityCache] Force updating cache...") | |
| self._update_cache() | |
| # 新增:本地防撞车机制 | |
| class LocalAllocationTracker: | |
| def __init__(self): | |
| self._recent_allocations = {} # {space_name: {'student_id': str, 'timestamp': datetime}} | |
| self._lock = threading.Lock() | |
| self._cleanup_interval = 600 # 清理间隔(秒) | |
| self._allocation_ttl = 60 # 本地分配记录的生存时间(秒) | |
| # 启动后台清理线程 | |
| self._start_cleanup_thread() | |
| def _start_cleanup_thread(self): | |
| """启动后台线程定期清理过期的本地分配记录""" | |
| def cleanup_worker(): | |
| while True: | |
| time.sleep(self._cleanup_interval) | |
| self._cleanup_expired_allocations() | |
| cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| cleanup_thread.start() | |
| print("[LocalAllocationTracker] Background cleanup thread started") | |
| def _cleanup_expired_allocations(self): | |
| """清理过期的本地分配记录""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| expired_spaces = [] | |
| for space_name, alloc_info in self._recent_allocations.items(): | |
| if (current_time - alloc_info['timestamp']).total_seconds() > self._allocation_ttl: | |
| expired_spaces.append(space_name) | |
| for space_name in expired_spaces: | |
| del self._recent_allocations[space_name] | |
| if expired_spaces: # 只在有过期记录时打印 | |
| print(f"[LocalAllocationTracker] Cleaned up expired allocation: {space_name}") | |
| def is_recently_allocated_locally(self, space_name): | |
| """检查空间是否在本地被最近分配过""" | |
| with self._lock: | |
| if space_name not in self._recent_allocations: | |
| return False, None | |
| alloc_info = self._recent_allocations[space_name] | |
| current_time = datetime.now() | |
| elapsed_seconds = (current_time - alloc_info['timestamp']).total_seconds() | |
| if elapsed_seconds > self._allocation_ttl: | |
| # 过期了,删除记录 | |
| del self._recent_allocations[space_name] | |
| print(f"[LocalAllocationTracker] Expired local allocation removed: {space_name}") | |
| return False, None | |
| print(f"[LocalAllocationTracker] Space {space_name} recently allocated locally to {alloc_info['student_id']} ({elapsed_seconds:.1f}s ago)") | |
| return True, alloc_info['student_id'] | |
| def record_local_allocation(self, space_name, student_id): | |
| """记录本地分配""" | |
| with self._lock: | |
| self._recent_allocations[space_name] = { | |
| 'student_id': student_id, | |
| 'timestamp': datetime.now() | |
| } | |
| print(f"[LocalAllocationTracker] Locally recorded allocation: {space_name} -> {student_id}") | |
| def get_recent_allocations_summary(self): | |
| """获取最近本地分配的摘要(用于调试)""" | |
| with self._lock: | |
| current_time = datetime.now() | |
| summary = [] | |
| for space_name, alloc_info in self._recent_allocations.items(): | |
| elapsed = (current_time - alloc_info['timestamp']).total_seconds() | |
| summary.append(f"{space_name} -> {alloc_info['student_id']} ({elapsed:.1f}s ago)") | |
| return summary | |
| # 全局实例 | |
| data_manager = DataManager() | |
| activity_cache = None | |
| local_tracker = LocalAllocationTracker() | |
| memory_allocation_store = MemoryAllocationStore() | |
| def init_system(): | |
| """初始化整个系统 - 只在启动时调用一次""" | |
| global activity_cache | |
| print("[init_system] Initializing load distributor system...") | |
| # 初始化数据管理器 | |
| if not data_manager.initialize(): | |
| raise Exception("Failed to initialize data manager") | |
| # 初始化活动缓存 | |
| activity_cache = SpaceActivityCache(data_manager) | |
| print("[init_system] System initialization completed") | |
| def analyze_space_activity_cached(): | |
| """使用缓存的空间活动分析 - 完全使用内存数据""" | |
| global activity_cache | |
| if activity_cache is None: | |
| raise Exception("Activity cache not initialized") | |
| # 从数据管理器获取可用空间列表(不触发数据同步) | |
| available_spaces = data_manager.get_available_spaces() | |
| space_activity = [] | |
| current_time = datetime.now() | |
| # 获取缓存信息 | |
| cache_info = activity_cache.get_cache_info() | |
| print(f"[analyze_space_activity_cached] Using cache (age: {cache_info['age_minutes']:.1f} min, fresh: {cache_info['is_fresh']})") | |
| # 获取所有空间的缓存活动数据 | |
| all_spaces_activity = activity_cache.get_all_spaces_activity() | |
| # 从内存获取分配记录 - 不再读取文件 | |
| active_allocations = memory_allocation_store.get_active_allocations() | |
| print(f"[analyze_space_activity_cached] Analyzing {len(available_spaces)} spaces using cached data...") | |
| print(f"[analyze_space_activity_cached] Memory allocations: {len(active_allocations)} active") | |
| for space_name in available_spaces: | |
| # 从缓存获取活动信息 | |
| cached_info = all_spaces_activity.get(space_name) | |
| if cached_info: | |
| last_activity = cached_info['last_activity'] | |
| cached_status = cached_info['status'] | |
| else: | |
| # 缓存中没有这个空间,可能是新空间 | |
| last_activity = datetime.min | |
| cached_status = "not_in_cache" | |
| # Calculate idle time in minutes | |
| if last_activity == datetime.min or 'empty' in cached_status or 'not_found' in cached_status: | |
| idle_minutes = float('inf') # Never used | |
| status = "Never used" | |
| last_activity_str = "Never" | |
| else: | |
| idle_minutes = (current_time - last_activity).total_seconds() / 60 | |
| status = f"Idle for {idle_minutes:.1f} minutes (cached)" | |
| last_activity_str = last_activity.strftime('%Y-%m-%d %H:%M:%S') | |
| # 检查内存中的分配记录 | |
| is_recently_allocated_memory = space_name in active_allocations | |
| if is_recently_allocated_memory: | |
| alloc_info = active_allocations[space_name] | |
| minutes_until_free = (alloc_info['expires_at'] - current_time).total_seconds() / 60 | |
| status += f" (Allocated in memory to {alloc_info['student_id']}, free in {minutes_until_free:.1f} min)" | |
| # Check if space is recently allocated (local) | |
| is_recently_allocated_local, local_student = local_tracker.is_recently_allocated_locally(space_name) | |
| if is_recently_allocated_local: | |
| status += f" (Recently allocated locally to {local_student})" | |
| space_activity.append({ | |
| 'space_name': space_name, | |
| 'last_activity': last_activity, | |
| 'last_activity_str': last_activity_str, | |
| 'idle_minutes': idle_minutes, | |
| 'status': status, | |
| 'is_recently_allocated_memory': is_recently_allocated_memory, | |
| 'is_recently_allocated_local': is_recently_allocated_local, | |
| 'cached_status': cached_status | |
| }) | |
| print(f"[analyze_space_activity_cached] {space_name}: {status}") | |
| # Sort by idle time (most idle first) | |
| space_activity.sort(key=lambda x: x['idle_minutes'], reverse=True) | |
| return space_activity | |
| def create_status_display(space_activity): | |
| """Create formatted status display for all spaces with proper line breaks""" | |
| status_display = "📊 **Current Space Status (sorted by availability):**<br><br>" | |
| # 显示内存分配记录摘要 | |
| memory_summary = memory_allocation_store.get_status_summary() | |
| if memory_summary['active_count'] > 0: | |
| status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>" | |
| for alloc in memory_summary['summary']: | |
| status_display += f" • {alloc}<br>" | |
| status_display += "<br>" | |
| # 显示本地分配记录摘要 | |
| local_summary = local_tracker.get_recent_allocations_summary() | |
| if local_summary: | |
| status_display += "🔒 **Recent Local Allocations:**<br>" | |
| for alloc in local_summary: | |
| status_display += f" • {alloc}<br>" | |
| status_display += "<br>" | |
| for i, space in enumerate(space_activity, 1): | |
| status_display += f"{i}. **{space['space_name']}**<br>" | |
| status_display += f" • Status: {space['status']}<br>" | |
| status_display += f" • Last activity: {space['last_activity_str']}<br><br>" | |
| return status_display | |
| def select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id): | |
| """使用缓存的增强防撞空间选择函数 - 完全使用内存存储""" | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Starting selection for student: {student_id}") | |
| # 第一步:过滤掉不符合基本条件的空间 | |
| basic_available_spaces = [] | |
| for space in space_activity: | |
| # 检查基本条件 - 使用内存分配记录 | |
| if (space['idle_minutes'] >= MIN_IDLE_MINUTES and | |
| not space['is_recently_allocated_memory'] and | |
| not space['is_recently_allocated_local']): | |
| basic_available_spaces.append(space) | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Basic available spaces: {len(basic_available_spaces)}") | |
| if not basic_available_spaces: | |
| # 生成详细的错误信息 | |
| idle_spaces = [s for s in space_activity if s['idle_minutes'] >= MIN_IDLE_MINUTES] | |
| memory_allocated = [s for s in space_activity if s['is_recently_allocated_memory']] | |
| local_allocated = [s for s in space_activity if s['is_recently_allocated_local']] | |
| error_parts = [] | |
| if not idle_spaces: | |
| error_parts.append(f"all spaces used within {MIN_IDLE_MINUTES} minutes") | |
| if memory_allocated: | |
| error_parts.append(f"{len(memory_allocated)} spaces allocated in memory") | |
| if local_allocated: | |
| error_parts.append(f"{len(local_allocated)} spaces locally allocated") | |
| error_msg = ( | |
| f"🚫 **All learning assistants are currently busy**\n\n" | |
| f"Blocking conditions: {', '.join(error_parts)}\n\n" | |
| f"**Please try again in 1-2 minutes.**" | |
| ) | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] No available spaces: {error_msg}") | |
| raise gr.Error(error_msg, duration=10) | |
| # 第二步:选择最优空间并进行最终验证 | |
| selected_space = basic_available_spaces[0] # 已经按idle_time排序 | |
| space_name = selected_space['space_name'] | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Preliminary selection: {space_name}") | |
| # 第三步:最终防撞车检查 - 再次验证本地分配状态 | |
| is_local_conflict, conflicting_student = local_tracker.is_recently_allocated_locally(space_name) | |
| if is_local_conflict: | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] COLLISION DETECTED! {space_name} recently allocated to {conflicting_student}") | |
| # 寻找替代空间 | |
| alternative_spaces = [s for s in basic_available_spaces[1:] | |
| if not local_tracker.is_recently_allocated_locally(s['space_name'])[0]] | |
| if alternative_spaces: | |
| selected_space = alternative_spaces[0] | |
| space_name = selected_space['space_name'] | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Using alternative space: {space_name}") | |
| else: | |
| error_msg = ( | |
| f"🚫 **Collision detected and no alternatives available**\n\n" | |
| f"The system detected a potential conflict with another student's allocation.\n\n" | |
| f"**Please try again in 10-15 seconds.**" | |
| ) | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] No alternatives available") | |
| raise gr.Error(error_msg, duration=8) | |
| # 第四步:立即记录本地分配(在写入内存之前) | |
| local_tracker.record_local_allocation(space_name, student_id) | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Local allocation recorded BEFORE memory write") | |
| # 第五步:记录到内存存储 - 不再写入文件或推送到远程 | |
| memory_allocation_store.add_allocation(space_name, student_id) | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Allocation recorded in memory only") | |
| # 第六步:生成结果(使用带缓存信息的状态显示) | |
| status_display = create_status_display_with_cache_info(space_activity) | |
| redirect_url = f"https://huggingface.co/spaces/CIV3283/{space_name}/?check={student_id}" | |
| print(f"[select_space_with_enhanced_collision_avoidance_cached] Final allocation: {space_name} -> {student_id}") | |
| return redirect_to_space(redirect_url, selected_space, status_display) | |
| def redirect_to_space(redirect_url, selected_space, status_display): | |
| """Display redirect information with manual click option""" | |
| if selected_space['idle_minutes'] == float('inf'): | |
| idle_info = "Never used (completely fresh)" | |
| else: | |
| idle_info = f"{selected_space['idle_minutes']:.1f} minutes" | |
| # Modified HTML structure - Only Access section, removed analysis section | |
| redirect_html = f""" | |
| <div style="max-width: 900px; margin: 0 auto; padding: 20px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
| <div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #28a745, #20c997); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);"> | |
| <h1 style="margin: 0 0 15px 0; font-size: 28px;">🎯 Learning Assistant Assigned</h1> | |
| <h2 style="margin: 0 0 10px 0; font-weight: normal; font-size: 24px;">{selected_space['space_name']}</h2> | |
| <p style="margin: 0; font-size: 18px; opacity: 0.9;"> | |
| ✨ This space was idle for: <strong>{idle_info}</strong> | |
| </p> | |
| </div> | |
| <div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #2196f3, #1976d2); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(33,150,243,0.3);"> | |
| <h2 style="margin-top: 0; color: white;">🚀 Access Your Learning Assistant</h2> | |
| <p style="color: rgba(255,255,255,0.9); font-size: 16px; margin-bottom: 25px;"> | |
| Click the button below to access your assigned learning assistant. | |
| </p> | |
| <a href="{redirect_url}" | |
| target="_blank" | |
| style="display: inline-block; background: rgba(255,255,255,0.15); color: white; | |
| padding: 15px 30px; font-size: 18px; font-weight: bold; text-decoration: none; | |
| border-radius: 25px; border: 2px solid rgba(255,255,255,0.3); | |
| transition: all 0.3s ease; margin-bottom: 20px;" | |
| onmouseover="this.style.background='rgba(255,255,255,0.25)'; this.style.transform='translateY(-2px)'" | |
| onmouseout="this.style.background='rgba(255,255,255,0.15)'; this.style.transform='translateY(0px)'"> | |
| ➤ Open Learning Assistant | |
| </a> | |
| <p style="margin: 15px 0 0 0; color: rgba(255,255,255,0.8); font-size: 14px;"> | |
| 💡 Left-click the button above or right-click it and select "Open in new tab" | |
| </p> | |
| </div> | |
| <div style="text-align: center; padding: 20px; background: #f1f3f4; border-radius: 8px; margin-top: 20px;"> | |
| <p style="margin: 0; color: #5f6368; font-size: 14px;"> | |
| 🔄 Need a different assistant? <a href="javascript:location.reload()" style="color: #1976d2; text-decoration: none;">Refresh this page</a> to get reassigned. | |
| </p> | |
| </div> | |
| </div> | |
| """ | |
| return gr.HTML(redirect_html) | |
| def load_balance_user_cached(student_id): | |
| """使用缓存的负载均衡函数 - 完全使用内存存储""" | |
| print(f"[load_balance_user_cached] Starting cached load balancing for student ID: {student_id}") | |
| # 检查系统是否已初始化 | |
| if not data_manager.initialized: | |
| raise gr.Error("🚫 System not properly initialized. Please contact administrator.", duration=8) | |
| # 使用缓存版本的分析函数(不触发数据同步) | |
| space_activity = analyze_space_activity_cached() | |
| # Select space with enhanced collision avoidance | |
| return select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id) | |
| def get_url_params(request: gr.Request): | |
| """Extract URL parameters from request""" | |
| query_params = dict(request.query_params) | |
| check_id = query_params.get('check', None) | |
| if check_id: | |
| return f"Load Distributor", check_id | |
| else: | |
| return "Load Distributor", None | |
| def create_status_display_with_cache_info(space_activity): | |
| """创建包含缓存信息的状态显示""" | |
| global activity_cache | |
| # 获取缓存状态 | |
| cache_info = activity_cache.get_cache_info() if activity_cache else None | |
| status_display = "📊 **Current Space Status (sorted by availability):**<br><br>" | |
| # 显示缓存信息 | |
| if cache_info: | |
| if cache_info['is_fresh']: | |
| cache_status = f"✅ Fresh (updated {cache_info['age_minutes']:.1f} min ago)" | |
| else: | |
| cache_status = f"⚠️ Stale (updated {cache_info['age_minutes']:.1f} min ago)" | |
| status_display += f"🔄 **Cache Status:** {cache_status}<br>" | |
| status_display += f"📋 **Cached Spaces:** {cache_info['spaces_count']}<br><br>" | |
| # 显示内存分配记录摘要 | |
| memory_summary = memory_allocation_store.get_status_summary() | |
| if memory_summary['active_count'] > 0: | |
| status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>" | |
| for alloc in memory_summary['summary']: | |
| status_display += f" • {alloc}<br>" | |
| status_display += "<br>" | |
| # 显示本地分配记录摘要 | |
| local_summary = local_tracker.get_recent_allocations_summary() | |
| if local_summary: | |
| status_display += "🔒 **Recent Local Allocations:**<br>" | |
| for alloc in local_summary: | |
| status_display += f" • {alloc}<br>" | |
| status_display += "<br>" | |
| for i, space in enumerate(space_activity, 1): | |
| status_display += f"{i}. **{space['space_name']}**<br>" | |
| status_display += f" • Status: {space['status']}<br>" | |
| status_display += f" • Last activity: {space['last_activity_str']}<br>" | |
| # 显示缓存状态(可选) | |
| if 'cached_status' in space: | |
| status_display += f" • Cache: {space['cached_status']}<br>" | |
| status_display += "<br>" | |
| return status_display | |
| def handle_user_access_cached(request: gr.Request): | |
| """使用缓存的用户访问处理""" | |
| title, check_id = get_url_params(request) | |
| if not check_id: | |
| # No student ID provided | |
| error_html = """ | |
| <div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center; | |
| background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 12px;"> | |
| <h2 style="color: #856404;">⚠️ Invalid Access</h2> | |
| <p style="color: #856404; font-size: 16px; line-height: 1.6;"> | |
| This load distributor requires a valid student ID parameter.<br><br> | |
| <strong>Please access this system through the official link provided in Moodle.</strong> | |
| </p> | |
| </div> | |
| """ | |
| return title, gr.HTML(error_html) | |
| # Valid student ID - perform cached load balancing | |
| try: | |
| result = load_balance_user_cached(check_id) | |
| return title, result | |
| except Exception as e: | |
| # Handle any errors during load balancing | |
| error_html = f""" | |
| <div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center; | |
| background: #f8d7da; border: 1px solid #f5c6cb; border-radius: 12px;"> | |
| <h2 style="color: #721c24;">🚫 Load Balancing Error</h2> | |
| <p style="color: #721c24; font-size: 16px; line-height: 1.6;"> | |
| {str(e)}<br><br> | |
| Please try again in a few moments or contact your instructor if the problem persists. | |
| </p> | |
| </div> | |
| """ | |
| return title, gr.HTML(error_html) | |
| # 管理功能 | |
| def get_cache_status(): | |
| """获取缓存状态(用于调试或管理)""" | |
| global activity_cache | |
| if activity_cache: | |
| return activity_cache.get_cache_info() | |
| return {"error": "Cache not initialized"} | |
| def force_cache_update(): | |
| """强制更新缓存(用于调试或管理)""" | |
| global activity_cache | |
| if activity_cache: | |
| activity_cache.force_update() | |
| return {"status": "Cache updated"} | |
| return {"error": "Cache not initialized"} | |
| def get_data_manager_status(): | |
| """获取数据管理器状态""" | |
| return { | |
| "initialized": data_manager.initialized, | |
| "last_sync": data_manager.last_data_sync.isoformat() if data_manager.last_data_sync else None, | |
| "spaces_count": len(data_manager.available_spaces), | |
| "repo_connected": data_manager.repo is not None | |
| } | |
| def force_data_sync(): | |
| """强制数据同步""" | |
| try: | |
| data_manager._sync_data() | |
| return {"status": "Data sync completed"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def get_memory_allocation_status(): | |
| """获取内存分配状态""" | |
| return memory_allocation_store.get_status_summary() | |
| def clear_memory_allocations(): | |
| """清除所有内存分配记录(管理功能)""" | |
| try: | |
| with memory_allocation_store._lock: | |
| cleared_count = len(memory_allocation_store._allocations) | |
| memory_allocation_store._allocations.clear() | |
| return {"status": f"Cleared {cleared_count} memory allocations"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # 添加可选的管理界面(用于调试) | |
| def create_admin_interface(): | |
| """创建管理界面(可选)- 增强版本""" | |
| with gr.Blocks(title="CIV3283 Load Distributor - Admin") as admin_interface: | |
| gr.Markdown("# 🔧 CIV3283 Load Distributor - Admin Panel") | |
| with gr.Tab("Cache Management"): | |
| with gr.Row(): | |
| cache_status_btn = gr.Button("📊 Check Cache Status", variant="secondary") | |
| force_cache_update_btn = gr.Button("🔄 Force Cache Update", variant="primary") | |
| cache_info_display = gr.JSON(label="Cache Information") | |
| cache_status_message = gr.Markdown("") | |
| with gr.Tab("Data Management"): | |
| with gr.Row(): | |
| data_status_btn = gr.Button("📊 Check Data Manager Status", variant="secondary") | |
| force_data_sync_btn = gr.Button("🔄 Force Data Sync", variant="primary") | |
| data_info_display = gr.JSON(label="Data Manager Information") | |
| data_status_message = gr.Markdown("") | |
| with gr.Tab("Memory Allocation"): | |
| with gr.Row(): | |
| memory_status_btn = gr.Button("🧠 Check Memory Allocations", variant="secondary") | |
| clear_memory_btn = gr.Button("🗑️ Clear All Memory Allocations", variant="stop") | |
| memory_info_display = gr.JSON(label="Memory Allocation Information") | |
| memory_status_message = gr.Markdown("") | |
| def check_cache_status(): | |
| try: | |
| status = get_cache_status() | |
| if 'error' in status: | |
| return status, "❌ Cache not available" | |
| else: | |
| age_min = status.get('age_minutes', 0) | |
| spaces_count = status.get('spaces_count', 0) | |
| is_fresh = status.get('is_fresh', False) | |
| if is_fresh: | |
| message = f"✅ Cache is fresh ({age_min:.1f} min old, {spaces_count} spaces)" | |
| else: | |
| message = f"⚠️ Cache is stale ({age_min:.1f} min old, {spaces_count} spaces)" | |
| return status, message | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error checking cache: {str(e)}" | |
| def force_update(): | |
| try: | |
| result = force_cache_update() | |
| if 'error' in result: | |
| return result, "❌ Failed to update cache" | |
| else: | |
| # 获取更新后的状态 | |
| status = get_cache_status() | |
| return status, "✅ Cache updated successfully" | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error updating cache: {str(e)}" | |
| def check_data_status(): | |
| try: | |
| status = get_data_manager_status() | |
| if status['initialized']: | |
| last_sync = status['last_sync'] | |
| if last_sync: | |
| sync_time = datetime.fromisoformat(last_sync) | |
| age_minutes = (datetime.now() - sync_time).total_seconds() / 60 | |
| message = f"✅ Data manager active (last sync: {age_minutes:.1f} min ago, {status['spaces_count']} spaces)" | |
| else: | |
| message = "⚠️ Data manager initialized but no sync recorded" | |
| else: | |
| message = "❌ Data manager not initialized" | |
| return status, message | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error checking data manager: {str(e)}" | |
| def force_sync(): | |
| try: | |
| result = force_data_sync() | |
| if 'error' in result: | |
| return result, "❌ Failed to sync data" | |
| else: | |
| # 获取更新后的状态 | |
| status = get_data_manager_status() | |
| return status, "✅ Data sync completed successfully" | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error syncing data: {str(e)}" | |
| def check_memory_status(): | |
| try: | |
| status = get_memory_allocation_status() | |
| active_count = status['active_count'] | |
| total_stored = status['total_stored'] | |
| if active_count > 0: | |
| message = f"🧠 {active_count} active allocations ({total_stored} total in memory)" | |
| else: | |
| message = "✅ No active memory allocations" | |
| return status, message | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error checking memory allocations: {str(e)}" | |
| def clear_memory(): | |
| try: | |
| result = clear_memory_allocations() | |
| if 'error' in result: | |
| return result, "❌ Failed to clear memory allocations" | |
| else: | |
| # 获取更新后的状态 | |
| status = get_memory_allocation_status() | |
| return status, f"✅ {result['status']}" | |
| except Exception as e: | |
| return {"error": str(e)}, f"❌ Error clearing memory: {str(e)}" | |
| # Cache tab event handlers | |
| cache_status_btn.click( | |
| fn=check_cache_status, | |
| outputs=[cache_info_display, cache_status_message] | |
| ) | |
| force_cache_update_btn.click( | |
| fn=force_update, | |
| outputs=[cache_info_display, cache_status_message] | |
| ) | |
| # Data tab event handlers | |
| data_status_btn.click( | |
| fn=check_data_status, | |
| outputs=[data_info_display, data_status_message] | |
| ) | |
| force_data_sync_btn.click( | |
| fn=force_sync, | |
| outputs=[data_info_display, data_status_message] | |
| ) | |
| # Memory tab event handlers | |
| memory_status_btn.click( | |
| fn=check_memory_status, | |
| outputs=[memory_info_display, memory_status_message] | |
| ) | |
| clear_memory_btn.click( | |
| fn=clear_memory, | |
| outputs=[memory_info_display, memory_status_message] | |
| ) | |
| return admin_interface | |
| # 修改后的主Gradio界面 | |
| def create_main_interface(): | |
| """创建主要的用户界面""" | |
| with gr.Blocks(title="CIV3283 Load Distributor") as interface: | |
| title_display = gr.Markdown("# 🔄 CIV3283 Learning Assistant Load Distributor", elem_id="title") | |
| content_display = gr.HTML("") | |
| # 主要的负载均衡逻辑 - 页面加载时执行 | |
| interface.load( | |
| fn=handle_user_access_cached, | |
| outputs=[title_display, content_display] | |
| ) | |
| return interface | |
| # 修改后的main函数 | |
| if __name__ == "__main__": | |
| import sys | |
| try: | |
| # 系统初始化 - 只在启动时执行一次 | |
| print("🚀 Initializing CIV3283 Load Distributor System...") | |
| init_system() | |
| print("✅ System initialization completed successfully") | |
| print("🧠 Using pure memory storage for allocations (no file I/O during user access)") | |
| # 检查是否需要管理界面 | |
| enable_admin = "--admin" in sys.argv or os.environ.get("ENABLE_ADMIN", "false").lower() == "true" | |
| if enable_admin: | |
| print("🔧 Starting with admin interface enabled") | |
| # 创建带管理界面的组合界面 | |
| main_interface = create_main_interface() | |
| admin_interface = create_admin_interface() | |
| # 使用TabbedInterface组合两个界面 | |
| combined_interface = gr.TabbedInterface( | |
| [main_interface, admin_interface], | |
| ["🎯 Load Distributor", "🔧 Admin Panel"], | |
| title="CIV3283 Load Distributor System" | |
| ) | |
| combined_interface.launch() | |
| else: | |
| print("🎯 Starting main interface only") | |
| # 只启动主界面 | |
| main_interface = create_main_interface() | |
| main_interface.launch() | |
| except Exception as e: | |
| print(f"❌ System initialization failed: {e}") | |
| import traceback | |
| print(f"Traceback: {traceback.format_exc()}") | |
| raise |