Spaces:
Sleeping
Sleeping
File size: 50,085 Bytes
11f6214 b48f33d 80381f0 ddb0df6 11f6214 80381f0 11f6214 80381f0 404bee7 80381f0 404bee7 80381f0 a8521d6 80381f0 8c1efcb 80381f0 8c1efcb 80381f0 8c1efcb 80381f0 8c1efcb 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 55c78cd 98b541b 80381f0 98b541b 80381f0 b48f33d 98b541b 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 98b541b 80381f0 6b97fd2 80381f0 6b97fd2 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 80381f0 6b97fd2 80381f0 6b97fd2 80381f0 6b97fd2 80381f0 6b97fd2 80381f0 ca72c84 80381f0 b48f33d 80381f0 11f6214 80381f0 11f6214 80381f0 11f6214 4d5ff77 d40921f 80381f0 11f6214 9f24cf6 80381f0 9f24cf6 80381f0 b48f33d 80381f0 b48f33d 80381f0 cefcd1e 80381f0 8c1efcb 80381f0 b48f33d 80381f0 8c1efcb 80381f0 cefcd1e 80381f0 cefcd1e 80381f0 8c1efcb 585ad0e 80381f0 585ad0e 80381f0 95d7c47 80381f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 | import gradio as gr
import os
import csv
from datetime import datetime, timedelta
from huggingface_hub import Repository
import threading
import time
import json
# Configuration
DATA_STORAGE_REPO = "CIV3283/Data_Storage"
DATA_BRANCH_NAME = "data_branch"
LOCAL_DATA_DIR = "temp_data_storage"
MIN_IDLE_MINUTES = 20 # Minimum idle time required for space assignment
ALLOCATION_LOCK_DURATION = 10 # Lock duration in minutes
# 缓存配置 - 全部改为内存存储
CACHE_UPDATE_INTERVAL = 300 # 10分钟 = 600秒
DATA_SYNC_INTERVAL = 300 # 数据同步间隔,10分钟 = 600秒
CACHE_LOCK = threading.Lock()
ALLOCATION_LOCK = threading.Lock()
# 全局内存存储
class MemoryAllocationStore:
"""内存中的分配记录存储"""
def __init__(self):
self._allocations = {} # {space_name: {'student_id': str, 'allocated_time': datetime, 'expires_at': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 60 # 每分钟清理一次过期记录
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台清理过期分配记录的线程"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[MemoryAllocationStore] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] <= current_time:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[MemoryAllocationStore] Cleaned up expired allocation: {space_name}")
def add_allocation(self, space_name, student_id):
"""添加新的分配记录"""
with self._lock:
current_time = datetime.now()
expires_at = current_time + timedelta(minutes=ALLOCATION_LOCK_DURATION)
self._allocations[space_name] = {
'student_id': student_id,
'allocated_time': current_time,
'expires_at': expires_at
}
print(f"[MemoryAllocationStore] Added allocation: {space_name} -> {student_id} (expires at {expires_at.strftime('%H:%M:%S')})")
def get_active_allocations(self):
"""��取所有有效的分配记录"""
with self._lock:
current_time = datetime.now()
active_allocations = {}
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_allocations[space_name] = alloc_info.copy()
return active_allocations
def is_allocated(self, space_name):
"""检查指定空间是否被分配"""
with self._lock:
if space_name not in self._allocations:
return False, None
alloc_info = self._allocations[space_name]
current_time = datetime.now()
if alloc_info['expires_at'] > current_time:
return True, alloc_info['student_id']
else:
# 过期了,删除记录
del self._allocations[space_name]
return False, None
def get_status_summary(self):
"""获取分配状态摘要"""
with self._lock:
current_time = datetime.now()
active_count = 0
summary = []
for space_name, alloc_info in self._allocations.items():
if alloc_info['expires_at'] > current_time:
active_count += 1
remaining_minutes = (alloc_info['expires_at'] - current_time).total_seconds() / 60
summary.append(f"{space_name} -> {alloc_info['student_id']} ({remaining_minutes:.1f}min left)")
return {
'active_count': active_count,
'total_stored': len(self._allocations),
'summary': summary
}
# 全局数据管理器
class DataManager:
def __init__(self):
self.repo = None
self.available_spaces = []
self.last_data_sync = None
self._sync_lock = threading.Lock()
self._sync_thread = None
self._stop_event = threading.Event()
self.initialized = False
def initialize(self):
"""初始化数据管理器 - 只在启动时调用一次"""
if self.initialized:
return True
try:
print("[DataManager] Initializing data manager...")
# 初始化仓库连接
self.repo = Repository(
local_dir=LOCAL_DATA_DIR,
clone_from=DATA_STORAGE_REPO,
revision=DATA_BRANCH_NAME,
repo_type="space",
use_auth_token=os.environ.get("HF_HUB_TOKEN")
)
# 配置git用户
self.repo.git_config_username_and_email("git_user", f"load_distributor")
self.repo.git_config_username_and_email("git_email", f"loaddistributor@takeiteasy.space")
# 初始数据同步
self._sync_data()
# 启动后台同步线程
self._start_background_sync()
self.initialized = True
print("[DataManager] Data manager initialized successfully")
return True
except Exception as e:
print(f"[DataManager] Initialization failed: {e}")
return False
def _start_background_sync(self):
"""启动后台数据同步线程"""
if self._sync_thread and self._sync_thread.is_alive():
return
self._stop_event.clear()
self._sync_thread = threading.Thread(target=self._background_sync_worker, daemon=True)
self._sync_thread.start()
print("[DataManager] Background sync thread started")
def _background_sync_worker(self):
"""后台同步工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=DATA_SYNC_INTERVAL):
break
print("[DataManager] Starting scheduled data sync...")
self._sync_data()
print("[DataManager] Scheduled data sync completed")
except Exception as e:
print(f"[DataManager] Error in background sync: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _sync_data(self):
"""同步数据 - 拉取最新数据并更新可用空间列表"""
with self._sync_lock:
try:
start_time = time.time()
print("[DataManager] Syncing data from remote repository...")
# 拉取最新数据
self.repo.git_pull(rebase=True)
# 更新可用空间列表
self.available_spaces = self._get_available_spaces()
self.last_data_sync = datetime.now()
elapsed_time = time.time() - start_time
print(f"[DataManager] Data sync completed in {elapsed_time:.2f}s, found {len(self.available_spaces)} spaces")
except Exception as e:
print(f"[DataManager] Error syncing data: {e}")
def _get_available_spaces(self):
"""获取可用空间列表"""
available_spaces = set()
try:
for filename in os.listdir(LOCAL_DATA_DIR):
if filename.endswith('_query_log.csv') and '_Student_' in filename:
space_name = filename.replace('_query_log.csv', '')
available_spaces.add(space_name)
return sorted(list(available_spaces))
except Exception as e:
print(f"[DataManager] Error getting available spaces: {e}")
return []
def get_available_spaces(self):
"""获取可用空间列表 - 不触发数据同步"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.available_spaces.copy()
def get_repo(self):
"""获取仓库对象"""
if not self.initialized:
raise Exception("DataManager not initialized")
return self.repo
def get_repo_dir(self):
"""获取仓库本地目录"""
if not self.initialized:
raise Exception("DataManager not initialized")
return LOCAL_DATA_DIR
def stop(self):
"""停止后台同步"""
if self._sync_thread:
self._stop_event.set()
print("[DataManager] Background sync thread stopping...")
class SpaceActivityCache:
"""内存中的空间活动缓存"""
def __init__(self, data_manager):
self.data_manager = data_manager
self._cache_data = {}
self._last_update = None
self._update_thread = None
self._stop_event = threading.Event()
# 启动时立即更新一次缓存
self._update_cache()
# 启动后台更新线程
self.start_background_updates()
def start_background_updates(self):
"""启动后台缓存更新线程"""
if self._update_thread and self._update_thread.is_alive():
return
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._background_update_worker, daemon=True)
self._update_thread.start()
print("[SpaceActivityCache] Background update thread started")
def stop_background_updates(self):
"""停止后台更新线程"""
if self._update_thread:
self._stop_event.set()
print("[SpaceActivityCache] Background update thread stopping...")
def _background_update_worker(self):
"""后台更新工作线程"""
while not self._stop_event.is_set():
try:
# 等待指定间隔或停止信号
if self._stop_event.wait(timeout=CACHE_UPDATE_INTERVAL):
break
print("[SpaceActivityCache] Starting scheduled cache update...")
self._update_cache()
print("[SpaceActivityCache] Scheduled cache update completed")
except Exception as e:
print(f"[SpaceActivityCache] Error in background update: {e}")
if not self._stop_event.wait(timeout=60): # 1分钟后重试
continue
def _update_cache(self):
"""更新缓存数据"""
try:
print("[SpaceActivityCache] Updating activity cache...")
start_time = time.time()
# 从数据管理器获取可用空间(不触发数据同步)
available_spaces = self.data_manager.get_available_spaces()
repo_dir = self.data_manager.get_repo_dir()
new_cache_data = {
'last_updated': datetime.now().isoformat(),
'update_interval_seconds': CACHE_UPDATE_INTERVAL,
'spaces': {}
}
# 为每个空间读取最后活动时间
for space_name in available_spaces:
csv_file = os.path.join(repo_dir, f"{space_name}_query_log.csv")
last_activity, status = self._get_last_activity_from_file(csv_file)
new_cache_data['spaces'][space_name] = {
'last_activity': last_activity.isoformat() if last_activity != datetime.min else None,
'status': status,
'cache_update_time': datetime.now().isoformat()
}
print(f"[SpaceActivityCache] {space_name}: {status}")
# 线程安全地更新缓存
with CACHE_LOCK:
self._cache_data = new_cache_data
self._last_update = datetime.now()
elapsed_time = time.time() - start_time
print(f"[SpaceActivityCache] Cache updated in {elapsed_time:.2f}s, {len(available_spaces)} spaces processed")
except Exception as e:
print(f"[SpaceActivityCache] Error updating cache: {e}")
import traceback
print(f"[SpaceActivityCache] Traceback: {traceback.format_exc()}")
def _get_last_activity_from_file(self, csv_file_path):
"""从文件读取最后活动时间"""
try:
if not os.path.exists(csv_file_path):
return datetime.min, "file_not_found"
# 检查文件大小
file_size = os.path.getsize(csv_file_path)
if file_size <= 100:
return datetime.min, "empty_or_header_only"
# 使用CSV reader读取最后一行
with open(csv_file_path, 'r', encoding='utf-8') as f:
csv_reader = csv.reader(f)
# 跳过header
try:
header = next(csv_reader)
except StopIteration:
return datetime.min, "empty_file"
# 读取所有行,获取最后一行
rows = []
try:
for row in csv_reader:
if row: # 跳过空行
rows.append(row)
except Exception as csv_error:
print(f"[SpaceActivityCache] CSV parsing error for {csv_file_path}: {csv_error}")
return datetime.min, "csv_parse_error"
if not rows:
return datetime.min, "no_data_rows"
# 解析最后一行的时间戳
last_row = rows[-1]
if len(last_row) >= 3:
timestamp_str = last_row[2].strip() # timestamp column
try:
parsed_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
return parsed_time, f"active_last_at_{parsed_time.strftime('%Y-%m-%d_%H:%M:%S')}"
except ValueError as ve:
print(f"[SpaceActivityCache] Date parsing error for '{timestamp_str}': {ve}")
return datetime.min, "date_parse_error"
else:
return datetime.min, "invalid_row_format"
except Exception as e:
print(f"[SpaceActivityCache] Error reading {csv_file_path}: {e}")
return datetime.min, "read_error"
def get_space_activity(self, space_name):
"""获取指定空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
spaces_data = self._cache_data.get('spaces', {})
space_info = spaces_data.get(space_name, {})
if not space_info:
return datetime.min, "not_in_cache"
# 解析时间戳
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
status = space_info.get('status', 'unknown')
return last_activity, status
def get_all_spaces_activity(self):
"""获取所有空间的活动信息"""
with CACHE_LOCK:
if not self._cache_data or 'spaces' not in self._cache_data:
print("[SpaceActivityCache] No cache available, updating immediately...")
self._update_cache()
result = {}
spaces_data = self._cache_data.get('spaces', {})
for space_name, space_info in spaces_data.items():
last_activity_str = space_info.get('last_activity')
if last_activity_str:
try:
last_activity = datetime.fromisoformat(last_activity_str)
except:
last_activity = datetime.min
else:
last_activity = datetime.min
result[space_name] = {
'last_activity': last_activity,
'status': space_info.get('status', 'unknown')
}
return result
def get_cache_info(self):
"""获取缓存状态信息"""
with CACHE_LOCK:
if self._cache_data and 'last_updated' in self._cache_data:
last_updated_str = self._cache_data['last_updated']
try:
last_updated = datetime.fromisoformat(last_updated_str)
age_minutes = (datetime.now() - last_updated).total_seconds() / 60
return {
'last_updated': last_updated,
'age_minutes': age_minutes,
'spaces_count': len(self._cache_data.get('spaces', {})),
'is_fresh': age_minutes < (CACHE_UPDATE_INTERVAL / 60) * 1.5
}
except:
pass
return {
'last_updated': None,
'age_minutes': float('inf'),
'spaces_count': 0,
'is_fresh': False
}
def force_update(self):
"""强制立即更新缓存"""
print("[SpaceActivityCache] Force updating cache...")
self._update_cache()
# 新增:本地防撞车机制
class LocalAllocationTracker:
def __init__(self):
self._recent_allocations = {} # {space_name: {'student_id': str, 'timestamp': datetime}}
self._lock = threading.Lock()
self._cleanup_interval = 600 # 清理间隔(秒)
self._allocation_ttl = 60 # 本地分配记录的生存时间(秒)
# 启动后台清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动后台线程定期清理过期的本地分配记录"""
def cleanup_worker():
while True:
time.sleep(self._cleanup_interval)
self._cleanup_expired_allocations()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
print("[LocalAllocationTracker] Background cleanup thread started")
def _cleanup_expired_allocations(self):
"""清理过期的本地分配记录"""
with self._lock:
current_time = datetime.now()
expired_spaces = []
for space_name, alloc_info in self._recent_allocations.items():
if (current_time - alloc_info['timestamp']).total_seconds() > self._allocation_ttl:
expired_spaces.append(space_name)
for space_name in expired_spaces:
del self._recent_allocations[space_name]
if expired_spaces: # 只在有过期记录时打印
print(f"[LocalAllocationTracker] Cleaned up expired allocation: {space_name}")
def is_recently_allocated_locally(self, space_name):
"""检查空间是否在本地被最近分配过"""
with self._lock:
if space_name not in self._recent_allocations:
return False, None
alloc_info = self._recent_allocations[space_name]
current_time = datetime.now()
elapsed_seconds = (current_time - alloc_info['timestamp']).total_seconds()
if elapsed_seconds > self._allocation_ttl:
# 过期了,删除记录
del self._recent_allocations[space_name]
print(f"[LocalAllocationTracker] Expired local allocation removed: {space_name}")
return False, None
print(f"[LocalAllocationTracker] Space {space_name} recently allocated locally to {alloc_info['student_id']} ({elapsed_seconds:.1f}s ago)")
return True, alloc_info['student_id']
def record_local_allocation(self, space_name, student_id):
"""记录本地分配"""
with self._lock:
self._recent_allocations[space_name] = {
'student_id': student_id,
'timestamp': datetime.now()
}
print(f"[LocalAllocationTracker] Locally recorded allocation: {space_name} -> {student_id}")
def get_recent_allocations_summary(self):
"""获取最近本地分配的摘要(用于调试)"""
with self._lock:
current_time = datetime.now()
summary = []
for space_name, alloc_info in self._recent_allocations.items():
elapsed = (current_time - alloc_info['timestamp']).total_seconds()
summary.append(f"{space_name} -> {alloc_info['student_id']} ({elapsed:.1f}s ago)")
return summary
# 全局实例
data_manager = DataManager()
activity_cache = None
local_tracker = LocalAllocationTracker()
memory_allocation_store = MemoryAllocationStore()
def init_system():
"""初始化整个系统 - 只在启动时调用一次"""
global activity_cache
print("[init_system] Initializing load distributor system...")
# 初始化数据管理器
if not data_manager.initialize():
raise Exception("Failed to initialize data manager")
# 初始化活动缓存
activity_cache = SpaceActivityCache(data_manager)
print("[init_system] System initialization completed")
def analyze_space_activity_cached():
"""使用缓存的空间活动分析 - 完全使用内存数据"""
global activity_cache
if activity_cache is None:
raise Exception("Activity cache not initialized")
# 从数据管理器获取可用空间列表(不触发数据同步)
available_spaces = data_manager.get_available_spaces()
space_activity = []
current_time = datetime.now()
# 获取缓存信息
cache_info = activity_cache.get_cache_info()
print(f"[analyze_space_activity_cached] Using cache (age: {cache_info['age_minutes']:.1f} min, fresh: {cache_info['is_fresh']})")
# 获取所有空间的缓存活动数据
all_spaces_activity = activity_cache.get_all_spaces_activity()
# 从内存获取分配记录 - 不再读取文件
active_allocations = memory_allocation_store.get_active_allocations()
print(f"[analyze_space_activity_cached] Analyzing {len(available_spaces)} spaces using cached data...")
print(f"[analyze_space_activity_cached] Memory allocations: {len(active_allocations)} active")
for space_name in available_spaces:
# 从缓存获取活动信息
cached_info = all_spaces_activity.get(space_name)
if cached_info:
last_activity = cached_info['last_activity']
cached_status = cached_info['status']
else:
# 缓存中没有这个空间,可能是新空间
last_activity = datetime.min
cached_status = "not_in_cache"
# Calculate idle time in minutes
if last_activity == datetime.min or 'empty' in cached_status or 'not_found' in cached_status:
idle_minutes = float('inf') # Never used
status = "Never used"
last_activity_str = "Never"
else:
idle_minutes = (current_time - last_activity).total_seconds() / 60
status = f"Idle for {idle_minutes:.1f} minutes (cached)"
last_activity_str = last_activity.strftime('%Y-%m-%d %H:%M:%S')
# 检查内存中的分配记录
is_recently_allocated_memory = space_name in active_allocations
if is_recently_allocated_memory:
alloc_info = active_allocations[space_name]
minutes_until_free = (alloc_info['expires_at'] - current_time).total_seconds() / 60
status += f" (Allocated in memory to {alloc_info['student_id']}, free in {minutes_until_free:.1f} min)"
# Check if space is recently allocated (local)
is_recently_allocated_local, local_student = local_tracker.is_recently_allocated_locally(space_name)
if is_recently_allocated_local:
status += f" (Recently allocated locally to {local_student})"
space_activity.append({
'space_name': space_name,
'last_activity': last_activity,
'last_activity_str': last_activity_str,
'idle_minutes': idle_minutes,
'status': status,
'is_recently_allocated_memory': is_recently_allocated_memory,
'is_recently_allocated_local': is_recently_allocated_local,
'cached_status': cached_status
})
print(f"[analyze_space_activity_cached] {space_name}: {status}")
# Sort by idle time (most idle first)
space_activity.sort(key=lambda x: x['idle_minutes'], reverse=True)
return space_activity
def create_status_display(space_activity):
"""Create formatted status display for all spaces with proper line breaks"""
status_display = "📊 **Current Space Status (sorted by availability):**<br><br>"
# 显示内存分配记录摘要
memory_summary = memory_allocation_store.get_status_summary()
if memory_summary['active_count'] > 0:
status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>"
for alloc in memory_summary['summary']:
status_display += f" • {alloc}<br>"
status_display += "<br>"
# 显示本地分配记录摘要
local_summary = local_tracker.get_recent_allocations_summary()
if local_summary:
status_display += "🔒 **Recent Local Allocations:**<br>"
for alloc in local_summary:
status_display += f" • {alloc}<br>"
status_display += "<br>"
for i, space in enumerate(space_activity, 1):
status_display += f"{i}. **{space['space_name']}**<br>"
status_display += f" • Status: {space['status']}<br>"
status_display += f" • Last activity: {space['last_activity_str']}<br><br>"
return status_display
def select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id):
"""使用缓存的增强防撞空间选择函数 - 完全使用内存存储"""
print(f"[select_space_with_enhanced_collision_avoidance_cached] Starting selection for student: {student_id}")
# 第一步:过滤掉不符合基本条件的空间
basic_available_spaces = []
for space in space_activity:
# 检查基本条件 - 使用内存分配记录
if (space['idle_minutes'] >= MIN_IDLE_MINUTES and
not space['is_recently_allocated_memory'] and
not space['is_recently_allocated_local']):
basic_available_spaces.append(space)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Basic available spaces: {len(basic_available_spaces)}")
if not basic_available_spaces:
# 生成详细的错误信息
idle_spaces = [s for s in space_activity if s['idle_minutes'] >= MIN_IDLE_MINUTES]
memory_allocated = [s for s in space_activity if s['is_recently_allocated_memory']]
local_allocated = [s for s in space_activity if s['is_recently_allocated_local']]
error_parts = []
if not idle_spaces:
error_parts.append(f"all spaces used within {MIN_IDLE_MINUTES} minutes")
if memory_allocated:
error_parts.append(f"{len(memory_allocated)} spaces allocated in memory")
if local_allocated:
error_parts.append(f"{len(local_allocated)} spaces locally allocated")
error_msg = (
f"🚫 **All learning assistants are currently busy**\n\n"
f"Blocking conditions: {', '.join(error_parts)}\n\n"
f"**Please try again in 1-2 minutes.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No available spaces: {error_msg}")
raise gr.Error(error_msg, duration=10)
# 第二步:选择最优空间并进行最终验证
selected_space = basic_available_spaces[0] # 已经按idle_time排序
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Preliminary selection: {space_name}")
# 第三步:最终防撞车检查 - 再次验证本地分配状态
is_local_conflict, conflicting_student = local_tracker.is_recently_allocated_locally(space_name)
if is_local_conflict:
print(f"[select_space_with_enhanced_collision_avoidance_cached] COLLISION DETECTED! {space_name} recently allocated to {conflicting_student}")
# 寻找替代空间
alternative_spaces = [s for s in basic_available_spaces[1:]
if not local_tracker.is_recently_allocated_locally(s['space_name'])[0]]
if alternative_spaces:
selected_space = alternative_spaces[0]
space_name = selected_space['space_name']
print(f"[select_space_with_enhanced_collision_avoidance_cached] Using alternative space: {space_name}")
else:
error_msg = (
f"🚫 **Collision detected and no alternatives available**\n\n"
f"The system detected a potential conflict with another student's allocation.\n\n"
f"**Please try again in 10-15 seconds.**"
)
print(f"[select_space_with_enhanced_collision_avoidance_cached] No alternatives available")
raise gr.Error(error_msg, duration=8)
# 第四步:立即记录本地分配(在写入内存之前)
local_tracker.record_local_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Local allocation recorded BEFORE memory write")
# 第五步:记录到内存存储 - 不再写入文件或推送到远程
memory_allocation_store.add_allocation(space_name, student_id)
print(f"[select_space_with_enhanced_collision_avoidance_cached] Allocation recorded in memory only")
# 第六步:生成结果(使用带缓存信息的状态显示)
status_display = create_status_display_with_cache_info(space_activity)
redirect_url = f"https://huggingface.co/spaces/CIV3283/{space_name}/?check={student_id}"
print(f"[select_space_with_enhanced_collision_avoidance_cached] Final allocation: {space_name} -> {student_id}")
return redirect_to_space(redirect_url, selected_space, status_display)
def redirect_to_space(redirect_url, selected_space, status_display):
"""Display redirect information with manual click option"""
if selected_space['idle_minutes'] == float('inf'):
idle_info = "Never used (completely fresh)"
else:
idle_info = f"{selected_space['idle_minutes']:.1f} minutes"
# Modified HTML structure - Only Access section, removed analysis section
redirect_html = f"""
<div style="max-width: 900px; margin: 0 auto; padding: 20px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;">
<div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #28a745, #20c997); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);">
<h1 style="margin: 0 0 15px 0; font-size: 28px;">🎯 Learning Assistant Assigned</h1>
<h2 style="margin: 0 0 10px 0; font-weight: normal; font-size: 24px;">{selected_space['space_name']}</h2>
<p style="margin: 0; font-size: 18px; opacity: 0.9;">
✨ This space was idle for: <strong>{idle_info}</strong>
</p>
</div>
<div style="text-align: center; margin-bottom: 30px; padding: 25px; background: linear-gradient(135deg, #2196f3, #1976d2); color: white; border-radius: 12px; box-shadow: 0 4px 6px rgba(33,150,243,0.3);">
<h2 style="margin-top: 0; color: white;">🚀 Access Your Learning Assistant</h2>
<p style="color: rgba(255,255,255,0.9); font-size: 16px; margin-bottom: 25px;">
Click the button below to access your assigned learning assistant.
</p>
<a href="{redirect_url}"
target="_blank"
style="display: inline-block; background: rgba(255,255,255,0.15); color: white;
padding: 15px 30px; font-size: 18px; font-weight: bold; text-decoration: none;
border-radius: 25px; border: 2px solid rgba(255,255,255,0.3);
transition: all 0.3s ease; margin-bottom: 20px;"
onmouseover="this.style.background='rgba(255,255,255,0.25)'; this.style.transform='translateY(-2px)'"
onmouseout="this.style.background='rgba(255,255,255,0.15)'; this.style.transform='translateY(0px)'">
➤ Open Learning Assistant
</a>
<p style="margin: 15px 0 0 0; color: rgba(255,255,255,0.8); font-size: 14px;">
💡 Left-click the button above or right-click it and select "Open in new tab"
</p>
</div>
<div style="text-align: center; padding: 20px; background: #f1f3f4; border-radius: 8px; margin-top: 20px;">
<p style="margin: 0; color: #5f6368; font-size: 14px;">
🔄 Need a different assistant? <a href="javascript:location.reload()" style="color: #1976d2; text-decoration: none;">Refresh this page</a> to get reassigned.
</p>
</div>
</div>
"""
return gr.HTML(redirect_html)
def load_balance_user_cached(student_id):
"""使用缓存的负载均衡函数 - 完全使用内存存储"""
print(f"[load_balance_user_cached] Starting cached load balancing for student ID: {student_id}")
# 检查系统是否已初始化
if not data_manager.initialized:
raise gr.Error("🚫 System not properly initialized. Please contact administrator.", duration=8)
# 使用缓存版本的分析函数(不触发数据同步)
space_activity = analyze_space_activity_cached()
# Select space with enhanced collision avoidance
return select_space_with_enhanced_collision_avoidance_cached(space_activity, student_id)
def get_url_params(request: gr.Request):
"""Extract URL parameters from request"""
query_params = dict(request.query_params)
check_id = query_params.get('check', None)
if check_id:
return f"Load Distributor", check_id
else:
return "Load Distributor", None
def create_status_display_with_cache_info(space_activity):
"""创建包含缓存信息的状态显示"""
global activity_cache
# 获取缓存状态
cache_info = activity_cache.get_cache_info() if activity_cache else None
status_display = "📊 **Current Space Status (sorted by availability):**<br><br>"
# 显示缓存信息
if cache_info:
if cache_info['is_fresh']:
cache_status = f"✅ Fresh (updated {cache_info['age_minutes']:.1f} min ago)"
else:
cache_status = f"⚠️ Stale (updated {cache_info['age_minutes']:.1f} min ago)"
status_display += f"🔄 **Cache Status:** {cache_status}<br>"
status_display += f"📋 **Cached Spaces:** {cache_info['spaces_count']}<br><br>"
# 显示内存分配记录摘要
memory_summary = memory_allocation_store.get_status_summary()
if memory_summary['active_count'] > 0:
status_display += f"🧠 **Memory Allocations ({memory_summary['active_count']} active):**<br>"
for alloc in memory_summary['summary']:
status_display += f" • {alloc}<br>"
status_display += "<br>"
# 显示本地分配记录摘要
local_summary = local_tracker.get_recent_allocations_summary()
if local_summary:
status_display += "🔒 **Recent Local Allocations:**<br>"
for alloc in local_summary:
status_display += f" • {alloc}<br>"
status_display += "<br>"
for i, space in enumerate(space_activity, 1):
status_display += f"{i}. **{space['space_name']}**<br>"
status_display += f" • Status: {space['status']}<br>"
status_display += f" • Last activity: {space['last_activity_str']}<br>"
# 显示缓存状态(可选)
if 'cached_status' in space:
status_display += f" • Cache: {space['cached_status']}<br>"
status_display += "<br>"
return status_display
def handle_user_access_cached(request: gr.Request):
"""使用缓存的用户访问处理"""
title, check_id = get_url_params(request)
if not check_id:
# No student ID provided
error_html = """
<div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center;
background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 12px;">
<h2 style="color: #856404;">⚠️ Invalid Access</h2>
<p style="color: #856404; font-size: 16px; line-height: 1.6;">
This load distributor requires a valid student ID parameter.<br><br>
<strong>Please access this system through the official link provided in Moodle.</strong>
</p>
</div>
"""
return title, gr.HTML(error_html)
# Valid student ID - perform cached load balancing
try:
result = load_balance_user_cached(check_id)
return title, result
except Exception as e:
# Handle any errors during load balancing
error_html = f"""
<div style="max-width: 600px; margin: 50px auto; padding: 30px; text-align: center;
background: #f8d7da; border: 1px solid #f5c6cb; border-radius: 12px;">
<h2 style="color: #721c24;">🚫 Load Balancing Error</h2>
<p style="color: #721c24; font-size: 16px; line-height: 1.6;">
{str(e)}<br><br>
Please try again in a few moments or contact your instructor if the problem persists.
</p>
</div>
"""
return title, gr.HTML(error_html)
# 管理功能
def get_cache_status():
"""获取缓存状态(用于调试或管理)"""
global activity_cache
if activity_cache:
return activity_cache.get_cache_info()
return {"error": "Cache not initialized"}
def force_cache_update():
"""强制更新缓存(用于调试或管理)"""
global activity_cache
if activity_cache:
activity_cache.force_update()
return {"status": "Cache updated"}
return {"error": "Cache not initialized"}
def get_data_manager_status():
"""获取数据管理器状态"""
return {
"initialized": data_manager.initialized,
"last_sync": data_manager.last_data_sync.isoformat() if data_manager.last_data_sync else None,
"spaces_count": len(data_manager.available_spaces),
"repo_connected": data_manager.repo is not None
}
def force_data_sync():
"""强制数据同步"""
try:
data_manager._sync_data()
return {"status": "Data sync completed"}
except Exception as e:
return {"error": str(e)}
def get_memory_allocation_status():
"""获取内存分配状态"""
return memory_allocation_store.get_status_summary()
def clear_memory_allocations():
"""清除所有内存分配记录(管理功能)"""
try:
with memory_allocation_store._lock:
cleared_count = len(memory_allocation_store._allocations)
memory_allocation_store._allocations.clear()
return {"status": f"Cleared {cleared_count} memory allocations"}
except Exception as e:
return {"error": str(e)}
# 添加可选的管理界面(用于调试)
def create_admin_interface():
"""创建管理界面(可选)- 增强版本"""
with gr.Blocks(title="CIV3283 Load Distributor - Admin") as admin_interface:
gr.Markdown("# 🔧 CIV3283 Load Distributor - Admin Panel")
with gr.Tab("Cache Management"):
with gr.Row():
cache_status_btn = gr.Button("📊 Check Cache Status", variant="secondary")
force_cache_update_btn = gr.Button("🔄 Force Cache Update", variant="primary")
cache_info_display = gr.JSON(label="Cache Information")
cache_status_message = gr.Markdown("")
with gr.Tab("Data Management"):
with gr.Row():
data_status_btn = gr.Button("📊 Check Data Manager Status", variant="secondary")
force_data_sync_btn = gr.Button("🔄 Force Data Sync", variant="primary")
data_info_display = gr.JSON(label="Data Manager Information")
data_status_message = gr.Markdown("")
with gr.Tab("Memory Allocation"):
with gr.Row():
memory_status_btn = gr.Button("🧠 Check Memory Allocations", variant="secondary")
clear_memory_btn = gr.Button("🗑️ Clear All Memory Allocations", variant="stop")
memory_info_display = gr.JSON(label="Memory Allocation Information")
memory_status_message = gr.Markdown("")
def check_cache_status():
try:
status = get_cache_status()
if 'error' in status:
return status, "❌ Cache not available"
else:
age_min = status.get('age_minutes', 0)
spaces_count = status.get('spaces_count', 0)
is_fresh = status.get('is_fresh', False)
if is_fresh:
message = f"✅ Cache is fresh ({age_min:.1f} min old, {spaces_count} spaces)"
else:
message = f"⚠️ Cache is stale ({age_min:.1f} min old, {spaces_count} spaces)"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking cache: {str(e)}"
def force_update():
try:
result = force_cache_update()
if 'error' in result:
return result, "❌ Failed to update cache"
else:
# 获取更新后的状态
status = get_cache_status()
return status, "✅ Cache updated successfully"
except Exception as e:
return {"error": str(e)}, f"❌ Error updating cache: {str(e)}"
def check_data_status():
try:
status = get_data_manager_status()
if status['initialized']:
last_sync = status['last_sync']
if last_sync:
sync_time = datetime.fromisoformat(last_sync)
age_minutes = (datetime.now() - sync_time).total_seconds() / 60
message = f"✅ Data manager active (last sync: {age_minutes:.1f} min ago, {status['spaces_count']} spaces)"
else:
message = "⚠️ Data manager initialized but no sync recorded"
else:
message = "❌ Data manager not initialized"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking data manager: {str(e)}"
def force_sync():
try:
result = force_data_sync()
if 'error' in result:
return result, "❌ Failed to sync data"
else:
# 获取更新后的状态
status = get_data_manager_status()
return status, "✅ Data sync completed successfully"
except Exception as e:
return {"error": str(e)}, f"❌ Error syncing data: {str(e)}"
def check_memory_status():
try:
status = get_memory_allocation_status()
active_count = status['active_count']
total_stored = status['total_stored']
if active_count > 0:
message = f"🧠 {active_count} active allocations ({total_stored} total in memory)"
else:
message = "✅ No active memory allocations"
return status, message
except Exception as e:
return {"error": str(e)}, f"❌ Error checking memory allocations: {str(e)}"
def clear_memory():
try:
result = clear_memory_allocations()
if 'error' in result:
return result, "❌ Failed to clear memory allocations"
else:
# 获取更新后的状态
status = get_memory_allocation_status()
return status, f"✅ {result['status']}"
except Exception as e:
return {"error": str(e)}, f"❌ Error clearing memory: {str(e)}"
# Cache tab event handlers
cache_status_btn.click(
fn=check_cache_status,
outputs=[cache_info_display, cache_status_message]
)
force_cache_update_btn.click(
fn=force_update,
outputs=[cache_info_display, cache_status_message]
)
# Data tab event handlers
data_status_btn.click(
fn=check_data_status,
outputs=[data_info_display, data_status_message]
)
force_data_sync_btn.click(
fn=force_sync,
outputs=[data_info_display, data_status_message]
)
# Memory tab event handlers
memory_status_btn.click(
fn=check_memory_status,
outputs=[memory_info_display, memory_status_message]
)
clear_memory_btn.click(
fn=clear_memory,
outputs=[memory_info_display, memory_status_message]
)
return admin_interface
# 修改后的主Gradio界面
def create_main_interface():
"""创建主要的用户界面"""
with gr.Blocks(title="CIV3283 Load Distributor") as interface:
title_display = gr.Markdown("# 🔄 CIV3283 Learning Assistant Load Distributor", elem_id="title")
content_display = gr.HTML("")
# 主要的负载均衡逻辑 - 页面加载时执行
interface.load(
fn=handle_user_access_cached,
outputs=[title_display, content_display]
)
return interface
# 修改后的main函数
if __name__ == "__main__":
import sys
try:
# 系统初始化 - 只在启动时执行一次
print("🚀 Initializing CIV3283 Load Distributor System...")
init_system()
print("✅ System initialization completed successfully")
print("🧠 Using pure memory storage for allocations (no file I/O during user access)")
# 检查是否需要管理界面
enable_admin = "--admin" in sys.argv or os.environ.get("ENABLE_ADMIN", "false").lower() == "true"
if enable_admin:
print("🔧 Starting with admin interface enabled")
# 创建带管理界面的组合界面
main_interface = create_main_interface()
admin_interface = create_admin_interface()
# 使用TabbedInterface组合两个界面
combined_interface = gr.TabbedInterface(
[main_interface, admin_interface],
["🎯 Load Distributor", "🔧 Admin Panel"],
title="CIV3283 Load Distributor System"
)
combined_interface.launch()
else:
print("🎯 Starting main interface only")
# 只启动主界面
main_interface = create_main_interface()
main_interface.launch()
except Exception as e:
print(f"❌ System initialization failed: {e}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
raise |