import gradio as gr
import os
import csv
from datetime import datetime, timedelta
from huggingface_hub import Repository
import threading
import time
import json
# Configuration
DATA_STORAGE_REPO = "CIV3283/Data_Storage"
DATA_BRANCH_NAME = "data_branch"
LOCAL_DATA_DIR = "temp_data_storage"
MIN_IDLE_MINUTES = 20 # Minimum idle time required for space assignment
ALLOCATION_LOCK_DURATION = 10 # Lock duration in minutes
# 缓存配置 - 全部改为内存存储
CACHE_UPDATE_INTERVAL = 300 # 10分钟 = 600秒
DATA_SYNC_INTERVAL = 300 # 数据同步间隔,10分钟 = 600秒
CACHE_LOCK = threading.Lock()
ALLOCATION_LOCK = threading.Lock()
# 全局内存存储
class MemoryAllocationStore:
"""内存中的分配记录存储"""
def __init__(self):
self._allocations = {} # {space_name: {'student_id': str, 'allocated_time': datetime, 'expires_at': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 60 # 每分钟清理一次过期记录
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台清理过期分配记录的线程"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[MemoryAllocationStore] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] <= current_time:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[MemoryAllocationStore] Cleaned up expired allocation: {space_name}")
def add_allocation(self, space_name, student_id):
"""添加新的分配记录"""
with self._lock:
current_time = datetime.now()
expires_at = current_time + timedelta(minutes=ALLOCATION_LOCK_DURATION)
self._allocations[space_name] = {
'student_id': student_id,
'allocated_time': current_time,
'expires_at': expires_at
}
print(f"[MemoryAllocationStore] Added allocation: {space_name} -> {student_id} (expires at {expires_at.strftime('%H:%M:%S')})")
def get_active_allocations(self):
"""��取所有有效的分配记录"""
with self._lock:
current_time = datetime.now()
active_allocations = {}
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_allocations[space_name] = alloc_info.copy()
return active_allocations
def is_allocated(self, space_name):
"""检查指定空间是否被分配"""
with self._lock:
if space_name not in self._allocations:
return False, None
alloc_info = self._allocations[space_name]
current_time = datetime.now()
if alloc_info['expires_at'] > current_time:
return True, alloc_info['student_id']
else:
# 过期了,删除记录
del self._allocations[space_name]
return False, None
def get_status_summary(self):
"""获取分配状态摘要"""
with self._lock:
current_time = datetime.now()
active_count = 0
summary = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_count += 1
remaining_minutes = (alloc_info['expires_at'] - current_time).total_seconds() / 60
summary.append(f"{space_name} -> {alloc_info['student_id']} ({remaining_minutes:.1f}min left)")
return {
'active_count': active_count,
'total_stored': len(self._allocations),
'summary': summary
}
# 全局数据管理器
class DataManager:
def __init__(self):
self.repo = None
self.available_spaces = []
self.last_data_sync = None
self._sync_lock = threading.Lock()
self._sync_thread = None
self._stop_event = threading.Event()
self.initialized = False
def initialize(self):
"""初始化数据管理器 - 只在启动时调用一次"""
if self.initialized:
return True
try:
print("[DataManager] Initializing data manager...")
# 初始化仓库连接
self.repo = Repository(
local_dir=LOCAL_DATA_DIR,
clone_from=DATA_STORAGE_REPO,
revision=DATA_BRANCH_NAME,
repo_type="space",
use_auth_token=os.environ.get("HF_HUB_TOKEN")
)
# 配置git用户
self.repo.git_config_username_and_email("git_user", f"load_distributor")
self.repo.git_config_username_and_email("git_email", f"loaddistributor@takeiteasy.space")
# 初始数据同步
self._sync_data()
# 启动后台同步线程
self._start_background_sync()
self.initialized = True
print("[DataManager] Data manager initialized successfully")
return True
except Exception as e:
print(f"[DataManager] Initialization failed: {e}")
return False
def _start_background_sync(self):
"""启动后台数据同步线程"""
if self._sync_thread and self._sync_thread.is_alive():
return
self._stop_event.clear()
self._sync_thread = threading.Thread(target=self._background_sync_worker, daemon=True)
self._sync_thread.start()
print("[DataManager] Background sync thread started")
def _background_sync_worker(self):
"""后台同步工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=DATA_SYNC_INTERVAL):
break
print("[DataManager] Starting scheduled data sync...")
self._sync_data()
print("[DataManager] Scheduled data sync completed")
except Exception as e:
print(f"[DataManager] Error in background sync: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _sync_data(self):
"""同步数据 - 拉取最新数据并更新可用空间列表"""
with self._sync_lock:
try:
start_time = time.time()
print("[DataManager] Syncing data from remote repository...")
# 拉取最新数据
self.repo.git_pull(rebase=True)
# 更新可用空间列表
self.available_spaces = self._get_available_spaces()
self.last_data_sync = datetime.now()
elapsed_time = time.time() - start_time
print(f"[DataManager] Data sync completed in {elapsed_time:.2f}s, found {len(self.available_spaces)} spaces")
except Exception as e:
print(f"[DataManager] Error syncing data: {e}")
def _get_available_spaces(self):
"""获取可用空间列表"""
available_spaces = set()
try:
for filename in os.listdir(LOCAL_DATA_DIR):
if filename.endswith('_query_log.csv') and '_Student_' in filename:
space_name = filename.replace('_query_log.csv', '')
available_spaces.add(space_name)
return sorted(list(available_spaces))
except Exception as e:
print(f"[DataManager] Error getting available spaces: {e}")
return []
def get_available_spaces(self):
"""获取可用空间列表 - 不触发数据同步"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.available_spaces.copy()
def get_repo(self):
"""获取仓库对象"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.repo
def get_repo_dir(self):
"""获取仓库本地目录"""
if not self.initialized:
raise Exception("DataManager not initialized")
return LOCAL_DATA_DIR
def stop(self):
"""停止后台同步"""
if self._sync_thread:
self._stop_event.set()
print("[DataManager] Background sync thread stopping...")
class SpaceActivityCache:
"""内存中的空间活动缓存"""
def __init__(self, data_manager):
self.data_manager = data_manager
self._cache_data = {}
self._last_update = None
self._update_thread = None
self._stop_event = threading.Event()
# 启动时立即更新一次缓存
self._update_cache()
# 启动后台更新线程
self.start_background_updates()
def start_background_updates(self):
"""启动后台缓存更新线程"""
if self._update_thread and self._update_thread.is_alive():
return
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._background_update_worker, daemon=True)
self._update_thread.start()
print("[SpaceActivityCache] Background update thread started")
def stop_background_updates(self):
"""停止后台更新线程"""
if self._update_thread:
self._stop_event.set()
print("[SpaceActivityCache] Background update thread stopping...")
def _background_update_worker(self):
"""后台更新工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=CACHE_UPDATE_INTERVAL):
break
print("[SpaceActivityCache] Starting scheduled cache update...")
self._update_cache()
print("[SpaceActivityCache] Scheduled cache update completed")
except Exception as e:
print(f"[SpaceActivityCache] Error in background update: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _update_cache(self):
"""更新缓存数据"""
try:
print("[SpaceActivityCache] Updating activity cache...")
start_time = time.time()
# 从数据管理器获取可用空间(不触发数据同步)
available_spaces = self.data_manager.get_available_spaces()
repo_dir = self.data_manager.get_repo_dir()
new_cache_data = {
'last_updated': datetime.now().isoformat(),
'update_interval_seconds': CACHE_UPDATE_INTERVAL,
'spaces': {}
}
# 为每个空间读取最后活动时间
for space_name in available_spaces:
csv_file = os.path.join(repo_dir, f"{space_name}_query_log.csv")
last_activity, status = self._get_last_activity_from_file(csv_file)
new_cache_data['spaces'][space_name] = {
'last_activity': last_activity.isoformat() if last_activity != datetime.min else None,
'status': status,
'cache_update_time': datetime.now().isoformat()
}
print(f"[SpaceActivityCache] {space_name}: {status}")
# 线程安全地更新缓存
with CACHE_LOCK:
self._cache_data = new_cache_data
self._last_update = datetime.now()
elapsed_time = time.time() - start_time
print(f"[SpaceActivityCache] Cache updated in {elapsed_time:.2f}s, {len(available_spaces)} spaces processed")
except Exception as e:
print(f"[SpaceActivityCache] Error updating cache: {e}")
import traceback
print(f"[SpaceActivityCache] Traceback: {traceback.format_exc()}")
def _get_last_activity_from_file(self, csv_file_path):
"""从文件读取最后活动时间"""
try:
if not os.path.exists(csv_file_path):
return datetime.min, "file_not_found"
# 检查文件大小
file_size = os.path.getsize(csv_file_path)
if file_size <= 100:
return datetime.min, "empty_or_header_only"
# 使用CSV reader读取最后一行
with open(csv_file_path, 'r', encoding='utf-8') as f:
csv_reader = csv.reader(f)
# 跳过header
try:
header = next(csv_reader)
except StopIteration:
return datetime.min, "empty_file"
# 读取所有行,获取最后一行
rows = []
try:
for row in csv_reader:
if row: # 跳过空行
rows.append(row)
except Exception as csv_error:
print(f"[SpaceActivityCache] CSV parsing error for {csv_file_path}: {csv_error}")
return datetime.min, "csv_parse_error"
if not rows:
return datetime.min, "no_data_rows"
# 解析最后一行的时间戳
last_row = rows[-1]
if len(last_row) >= 3:
timestamp_str = last_row[2].strip() # timestamp column
try:
parsed_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
return parsed_time, f"active_last_at_{parsed_time.strftime('%Y-%m-%d_%H:%M:%S')}"
except ValueError as ve:
print(f"[SpaceActivityCache] Date parsing error for '{timestamp_str}': {ve}")
return datetime.min, "date_parse_error"
else:
return datetime.min, "invalid_row_format"
except Exception as e:
print(f"[SpaceActivityCache] Error reading {csv_file_path}: {e}")
return datetime.min, "read_error"
def get_space_activity(self, space_name):
"""获取指定空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
spaces_data = self._cache_data.get('spaces', {})
space_info = spaces_data.get(space_name, {})
if not space_info:
return datetime.min, "not_in_cache"
# 解析时间戳
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
status = space_info.get('status', 'unknown')
return last_activity, status
def get_all_spaces_activity(self):
"""获取所有空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
result = {}
spaces_data = self._cache_data.get('spaces', {})
for space_name, space_info in spaces_data.items():
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
result[space_name] = {
'last_activity': last_activity,
'status': space_info.get('status', 'unknown')
}
return result
def get_cache_info(self):
"""获取缓存状态信息"""
with CACHE_LOCK:
if self._cache_data and 'last_updated' in self._cache_data:
last_updated_str = self._cache_data['last_updated']
try:
last_updated = datetime.fromisoformat(last_updated_str)
age_minutes = (datetime.now() - last_updated).total_seconds() / 60
return {
'last_updated': last_updated,
'age_minutes': age_minutes,
'spaces_count': len(self._cache_data.get('spaces', {})),
'is_fresh': age_minutes < (CACHE_UPDATE_INTERVAL / 60) * 1.5
}
except:
pass
return {
'last_updated': None,
'age_minutes': float('inf'),
'spaces_count': 0,
'is_fresh': False
}
def force_update(self):
"""强制立即更新缓存"""
print("[SpaceActivityCache] Force updating cache...")
self._update_cache()
# 新增:本地防撞车机制
class LocalAllocationTracker:
def __init__(self):
self._recent_allocations = {} # {space_name: {'student_id': str, 'timestamp': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 600 # 清理间隔(秒)
self._allocation_ttl = 60 # 本地分配记录的生存时间(秒)
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台线程定期清理过期的本地分配记录"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[LocalAllocationTracker] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的本地分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._recent_allocations.items():
if (current_time - alloc_info['timestamp']).total_seconds() > self._allocation_ttl:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._recent_allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[LocalAllocationTracker] Cleaned up expired allocation: {space_name}")
def is_recently_allocated_locally(self, space_name):
"""检查空间是否在本地被最近分配过"""
with self._lock:
if space_name not in self._recent_allocations:
return False, None
alloc_info = self._recent_allocations[space_name]
current_time = datetime.now()
elapsed_seconds = (current_time - alloc_info['timestamp']).total_seconds()
if elapsed_seconds > self._allocation_ttl:
# 过期了,删除记录
del self._recent_allocations[space_name]
print(f"[LocalAllocationTracker] Expired local allocation removed: {space_name}")
return False, None
print(f"[LocalAllocationTracker] Space {space_name} recently allocated locally to {alloc_info['student_id']} ({elapsed_seconds:.1f}s ago)")
return True, alloc_info['student_id']
def record_local_allocation(self, space_name, student_id):
"""记录本地分配"""
with self._lock:
self._recent_allocations[space_name] = {
'student_id': student_id,
'timestamp': datetime.now()
}
print(f"[LocalAllocationTracker] Locally recorded allocation: {space_name} -> {student_id}")
def get_recent_allocations_summary(self):
"""获取最近本地分配的摘要(用于调试)"""
with self._lock:
current_time = datetime.now()
summary = []
for space_name, alloc_info in self._recent_allocations.items():
elapsed = (current_time - alloc_info['timestamp']).total_seconds()
summary.append(f"{space_name} -> {alloc_info['student_id']} ({elapsed:.1f}s ago)")
return summary
# 全局实例
data_manager = DataManager()
activity_cache = None
local_tracker = LocalAllocationTracker()
memory_allocation_store = MemoryAllocationStore()
def init_system():
"""初始化整个系统 - 只在启动时调用一次"""
global activity_cache
print("[init_system] Initializing load distributor system...")
# 初始化数据管理器
if not data_manager.initialize():
raise Exception("Failed to initialize data manager")
# 初始化活动缓存
activity_cache = SpaceActivityCache(data_manager)
print("[init_system] System initialization completed")
def analyze_space_activity_cached():
"""使用缓存的空间活动分析 - 完全使用内存数据"""
global activity_cache
if activity_cache is None:
raise Exception("Activity cache not initialized")
# 从数据管理器获取可用空间列表(不触发数据同步)
available_spaces = data_manager.get_available_spaces()
space_activity = []
current_time = datetime.now()
# 获取缓存信息
cache_info = activity_cache.get_cache_info()
print(f"[analyze_space_activity_cached] Using cache (age: {cache_info['age_minutes']:.1f} min, fresh: {cache_info['is_fresh']})")
# 获取所有空间的缓存活动数据
all_spaces_activity = activity_cache.get_all_spaces_activity()
# 从内存获取分配记录 - 不再读取文件
active_allocations = memory_allocation_store.get_active_allocations()
print(f"[analyze_space_activity_cached] Analyzing {len(available_spaces)} spaces using cached data...")
print(f"[analyze_space_activity_cached] Memory allocations: {len(active_allocations)} active")
for space_name in available_spaces:
# 从缓存获取活动信息
cached_info = all_spaces_activity.get(space_name)
if cached_info:
last_activity = cached_info['last_activity']
cached_status = cached_info['status']
else:
# 缓存中没有这个空间,可能是新空间
last_activity = datetime.min
cached_status = "not_in_cache"
# Calculate idle time in minutes
if last_activity == datetime.min or 'empty' in cached_status or 'not_found' in cached_status:
idle_minutes = float('inf') # Never used
status = "Never used"
last_activity_str = "Never"
else:
idle_minutes = (current_time - last_activity).total_seconds() / 60
status = f"Idle for {idle_minutes:.1f} minutes (cached)"
last_activity_str = last_activity.strftime('%Y-%m-%d %H:%M:%S')
# 检查内存中的分配记录
is_recently_allocated_memory = space_name in active_allocations
if is_recently_allocated_memory:
alloc_info = active_allocations[space_name]
minutes_until_free = (alloc_info['expires_at'] - current_time).total_seconds() / 60
status += f" (Allocated in memory to {alloc_info['student_id']}, free in {minutes_until_free:.1f} min)"
# Check if space is recently allocated (local)
is_recently_allocated_local, local_student = local_tracker.is_recently_allocated_locally(space_name)
if is_recently_allocated_local:
status += f" (Recently allocated locally to {local_student})"
space_activity.append({
'space_name': space_name,
'last_activity': last_activity,
'last_activity_str': last_activity_str,
'idle_minutes': idle_minutes,
'status': status,
'is_recently_allocated_memory': is_recently_allocated_memory,
'is_recently_allocated_local': is_recently_allocated_local,
'cached_status': cached_status
})
print(f"[analyze_space_activity_cached] {space_name}: {status}")
# Sort by idle time (most idle first)
space_activity.sort(key=lambda x: x['idle_minutes'], reverse=True)
return space_activity
def create_status_display(space_activity):
"""Create formatted status display for all spaces with proper line breaks"""
status_display = "📊 **Current Space Status (sorted by availability):**
"
# 显示内存分配记录摘要
memory_summary = memory_allocation_store.get_status_summary()
if memory_summary['active_count'] > 0:
status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**
"
for alloc in memory_summary['summary']:
status_display += f" • {alloc}
"
status_display += "
"
# 显示本地分配记录摘要
local_summary = local_tracker.get_recent_allocations_summary()
if local_summary:
status_display += "🔒 **Recent Local Allocations:**
"
for alloc in local_summary:
status_display += f" • {alloc}
"
status_display += "
"
for i, space in enumerate(space_activity, 1):
status_display += f"{i}. **{space['space_name']}**
"
status_display += f" • Status: {space['status']}
"
status_display += f" • Last activity: {space['last_activity_str']}
"
return status_display
def select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id):
"""使用缓存的增强防撞空间选择函数 - 完全使用内存存储"""
print(f"[select_space_with_enhanced_collision_avoidance_cached] Starting selection for student: {student_id}")
# 第一步:过滤掉不符合基本条件的空间
basic_available_spaces = []
for space in space_activity:
# 检查基本条件 - 使用内存分配记录
if (space['idle_minutes'] >= MIN_IDLE_MINUTES and
not space['is_recently_allocated_memory'] and
not space['is_recently_allocated_local']):
basic_available_spaces.append(space)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Basic available spaces: {len(basic_available_spaces)}")
if not basic_available_spaces:
# 生成详细的错误信息
idle_spaces = [s for s in space_activity if s['idle_minutes'] >= MIN_IDLE_MINUTES]
memory_allocated = [s for s in space_activity if s['is_recently_allocated_memory']]
local_allocated = [s for s in space_activity if s['is_recently_allocated_local']]
error_parts = []
if not idle_spaces:
error_parts.append(f"all spaces used within {MIN_IDLE_MINUTES} minutes")
if memory_allocated:
error_parts.append(f"{len(memory_allocated)} spaces allocated in memory")
if local_allocated:
error_parts.append(f"{len(local_allocated)} spaces locally allocated")
error_msg = (
f"🚫 **All learning assistants are currently busy**\n\n"
f"Blocking conditions: {', '.join(error_parts)}\n\n"
f"**Please try again in 1-2 minutes.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No available spaces: {error_msg}")
raise gr.Error(error_msg, duration=10)
# 第二步:选择最优空间并进行最终验证
selected_space = basic_available_spaces[0] # 已经按idle_time排序
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Preliminary selection: {space_name}")
# 第三步:最终防撞车检查 - 再次验证本地分配状态
is_local_conflict, conflicting_student = local_tracker.is_recently_allocated_locally(space_name)
if is_local_conflict:
print(f"[select_space_with_enhanced_collision_avoidance_cached] COLLISION DETECTED! {space_name} recently allocated to {conflicting_student}")
# 寻找替代空间
alternative_spaces = [s for s in basic_available_spaces[1:]
if not local_tracker.is_recently_allocated_locally(s['space_name'])[0]]
if alternative_spaces:
selected_space = alternative_spaces[0]
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Using alternative space: {space_name}")
else:
error_msg = (
f"🚫 **Collision detected and no alternatives available**\n\n"
f"The system detected a potential conflict with another student's allocation.\n\n"
f"**Please try again in 10-15 seconds.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No alternatives available")
raise gr.Error(error_msg, duration=8)
# 第四步:立即记录本地分配(在写入内存之前)
local_tracker.record_local_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Local allocation recorded BEFORE memory write")
# 第五步:记录到内存存储 - 不再写入文件或推送到远程
memory_allocation_store.add_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Allocation recorded in memory only")
# 第六步:生成结果(使用带缓存信息的状态显示)
status_display = create_status_display_with_cache_info(space_activity)
redirect_url = f"https://huggingface.co/spaces/CIV3283/{space_name}/?check={student_id}"
print(f"[select_space_with_enhanced_collision_avoidance_cached] Final allocation: {space_name} -> {student_id}")
return redirect_to_space(redirect_url, selected_space, status_display)
def redirect_to_space(redirect_url, selected_space, status_display):
"""Display redirect information with manual click option"""
if selected_space['idle_minutes'] == float('inf'):
idle_info = "Never used (completely fresh)"
else:
idle_info = f"{selected_space['idle_minutes']:.1f} minutes"
# Modified HTML structure - Only Access section, removed analysis section
redirect_html = f"""
✨ This space was idle for: {idle_info}
Click the button below to access your assigned learning assistant.
➤ Open Learning Assistant💡 Left-click the button above or right-click it and select "Open in new tab"
🔄 Need a different assistant? Refresh this page to get reassigned.
This load distributor requires a valid student ID parameter.
Please access this system through the official link provided in Moodle.
{str(e)}
Please try again in a few moments or contact your instructor if the problem persists.