Spaces:
Runtime error
Runtime error
File size: 12,541 Bytes
e3804d7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 | import os
import json
import re
from datetime import datetime
from discord.ext import commands
from configuration.config import CR_USER_ID, CR_ROLE_NAME
# Global State
MAIN_SCHEDULE = {}
MAIN_SCHEDULE_FILE = None
TEMP_CHANGES = {}
# Ensure project root is on sys.path so packages at repo root (e.g., database, configuration)
# can be imported when running this file as a script: `python src/main.py`.
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
def is_cr():
async def predicate(ctx):
if CR_USER_ID and ctx.author.id == CR_USER_ID:
return True
if any(role.name == CR_ROLE_NAME for role in ctx.author.roles):
return True
await ctx.send("❌ You don't have permission to use this command.")
return False
return commands.check(predicate)
def get_week_key(dt=None):
d = dt or datetime.now()
iso = d.isocalendar()
return (iso.year, iso.week)
def load_main_schedule_from_file(path):
global MAIN_SCHEDULE, MAIN_SCHEDULE_FILE
# Resolve path: allow passing a filename relative to project root (where main.json usually lives)
candidate_paths = []
# expand user + absolute
p = os.path.expanduser(path)
if not os.path.isabs(p):
p = os.path.abspath(p)
candidate_paths.append(p)
# also try relative to project root (one level up from src)
try:
pr = PROJECT_ROOT
except NameError:
pr = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
candidate_paths.append(os.path.join(pr, path))
candidate_paths.append(os.path.join(pr, 'assets', path))
candidate_paths.append(os.path.join(pr, 'assets', os.path.basename(path)))
candidate_paths.append(os.path.abspath(os.path.join(os.path.dirname(__file__), path)))
found = None
for cp in candidate_paths:
if os.path.isfile(cp):
found = cp
break
if not found:
raise FileNotFoundError(f"Schedule file not found: {path}. Tried: {', '.join(candidate_paths)}")
with open(found, 'r', encoding='utf-8') as f:
data = json.load(f)
# Normalize day keys to lowercase
normalized = {}
for group, days in data.items():
normalized[group] = {}
for day, entries in days.items():
normalized[group][day.lower()] = entries
MAIN_SCHEDULE = normalized
MAIN_SCHEDULE_FILE = found
return MAIN_SCHEDULE
def save_main_schedule_to_file(path=None):
global MAIN_SCHEDULE, MAIN_SCHEDULE_FILE
if path is None:
path = MAIN_SCHEDULE_FILE
if not path:
raise ValueError("No main schedule file set")
# We will write days as they are in MAIN_SCHEDULE (lowercase days)
with open(path, 'w', encoding='utf-8') as f:
json.dump(MAIN_SCHEDULE, f, indent=2, ensure_ascii=False)
def _normalize_time(t):
if not t:
return ''
s = str(t).strip()
# Ensure there's a space before AM/PM if present (e.g., '9:00AM' -> '9:00 AM')
s = re.sub(r'\s*([AaPp][Mm])$', r' \1', s)
# Normalize spacing and AM/PM case
s = re.sub(r'\s+', ' ', s).strip()
s = re.sub(r'([AaPp][Mm])', lambda m: m.group(1).upper(), s)
return s
def _normalize_subject(sub):
if not sub:
return ''
return str(sub).strip().lower()
def apply_temp_replacement(week_key, group, day, orig_time, orig_subject, new_entry):
# store normalized keys for robust matching
ot = _normalize_time(orig_time)
osub = _normalize_subject(orig_subject)
# normalize new_entry time/subject for consistency
new_e = dict(new_entry)
if 'time' in new_e:
new_e['time'] = _normalize_time(new_e['time'])
if 'subject' in new_e:
new_e['subject'] = new_e['subject'].strip()
TEMP_CHANGES.setdefault(week_key, {}).setdefault(group, {}).setdefault(day, {}).setdefault('replacements', []).append(((ot, osub), new_e))
def apply_temp_cancellation(week_key, group, day, orig_time, orig_subject):
ot = _normalize_time(orig_time)
osub = _normalize_subject(orig_subject)
TEMP_CHANGES.setdefault(week_key, {}).setdefault(group, {}).setdefault(day, {}).setdefault('cancellations', []).append((ot, osub))
def merge_schedule_for_week(group, day, week_key=None):
"""Return a list of schedule entries for the given group and day, applying temporary changes for the week_key if present."""
week_key = week_key or get_week_key()
base = []
if group in MAIN_SCHEDULE and day in MAIN_SCHEDULE[group]:
# clone base entries
base = [dict(e) for e in MAIN_SCHEDULE[group][day]]
# Apply temporary changes if any
wk = TEMP_CHANGES.get(week_key, {})
grp = wk.get(group, {})
day_changes = grp.get(day, {})
cancels = set(day_changes.get('cancellations', []))
replacements = {k: v for (k, v) in day_changes.get('replacements', [])}
merged = []
handled_orig_keys = set()
for e in base:
key = (_normalize_time(e.get('time')), _normalize_subject(e.get('subject')))
# If original time+subject canceled directly, skip
if key in cancels:
handled_orig_keys.add(key)
continue
# If there is a replacement mapping for this original, check if the replacement itself is canceled
if key in replacements:
new_e = replacements[key]
new_key = (_normalize_time(new_e.get('time')), _normalize_subject(new_e.get('subject')))
# If replacement is canceled explicitly, skip (neither original nor replacement)
if new_key in cancels:
handled_orig_keys.add(key)
continue
merged.append({
'time': new_e.get('time'),
'subject': new_e.get('subject'),
'room': new_e.get('room', ''),
'instructor': new_e.get('instructor', ''),
'note': new_e.get('note', '')
})
handled_orig_keys.add(key)
else:
# keep original (normalize time for consistent display)
merged.append({
'time': _normalize_time(e.get('time')),
'subject': e.get('subject'),
'room': e.get('room', ''),
'instructor': e.get('instructor', ''),
'note': e.get('note', '')
})
# Add any replacement entries that did not map to an existing original (standalone adds)
for (orig_k, new_e) in day_changes.get('replacements', []):
if orig_k not in handled_orig_keys:
new_key = (_normalize_time(new_e.get('time')), _normalize_subject(new_e.get('subject')))
if new_key in cancels:
# this standalone replacement was later canceled
continue
merged.append({
'time': new_e.get('time'),
'subject': new_e.get('subject'),
'room': new_e.get('room', ''),
'instructor': new_e.get('instructor', ''),
'note': new_e.get('note', '')
})
return merged
def apply_temp_changes_to_db_rows(rows, week_key):
"""Given a list of sqlite Row-like dicts with keys day,time,subject,group_name,room,
apply temporary changes from TEMP_CHANGES for week_key and return merged list.
"""
# build mapping of replacements and cancellations for groups/days
result = []
if not rows:
return []
def _rget(r, key, default=None):
try:
# SQLAlchemy object access
if hasattr(r, key):
return getattr(r, key)
# Dictionary access
elif hasattr(r, 'get'):
return r.get(key, default)
else:
return r[key]
except Exception:
return default
# Group rows by (group, day) so we can apply replacements/cancellations per group/day
grouped = {}
for r in rows:
group = _rget(r, 'group_name') or _rget(r, 'group') or ''
day = (_rget(r, 'day') or '').lower()
grouped.setdefault((group, day), []).append(r)
wk = TEMP_CHANGES.get(week_key, {})
# Process each group/day present in DB rows
for (group, day), rlist in grouped.items():
# get changes for this group/day
grp = wk.get(group, {})
day_changes = grp.get(day, {})
cancels = set(day_changes.get('cancellations', []))
replacements = {k: v for (k, v) in day_changes.get('replacements', [])}
handled_orig_keys = set()
for r in rlist:
time = _rget(r, 'time')
subject = _rget(r, 'subject')
key = (_normalize_time(time), _normalize_subject(subject))
if key in cancels:
# skip cancelled original
handled_orig_keys.add(key)
continue
if key in replacements:
new_e = replacements[key]
handled_orig_keys.add(key)
merged_entry = {
'time': new_e.get('time'),
'subject': new_e.get('subject'),
'room': new_e.get('room', _rget(r, 'room', '')),
'group_name': group
}
result.append(merged_entry)
else:
# keep original (normalize time for consistent display)
result.append({'time': _normalize_time(time), 'subject': subject, 'room': _rget(r, 'room', ''), 'group_name': group})
# Add any replacement entries that did not map to an existing original (standalone adds)
for (orig_k, new_e) in day_changes.get('replacements', []):
if orig_k not in handled_orig_keys:
# This replacement did not correspond to any existing row - append as new
result.append({'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room', ''), 'group_name': group})
# Also process any TEMP_CHANGES for groups/days not present in DB rows (pure adds)
for grp_name, groups in wk.items():
for dname, dchanges in groups.items():
if (grp_name, dname) in grouped:
continue
# no DB rows for this group/day; add all replacements that are not cancellations
for (orig_k, new_e) in dchanges.get('replacements', []):
result.append({'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room',''), 'group_name': grp_name})
return result
def _find_time_tokens(tokens, start=0):
"""Find a time token starting at or after start. Returns (time_string, start_index, end_index).
Supports formats like '9:00', '9:00 AM', '09:00', '9:00PM' etc.
"""
ampm = set(['AM', 'PM', 'am', 'pm'])
time_re = re.compile(r'^\d{1,2}:\d{2}([AaPp][Mm])?$')
for i in range(start, len(tokens)):
t = tokens[i]
# token like 9:00 or 9:00AM
if time_re.match(t):
return (t, i, i)
# token like 9:00 and next token AM/PM
if i + 1 < len(tokens) and re.match(r'^\d{1,2}:\d{2}$', t) and tokens[i+1] in ampm:
return (f"{t} {tokens[i+1]}", i, i+1)
return (None, -1, -1)
def _parse_edit_cancel_args(args):
tokens = args.split()
if len(tokens) < 4:
return None
group = tokens[0]
day = tokens[1].lower()
# find first time token
t1, t1s, t1e = _find_time_tokens(tokens, 2)
if not t1:
return None
# find second time token after t1e+1 (for edit). For cancel, second time may not exist.
t2, t2s, t2e = _find_time_tokens(tokens, t1e+1)
# mode is last token if it's 'permanent' or 'temporary'
mode = tokens[-1].lower() if tokens[-1].lower() in ('permanent', 'temporary') else 'temporary'
if t2:
# edit: subject is between t1e+1 and t2s-1; new_subject is between t2e+1 and -1 (mode)
orig_subject = ' '.join(tokens[t1e+1:t2s]).strip()
new_time = t2
new_subject = ' '.join(tokens[t2e+1:-1]).strip()
return {
'group': group, 'day': day, 'orig_time': t1, 'orig_subject': orig_subject,
'new_time': new_time, 'new_subject': new_subject, 'permanent': (mode == 'permanent')
}
else:
# cancel: subject is remaining tokens after t1e
orig_subject = ' '.join(tokens[t1e+1: -1]).strip() if mode in ('permanent', 'temporary') else ' '.join(tokens[t1e+1:]).strip()
return {
'group': group, 'day': day, 'orig_time': t1, 'orig_subject': orig_subject, 'permanent': (mode == 'permanent')
}
|