musawar32ali's picture
Update app.py
f253d99 verified
# app.py
"""
Improved Automatic Time Table Generation Agent (Genetic Algorithm)
- Gradio interface (offline)
- Adaptive mutation rate, better crossover, visualizations, exports
"""
import io
import random
import math
from typing import List, Dict, Tuple, Optional
import tempfile
import datetime
import numpy as np
import pandas as pd
import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
# -------------------------
# Parsing helpers
# -------------------------
def parse_lines(text: str) -> List[str]:
return [line.strip() for line in (text or "").splitlines() if line.strip()]
def parse_teacher_unavailability(text: str) -> Dict[str, List[Tuple[str,str]]]:
d = {}
for ln in (text or "").splitlines():
ln = ln.strip()
if not ln: continue
parts = [p.strip() for p in ln.split(",")]
if len(parts) >= 3:
teacher, day, slot = parts[0], parts[1], parts[2]
d.setdefault(teacher, []).append((day, slot))
return d
def parse_course_teacher_pref(text: str) -> Dict[str, List[str]]:
d = {}
for ln in (text or "").splitlines():
ln = ln.strip()
if not ln: continue
if ":" in ln:
course, rest = ln.split(":", 1)
teachers = [t.strip() for t in rest.split(",") if t.strip()]
if teachers:
d[course.strip()] = teachers
return d
def parse_room_constraints(text: str) -> Dict[str, List[str]]:
d = {}
for ln in (text or "").splitlines():
ln = ln.strip()
if not ln: continue
if ":" in ln:
course, rest = ln.split(":", 1)
rooms = [r.strip() for r in rest.split(",") if r.strip()]
if rooms:
d[course.strip()] = rooms
return d
# -------------------------
# Genetic Algorithm core (improved)
# -------------------------
class TimetableGA:
def __init__(
self,
courses: List[str],
teachers: List[str],
rooms: List[str],
days: List[str],
slots: List[str],
teacher_unavailable: Dict[str, List[Tuple[str,str]]],
course_teacher_pref: Dict[str, List[str]],
room_constraints: Dict[str, List[str]],
population_size: int = 80,
generations: int = 350,
mutation_rate: float = 0.06,
elitism: int = 2,
seed: Optional[int] = None,
):
self.courses = courses
self.teachers = teachers
self.rooms = rooms
self.days = days
self.slots = slots
self.times = [(d, s) for d in days for s in slots]
self.num_periods = len(self.times)
self.num_courses = len(courses)
self.teacher_unavailable = teacher_unavailable
self.course_teacher_pref = course_teacher_pref
self.room_constraints = room_constraints
self.population_size = max(10, int(population_size))
self.generations = max(1, int(generations))
self.base_mutation_rate = float(mutation_rate)
self.mutation_rate = float(mutation_rate)
self.elitism = max(0, int(elitism))
if seed is not None:
random.seed(int(seed))
np.random.seed(int(seed))
def _random_individual(self):
# Ensure more even distribution of periods by sampling without replacement if possible
if self.num_courses <= self.num_periods:
period_indices = np.random.choice(self.num_periods, size=self.num_courses, replace=False)
else:
period_indices = np.random.randint(0, self.num_periods, size=self.num_courses)
room_indices = np.random.randint(0, len(self.rooms), size=self.num_courses)
teacher_indices = np.zeros(self.num_courses, dtype=int)
for i, c in enumerate(self.courses):
prefs = self.course_teacher_pref.get(c)
if prefs:
# pick a random allowed teacher
teacher_indices[i] = self.teachers.index(random.choice(prefs))
else:
teacher_indices[i] = np.random.randint(0, len(self.teachers))
return (period_indices.astype(int), room_indices.astype(int), teacher_indices.astype(int))
def _fitness(self, individual) -> float:
p, r, t = individual
penalties = 0.0
# teacher conflicts (hard)
teacher_slot = {}
for i in range(self.num_courses):
key = (int(t[i]), int(p[i]))
teacher_slot.setdefault(key, 0)
teacher_slot[key] += 1
teacher_conflicts = sum(max(0, c-1) for c in teacher_slot.values())
penalties += teacher_conflicts * 250.0
# room conflicts (hard)
room_slot = {}
for i in range(self.num_courses):
key = (int(r[i]), int(p[i]))
room_slot.setdefault(key, 0)
room_slot[key] += 1
room_conflicts = sum(max(0, c-1) for c in room_slot.values())
penalties += room_conflicts * 180.0
# teacher unavailability (hard)
unavail = 0
for i in range(self.num_courses):
teacher = self.teachers[int(t[i])]
period = self.times[int(p[i])]
if teacher in self.teacher_unavailable and period in self.teacher_unavailable[teacher]:
unavail += 1
penalties += unavail * 300.0
# course-teacher pref (soft)
pref_violations = 0
for i, c in enumerate(self.courses):
prefs = self.course_teacher_pref.get(c)
if prefs:
chosen = self.teachers[int(t[i])]
if chosen not in prefs:
pref_violations += 1
penalties += pref_violations * 8.0
# room constraints (soft)
room_viol = 0
for i, c in enumerate(self.courses):
allowed = self.room_constraints.get(c)
if allowed:
chosen_room = self.rooms[int(r[i])]
if chosen_room not in allowed:
room_viol += 1
penalties += room_viol * 12.0
# fairness: avoid assigning many courses to same teacher across day/slots (soft)
teacher_workload = {}
for i in range(self.num_courses):
teacher_workload.setdefault(int(t[i]), 0)
teacher_workload[int(t[i])] += 1
# penalty for variance
workloads = np.array(list(teacher_workload.values()), dtype=float) if teacher_workload else np.array([0.0])
if workloads.size > 1:
variance = float(np.var(workloads))
penalties += variance * 5.0
base = 20000.0
score = base - penalties
return float(score)
def _crossover(self, a, b):
# two-point crossover for better mixing
a_p, a_r, a_t = a
b_p, b_r, b_t = b
if self.num_courses <= 2:
return a, b
i1 = np.random.randint(1, self.num_courses - 1)
i2 = np.random.randint(i1, self.num_courses)
def mix(x, y):
child = x.copy()
child[i1:i2] = y[i1:i2]
return child
c1 = (mix(a_p, b_p).copy(), mix(a_r, b_r).copy(), mix(a_t, b_t).copy())
c2 = (mix(b_p, a_p).copy(), mix(b_r, a_r).copy(), mix(b_t, a_t).copy())
return c1, c2
def _mutate(self, ind, mutate_rate):
p, r, t = ind
for i in range(self.num_courses):
if random.random() < mutate_rate:
# mutate period
p[i] = random.randint(0, self.num_periods - 1)
if random.random() < mutate_rate:
# mutate room
r[i] = random.randint(0, len(self.rooms) - 1)
if random.random() < mutate_rate:
# mutate teacher with respect to preferences
prefs = self.course_teacher_pref.get(self.courses[i])
if prefs:
t[i] = self.teachers.index(random.choice(prefs))
else:
t[i] = random.randint(0, len(self.teachers) - 1)
return (p, r, t)
def run(self, verbose=False, progress_callback=None):
# population init
population = [self._random_individual() for _ in range(self.population_size)]
fitnesses = [self._fitness(ind) for ind in population]
best_idx = int(np.argmax(fitnesses))
best = population[best_idx]
best_score = fitnesses[best_idx]
stagnation = 0
last_improve_gen = 0
for gen in range(self.generations):
# adaptive mutation rate: slight decay, increase if stagnation occurs
self.mutation_rate = self.base_mutation_rate * (0.98 ** gen)
if gen - last_improve_gen > max(10, self.generations // 40):
# increase mutation rate to escape plateau
self.mutation_rate = min(0.5, self.mutation_rate * 1.6)
ranked = sorted(zip(fitnesses, population), key=lambda x: x[0], reverse=True)
new_pop = [p for _, p in ranked[:self.elitism]]
# tournament selection + crossover
while len(new_pop) < self.population_size:
# tournament
i1, i2 = random.randrange(self.population_size), random.randrange(self.population_size)
parent1 = population[i1] if fitnesses[i1] > fitnesses[i2] else population[i2]
i3, i4 = random.randrange(self.population_size), random.randrange(self.population_size)
parent2 = population[i3] if fitnesses[i3] > fitnesses[i4] else population[i4]
c1, c2 = self._crossover(parent1, parent2)
c1 = self._mutate(c1, self.mutation_rate)
c2 = self._mutate(c2, self.mutation_rate)
new_pop.extend([c1, c2])
population = new_pop[:self.population_size]
fitnesses = [self._fitness(ind) for ind in population]
gen_best = max(fitnesses)
if gen_best > best_score:
best_score = gen_best
best = population[int(np.argmax(fitnesses))]
last_improve_gen = gen
# progress callback for UI
if progress_callback is not None:
try:
progress_callback(gen + 1, self.generations, best_score)
except Exception:
pass
if verbose and (gen % max(1, self.generations // 10) == 0):
print(f"Gen {gen} best {best_score:.2f} mut_rate {self.mutation_rate:.4f}")
# early stop if near-perfect
if best_score >= 19990.0:
break
return {"best": best, "score": best_score, "times": self.times, "generations": gen + 1}
# -------------------------
# Convert to DataFrame & export utilities
# -------------------------
def individual_to_dataframe(individual, courses, teachers, rooms, times):
p, r, t = individual
rows = []
for i, course in enumerate(courses):
idx = int(p[i])
day, slot = times[idx]
rows.append({
"Course": course,
"Teacher": teachers[int(t[i])],
"Room": rooms[int(r[i])],
"Day": day,
"Slot": slot
})
df = pd.DataFrame(rows)
# Keep Day order consistent with days then slots order
day_order = {d:i for i,d in enumerate([d for d,_ in times])}
df["Day_order"] = df["Day"].map(day_order)
df = df.sort_values(["Day_order","Slot"]).reset_index(drop=True).drop(columns=["Day_order"])
return df
def dataframe_to_csv_bytes(df: pd.DataFrame) -> bytes:
buf = io.StringIO()
df.to_csv(buf, index=False)
return buf.getvalue().encode("utf-8")
def dataframe_to_xlsx_bytes(df: pd.DataFrame) -> bytes:
buf = io.BytesIO()
with pd.ExcelWriter(buf, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Timetable")
buf.seek(0)
return buf.read()
# -------------------------
# Analysis & visualizations
# -------------------------
def compute_conflicts(df: pd.DataFrame):
tconf = df.groupby(["Teacher","Day","Slot"]).size().reset_index(name="count")
tconf = tconf[tconf["count"]>1].copy()
rconf = df.groupby(["Room","Day","Slot"]).size().reset_index(name="count")
rconf = rconf[rconf["count"]>1].copy()
return tconf, rconf
def make_week_grid_plot(df: pd.DataFrame, days: List[str], slots: List[str]):
# Create a grid with cell text course (teacher)
grid = [["" for _ in slots] for _ in days]
for _, row in df.iterrows():
try:
d_idx = days.index(row["Day"])
s_idx = slots.index(row["Slot"])
grid[d_idx][s_idx] = f"{row['Course']}\n({row['Teacher']})"
except ValueError:
continue
# Create Plotly table-like heatmap (hover shows text)
fig = go.Figure()
fig.add_trace(go.Table(
header=dict(values=["Day/Slot"] + slots, align="center"),
cells=dict(values=[[d] for d in days] + list(map(list, zip(*grid))), align="left", height=40)
))
fig.update_layout(margin=dict(l=5,r=5,t=20,b=5), height=400 + 30*len(days))
return fig
def make_conflict_heatmap(df: pd.DataFrame, days: List[str], slots: List[str], teachers: List[str], rooms: List[str]):
# Teacher conflict heatmap: teacher vs day-slot index
time_labels = [f"{d}\n{s}" for d in days for s in slots]
teacher_grid = np.zeros((len(teachers), len(time_labels)), dtype=int)
for _, row in df.iterrows():
teacher_idx = teachers.index(row["Teacher"])
time_idx = days.index(row["Day"]) * len(slots) + slots.index(row["Slot"])
teacher_grid[teacher_idx, time_idx] += 1
# create a figure with subplots: teacher heatmap and room heatmap
teacher_fig = px.imshow(teacher_grid, labels=dict(x="Time", y="Teacher", color="Count"),
x=time_labels, y=teachers, aspect="auto")
teacher_fig.update_layout(title="Teacher assignment heatmap", height=350)
room_grid = np.zeros((len(rooms), len(time_labels)), dtype=int)
for _, row in df.iterrows():
room_idx = rooms.index(row["Room"])
time_idx = days.index(row["Day"]) * len(slots) + slots.index(row["Slot"])
room_grid[room_idx, time_idx] += 1
room_fig = px.imshow(room_grid, labels=dict(x="Time", y="Room", color="Count"),
x=time_labels, y=rooms, aspect="auto")
room_fig.update_layout(title="Room booking heatmap", height=350)
return teacher_fig, room_fig
# -------------------------
# Assistant (local, rule-based NLP)
# -------------------------
def assistant_reply(df: Optional[pd.DataFrame], query: str) -> str:
if df is None or df.empty:
return "No timetable available. Generate a timetable first."
q = (query or "").strip().lower()
if not q:
return "Try: 'show conflicts', 'schedule for T1_Ali', 'when is C2_Physics', or 'summary'."
# conflicts
if "conflict" in q or "clash" in q or "problem" in q:
tconf, rconf = compute_conflicts(df)
lines = []
if not tconf.empty:
lines.append("Teacher conflicts:")
for _, r in tconf.iterrows():
lines.append(f"- {r['Teacher']} has {int(r['count'])} classes at {r['Day']} {r['Slot']}")
else:
lines.append("No teacher conflicts detected.")
if not rconf.empty:
lines.append("Room conflicts:")
for _, r in rconf.iterrows():
lines.append(f"- {r['Room']} has {int(r['count'])} bookings at {r['Day']} {r['Slot']}")
else:
lines.append("No room conflicts detected.")
lines.append("Fix ideas: 1) reassign one of the conflicting classes to a different slot/room; 2) allow alternate teacher; 3) relax room constraints.")
return "\n".join(lines)
# schedule for teacher
if "schedule for" in q or q.startswith("show schedule") or q.startswith("show for"):
# extract teacher token
words = q.replace("schedule for", "").replace("show schedule for", "").replace("show for", "").strip()
if not words:
return "Specify teacher, e.g., 'Schedule for T2_Sara'"
# find teacher by partial match
cand = None
for t in sorted(df["Teacher"].unique(), key=len, reverse=True):
if words in t.lower() or words.replace(" ", "_") in t.lower():
cand = t
break
if cand:
sub = df[df["Teacher"] == cand].sort_values(["Day","Slot"])
return f"Schedule for {cand}:\n" + sub.to_string(index=False)
else:
return "Couldn't find that teacher. Try exact teacher name like 'T1_Ali' or 'T2_Sara'."
# when is course scheduled
if "when is" in q or "when" in q and any(k in q for k in ["course", "c1", "c2", "when is"]):
# naive: find any token that matches a course
for c in df["Course"].unique():
if c.lower() in q:
sub = df[df["Course"] == c]
if sub.empty:
continue
rows = []
for _, r in sub.iterrows():
rows.append(f"- {r['Course']}: {r['Day']} {r['Slot']} with {r['Teacher']} in {r['Room']}")
return "\n".join(rows)
return "Mention the exact course name, e.g., 'When is C2_Physics scheduled?'"
# summary
if "summary" in q or "overview" in q or "stats" in q:
tconf, rconf = compute_conflicts(df)
total = len(df)
unique_teachers = df["Teacher"].nunique()
unique_rooms = df["Room"].nunique()
lines = [
f"Rows: {total}",
f"Teachers used: {unique_teachers}",
f"Rooms used: {unique_rooms}",
f"Teacher conflict count: {len(tconf)}",
f"Room conflict count: {len(rconf)}"
]
return "\n".join(lines)
return "I didn't understand. Try: 'show conflicts', 'schedule for T1_Ali', 'when is C2_Physics', or 'summary'."
# -------------------------
# Gradio UI
# -------------------------
title = "Automatic Time Table Generation Agent (Improved)"
desc = "Improved GA + visualizer + assistant. Generate, inspect conflicts, visualize and export."
with gr.Blocks(title=title, css="""
.gradio-container { max-width: 1200px; margin: auto; }
""") as demo:
gr.Markdown(f"# {title}")
gr.Markdown(desc)
with gr.Row():
with gr.Column(scale=1, min_width=380):
gr.Markdown("## Inputs")
courses_in = gr.Textbox(label="Courses (one per line)", value="C1_Math\nC2_Physics\nC3_Chemistry\nC4_English", lines=6)
teachers_in = gr.Textbox(label="Teachers (one per line)", value="T1_Ali\nT2_Sara\nT3_Omar", lines=4)
rooms_in = gr.Textbox(label="Rooms (one per line)", value="R1\nR2\nR3", lines=4)
days_in = gr.Textbox(label="Days (one per line)", value="Monday\nTuesday\nWednesday\nThursday\nFriday", lines=5)
slots_in = gr.Textbox(label="Slots (one per line)", value="Slot1\nSlot2\nSlot3\nSlot4\nSlot5\nSlot6", lines=6)
with gr.Accordion("Optional constraints (click to expand)", open=False):
teacher_unavail_in = gr.Textbox(label="Teacher unavailability (Teacher,Day,Slot per line)", value="", lines=4)
course_teacher_pref_in = gr.Textbox(label="Course -> allowed teachers (Course: T1,T2)", value="", lines=4)
room_constraints_in = gr.Textbox(label="Course -> allowed rooms (Course: R1,R2)", value="", lines=4)
with gr.Accordion("GA parameters (advanced)", open=False):
pop_in = gr.Slider(label="Population size", minimum=10, maximum=1000, value=120, step=10)
gen_in = gr.Slider(label="Generations", minimum=10, maximum=3000, value=600, step=10)
mut_in = gr.Slider(label="Base mutation rate", minimum=0.0, maximum=0.5, value=0.06, step=0.01)
elitism_in = gr.Slider(label="Elitism (keep top N)", minimum=0, maximum=20, value=3, step=1)
seed_in = gr.Number(label="Random seed (optional)", value=42)
run_btn = gr.Button("Run Generator", variant="primary")
with gr.Column(scale=1, min_width=420):
gr.Markdown("## Results & Tools")
summary_out = gr.Textbox(label="Summary", lines=3)
table_out = gr.Dataframe(headers=["Course","Teacher","Room","Day","Slot"], interactive=False)
with gr.Row():
csv_btn = gr.File(label="Download CSV (generated)")
xlsx_btn = gr.File(label="Download XLSX (generated)")
with gr.Tabs():
with gr.TabItem("Timetable Grid"):
grid_plot = gr.Plot(label="Weekly timetable grid")
download_grid_png = gr.Button("Download timetable PNG")
with gr.TabItem("Conflicts / Heatmaps"):
teacher_heat = gr.Plot(label="Teacher heatmap")
room_heat = gr.Plot(label="Room heatmap")
conflict_table = gr.Dataframe(headers=["Type","Entity","Day","Slot","Count"], interactive=False)
gen_progress = gr.Number(label="Generations run", value=0)
best_score_box = gr.Number(label="Best fitness score", value=0)
gr.Markdown("## Assistant (Ask about the timetable)")
assistant_input = gr.Textbox(label="Ask a question", placeholder="e.g., 'Show conflicts' or 'Schedule for T2_Sara'")
assistant_output = gr.Textbox(label="Assistant response", lines=8)
# internal state holders
state_best = gr.State()
state_df = gr.State()
state_csv = gr.State()
state_xlsx = gr.State()
state_grid_png = gr.State()
# progress callback handler for GA
def _progress_cb(gen, total, best_score):
# we will update UI after run; this is here for compatibility
return
def run_ga_and_prepare_download(
courses_text, teachers_text, rooms_text, days_text, slots_text,
teacher_unavail_text, course_teacher_pref_text, room_constraints_text,
pop_size, gens, mut_rate, elitism, seed
):
courses = parse_lines(courses_text)
teachers = parse_lines(teachers_text)
rooms = parse_lines(rooms_text)
days = parse_lines(days_text)
slots = parse_lines(slots_text)
if not (courses and teachers and rooms and days and slots):
return "Please provide courses, teachers, rooms, days and slots.", None, None, None, None, None, None, None
teacher_unavail = parse_teacher_unavailability(teacher_unavail_text)
course_teacher_pref = parse_course_teacher_pref(course_teacher_pref_text)
room_constraints = parse_room_constraints(room_constraints_text)
ga = TimetableGA(
courses=courses, teachers=teachers, rooms=rooms, days=days, slots=slots,
teacher_unavailable=teacher_unavail,
course_teacher_pref=course_teacher_pref,
room_constraints=room_constraints,
population_size=pop_size, generations=gens, mutation_rate=mut_rate,
elitism=elitism, seed=seed if seed is not None else None
)
# run / progress updates via callback not possible in this synchronous call, but we'll return final results
res = ga.run(verbose=False, progress_callback=None)
best = res["best"]
score = res["score"]
generations_ran = res.get("generations", gens)
times = res["times"]
df = individual_to_dataframe(best, courses, teachers, rooms, times)
csv_bytes = dataframe_to_csv_bytes(df)
xlsx_bytes = dataframe_to_xlsx_bytes(df)
summary = f"Generator finished. Best fitness score: {score:.2f}. Rows: {len(df)}. Generations run: {generations_ran}"
# create file-like objects
csv_file = io.BytesIO(csv_bytes); csv_file.name = "timetable.csv"
xlsx_file = io.BytesIO(xlsx_bytes); xlsx_file.name = "timetable.xlsx"
return summary, df, csv_file, xlsx_file, generations_ran, score, csv_bytes, xlsx_bytes
def make_visuals(df, days_text, slots_text, teachers_text, rooms_text):
days = parse_lines(days_text)
slots = parse_lines(slots_text)
teachers = parse_lines(teachers_text)
rooms = parse_lines(rooms_text)
if df is None or df.empty:
return None, None, None, None, None
grid_fig = make_week_grid_plot(df, days, slots)
teacher_fig, room_fig = make_conflict_heatmap(df, days, slots, teachers, rooms)
tconf, rconf = compute_conflicts(df)
# prepare conflict table rows
rows = []
for _, r in tconf.iterrows():
rows.append(["Teacher", r["Teacher"], r["Day"], r["Slot"], int(r["count"])])
for _, r in rconf.iterrows():
rows.append(["Room", r["Room"], r["Day"], r["Slot"], int(r["count"])])
conflict_df = pd.DataFrame(rows, columns=["Type","Entity","Day","Slot","Count"])
return grid_fig, teacher_fig, room_fig, conflict_df, grid_fig.to_image(format="png", width=1000, height=600)
run_btn.click(
run_ga_and_prepare_download,
inputs=[courses_in, teachers_in, rooms_in, days_in, slots_in,
teacher_unavail_in, course_teacher_pref_in, room_constraints_in,
pop_in, gen_in, mut_in, elitism_in, seed_in],
outputs=[summary_out, table_out, csv_btn, xlsx_btn, gen_progress, best_score_box, state_csv, state_xlsx],
show_progress=True
)
# Build visuals when table_out changes
def on_table_change(df, days_text, slots_text, teachers_text, rooms_text):
grid_fig, teacher_fig, room_fig, conflict_df, png_bytes = make_visuals(df, days_text, slots_text, teachers_text, rooms_text)
# return plots / tables and store png bytes for download
png_file = None
if png_bytes is not None:
png_file = io.BytesIO(png_bytes)
png_file.name = f"timetable_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
return grid_fig, teacher_fig, room_fig, conflict_df, png_file
table_out.change(
on_table_change,
inputs=[table_out, days_in, slots_in, teachers_in, rooms_in],
outputs=[grid_plot, teacher_heat, room_heat, conflict_table, state_grid_png]
)
# Download PNG button
def download_png(png_state):
if png_state is None:
return None
return png_state
download_grid_png.click(download_png, inputs=[state_grid_png], outputs=[csv_btn]) # reuse csv_btn slot to trigger file download (hack for single-click)
# Assistant handlers
assistant_input.submit(lambda q, df: assistant_reply(df, q), inputs=[assistant_input, table_out], outputs=[assistant_output])
assistant_input.change(lambda q, df: assistant_reply(df, q), inputs=[assistant_input, table_out], outputs=[assistant_output])
# Provide nice footer
gr.Markdown("**Exports:** CSV and XLSX. **Visuals:** Table grid & heatmaps. Assistant is local and rule-based.")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", share=False)