import os import json import re from datetime import datetime from discord.ext import commands from configuration.config import CR_USER_ID, CR_ROLE_NAME # Global State MAIN_SCHEDULE = {} MAIN_SCHEDULE_FILE = None TEMP_CHANGES = {} # Ensure project root is on sys.path so packages at repo root (e.g., database, configuration) # can be imported when running this file as a script: `python src/main.py`. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) def is_cr(): async def predicate(ctx): if CR_USER_ID and ctx.author.id == CR_USER_ID: return True if any(role.name == CR_ROLE_NAME for role in ctx.author.roles): return True await ctx.send("❌ You don't have permission to use this command.") return False return commands.check(predicate) def get_week_key(dt=None): d = dt or datetime.now() iso = d.isocalendar() return (iso.year, iso.week) def load_main_schedule_from_file(path): global MAIN_SCHEDULE, MAIN_SCHEDULE_FILE # Resolve path: allow passing a filename relative to project root (where main.json usually lives) candidate_paths = [] # expand user + absolute p = os.path.expanduser(path) if not os.path.isabs(p): p = os.path.abspath(p) candidate_paths.append(p) # also try relative to project root (one level up from src) try: pr = PROJECT_ROOT except NameError: pr = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) candidate_paths.append(os.path.join(pr, path)) candidate_paths.append(os.path.join(pr, 'assets', path)) candidate_paths.append(os.path.join(pr, 'assets', os.path.basename(path))) candidate_paths.append(os.path.abspath(os.path.join(os.path.dirname(__file__), path))) found = None for cp in candidate_paths: if os.path.isfile(cp): found = cp break if not found: raise FileNotFoundError(f"Schedule file not found: {path}. Tried: {', '.join(candidate_paths)}") with open(found, 'r', encoding='utf-8') as f: data = json.load(f) # Normalize day keys to lowercase normalized = {} for group, days in data.items(): normalized[group] = {} for day, entries in days.items(): normalized[group][day.lower()] = entries MAIN_SCHEDULE = normalized MAIN_SCHEDULE_FILE = found return MAIN_SCHEDULE def save_main_schedule_to_file(path=None): global MAIN_SCHEDULE, MAIN_SCHEDULE_FILE if path is None: path = MAIN_SCHEDULE_FILE if not path: raise ValueError("No main schedule file set") # We will write days as they are in MAIN_SCHEDULE (lowercase days) with open(path, 'w', encoding='utf-8') as f: json.dump(MAIN_SCHEDULE, f, indent=2, ensure_ascii=False) def _normalize_time(t): if not t: return '' s = str(t).strip() # Ensure there's a space before AM/PM if present (e.g., '9:00AM' -> '9:00 AM') s = re.sub(r'\s*([AaPp][Mm])$', r' \1', s) # Normalize spacing and AM/PM case s = re.sub(r'\s+', ' ', s).strip() s = re.sub(r'([AaPp][Mm])', lambda m: m.group(1).upper(), s) return s def _normalize_subject(sub): if not sub: return '' return str(sub).strip().lower() def apply_temp_replacement(week_key, group, day, orig_time, orig_subject, new_entry): # store normalized keys for robust matching ot = _normalize_time(orig_time) osub = _normalize_subject(orig_subject) # normalize new_entry time/subject for consistency new_e = dict(new_entry) if 'time' in new_e: new_e['time'] = _normalize_time(new_e['time']) if 'subject' in new_e: new_e['subject'] = new_e['subject'].strip() TEMP_CHANGES.setdefault(week_key, {}).setdefault(group, {}).setdefault(day, {}).setdefault('replacements', []).append(((ot, osub), new_e)) def apply_temp_cancellation(week_key, group, day, orig_time, orig_subject): ot = _normalize_time(orig_time) osub = _normalize_subject(orig_subject) TEMP_CHANGES.setdefault(week_key, {}).setdefault(group, {}).setdefault(day, {}).setdefault('cancellations', []).append((ot, osub)) def merge_schedule_for_week(group, day, week_key=None): """Return a list of schedule entries for the given group and day, applying temporary changes for the week_key if present.""" week_key = week_key or get_week_key() base = [] if group in MAIN_SCHEDULE and day in MAIN_SCHEDULE[group]: # clone base entries base = [dict(e) for e in MAIN_SCHEDULE[group][day]] # Apply temporary changes if any wk = TEMP_CHANGES.get(week_key, {}) grp = wk.get(group, {}) day_changes = grp.get(day, {}) cancels = set(day_changes.get('cancellations', [])) replacements = {k: v for (k, v) in day_changes.get('replacements', [])} merged = [] handled_orig_keys = set() for e in base: key = (_normalize_time(e.get('time')), _normalize_subject(e.get('subject'))) # If original time+subject canceled directly, skip if key in cancels: handled_orig_keys.add(key) continue # If there is a replacement mapping for this original, check if the replacement itself is canceled if key in replacements: new_e = replacements[key] new_key = (_normalize_time(new_e.get('time')), _normalize_subject(new_e.get('subject'))) # If replacement is canceled explicitly, skip (neither original nor replacement) if new_key in cancels: handled_orig_keys.add(key) continue merged.append({ 'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room', ''), 'instructor': new_e.get('instructor', ''), 'note': new_e.get('note', '') }) handled_orig_keys.add(key) else: # keep original (normalize time for consistent display) merged.append({ 'time': _normalize_time(e.get('time')), 'subject': e.get('subject'), 'room': e.get('room', ''), 'instructor': e.get('instructor', ''), 'note': e.get('note', '') }) # Add any replacement entries that did not map to an existing original (standalone adds) for (orig_k, new_e) in day_changes.get('replacements', []): if orig_k not in handled_orig_keys: new_key = (_normalize_time(new_e.get('time')), _normalize_subject(new_e.get('subject'))) if new_key in cancels: # this standalone replacement was later canceled continue merged.append({ 'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room', ''), 'instructor': new_e.get('instructor', ''), 'note': new_e.get('note', '') }) return merged def apply_temp_changes_to_db_rows(rows, week_key): """Given a list of sqlite Row-like dicts with keys day,time,subject,group_name,room, apply temporary changes from TEMP_CHANGES for week_key and return merged list. """ # build mapping of replacements and cancellations for groups/days result = [] if not rows: return [] def _rget(r, key, default=None): try: # SQLAlchemy object access if hasattr(r, key): return getattr(r, key) # Dictionary access elif hasattr(r, 'get'): return r.get(key, default) else: return r[key] except Exception: return default # Group rows by (group, day) so we can apply replacements/cancellations per group/day grouped = {} for r in rows: group = _rget(r, 'group_name') or _rget(r, 'group') or '' day = (_rget(r, 'day') or '').lower() grouped.setdefault((group, day), []).append(r) wk = TEMP_CHANGES.get(week_key, {}) # Process each group/day present in DB rows for (group, day), rlist in grouped.items(): # get changes for this group/day grp = wk.get(group, {}) day_changes = grp.get(day, {}) cancels = set(day_changes.get('cancellations', [])) replacements = {k: v for (k, v) in day_changes.get('replacements', [])} handled_orig_keys = set() for r in rlist: time = _rget(r, 'time') subject = _rget(r, 'subject') key = (_normalize_time(time), _normalize_subject(subject)) if key in cancels: # skip cancelled original handled_orig_keys.add(key) continue if key in replacements: new_e = replacements[key] handled_orig_keys.add(key) merged_entry = { 'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room', _rget(r, 'room', '')), 'group_name': group } result.append(merged_entry) else: # keep original (normalize time for consistent display) result.append({'time': _normalize_time(time), 'subject': subject, 'room': _rget(r, 'room', ''), 'group_name': group}) # Add any replacement entries that did not map to an existing original (standalone adds) for (orig_k, new_e) in day_changes.get('replacements', []): if orig_k not in handled_orig_keys: # This replacement did not correspond to any existing row - append as new result.append({'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room', ''), 'group_name': group}) # Also process any TEMP_CHANGES for groups/days not present in DB rows (pure adds) for grp_name, groups in wk.items(): for dname, dchanges in groups.items(): if (grp_name, dname) in grouped: continue # no DB rows for this group/day; add all replacements that are not cancellations for (orig_k, new_e) in dchanges.get('replacements', []): result.append({'time': new_e.get('time'), 'subject': new_e.get('subject'), 'room': new_e.get('room',''), 'group_name': grp_name}) return result def _find_time_tokens(tokens, start=0): """Find a time token starting at or after start. Returns (time_string, start_index, end_index). Supports formats like '9:00', '9:00 AM', '09:00', '9:00PM' etc. """ ampm = set(['AM', 'PM', 'am', 'pm']) time_re = re.compile(r'^\d{1,2}:\d{2}([AaPp][Mm])?$') for i in range(start, len(tokens)): t = tokens[i] # token like 9:00 or 9:00AM if time_re.match(t): return (t, i, i) # token like 9:00 and next token AM/PM if i + 1 < len(tokens) and re.match(r'^\d{1,2}:\d{2}$', t) and tokens[i+1] in ampm: return (f"{t} {tokens[i+1]}", i, i+1) return (None, -1, -1) def _parse_edit_cancel_args(args): tokens = args.split() if len(tokens) < 4: return None group = tokens[0] day = tokens[1].lower() # find first time token t1, t1s, t1e = _find_time_tokens(tokens, 2) if not t1: return None # find second time token after t1e+1 (for edit). For cancel, second time may not exist. t2, t2s, t2e = _find_time_tokens(tokens, t1e+1) # mode is last token if it's 'permanent' or 'temporary' mode = tokens[-1].lower() if tokens[-1].lower() in ('permanent', 'temporary') else 'temporary' if t2: # edit: subject is between t1e+1 and t2s-1; new_subject is between t2e+1 and -1 (mode) orig_subject = ' '.join(tokens[t1e+1:t2s]).strip() new_time = t2 new_subject = ' '.join(tokens[t2e+1:-1]).strip() return { 'group': group, 'day': day, 'orig_time': t1, 'orig_subject': orig_subject, 'new_time': new_time, 'new_subject': new_subject, 'permanent': (mode == 'permanent') } else: # cancel: subject is remaining tokens after t1e orig_subject = ' '.join(tokens[t1e+1: -1]).strip() if mode in ('permanent', 'temporary') else ' '.join(tokens[t1e+1:]).strip() return { 'group': group, 'day': day, 'orig_time': t1, 'orig_subject': orig_subject, 'permanent': (mode == 'permanent') }