import gradio as gr import os import csv from datetime import datetime, timedelta from huggingface_hub import Repository import threading import time import json # Configuration DATA_STORAGE_REPO = "CIV3283/Data_Storage" DATA_BRANCH_NAME = "data_branch" LOCAL_DATA_DIR = "temp_data_storage" MIN_IDLE_MINUTES = 20 # Minimum idle time required for space assignment ALLOCATION_LOCK_DURATION = 10 # Lock duration in minutes # 缓存配置 - 全部改为内存存储 CACHE_UPDATE_INTERVAL = 300 # 10分钟 = 600秒 DATA_SYNC_INTERVAL = 300 # 数据同步间隔,10分钟 = 600秒 CACHE_LOCK = threading.Lock() ALLOCATION_LOCK = threading.Lock() # 全局内存存储 class MemoryAllocationStore: """内存中的分配记录存储""" def __init__(self): self._allocations = {} # {space_name: {'student_id': str, 'allocated_time': datetime, 'expires_at': datetime}} self._lock = threading.Lock() self._cleanup_interval = 60 # 每分钟清理一次过期记录 # 启动后台清理线程 self._start_cleanup_thread() def _start_cleanup_thread(self): """启动后台清理过期分配记录的线程""" def cleanup_worker(): while True: time.sleep(self._cleanup_interval) self._cleanup_expired_allocations() cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() print("[MemoryAllocationStore] Background cleanup thread started") def _cleanup_expired_allocations(self): """清理过期的分配记录""" with self._lock: current_time = datetime.now() expired_spaces = [] for space_name, alloc_info in self._allocations.items(): if alloc_info['expires_at'] <= current_time: expired_spaces.append(space_name) for space_name in expired_spaces: del self._allocations[space_name] if expired_spaces: # 只在有过期记录时打印 print(f"[MemoryAllocationStore] Cleaned up expired allocation: {space_name}") def add_allocation(self, space_name, student_id): """添加新的分配记录""" with self._lock: current_time = datetime.now() expires_at = current_time + timedelta(minutes=ALLOCATION_LOCK_DURATION) self._allocations[space_name] = { 'student_id': student_id, 'allocated_time': current_time, 'expires_at': expires_at } print(f"[MemoryAllocationStore] Added allocation: {space_name} -> {student_id} (expires at {expires_at.strftime('%H:%M:%S')})") def get_active_allocations(self): """��取所有有效的分配记录""" with self._lock: current_time = datetime.now() active_allocations = {} for space_name, alloc_info in self._allocations.items(): if alloc_info['expires_at'] > current_time: active_allocations[space_name] = alloc_info.copy() return active_allocations def is_allocated(self, space_name): """检查指定空间是否被分配""" with self._lock: if space_name not in self._allocations: return False, None alloc_info = self._allocations[space_name] current_time = datetime.now() if alloc_info['expires_at'] > current_time: return True, alloc_info['student_id'] else: # 过期了,删除记录 del self._allocations[space_name] return False, None def get_status_summary(self): """获取分配状态摘要""" with self._lock: current_time = datetime.now() active_count = 0 summary = [] for space_name, alloc_info in self._allocations.items(): if alloc_info['expires_at'] > current_time: active_count += 1 remaining_minutes = (alloc_info['expires_at'] - current_time).total_seconds() / 60 summary.append(f"{space_name} -> {alloc_info['student_id']} ({remaining_minutes:.1f}min left)") return { 'active_count': active_count, 'total_stored': len(self._allocations), 'summary': summary } # 全局数据管理器 class DataManager: def __init__(self): self.repo = None self.available_spaces = [] self.last_data_sync = None self._sync_lock = threading.Lock() self._sync_thread = None self._stop_event = threading.Event() self.initialized = False def initialize(self): """初始化数据管理器 - 只在启动时调用一次""" if self.initialized: return True try: print("[DataManager] Initializing data manager...") # 初始化仓库连接 self.repo = Repository( local_dir=LOCAL_DATA_DIR, clone_from=DATA_STORAGE_REPO, revision=DATA_BRANCH_NAME, repo_type="space", use_auth_token=os.environ.get("HF_HUB_TOKEN") ) # 配置git用户 self.repo.git_config_username_and_email("git_user", f"load_distributor") self.repo.git_config_username_and_email("git_email", f"loaddistributor@takeiteasy.space") # 初始数据同步 self._sync_data() # 启动后台同步线程 self._start_background_sync() self.initialized = True print("[DataManager] Data manager initialized successfully") return True except Exception as e: print(f"[DataManager] Initialization failed: {e}") return False def _start_background_sync(self): """启动后台数据同步线程""" if self._sync_thread and self._sync_thread.is_alive(): return self._stop_event.clear() self._sync_thread = threading.Thread(target=self._background_sync_worker, daemon=True) self._sync_thread.start() print("[DataManager] Background sync thread started") def _background_sync_worker(self): """后台同步工作线程""" while not self._stop_event.is_set(): try: # 等待指定间隔或停止信号 if self._stop_event.wait(timeout=DATA_SYNC_INTERVAL): break print("[DataManager] Starting scheduled data sync...") self._sync_data() print("[DataManager] Scheduled data sync completed") except Exception as e: print(f"[DataManager] Error in background sync: {e}") if not self._stop_event.wait(timeout=60): # 1分钟后重试 continue def _sync_data(self): """同步数据 - 拉取最新数据并更新可用空间列表""" with self._sync_lock: try: start_time = time.time() print("[DataManager] Syncing data from remote repository...") # 拉取最新数据 self.repo.git_pull(rebase=True) # 更新可用空间列表 self.available_spaces = self._get_available_spaces() self.last_data_sync = datetime.now() elapsed_time = time.time() - start_time print(f"[DataManager] Data sync completed in {elapsed_time:.2f}s, found {len(self.available_spaces)} spaces") except Exception as e: print(f"[DataManager] Error syncing data: {e}") def _get_available_spaces(self): """获取可用空间列表""" available_spaces = set() try: for filename in os.listdir(LOCAL_DATA_DIR): if filename.endswith('_query_log.csv') and '_Student_' in filename: space_name = filename.replace('_query_log.csv', '') available_spaces.add(space_name) return sorted(list(available_spaces)) except Exception as e: print(f"[DataManager] Error getting available spaces: {e}") return [] def get_available_spaces(self): """获取可用空间列表 - 不触发数据同步""" if not self.initialized: raise Exception("DataManager not initialized") return self.available_spaces.copy() def get_repo(self): """获取仓库对象""" if not self.initialized: raise Exception("DataManager not initialized") return self.repo def get_repo_dir(self): """获取仓库本地目录""" if not self.initialized: raise Exception("DataManager not initialized") return LOCAL_DATA_DIR def stop(self): """停止后台同步""" if self._sync_thread: self._stop_event.set() print("[DataManager] Background sync thread stopping...") class SpaceActivityCache: """内存中的空间活动缓存""" def __init__(self, data_manager): self.data_manager = data_manager self._cache_data = {} self._last_update = None self._update_thread = None self._stop_event = threading.Event() # 启动时立即更新一次缓存 self._update_cache() # 启动后台更新线程 self.start_background_updates() def start_background_updates(self): """启动后台缓存更新线程""" if self._update_thread and self._update_thread.is_alive(): return self._stop_event.clear() self._update_thread = threading.Thread(target=self._background_update_worker, daemon=True) self._update_thread.start() print("[SpaceActivityCache] Background update thread started") def stop_background_updates(self): """停止后台更新线程""" if self._update_thread: self._stop_event.set() print("[SpaceActivityCache] Background update thread stopping...") def _background_update_worker(self): """后台更新工作线程""" while not self._stop_event.is_set(): try: # 等待指定间隔或停止信号 if self._stop_event.wait(timeout=CACHE_UPDATE_INTERVAL): break print("[SpaceActivityCache] Starting scheduled cache update...") self._update_cache() print("[SpaceActivityCache] Scheduled cache update completed") except Exception as e: print(f"[SpaceActivityCache] Error in background update: {e}") if not self._stop_event.wait(timeout=60): # 1分钟后重试 continue def _update_cache(self): """更新缓存数据""" try: print("[SpaceActivityCache] Updating activity cache...") start_time = time.time() # 从数据管理器获取可用空间(不触发数据同步) available_spaces = self.data_manager.get_available_spaces() repo_dir = self.data_manager.get_repo_dir() new_cache_data = { 'last_updated': datetime.now().isoformat(), 'update_interval_seconds': CACHE_UPDATE_INTERVAL, 'spaces': {} } # 为每个空间读取最后活动时间 for space_name in available_spaces: csv_file = os.path.join(repo_dir, f"{space_name}_query_log.csv") last_activity, status = self._get_last_activity_from_file(csv_file) new_cache_data['spaces'][space_name] = { 'last_activity': last_activity.isoformat() if last_activity != datetime.min else None, 'status': status, 'cache_update_time': datetime.now().isoformat() } print(f"[SpaceActivityCache] {space_name}: {status}") # 线程安全地更新缓存 with CACHE_LOCK: self._cache_data = new_cache_data self._last_update = datetime.now() elapsed_time = time.time() - start_time print(f"[SpaceActivityCache] Cache updated in {elapsed_time:.2f}s, {len(available_spaces)} spaces processed") except Exception as e: print(f"[SpaceActivityCache] Error updating cache: {e}") import traceback print(f"[SpaceActivityCache] Traceback: {traceback.format_exc()}") def _get_last_activity_from_file(self, csv_file_path): """从文件读取最后活动时间""" try: if not os.path.exists(csv_file_path): return datetime.min, "file_not_found" # 检查文件大小 file_size = os.path.getsize(csv_file_path) if file_size <= 100: return datetime.min, "empty_or_header_only" # 使用CSV reader读取最后一行 with open(csv_file_path, 'r', encoding='utf-8') as f: csv_reader = csv.reader(f) # 跳过header try: header = next(csv_reader) except StopIteration: return datetime.min, "empty_file" # 读取所有行,获取最后一行 rows = [] try: for row in csv_reader: if row: # 跳过空行 rows.append(row) except Exception as csv_error: print(f"[SpaceActivityCache] CSV parsing error for {csv_file_path}: {csv_error}") return datetime.min, "csv_parse_error" if not rows: return datetime.min, "no_data_rows" # 解析最后一行的时间戳 last_row = rows[-1] if len(last_row) >= 3: timestamp_str = last_row[2].strip() # timestamp column try: parsed_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') return parsed_time, f"active_last_at_{parsed_time.strftime('%Y-%m-%d_%H:%M:%S')}" except ValueError as ve: print(f"[SpaceActivityCache] Date parsing error for '{timestamp_str}': {ve}") return datetime.min, "date_parse_error" else: return datetime.min, "invalid_row_format" except Exception as e: print(f"[SpaceActivityCache] Error reading {csv_file_path}: {e}") return datetime.min, "read_error" def get_space_activity(self, space_name): """获取指定空间的活动信息""" with CACHE_LOCK: if not self._cache_data or 'spaces' not in self._cache_data: print("[SpaceActivityCache] No cache available, updating immediately...") self._update_cache() spaces_data = self._cache_data.get('spaces', {}) space_info = spaces_data.get(space_name, {}) if not space_info: return datetime.min, "not_in_cache" # 解析时间戳 last_activity_str = space_info.get('last_activity') if last_activity_str: try: last_activity = datetime.fromisoformat(last_activity_str) except: last_activity = datetime.min else: last_activity = datetime.min status = space_info.get('status', 'unknown') return last_activity, status def get_all_spaces_activity(self): """获取所有空间的活动信息""" with CACHE_LOCK: if not self._cache_data or 'spaces' not in self._cache_data: print("[SpaceActivityCache] No cache available, updating immediately...") self._update_cache() result = {} spaces_data = self._cache_data.get('spaces', {}) for space_name, space_info in spaces_data.items(): last_activity_str = space_info.get('last_activity') if last_activity_str: try: last_activity = datetime.fromisoformat(last_activity_str) except: last_activity = datetime.min else: last_activity = datetime.min result[space_name] = { 'last_activity': last_activity, 'status': space_info.get('status', 'unknown') } return result def get_cache_info(self): """获取缓存状态信息""" with CACHE_LOCK: if self._cache_data and 'last_updated' in self._cache_data: last_updated_str = self._cache_data['last_updated'] try: last_updated = datetime.fromisoformat(last_updated_str) age_minutes = (datetime.now() - last_updated).total_seconds() / 60 return { 'last_updated': last_updated, 'age_minutes': age_minutes, 'spaces_count': len(self._cache_data.get('spaces', {})), 'is_fresh': age_minutes < (CACHE_UPDATE_INTERVAL / 60) * 1.5 } except: pass return { 'last_updated': None, 'age_minutes': float('inf'), 'spaces_count': 0, 'is_fresh': False } def force_update(self): """强制立即更新缓存""" print("[SpaceActivityCache] Force updating cache...") self._update_cache() # 新增:本地防撞车机制 class LocalAllocationTracker: def __init__(self): self._recent_allocations = {} # {space_name: {'student_id': str, 'timestamp': datetime}} self._lock = threading.Lock() self._cleanup_interval = 600 # 清理间隔(秒) self._allocation_ttl = 60 # 本地分配记录的生存时间(秒) # 启动后台清理线程 self._start_cleanup_thread() def _start_cleanup_thread(self): """启动后台线程定期清理过期的本地分配记录""" def cleanup_worker(): while True: time.sleep(self._cleanup_interval) self._cleanup_expired_allocations() cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() print("[LocalAllocationTracker] Background cleanup thread started") def _cleanup_expired_allocations(self): """清理过期的本地分配记录""" with self._lock: current_time = datetime.now() expired_spaces = [] for space_name, alloc_info in self._recent_allocations.items(): if (current_time - alloc_info['timestamp']).total_seconds() > self._allocation_ttl: expired_spaces.append(space_name) for space_name in expired_spaces: del self._recent_allocations[space_name] if expired_spaces: # 只在有过期记录时打印 print(f"[LocalAllocationTracker] Cleaned up expired allocation: {space_name}") def is_recently_allocated_locally(self, space_name): """检查空间是否在本地被最近分配过""" with self._lock: if space_name not in self._recent_allocations: return False, None alloc_info = self._recent_allocations[space_name] current_time = datetime.now() elapsed_seconds = (current_time - alloc_info['timestamp']).total_seconds() if elapsed_seconds > self._allocation_ttl: # 过期了,删除记录 del self._recent_allocations[space_name] print(f"[LocalAllocationTracker] Expired local allocation removed: {space_name}") return False, None print(f"[LocalAllocationTracker] Space {space_name} recently allocated locally to {alloc_info['student_id']} ({elapsed_seconds:.1f}s ago)") return True, alloc_info['student_id'] def record_local_allocation(self, space_name, student_id): """记录本地分配""" with self._lock: self._recent_allocations[space_name] = { 'student_id': student_id, 'timestamp': datetime.now() } print(f"[LocalAllocationTracker] Locally recorded allocation: {space_name} -> {student_id}") def get_recent_allocations_summary(self): """获取最近本地分配的摘要(用于调试)""" with self._lock: current_time = datetime.now() summary = [] for space_name, alloc_info in self._recent_allocations.items(): elapsed = (current_time - alloc_info['timestamp']).total_seconds() summary.append(f"{space_name} -> {alloc_info['student_id']} ({elapsed:.1f}s ago)") return summary # 全局实例 data_manager = DataManager() activity_cache = None local_tracker = LocalAllocationTracker() memory_allocation_store = MemoryAllocationStore() def init_system(): """初始化整个系统 - 只在启动时调用一次""" global activity_cache print("[init_system] Initializing load distributor system...") # 初始化数据管理器 if not data_manager.initialize(): raise Exception("Failed to initialize data manager") # 初始化活动缓存 activity_cache = SpaceActivityCache(data_manager) print("[init_system] System initialization completed") def analyze_space_activity_cached(): """使用缓存的空间活动分析 - 完全使用内存数据""" global activity_cache if activity_cache is None: raise Exception("Activity cache not initialized") # 从数据管理器获取可用空间列表(不触发数据同步) available_spaces = data_manager.get_available_spaces() space_activity = [] current_time = datetime.now() # 获取缓存信息 cache_info = activity_cache.get_cache_info() print(f"[analyze_space_activity_cached] Using cache (age: {cache_info['age_minutes']:.1f} min, fresh: {cache_info['is_fresh']})") # 获取所有空间的缓存活动数据 all_spaces_activity = activity_cache.get_all_spaces_activity() # 从内存获取分配记录 - 不再读取文件 active_allocations = memory_allocation_store.get_active_allocations() print(f"[analyze_space_activity_cached] Analyzing {len(available_spaces)} spaces using cached data...") print(f"[analyze_space_activity_cached] Memory allocations: {len(active_allocations)} active") for space_name in available_spaces: # 从缓存获取活动信息 cached_info = all_spaces_activity.get(space_name) if cached_info: last_activity = cached_info['last_activity'] cached_status = cached_info['status'] else: # 缓存中没有这个空间,可能是新空间 last_activity = datetime.min cached_status = "not_in_cache" # Calculate idle time in minutes if last_activity == datetime.min or 'empty' in cached_status or 'not_found' in cached_status: idle_minutes = float('inf') # Never used status = "Never used" last_activity_str = "Never" else: idle_minutes = (current_time - last_activity).total_seconds() / 60 status = f"Idle for {idle_minutes:.1f} minutes (cached)" last_activity_str = last_activity.strftime('%Y-%m-%d %H:%M:%S') # 检查内存中的分配记录 is_recently_allocated_memory = space_name in active_allocations if is_recently_allocated_memory: alloc_info = active_allocations[space_name] minutes_until_free = (alloc_info['expires_at'] - current_time).total_seconds() / 60 status += f" (Allocated in memory to {alloc_info['student_id']}, free in {minutes_until_free:.1f} min)" # Check if space is recently allocated (local) is_recently_allocated_local, local_student = local_tracker.is_recently_allocated_locally(space_name) if is_recently_allocated_local: status += f" (Recently allocated locally to {local_student})" space_activity.append({ 'space_name': space_name, 'last_activity': last_activity, 'last_activity_str': last_activity_str, 'idle_minutes': idle_minutes, 'status': status, 'is_recently_allocated_memory': is_recently_allocated_memory, 'is_recently_allocated_local': is_recently_allocated_local, 'cached_status': cached_status }) print(f"[analyze_space_activity_cached] {space_name}: {status}") # Sort by idle time (most idle first) space_activity.sort(key=lambda x: x['idle_minutes'], reverse=True) return space_activity def create_status_display(space_activity): """Create formatted status display for all spaces with proper line breaks""" status_display = "📊 **Current Space Status (sorted by availability):**

" # 显示内存分配记录摘要 memory_summary = memory_allocation_store.get_status_summary() if memory_summary['active_count'] > 0: status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**
" for alloc in memory_summary['summary']: status_display += f"   • {alloc}
" status_display += "
" # 显示本地分配记录摘要 local_summary = local_tracker.get_recent_allocations_summary() if local_summary: status_display += "🔒 **Recent Local Allocations:**
" for alloc in local_summary: status_display += f"   • {alloc}
" status_display += "
" for i, space in enumerate(space_activity, 1): status_display += f"{i}. **{space['space_name']}**
" status_display += f"   • Status: {space['status']}
" status_display += f"   • Last activity: {space['last_activity_str']}

" return status_display def select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id): """使用缓存的增强防撞空间选择函数 - 完全使用内存存储""" print(f"[select_space_with_enhanced_collision_avoidance_cached] Starting selection for student: {student_id}") # 第一步:过滤掉不符合基本条件的空间 basic_available_spaces = [] for space in space_activity: # 检查基本条件 - 使用内存分配记录 if (space['idle_minutes'] >= MIN_IDLE_MINUTES and not space['is_recently_allocated_memory'] and not space['is_recently_allocated_local']): basic_available_spaces.append(space) print(f"[select_space_with_enhanced_collision_avoidance_cached] Basic available spaces: {len(basic_available_spaces)}") if not basic_available_spaces: # 生成详细的错误信息 idle_spaces = [s for s in space_activity if s['idle_minutes'] >= MIN_IDLE_MINUTES] memory_allocated = [s for s in space_activity if s['is_recently_allocated_memory']] local_allocated = [s for s in space_activity if s['is_recently_allocated_local']] error_parts = [] if not idle_spaces: error_parts.append(f"all spaces used within {MIN_IDLE_MINUTES} minutes") if memory_allocated: error_parts.append(f"{len(memory_allocated)} spaces allocated in memory") if local_allocated: error_parts.append(f"{len(local_allocated)} spaces locally allocated") error_msg = ( f"🚫 **All learning assistants are currently busy**\n\n" f"Blocking conditions: {', '.join(error_parts)}\n\n" f"**Please try again in 1-2 minutes.**" ) print(f"[select_space_with_enhanced_collision_avoidance_cached] No available spaces: {error_msg}") raise gr.Error(error_msg, duration=10) # 第二步:选择最优空间并进行最终验证 selected_space = basic_available_spaces[0] # 已经按idle_time排序 space_name = selected_space['space_name'] print(f"[select_space_with_enhanced_collision_avoidance_cached] Preliminary selection: {space_name}") # 第三步:最终防撞车检查 - 再次验证本地分配状态 is_local_conflict, conflicting_student = local_tracker.is_recently_allocated_locally(space_name) if is_local_conflict: print(f"[select_space_with_enhanced_collision_avoidance_cached] COLLISION DETECTED! {space_name} recently allocated to {conflicting_student}") # 寻找替代空间 alternative_spaces = [s for s in basic_available_spaces[1:] if not local_tracker.is_recently_allocated_locally(s['space_name'])[0]] if alternative_spaces: selected_space = alternative_spaces[0] space_name = selected_space['space_name'] print(f"[select_space_with_enhanced_collision_avoidance_cached] Using alternative space: {space_name}") else: error_msg = ( f"🚫 **Collision detected and no alternatives available**\n\n" f"The system detected a potential conflict with another student's allocation.\n\n" f"**Please try again in 10-15 seconds.**" ) print(f"[select_space_with_enhanced_collision_avoidance_cached] No alternatives available") raise gr.Error(error_msg, duration=8) # 第四步:立即记录本地分配(在写入内存之前) local_tracker.record_local_allocation(space_name, student_id) print(f"[select_space_with_enhanced_collision_avoidance_cached] Local allocation recorded BEFORE memory write") # 第五步:记录到内存存储 - 不再写入文件或推送到远程 memory_allocation_store.add_allocation(space_name, student_id) print(f"[select_space_with_enhanced_collision_avoidance_cached] Allocation recorded in memory only") # 第六步:生成结果(使用带缓存信息的状态显示) status_display = create_status_display_with_cache_info(space_activity) redirect_url = f"https://huggingface.co/spaces/CIV3283/{space_name}/?check={student_id}" print(f"[select_space_with_enhanced_collision_avoidance_cached] Final allocation: {space_name} -> {student_id}") return redirect_to_space(redirect_url, selected_space, status_display) def redirect_to_space(redirect_url, selected_space, status_display): """Display redirect information with manual click option""" if selected_space['idle_minutes'] == float('inf'): idle_info = "Never used (completely fresh)" else: idle_info = f"{selected_space['idle_minutes']:.1f} minutes" # Modified HTML structure - Only Access section, removed analysis section redirect_html = f"""

🎯 Learning Assistant Assigned

{selected_space['space_name']}

✨ This space was idle for: {idle_info}

🚀 Access Your Learning Assistant

Click the button below to access your assigned learning assistant.

➤ Open Learning Assistant

💡 Left-click the button above or right-click it and select "Open in new tab"

🔄 Need a different assistant? Refresh this page to get reassigned.

""" return gr.HTML(redirect_html) def load_balance_user_cached(student_id): """使用缓存的负载均衡函数 - 完全使用内存存储""" print(f"[load_balance_user_cached] Starting cached load balancing for student ID: {student_id}") # 检查系统是否已初始化 if not data_manager.initialized: raise gr.Error("🚫 System not properly initialized. Please contact administrator.", duration=8) # 使用缓存版本的分析函数(不触发数据同步) space_activity = analyze_space_activity_cached() # Select space with enhanced collision avoidance return select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id) def get_url_params(request: gr.Request): """Extract URL parameters from request""" query_params = dict(request.query_params) check_id = query_params.get('check', None) if check_id: return f"Load Distributor", check_id else: return "Load Distributor", None def create_status_display_with_cache_info(space_activity): """创建包含缓存信息的状态显示""" global activity_cache # 获取缓存状态 cache_info = activity_cache.get_cache_info() if activity_cache else None status_display = "📊 **Current Space Status (sorted by availability):**

" # 显示缓存信息 if cache_info: if cache_info['is_fresh']: cache_status = f"✅ Fresh (updated {cache_info['age_minutes']:.1f} min ago)" else: cache_status = f"⚠️ Stale (updated {cache_info['age_minutes']:.1f} min ago)" status_display += f"🔄 **Cache Status:** {cache_status}
" status_display += f"📋 **Cached Spaces:** {cache_info['spaces_count']}

" # 显示内存分配记录摘要 memory_summary = memory_allocation_store.get_status_summary() if memory_summary['active_count'] > 0: status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**
" for alloc in memory_summary['summary']: status_display += f"   • {alloc}
" status_display += "
" # 显示本地分配记录摘要 local_summary = local_tracker.get_recent_allocations_summary() if local_summary: status_display += "🔒 **Recent Local Allocations:**
" for alloc in local_summary: status_display += f"   • {alloc}
" status_display += "
" for i, space in enumerate(space_activity, 1): status_display += f"{i}. **{space['space_name']}**
" status_display += f"   • Status: {space['status']}
" status_display += f"   • Last activity: {space['last_activity_str']}
" # 显示缓存状态(可选) if 'cached_status' in space: status_display += f"   • Cache: {space['cached_status']}
" status_display += "
" return status_display def handle_user_access_cached(request: gr.Request): """使用缓存的用户访问处理""" title, check_id = get_url_params(request) if not check_id: # No student ID provided error_html = """

⚠️ Invalid Access

This load distributor requires a valid student ID parameter.

Please access this system through the official link provided in Moodle.

""" return title, gr.HTML(error_html) # Valid student ID - perform cached load balancing try: result = load_balance_user_cached(check_id) return title, result except Exception as e: # Handle any errors during load balancing error_html = f"""

🚫 Load Balancing Error

{str(e)}

Please try again in a few moments or contact your instructor if the problem persists.

""" return title, gr.HTML(error_html) # 管理功能 def get_cache_status(): """获取缓存状态(用于调试或管理)""" global activity_cache if activity_cache: return activity_cache.get_cache_info() return {"error": "Cache not initialized"} def force_cache_update(): """强制更新缓存(用于调试或管理)""" global activity_cache if activity_cache: activity_cache.force_update() return {"status": "Cache updated"} return {"error": "Cache not initialized"} def get_data_manager_status(): """获取数据管理器状态""" return { "initialized": data_manager.initialized, "last_sync": data_manager.last_data_sync.isoformat() if data_manager.last_data_sync else None, "spaces_count": len(data_manager.available_spaces), "repo_connected": data_manager.repo is not None } def force_data_sync(): """强制数据同步""" try: data_manager._sync_data() return {"status": "Data sync completed"} except Exception as e: return {"error": str(e)} def get_memory_allocation_status(): """获取内存分配状态""" return memory_allocation_store.get_status_summary() def clear_memory_allocations(): """清除所有内存分配记录(管理功能)""" try: with memory_allocation_store._lock: cleared_count = len(memory_allocation_store._allocations) memory_allocation_store._allocations.clear() return {"status": f"Cleared {cleared_count} memory allocations"} except Exception as e: return {"error": str(e)} # 添加可选的管理界面(用于调试) def create_admin_interface(): """创建管理界面(可选)- 增强版本""" with gr.Blocks(title="CIV3283 Load Distributor - Admin") as admin_interface: gr.Markdown("# 🔧 CIV3283 Load Distributor - Admin Panel") with gr.Tab("Cache Management"): with gr.Row(): cache_status_btn = gr.Button("📊 Check Cache Status", variant="secondary") force_cache_update_btn = gr.Button("🔄 Force Cache Update", variant="primary") cache_info_display = gr.JSON(label="Cache Information") cache_status_message = gr.Markdown("") with gr.Tab("Data Management"): with gr.Row(): data_status_btn = gr.Button("📊 Check Data Manager Status", variant="secondary") force_data_sync_btn = gr.Button("🔄 Force Data Sync", variant="primary") data_info_display = gr.JSON(label="Data Manager Information") data_status_message = gr.Markdown("") with gr.Tab("Memory Allocation"): with gr.Row(): memory_status_btn = gr.Button("🧠 Check Memory Allocations", variant="secondary") clear_memory_btn = gr.Button("🗑️ Clear All Memory Allocations", variant="stop") memory_info_display = gr.JSON(label="Memory Allocation Information") memory_status_message = gr.Markdown("") def check_cache_status(): try: status = get_cache_status() if 'error' in status: return status, "❌ Cache not available" else: age_min = status.get('age_minutes', 0) spaces_count = status.get('spaces_count', 0) is_fresh = status.get('is_fresh', False) if is_fresh: message = f"✅ Cache is fresh ({age_min:.1f} min old, {spaces_count} spaces)" else: message = f"⚠️ Cache is stale ({age_min:.1f} min old, {spaces_count} spaces)" return status, message except Exception as e: return {"error": str(e)}, f"❌ Error checking cache: {str(e)}" def force_update(): try: result = force_cache_update() if 'error' in result: return result, "❌ Failed to update cache" else: # 获取更新后的状态 status = get_cache_status() return status, "✅ Cache updated successfully" except Exception as e: return {"error": str(e)}, f"❌ Error updating cache: {str(e)}" def check_data_status(): try: status = get_data_manager_status() if status['initialized']: last_sync = status['last_sync'] if last_sync: sync_time = datetime.fromisoformat(last_sync) age_minutes = (datetime.now() - sync_time).total_seconds() / 60 message = f"✅ Data manager active (last sync: {age_minutes:.1f} min ago, {status['spaces_count']} spaces)" else: message = "⚠️ Data manager initialized but no sync recorded" else: message = "❌ Data manager not initialized" return status, message except Exception as e: return {"error": str(e)}, f"❌ Error checking data manager: {str(e)}" def force_sync(): try: result = force_data_sync() if 'error' in result: return result, "❌ Failed to sync data" else: # 获取更新后的状态 status = get_data_manager_status() return status, "✅ Data sync completed successfully" except Exception as e: return {"error": str(e)}, f"❌ Error syncing data: {str(e)}" def check_memory_status(): try: status = get_memory_allocation_status() active_count = status['active_count'] total_stored = status['total_stored'] if active_count > 0: message = f"🧠 {active_count} active allocations ({total_stored} total in memory)" else: message = "✅ No active memory allocations" return status, message except Exception as e: return {"error": str(e)}, f"❌ Error checking memory allocations: {str(e)}" def clear_memory(): try: result = clear_memory_allocations() if 'error' in result: return result, "❌ Failed to clear memory allocations" else: # 获取更新后的状态 status = get_memory_allocation_status() return status, f"✅ {result['status']}" except Exception as e: return {"error": str(e)}, f"❌ Error clearing memory: {str(e)}" # Cache tab event handlers cache_status_btn.click( fn=check_cache_status, outputs=[cache_info_display, cache_status_message] ) force_cache_update_btn.click( fn=force_update, outputs=[cache_info_display, cache_status_message] ) # Data tab event handlers data_status_btn.click( fn=check_data_status, outputs=[data_info_display, data_status_message] ) force_data_sync_btn.click( fn=force_sync, outputs=[data_info_display, data_status_message] ) # Memory tab event handlers memory_status_btn.click( fn=check_memory_status, outputs=[memory_info_display, memory_status_message] ) clear_memory_btn.click( fn=clear_memory, outputs=[memory_info_display, memory_status_message] ) return admin_interface # 修改后的主Gradio界面 def create_main_interface(): """创建主要的用户界面""" with gr.Blocks(title="CIV3283 Load Distributor") as interface: title_display = gr.Markdown("# 🔄 CIV3283 Learning Assistant Load Distributor", elem_id="title") content_display = gr.HTML("") # 主要的负载均衡逻辑 - 页面加载时执行 interface.load( fn=handle_user_access_cached, outputs=[title_display, content_display] ) return interface # 修改后的main函数 if __name__ == "__main__": import sys try: # 系统初始化 - 只在启动时执行一次 print("🚀 Initializing CIV3283 Load Distributor System...") init_system() print("✅ System initialization completed successfully") print("🧠 Using pure memory storage for allocations (no file I/O during user access)") # 检查是否需要管理界面 enable_admin = "--admin" in sys.argv or os.environ.get("ENABLE_ADMIN", "false").lower() == "true" if enable_admin: print("🔧 Starting with admin interface enabled") # 创建带管理界面的组合界面 main_interface = create_main_interface() admin_interface = create_admin_interface() # 使用TabbedInterface组合两个界面 combined_interface = gr.TabbedInterface( [main_interface, admin_interface], ["🎯 Load Distributor", "🔧 Admin Panel"], title="CIV3283 Load Distributor System" ) combined_interface.launch() else: print("🎯 Starting main interface only") # 只启动主界面 main_interface = create_main_interface() main_interface.launch() except Exception as e: print(f"❌ System initialization failed: {e}") import traceback print(f"Traceback: {traceback.format_exc()}") raise