linxinhua's picture
Update app.py
404bee7 verified
import gradio as gr
import os
import csv
from datetime import datetime, timedelta
from huggingface_hub import Repository
import threading
import time
import json
# Configuration
DATA_STORAGE_REPO = "CIV3283/Data_Storage"
DATA_BRANCH_NAME = "data_branch"
LOCAL_DATA_DIR = "temp_data_storage"
MIN_IDLE_MINUTES = 20 # Minimum idle time required for space assignment
ALLOCATION_LOCK_DURATION = 10 # Lock duration in minutes
# 缓存配置 - 全部改为内存存储
CACHE_UPDATE_INTERVAL = 300 # 10分钟 = 600秒
DATA_SYNC_INTERVAL = 300 # 数据同步间隔,10分钟 = 600秒
CACHE_LOCK = threading.Lock()
ALLOCATION_LOCK = threading.Lock()
# 全局内存存储
class MemoryAllocationStore:
"""内存中的分配记录存储"""
def __init__(self):
self._allocations = {} # {space_name: {'student_id': str, 'allocated_time': datetime, 'expires_at': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 60 # 每分钟清理一次过期记录
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台清理过期分配记录的线程"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[MemoryAllocationStore] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] <= current_time:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[MemoryAllocationStore] Cleaned up expired allocation: {space_name}")
def add_allocation(self, space_name, student_id):
"""添加新的分配记录"""
with self._lock:
current_time = datetime.now()
expires_at = current_time + timedelta(minutes=ALLOCATION_LOCK_DURATION)
self._allocations[space_name] = {
'student_id': student_id,
'allocated_time': current_time,
'expires_at': expires_at
}
print(f"[MemoryAllocationStore] Added allocation: {space_name} -> {student_id} (expires at {expires_at.strftime('%H:%M:%S')})")
def get_active_allocations(self):
"""��取所有有效的分配记录"""
with self._lock:
current_time = datetime.now()
active_allocations = {}
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_allocations[space_name] = alloc_info.copy()
return active_allocations
def is_allocated(self, space_name):
"""检查指定空间是否被分配"""
with self._lock:
if space_name not in self._allocations:
return False, None
alloc_info = self._allocations[space_name]
current_time = datetime.now()
if alloc_info['expires_at'] > current_time:
return True, alloc_info['student_id']
else:
# 过期了,删除记录
del self._allocations[space_name]
return False, None
def get_status_summary(self):
"""获取分配状态摘要"""
with self._lock:
current_time = datetime.now()
active_count = 0
summary = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_count += 1
remaining_minutes = (alloc_info['expires_at'] - current_time).total_seconds() / 60
summary.append(f"{space_name} -> {alloc_info['student_id']} ({remaining_minutes:.1f}min left)")
return {
'active_count': active_count,
'total_stored': len(self._allocations),
'summary': summary
}
# 全局数据管理器
class DataManager:
def __init__(self):
self.repo = None
self.available_spaces = []
self.last_data_sync = None
self._sync_lock = threading.Lock()
self._sync_thread = None
self._stop_event = threading.Event()
self.initialized = False
def initialize(self):
"""初始化数据管理器 - 只在启动时调用一次"""
if self.initialized:
return True
try:
print("[DataManager] Initializing data manager...")
# 初始化仓库连接
self.repo = Repository(
local_dir=LOCAL_DATA_DIR,
clone_from=DATA_STORAGE_REPO,
revision=DATA_BRANCH_NAME,
repo_type="space",
use_auth_token=os.environ.get("HF_HUB_TOKEN")
)
# 配置git用户
self.repo.git_config_username_and_email("git_user", f"load_distributor")
self.repo.git_config_username_and_email("git_email", f"loaddistributor@takeiteasy.space")
# 初始数据同步
self._sync_data()
# 启动后台同步线程
self._start_background_sync()
self.initialized = True
print("[DataManager] Data manager initialized successfully")
return True
except Exception as e:
print(f"[DataManager] Initialization failed: {e}")
return False
def _start_background_sync(self):
"""启动后台数据同步线程"""
if self._sync_thread and self._sync_thread.is_alive():
return
self._stop_event.clear()
self._sync_thread = threading.Thread(target=self._background_sync_worker, daemon=True)
self._sync_thread.start()
print("[DataManager] Background sync thread started")
def _background_sync_worker(self):
"""后台同步工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=DATA_SYNC_INTERVAL):
break
print("[DataManager] Starting scheduled data sync...")
self._sync_data()
print("[DataManager] Scheduled data sync completed")
except Exception as e:
print(f"[DataManager] Error in background sync: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _sync_data(self):
"""同步数据 - 拉取最新数据并更新可用空间列表"""
with self._sync_lock:
try:
start_time = time.time()
print("[DataManager] Syncing data from remote repository...")
# 拉取最新数据
self.repo.git_pull(rebase=True)
# 更新可用空间列表
self.available_spaces = self._get_available_spaces()
self.last_data_sync = datetime.now()
elapsed_time = time.time() - start_time
print(f"[DataManager] Data sync completed in {elapsed_time:.2f}s, found {len(self.available_spaces)} spaces")
except Exception as e:
print(f"[DataManager] Error syncing data: {e}")
def _get_available_spaces(self):
"""获取可用空间列表"""
available_spaces = set()
try:
for filename in os.listdir(LOCAL_DATA_DIR):
if filename.endswith('_query_log.csv') and '_Student_' in filename:
space_name = filename.replace('_query_log.csv', '')
available_spaces.add(space_name)
return sorted(list(available_spaces))
except Exception as e:
print(f"[DataManager] Error getting available spaces: {e}")
return []
def get_available_spaces(self):
"""获取可用空间列表 - 不触发数据同步"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.available_spaces.copy()
def get_repo(self):
"""获取仓库对象"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.repo
def get_repo_dir(self):
"""获取仓库本地目录"""
if not self.initialized:
raise Exception("DataManager not initialized")
return LOCAL_DATA_DIR
def stop(self):
"""停止后台同步"""
if self._sync_thread:
self._stop_event.set()
print("[DataManager] Background sync thread stopping...")
class SpaceActivityCache:
"""内存中的空间活动缓存"""
def __init__(self, data_manager):
self.data_manager = data_manager
self._cache_data = {}
self._last_update = None
self._update_thread = None
self._stop_event = threading.Event()
# 启动时立即更新一次缓存
self._update_cache()
# 启动后台更新线程
self.start_background_updates()
def start_background_updates(self):
"""启动后台缓存更新线程"""
if self._update_thread and self._update_thread.is_alive():
return
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._background_update_worker, daemon=True)
self._update_thread.start()
print("[SpaceActivityCache] Background update thread started")
def stop_background_updates(self):
"""停止后台更新线程"""
if self._update_thread:
self._stop_event.set()
print("[SpaceActivityCache] Background update thread stopping...")
def _background_update_worker(self):
"""后台更新工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=CACHE_UPDATE_INTERVAL):
break
print("[SpaceActivityCache] Starting scheduled cache update...")
self._update_cache()
print("[SpaceActivityCache] Scheduled cache update completed")
except Exception as e:
print(f"[SpaceActivityCache] Error in background update: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _update_cache(self):
"""更新缓存数据"""
try:
print("[SpaceActivityCache] Updating activity cache...")
start_time = time.time()
# 从数据管理器获取可用空间(不触发数据同步)
available_spaces = self.data_manager.get_available_spaces()
repo_dir = self.data_manager.get_repo_dir()
new_cache_data = {
'last_updated': datetime.now().isoformat(),
'update_interval_seconds': CACHE_UPDATE_INTERVAL,
'spaces': {}
}
# 为每个空间读取最后活动时间
for space_name in available_spaces:
csv_file = os.path.join(repo_dir, f"{space_name}_query_log.csv")
last_activity, status = self._get_last_activity_from_file(csv_file)
new_cache_data['spaces'][space_name] = {
'last_activity': last_activity.isoformat() if last_activity != datetime.min else None,
'status': status,
'cache_update_time': datetime.now().isoformat()
}
print(f"[SpaceActivityCache] {space_name}: {status}")
# 线程安全地更新缓存
with CACHE_LOCK:
self._cache_data = new_cache_data
self._last_update = datetime.now()
elapsed_time = time.time() - start_time
print(f"[SpaceActivityCache] Cache updated in {elapsed_time:.2f}s, {len(available_spaces)} spaces processed")
except Exception as e:
print(f"[SpaceActivityCache] Error updating cache: {e}")
import traceback
print(f"[SpaceActivityCache] Traceback: {traceback.format_exc()}")
def _get_last_activity_from_file(self, csv_file_path):
"""从文件读取最后活动时间"""
try:
if not os.path.exists(csv_file_path):
return datetime.min, "file_not_found"
# 检查文件大小
file_size = os.path.getsize(csv_file_path)
if file_size <= 100:
return datetime.min, "empty_or_header_only"
# 使用CSV reader读取最后一行
with open(csv_file_path, 'r', encoding='utf-8') as f:
csv_reader = csv.reader(f)
# 跳过header
try:
header = next(csv_reader)
except StopIteration:
return datetime.min, "empty_file"
# 读取所有行,获取最后一行
rows = []
try:
for row in csv_reader:
if row: # 跳过空行
rows.append(row)
except Exception as csv_error:
print(f"[SpaceActivityCache] CSV parsing error for {csv_file_path}: {csv_error}")
return datetime.min, "csv_parse_error"
if not rows:
return datetime.min, "no_data_rows"
# 解析最后一行的时间戳
last_row = rows[-1]
if len(last_row) >= 3:
timestamp_str = last_row[2].strip() # timestamp column
try:
parsed_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
return parsed_time, f"active_last_at_{parsed_time.strftime('%Y-%m-%d_%H:%M:%S')}"
except ValueError as ve:
print(f"[SpaceActivityCache] Date parsing error for '{timestamp_str}': {ve}")
return datetime.min, "date_parse_error"
else:
return datetime.min, "invalid_row_format"
except Exception as e:
print(f"[SpaceActivityCache] Error reading {csv_file_path}: {e}")
return datetime.min, "read_error"
def get_space_activity(self, space_name):
"""获取指定空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
spaces_data = self._cache_data.get('spaces', {})
space_info = spaces_data.get(space_name, {})
if not space_info:
return datetime.min, "not_in_cache"
# 解析时间戳
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
status = space_info.get('status', 'unknown')
return last_activity, status
def get_all_spaces_activity(self):
"""获取所有空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
result = {}
spaces_data = self._cache_data.get('spaces', {})
for space_name, space_info in spaces_data.items():
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
result[space_name] = {
'last_activity': last_activity,
'status': space_info.get('status', 'unknown')
}
return result
def get_cache_info(self):
"""获取缓存状态信息"""
with CACHE_LOCK:
if self._cache_data and 'last_updated' in self._cache_data:
last_updated_str = self._cache_data['last_updated']
try:
last_updated = datetime.fromisoformat(last_updated_str)
age_minutes = (datetime.now() - last_updated).total_seconds() / 60
return {
'last_updated': last_updated,
'age_minutes': age_minutes,
'spaces_count': len(self._cache_data.get('spaces', {})),
'is_fresh': age_minutes < (CACHE_UPDATE_INTERVAL / 60) * 1.5
}
except:
pass
return {
'last_updated': None,
'age_minutes': float('inf'),
'spaces_count': 0,
'is_fresh': False
}
def force_update(self):
"""强制立即更新缓存"""
print("[SpaceActivityCache] Force updating cache...")
self._update_cache()
# 新增:本地防撞车机制
class LocalAllocationTracker:
def __init__(self):
self._recent_allocations = {} # {space_name: {'student_id': str, 'timestamp': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 600 # 清理间隔(秒)
self._allocation_ttl = 60 # 本地分配记录的生存时间(秒)
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台线程定期清理过期的本地分配记录"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[LocalAllocationTracker] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的本地分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._recent_allocations.items():
if (current_time - alloc_info['timestamp']).total_seconds() > self._allocation_ttl:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._recent_allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[LocalAllocationTracker] Cleaned up expired allocation: {space_name}")
def is_recently_allocated_locally(self, space_name):
"""检查空间是否在本地被最近分配过"""
with self._lock:
if space_name not in self._recent_allocations:
return False, None
alloc_info = self._recent_allocations[space_name]
current_time = datetime.now()
elapsed_seconds = (current_time - alloc_info['timestamp']).total_seconds()
if elapsed_seconds > self._allocation_ttl:
# 过期了,删除记录
del self._recent_allocations[space_name]
print(f"[LocalAllocationTracker] Expired local allocation removed: {space_name}")
return False, None
print(f"[LocalAllocationTracker] Space {space_name} recently allocated locally to {alloc_info['student_id']} ({elapsed_seconds:.1f}s ago)")
return True, alloc_info['student_id']
def record_local_allocation(self, space_name, student_id):
"""记录本地分配"""
with self._lock:
self._recent_allocations[space_name] = {
'student_id': student_id,
'timestamp': datetime.now()
}
print(f"[LocalAllocationTracker] Locally recorded allocation: {space_name} -> {student_id}")
def get_recent_allocations_summary(self):
"""获取最近本地分配的摘要(用于调试)"""
with self._lock:
current_time = datetime.now()
summary = []
for space_name, alloc_info in self._recent_allocations.items():
elapsed = (current_time - alloc_info['timestamp']).total_seconds()
summary.append(f"{space_name} -> {alloc_info['student_id']} ({elapsed:.1f}s ago)")
return summary
# 全局实例
data_manager = DataManager()
activity_cache = None
local_tracker = LocalAllocationTracker()
memory_allocation_store = MemoryAllocationStore()
def init_system():
"""初始化整个系统 - 只在启动时调用一次"""
global activity_cache
print("[init_system] Initializing load distributor system...")
# 初始化数据管理器
if not data_manager.initialize():
raise Exception("Failed to initialize data manager")
# 初始化活动缓存
activity_cache = SpaceActivityCache(data_manager)
print("[init_system] System initialization completed")
def analyze_space_activity_cached():
"""使用缓存的空间活动分析 - 完全使用内存数据"""
global activity_cache
if activity_cache is None:
raise Exception("Activity cache not initialized")
# 从数据管理器获取可用空间列表(不触发数据同步)
available_spaces = data_manager.get_available_spaces()
space_activity = []
current_time = datetime.now()
# 获取缓存信息
cache_info = activity_cache.get_cache_info()
print(f"[analyze_space_activity_cached] Using cache (age: {cache_info['age_minutes']:.1f} min, fresh: {cache_info['is_fresh']})")
# 获取所有空间的缓存活动数据
all_spaces_activity = activity_cache.get_all_spaces_activity()
# 从内存获取分配记录 - 不再读取文件
active_allocations = memory_allocation_store.get_active_allocations()
print(f"[analyze_space_activity_cached] Analyzing {len(available_spaces)} spaces using cached data...")
print(f"[analyze_space_activity_cached] Memory allocations: {len(active_allocations)} active")
for space_name in available_spaces:
# 从缓存获取活动信息
cached_info = all_spaces_activity.get(space_name)
if cached_info:
last_activity = cached_info['last_activity']
cached_status = cached_info['status']
else:
# 缓存中没有这个空间,可能是新空间
last_activity = datetime.min
cached_status = "not_in_cache"
# Calculate idle time in minutes
if last_activity == datetime.min or 'empty' in cached_status or 'not_found' in cached_status:
idle_minutes = float('inf') # Never used
status = "Never used"
last_activity_str = "Never"
else:
idle_minutes = (current_time - last_activity).total_seconds() / 60
status = f"Idle for {idle_minutes:.1f} minutes (cached)"
last_activity_str = last_activity.strftime('%Y-%m-%d %H:%M:%S')
# 检查内存中的分配记录
is_recently_allocated_memory = space_name in active_allocations
if is_recently_allocated_memory:
alloc_info = active_allocations[space_name]
minutes_until_free = (alloc_info['expires_at'] - current_time).total_seconds() / 60
status += f" (Allocated in memory to {alloc_info['student_id']}, free in {minutes_until_free:.1f} min)"
# Check if space is recently allocated (local)
is_recently_allocated_local, local_student = local_tracker.is_recently_allocated_locally(space_name)
if is_recently_allocated_local:
status += f" (Recently allocated locally to {local_student})"
space_activity.append({
'space_name': space_name,
'last_activity': last_activity,
'last_activity_str': last_activity_str,
'idle_minutes': idle_minutes,
'status': status,
'is_recently_allocated_memory': is_recently_allocated_memory,
'is_recently_allocated_local': is_recently_allocated_local,
'cached_status': cached_status
})
print(f"[analyze_space_activity_cached] {space_name}: {status}")
# Sort by idle time (most idle first)
space_activity.sort(key=lambda x: x['idle_minutes'], reverse=True)
return space_activity
def create_status_display(space_activity):
"""Create formatted status display for all spaces with proper line breaks"""
status_display = "📊 **Current Space Status (sorted by availability):**<br><br>"
# 显示内存分配记录摘要
memory_summary = memory_allocation_store.get_status_summary()
if memory_summary['active_count'] > 0:
status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>"
for alloc in memory_summary['summary']:
status_display += f"&nbsp;&nbsp;&nbsp;• {alloc}<br>"
status_display += "<br>"
# 显示本地分配记录摘要
local_summary = local_tracker.get_recent_allocations_summary()
if local_summary:
status_display += "🔒 **Recent Local Allocations:**<br>"
for alloc in local_summary:
status_display += f"&nbsp;&nbsp;&nbsp;• {alloc}<br>"
status_display += "<br>"
for i, space in enumerate(space_activity, 1):
status_display += f"{i}. **{space['space_name']}**<br>"
status_display += f"&nbsp;&nbsp;&nbsp;• Status: {space['status']}<br>"
status_display += f"&nbsp;&nbsp;&nbsp;• Last activity: {space['last_activity_str']}<br><br>"
return status_display
def select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id):
"""使用缓存的增强防撞空间选择函数 - 完全使用内存存储"""
print(f"[select_space_with_enhanced_collision_avoidance_cached] Starting selection for student: {student_id}")
# 第一步:过滤掉不符合基本条件的空间
basic_available_spaces = []
for space in space_activity:
# 检查基本条件 - 使用内存分配记录
if (space['idle_minutes'] >= MIN_IDLE_MINUTES and
not space['is_recently_allocated_memory'] and
not space['is_recently_allocated_local']):
basic_available_spaces.append(space)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Basic available spaces: {len(basic_available_spaces)}")
if not basic_available_spaces:
# 生成详细的错误信息
idle_spaces = [s for s in space_activity if s['idle_minutes'] >= MIN_IDLE_MINUTES]
memory_allocated = [s for s in space_activity if s['is_recently_allocated_memory']]
local_allocated = [s for s in space_activity if s['is_recently_allocated_local']]
error_parts = []
if not idle_spaces:
error_parts.append(f"all spaces used within {MIN_IDLE_MINUTES} minutes")
if memory_allocated:
error_parts.append(f"{len(memory_allocated)} spaces allocated in memory")
if local_allocated:
error_parts.append(f"{len(local_allocated)} spaces locally allocated")
error_msg = (
f"🚫 **All learning assistants are currently busy**\n\n"
f"Blocking conditions: {', '.join(error_parts)}\n\n"
f"**Please try again in 1-2 minutes.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No available spaces: {error_msg}")
raise gr.Error(error_msg, duration=10)
# 第二步:选择最优空间并进行最终验证
selected_space = basic_available_spaces[0] # 已经按idle_time排序
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Preliminary selection: {space_name}")
# 第三步:最终防撞车检查 - 再次验证本地分配状态
is_local_conflict, conflicting_student = local_tracker.is_recently_allocated_locally(space_name)
if is_local_conflict:
print(f"[select_space_with_enhanced_collision_avoidance_cached] COLLISION DETECTED! {space_name} recently allocated to {conflicting_student}")
# 寻找替代空间
alternative_spaces = [s for s in basic_available_spaces[1:]
if not local_tracker.is_recently_allocated_locally(s['space_name'])[0]]
if alternative_spaces:
selected_space = alternative_spaces[0]
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Using alternative space: {space_name}")
else:
error_msg = (
f"🚫 **Collision detected and no alternatives available**\n\n"
f"The system detected a potential conflict with another student's allocation.\n\n"
f"**Please try again in 10-15 seconds.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No alternatives available")
raise gr.Error(error_msg, duration=8)
# 第四步:立即记录本地分配(在写入内存之前)
local_tracker.record_local_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Local allocation recorded BEFORE memory write")
# 第五步:记录到内存存储 - 不再写入文件或推送到远程
memory_allocation_store.add_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Allocation recorded in memory only")
# 第六步:生成结果(使用带缓存信息的状态显示)
status_display = create_status_display_with_cache_info(space_activity)
redirect_url = f"https://huggingface.co/spaces/CIV3283/{space_name}/?check={student_id}"
print(f"[select_space_with_enhanced_collision_avoidance_cached] Final allocation: {space_name} -> {student_id}")
return redirect_to_space(redirect_url, selected_space, status_display)
def redirect_to_space(redirect_url, selected_space, status_display):
"""Display redirect information with manual click option"""
if selected_space['idle_minutes'] == float('inf'):
idle_info = "Never used (completely fresh)"
else:
idle_info = f"{selected_space['idle_minutes']:.1f} minutes"
# Modified HTML structure - Only Access section, removed analysis section
redirect_html = f"""
<div style="max-width: 900px; margin: 0 auto; padding: 20px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;">
<div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #28a745, #20c997); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);">
<h1 style="margin: 0 0 15px 0; font-size: 28px;">🎯 Learning Assistant Assigned</h1>
<h2 style="margin: 0 0 10px 0; font-weight: normal; font-size: 24px;">{selected_space['space_name']}</h2>
<p style="margin: 0; font-size: 18px; opacity: 0.9;">
✨ This space was idle for: <strong>{idle_info}</strong>
</p>
</div>
<div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #2196f3, #1976d2); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(33,150,243,0.3);">
<h2 style="margin-top: 0; color: white;">🚀 Access Your Learning Assistant</h2>
<p style="color: rgba(255,255,255,0.9); font-size: 16px; margin-bottom: 25px;">
Click the button below to access your assigned learning assistant.
</p>
<a href="{redirect_url}"
target="_blank"
style="display: inline-block; background: rgba(255,255,255,0.15); color: white;
padding: 15px 30px; font-size: 18px; font-weight: bold; text-decoration: none;
border-radius: 25px; border: 2px solid rgba(255,255,255,0.3);
transition: all 0.3s ease; margin-bottom: 20px;"
onmouseover="this.style.background='rgba(255,255,255,0.25)'; this.style.transform='translateY(-2px)'"
onmouseout="this.style.background='rgba(255,255,255,0.15)'; this.style.transform='translateY(0px)'">
➤ Open Learning Assistant
</a>
<p style="margin: 15px 0 0 0; color: rgba(255,255,255,0.8); font-size: 14px;">
💡 Left-click the button above or right-click it and select "Open in new tab"
</p>
</div>
<div style="text-align: center; padding: 20px; background: #f1f3f4; border-radius: 8px; margin-top: 20px;">
<p style="margin: 0; color: #5f6368; font-size: 14px;">
🔄 Need a different assistant? <a href="javascript:location.reload()" style="color: #1976d2; text-decoration: none;">Refresh this page</a> to get reassigned.
</p>
</div>
</div>
"""
return gr.HTML(redirect_html)
def load_balance_user_cached(student_id):
"""使用缓存的负载均衡函数 - 完全使用内存存储"""
print(f"[load_balance_user_cached] Starting cached load balancing for student ID: {student_id}")
# 检查系统是否已初始化
if not data_manager.initialized:
raise gr.Error("🚫 System not properly initialized. Please contact administrator.", duration=8)
# 使用缓存版本的分析函数(不触发数据同步)
space_activity = analyze_space_activity_cached()
# Select space with enhanced collision avoidance
return select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id)
def get_url_params(request: gr.Request):
"""Extract URL parameters from request"""
query_params = dict(request.query_params)
check_id = query_params.get('check', None)
if check_id:
return f"Load Distributor", check_id
else:
return "Load Distributor", None
def create_status_display_with_cache_info(space_activity):
"""创建包含缓存信息的状态显示"""
global activity_cache
# 获取缓存状态
cache_info = activity_cache.get_cache_info() if activity_cache else None
status_display = "📊 **Current Space Status (sorted by availability):**<br><br>"
# 显示缓存信息
if cache_info:
if cache_info['is_fresh']:
cache_status = f"✅ Fresh (updated {cache_info['age_minutes']:.1f} min ago)"
else:
cache_status = f"⚠️ Stale (updated {cache_info['age_minutes']:.1f} min ago)"
status_display += f"🔄 **Cache Status:** {cache_status}<br>"
status_display += f"📋 **Cached Spaces:** {cache_info['spaces_count']}<br><br>"
# 显示内存分配记录摘要
memory_summary = memory_allocation_store.get_status_summary()
if memory_summary['active_count'] > 0:
status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>"
for alloc in memory_summary['summary']:
status_display += f"&nbsp;&nbsp;&nbsp;• {alloc}<br>"
status_display += "<br>"
# 显示本地分配记录摘要
local_summary = local_tracker.get_recent_allocations_summary()
if local_summary:
status_display += "🔒 **Recent Local Allocations:**<br>"
for alloc in local_summary:
status_display += f"&nbsp;&nbsp;&nbsp;• {alloc}<br>"
status_display += "<br>"
for i, space in enumerate(space_activity, 1):
status_display += f"{i}. **{space['space_name']}**<br>"
status_display += f"&nbsp;&nbsp;&nbsp;• Status: {space['status']}<br>"
status_display += f"&nbsp;&nbsp;&nbsp;• Last activity: {space['last_activity_str']}<br>"
# 显示缓存状态(可选)
if 'cached_status' in space:
status_display += f"&nbsp;&nbsp;&nbsp;• Cache: {space['cached_status']}<br>"
status_display += "<br>"
return status_display
def handle_user_access_cached(request: gr.Request):
"""使用缓存的用户访问处理"""
title, check_id = get_url_params(request)
if not check_id:
# No student ID provided
error_html = """
<div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center;
background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 12px;">
<h2 style="color: #856404;">⚠️ Invalid Access</h2>
<p style="color: #856404; font-size: 16px; line-height: 1.6;">
This load distributor requires a valid student ID parameter.<br><br>
<strong>Please access this system through the official link provided in Moodle.</strong>
</p>
</div>
"""
return title, gr.HTML(error_html)
# Valid student ID - perform cached load balancing
try:
result = load_balance_user_cached(check_id)
return title, result
except Exception as e:
# Handle any errors during load balancing
error_html = f"""
<div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center;
background: #f8d7da; border: 1px solid #f5c6cb; border-radius: 12px;">
<h2 style="color: #721c24;">🚫 Load Balancing Error</h2>
<p style="color: #721c24; font-size: 16px; line-height: 1.6;">
{str(e)}<br><br>
Please try again in a few moments or contact your instructor if the problem persists.
</p>
</div>
"""
return title, gr.HTML(error_html)
# 管理功能
def get_cache_status():
"""获取缓存状态(用于调试或管理)"""
global activity_cache
if activity_cache:
return activity_cache.get_cache_info()
return {"error": "Cache not initialized"}
def force_cache_update():
"""强制更新缓存(用于调试或管理)"""
global activity_cache
if activity_cache:
activity_cache.force_update()
return {"status": "Cache updated"}
return {"error": "Cache not initialized"}
def get_data_manager_status():
"""获取数据管理器状态"""
return {
"initialized": data_manager.initialized,
"last_sync": data_manager.last_data_sync.isoformat() if data_manager.last_data_sync else None,
"spaces_count": len(data_manager.available_spaces),
"repo_connected": data_manager.repo is not None
}
def force_data_sync():
"""强制数据同步"""
try:
data_manager._sync_data()
return {"status": "Data sync completed"}
except Exception as e:
return {"error": str(e)}
def get_memory_allocation_status():
"""获取内存分配状态"""
return memory_allocation_store.get_status_summary()
def clear_memory_allocations():
"""清除所有内存分配记录(管理功能)"""
try:
with memory_allocation_store._lock:
cleared_count = len(memory_allocation_store._allocations)
memory_allocation_store._allocations.clear()
return {"status": f"Cleared {cleared_count} memory allocations"}
except Exception as e:
return {"error": str(e)}
# 添加可选的管理界面(用于调试)
def create_admin_interface():
"""创建管理界面(可选)- 增强版本"""
with gr.Blocks(title="CIV3283 Load Distributor - Admin") as admin_interface:
gr.Markdown("# 🔧 CIV3283 Load Distributor - Admin Panel")
with gr.Tab("Cache Management"):
with gr.Row():
cache_status_btn = gr.Button("📊 Check Cache Status", variant="secondary")
force_cache_update_btn = gr.Button("🔄 Force Cache Update", variant="primary")
cache_info_display = gr.JSON(label="Cache Information")
cache_status_message = gr.Markdown("")
with gr.Tab("Data Management"):
with gr.Row():
data_status_btn = gr.Button("📊 Check Data Manager Status", variant="secondary")
force_data_sync_btn = gr.Button("🔄 Force Data Sync", variant="primary")
data_info_display = gr.JSON(label="Data Manager Information")
data_status_message = gr.Markdown("")
with gr.Tab("Memory Allocation"):
with gr.Row():
memory_status_btn = gr.Button("🧠 Check Memory Allocations", variant="secondary")
clear_memory_btn = gr.Button("🗑️ Clear All Memory Allocations", variant="stop")
memory_info_display = gr.JSON(label="Memory Allocation Information")
memory_status_message = gr.Markdown("")
def check_cache_status():
try:
status = get_cache_status()
if 'error' in status:
return status, "❌ Cache not available"
else:
age_min = status.get('age_minutes', 0)
spaces_count = status.get('spaces_count', 0)
is_fresh = status.get('is_fresh', False)
if is_fresh:
message = f"✅ Cache is fresh ({age_min:.1f} min old, {spaces_count} spaces)"
else:
message = f"⚠️ Cache is stale ({age_min:.1f} min old, {spaces_count} spaces)"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking cache: {str(e)}"
def force_update():
try:
result = force_cache_update()
if 'error' in result:
return result, "❌ Failed to update cache"
else:
# 获取更新后的状态
status = get_cache_status()
return status, "✅ Cache updated successfully"
except Exception as e:
return {"error": str(e)}, f"❌ Error updating cache: {str(e)}"
def check_data_status():
try:
status = get_data_manager_status()
if status['initialized']:
last_sync = status['last_sync']
if last_sync:
sync_time = datetime.fromisoformat(last_sync)
age_minutes = (datetime.now() - sync_time).total_seconds() / 60
message = f"✅ Data manager active (last sync: {age_minutes:.1f} min ago, {status['spaces_count']} spaces)"
else:
message = "⚠️ Data manager initialized but no sync recorded"
else:
message = "❌ Data manager not initialized"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking data manager: {str(e)}"
def force_sync():
try:
result = force_data_sync()
if 'error' in result:
return result, "❌ Failed to sync data"
else:
# 获取更新后的状态
status = get_data_manager_status()
return status, "✅ Data sync completed successfully"
except Exception as e:
return {"error": str(e)}, f"❌ Error syncing data: {str(e)}"
def check_memory_status():
try:
status = get_memory_allocation_status()
active_count = status['active_count']
total_stored = status['total_stored']
if active_count > 0:
message = f"🧠 {active_count} active allocations ({total_stored} total in memory)"
else:
message = "✅ No active memory allocations"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking memory allocations: {str(e)}"
def clear_memory():
try:
result = clear_memory_allocations()
if 'error' in result:
return result, "❌ Failed to clear memory allocations"
else:
# 获取更新后的状态
status = get_memory_allocation_status()
return status, f"✅ {result['status']}"
except Exception as e:
return {"error": str(e)}, f"❌ Error clearing memory: {str(e)}"
# Cache tab event handlers
cache_status_btn.click(
fn=check_cache_status,
outputs=[cache_info_display, cache_status_message]
)
force_cache_update_btn.click(
fn=force_update,
outputs=[cache_info_display, cache_status_message]
)
# Data tab event handlers
data_status_btn.click(
fn=check_data_status,
outputs=[data_info_display, data_status_message]
)
force_data_sync_btn.click(
fn=force_sync,
outputs=[data_info_display, data_status_message]
)
# Memory tab event handlers
memory_status_btn.click(
fn=check_memory_status,
outputs=[memory_info_display, memory_status_message]
)
clear_memory_btn.click(
fn=clear_memory,
outputs=[memory_info_display, memory_status_message]
)
return admin_interface
# 修改后的主Gradio界面
def create_main_interface():
"""创建主要的用户界面"""
with gr.Blocks(title="CIV3283 Load Distributor") as interface:
title_display = gr.Markdown("# 🔄 CIV3283 Learning Assistant Load Distributor", elem_id="title")
content_display = gr.HTML("")
# 主要的负载均衡逻辑 - 页面加载时执行
interface.load(
fn=handle_user_access_cached,
outputs=[title_display, content_display]
)
return interface
# 修改后的main函数
if __name__ == "__main__":
import sys
try:
# 系统初始化 - 只在启动时执行一次
print("🚀 Initializing CIV3283 Load Distributor System...")
init_system()
print("✅ System initialization completed successfully")
print("🧠 Using pure memory storage for allocations (no file I/O during user access)")
# 检查是否需要管理界面
enable_admin = "--admin" in sys.argv or os.environ.get("ENABLE_ADMIN", "false").lower() == "true"
if enable_admin:
print("🔧 Starting with admin interface enabled")
# 创建带管理界面的组合界面
main_interface = create_main_interface()
admin_interface = create_admin_interface()
# 使用TabbedInterface组合两个界面
combined_interface = gr.TabbedInterface(
[main_interface, admin_interface],
["🎯 Load Distributor", "🔧 Admin Panel"],
title="CIV3283 Load Distributor System"
)
combined_interface.launch()
else:
print("🎯 Starting main interface only")
# 只启动主界面
main_interface = create_main_interface()
main_interface.launch()
except Exception as e:
print(f"❌ System initialization failed: {e}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
raise