from __future__ import annotations
import time
from typing import Dict, List, Optional
import streamlit as st
from focus_resource_env import (
DEEP_WORK,
EMPTY,
OP_IDLE,
OP_MUTE_COMMS,
OP_RESCHEDULE_MEETING,
FocusResourceEnv,
Task,
)
BLOCK_TYPES = ["Focus", "Meeting"]
COMPLEXITY_OPTIONS = [1.0, 1.25, 1.5, 1.75]
DEFAULT_TASK_NAMES = ["Architecture", "Review", "Execution"]
def inject_styles() -> None:
theme_tokens = {
"__BG0__": "#050816",
"__BG1__": "#0b1224",
"__LINE__": "rgba(159, 184, 255, 0.16)",
"__TEXT__": "#edf3ff",
"__MUTED__": "#96a7cb",
"__ACCENT__": "#7ce7ff",
"__ACCENT2__": "#7cf0c5",
"__BUTTON_TEXT__": "#06131c",
"__METRIC_BG__": "linear-gradient(180deg, rgba(255,255,255,0.04), rgba(255,255,255,0.02))",
"__SHADOW__": "0 14px 40px rgba(0, 0, 0, 0.22)",
"__INPUT_BG__": "rgba(255,255,255,0.04)",
"__HERO_BG__": "linear-gradient(135deg, rgba(16, 26, 52, 0.94), rgba(11, 20, 38, 0.94))",
}
with open("styles.css", encoding="utf-8") as css_file:
css = css_file.read()
for key, value in theme_tokens.items():
css = css.replace(key, value)
st.markdown(f"", unsafe_allow_html=True)
def friendly_error(exc: Exception) -> str:
if "end_hour must be after start_hour" in str(exc):
return "End time must be later than start time."
return "Those control panel settings do not fit together yet. Try adjusting the workday range and resetting the studio."
def create_env(start_hour: str, end_hour: str, distraction_risk: float, seed: int) -> FocusResourceEnv:
return FocusResourceEnv(
start_hour=start_hour,
end_hour=end_hour,
distraction_risk=distraction_risk,
seed=seed,
)
def default_task_name(index: int) -> str:
return DEFAULT_TASK_NAMES[index] if index < len(DEFAULT_TASK_NAMES) else f"Task {index + 1}"
def init_state() -> None:
st.session_state.setdefault("ui_error", "")
st.session_state.setdefault("ui_error_until", 0.0)
st.session_state.setdefault("start_hour", "09:00")
st.session_state.setdefault("end_hour", "17:00")
st.session_state.setdefault("distraction_risk", 0.15)
st.session_state.setdefault("seed", 7)
st.session_state.setdefault("selection_start", None)
st.session_state.setdefault("selected_range", None)
st.session_state.setdefault("selected_block_id", None)
st.session_state.setdefault("move_block_id", None)
st.session_state.setdefault("armed_task_index", None)
st.session_state.setdefault("next_block_id", 1)
def build_initial_blocks(env: FocusResourceEnv) -> List[dict]:
blocks: List[dict] = []
seen_meetings = set()
for _, meta in sorted(env.meeting_meta.items()):
meeting_id = meta["meeting_id"]
if meeting_id in seen_meetings:
continue
seen_meetings.add(meeting_id)
blocks.append(
{
"id": f"meeting-{meeting_id}",
"start": int(meta["start"]),
"end": int(meta["start"] + meta["length"] - 1),
"type": "Meeting",
"label": "Meeting",
"priority": int(meta["priority"]),
}
)
return sorted(blocks, key=lambda block: block["start"])
def ensure_task_names(env: FocusResourceEnv) -> None:
names = st.session_state.get("task_names", [])
st.session_state.task_names = [names[i] if i < len(names) else default_task_name(i) for i in range(len(env.task_buffer))]
def sync_from_env(env: FocusResourceEnv) -> None:
st.session_state.env = env
st.session_state.observation = env._observation()
st.session_state.done = env.current_slot >= env.timeline_length
st.session_state.setdefault("last_reward", 0.0)
st.session_state.setdefault("last_info", {})
ensure_task_names(env)
def get_env() -> FocusResourceEnv | None:
if "env" not in st.session_state:
try:
env = create_env(
st.session_state.get("start_hour", "09:00"),
st.session_state.get("end_hour", "17:00"),
float(st.session_state.get("distraction_risk", 0.15)),
int(st.session_state.get("seed", 7)),
)
env.reset()
sync_from_env(env)
st.session_state.blocks = build_initial_blocks(env)
except ValueError as exc:
set_ui_error(friendly_error(exc), seconds=6.0)
return None
return st.session_state.env
def set_ui_error(message: str, seconds: float = 5.0) -> None:
st.session_state.ui_error = message
st.session_state.ui_error_until = time.time() + seconds
def clear_ui_error() -> None:
st.session_state.ui_error = ""
st.session_state.ui_error_until = 0.0
def render_flash_error() -> None:
message = st.session_state.get("ui_error", "")
until = float(st.session_state.get("ui_error_until", 0.0))
if not message or time.time() >= until:
if message:
clear_ui_error()
return
remaining_ms = max(0, int((until - time.time()) * 1000))
st.markdown(
f"""
Check the plan
{message}
""",
unsafe_allow_html=True,
)
def reset_env(start_hour: str, end_hour: str, distraction_risk: float, seed: int) -> None:
env = create_env(start_hour, end_hour, distraction_risk, seed)
env.reset()
st.session_state.start_hour = start_hour
st.session_state.end_hour = end_hour
st.session_state.distraction_risk = float(distraction_risk)
st.session_state.seed = int(seed)
st.session_state.last_reward = 0.0
st.session_state.last_info = {}
clear_ui_error()
st.session_state.selection_start = None
st.session_state.selected_range = None
st.session_state.selected_block_id = None
st.session_state.move_block_id = None
st.session_state.armed_task_index = None
st.session_state.blocks = build_initial_blocks(env)
sync_from_env(env)
st.rerun()
def sync_tasks_from_widgets() -> None:
env = st.session_state.env
new_buffer: List[Task] = []
new_names: List[str] = []
for i, task in enumerate(env.task_buffer):
name = str(st.session_state.get(f"task_name_{i}", default_task_name(i))).strip() or default_task_name(i)
slots = max(1, int(st.session_state.get(f"task_slots_{i}", task.duration)))
complexity = float(st.session_state.get(f"task_complexity_{i}", task.hidden_complexity))
new_buffer.append(Task(duration=slots, hidden_complexity=complexity))
new_names.append(name)
env.task_buffer = new_buffer
st.session_state.task_names = new_names
def block_at_slot(slot: int) -> Optional[dict]:
for block in st.session_state.get("blocks", []):
if block["start"] <= slot <= block["end"]:
return block
return None
def sort_blocks() -> None:
st.session_state.blocks = sorted(st.session_state.get("blocks", []), key=lambda item: item["start"])
def remove_overlaps(start: int, end: int, ignore_id: Optional[str] = None) -> None:
kept = []
for block in st.session_state.get("blocks", []):
overlaps = not (block["end"] < start or block["start"] > end)
if overlaps and block["id"] != ignore_id:
continue
kept.append(block)
st.session_state.blocks = kept
def apply_blocks_to_env() -> None:
env = st.session_state.env
env.timeline[:] = EMPTY
env.meeting_meta = {}
for block in st.session_state.get("blocks", []):
if block["type"] == "Focus":
env.timeline[block["start"] : block["end"] + 1] = DEEP_WORK
else:
env._place_meeting(
block["start"],
block["end"] - block["start"] + 1,
int(block.get("priority", 5)),
env._next_meeting_id(),
)
sync_from_env(env)
def clear_selection() -> None:
st.session_state.selection_start = None
st.session_state.selected_range = None
st.session_state.selected_block_id = None
st.session_state.move_block_id = None
def create_block(start: int, end: int, block_type: str, label: str, priority: int) -> None:
start, end = sorted((start, end))
remove_overlaps(start, end)
st.session_state.blocks.append(
{
"id": f"user-{st.session_state.get('next_block_id', 1)}",
"start": start,
"end": end,
"type": block_type,
"label": label.strip() or block_type,
"priority": int(priority),
}
)
st.session_state.next_block_id = int(st.session_state.get("next_block_id", 1)) + 1
sort_blocks()
apply_blocks_to_env()
clear_selection()
st.rerun()
def update_block(block_id: str, block_type: str, label: str, priority: int) -> None:
for block in st.session_state.get("blocks", []):
if block["id"] == block_id:
block["type"] = block_type
block["label"] = label.strip() or block_type
block["priority"] = int(priority)
break
apply_blocks_to_env()
st.rerun()
def delete_block(block_id: str) -> None:
st.session_state.blocks = [block for block in st.session_state.get("blocks", []) if block["id"] != block_id]
apply_blocks_to_env()
clear_selection()
st.rerun()
def move_block(block_id: str, new_start: int) -> None:
block = next((item for item in st.session_state.get("blocks", []) if item["id"] == block_id), None)
if block is None:
return
duration = block["end"] - block["start"]
new_end = min(st.session_state.env.timeline_length - 1, new_start + duration)
new_start = max(0, new_end - duration)
remove_overlaps(new_start, new_end, ignore_id=block_id)
block["start"] = new_start
block["end"] = new_end
sort_blocks()
apply_blocks_to_env()
st.session_state.move_block_id = None
st.session_state.selected_block_id = block_id
st.rerun()
def arm_task(index: int) -> None:
sync_tasks_from_widgets()
st.session_state.armed_task_index = index
def place_armed_task(start: int, end: int) -> None:
env = st.session_state.env
sync_tasks_from_widgets()
task_index = st.session_state.get("armed_task_index")
if task_index is None or not (0 <= task_index < len(env.task_buffer)):
set_ui_error("Pick a task from the queue before placing it on the calendar.")
return
task = env.task_buffer.pop(task_index)
task_name = st.session_state.task_names.pop(task_index)
desired = max(1, int(round(task.duration * task.hidden_complexity)))
start, end = sorted((start, end))
end = min(env.timeline_length - 1, max(end, start + desired - 1))
st.session_state.armed_task_index = None
create_block(start, end, "Focus", task_name, 5)
def run_simulation_step(operation: int) -> None:
env = st.session_state.env
sync_tasks_from_widgets()
apply_blocks_to_env()
current_slot = int(env.current_slot)
target_slot = current_slot
if operation == OP_RESCHEDULE_MEETING:
future_meetings = [block for block in st.session_state.get("blocks", []) if block["type"] == "Meeting" and block["start"] >= current_slot]
if not future_meetings:
set_ui_error("No future meeting is available to move right now.")
return
target_slot = future_meetings[0]["start"]
obs, reward, done, info = env.step((target_slot, operation))
st.session_state.observation = obs
st.session_state.last_reward = reward
st.session_state.last_info = info
st.session_state.done = done
if operation == OP_RESCHEDULE_MEETING and info.get("action_info", {}).get("status") == "meeting_rescheduled":
moved = next((block for block in st.session_state.get("blocks", []) if block["type"] == "Meeting" and block["start"] == int(info["action_info"]["from_slot"])), None)
if moved is not None:
duration = moved["end"] - moved["start"]
moved["start"] = int(info["action_info"]["to_slot"])
moved["end"] = moved["start"] + duration
sort_blocks()
st.rerun()
def render_header(observation: Dict[str, object]) -> None:
st.markdown(
"""
Cognitive Resource Simulator
engineer-manager
Design the day directly on the calendar, lock in meaningful work blocks, and pressure-test the plan against interruption cost with a cleaner single-source workflow.
One interface, one truth
Blocks keep their own type and label
The task queue never rewrites scheduled work
""",
unsafe_allow_html=True,
)
a, b, c, d = st.columns(4)
a.metric("Current time", observation["current_time"])
b.metric("Current slot", str(observation["current_slot"]))
c.metric("Focus Fortress", "Active" if observation["mute_comms"] else "Inactive")
d.metric("Last reward", f"{st.session_state.get('last_reward', 0.0):.2f}")
def render_setup() -> None:
st.markdown("### Control Panel")
start_hour = st.text_input("Start", value=st.session_state.get("start_hour", "09:00"), key="setup_start")
end_hour = st.text_input("End", value=st.session_state.get("end_hour", "17:00"), key="setup_end")
risk = st.slider(
"Distraction risk",
min_value=0.0,
max_value=1.0,
value=float(st.session_state.get("distraction_risk", 0.15)),
step=0.05,
help="Higher values create noisier days and lower uninterrupted focus potential.",
key="setup_risk",
)
seed = st.number_input(
"Seed",
min_value=0,
max_value=100000,
value=int(st.session_state.get("seed", 7)),
step=1,
help="? Scenario Consistency: Use a fixed Seed number to regenerate this exact daily challenge for comparison testing.",
key="setup_seed",
)
st.caption("Professional scheduling note: the end time must be later than the start time for the studio to build a valid workday.")
if st.button("Reset Studio", use_container_width=True):
try:
reset_env(start_hour, end_hour, float(risk), int(seed))
except ValueError as exc:
set_ui_error(friendly_error(exc), seconds=6.0)
def render_calendar(env: FocusResourceEnv) -> None:
st.markdown("### Day Plan")
st.caption("Click one open slot to start a range. Click a second slot to finish it. Click an existing block to edit it or move it.")
current_slot = int(env.current_slot)
selection_start = st.session_state.get("selection_start")
selected_range = st.session_state.get("selected_range")
selected_block = next((block for block in st.session_state.get("blocks", []) if block["id"] == st.session_state.get("selected_block_id")), None)
move_block_id = st.session_state.get("move_block_id")
for slot in range(env.timeline_length):
block = block_at_slot(slot)
time_col, body_col, action_col = st.columns([1.0, 4.25, 1.95])
time_col.markdown(f"**{env._slot_label(slot)}**")
if block is None:
label = "Open"
else:
label = f"{block['type']} | {block['label']}"
if slot == current_slot:
body_col.markdown(f"`NOW` {label}")
elif selection_start is not None and min(selection_start, slot) <= slot <= max(selection_start, slot):
body_col.markdown(f"`SELECTED` {label}")
else:
body_col.markdown(label)
if move_block_id:
if action_col.button("Drop", key=f"slot_drop_{slot}", use_container_width=True, type="secondary"):
move_block(move_block_id, slot)
elif block is not None:
start_col, edit_col = action_col.columns([1, 1])
start_text = "Finish" if selection_start is not None else "Start"
if start_col.button(start_text, key=f"slot_start_{slot}", use_container_width=True, type="primary"):
if selection_start is None:
st.session_state.selection_start = slot
else:
st.session_state.selected_range = (selection_start, slot)
st.rerun()
if edit_col.button("Edit", key=f"slot_edit_{slot}", use_container_width=True, type="secondary"):
st.session_state.selected_block_id = block["id"]
st.session_state.selection_start = None
st.session_state.selected_range = None
st.rerun()
else:
button_text = "Start" if selection_start is None else "Finish"
if action_col.button(button_text, key=f"slot_open_{slot}", use_container_width=True, type="primary"):
if selection_start is None:
st.session_state.selection_start = slot
else:
st.session_state.selected_range = (selection_start, slot)
st.rerun()
if selected_range is not None:
start, end = sorted(selected_range)
st.markdown("#### New block")
block_type = st.radio("Type", options=BLOCK_TYPES, horizontal=True, key="new_block_type")
armed_task_index = st.session_state.get("armed_task_index")
if block_type == "Focus" and armed_task_index is not None and armed_task_index < len(st.session_state.get("task_names", [])):
default_label = st.session_state["task_names"][armed_task_index]
else:
default_label = ""
label = st.text_input("Label", value=default_label, key="new_block_label")
priority = st.slider("Meeting priority", 1, 10, 5, key="new_block_priority")
a, b, c = st.columns(3)
if a.button("Create block", use_container_width=True):
if block_type == "Focus" and armed_task_index is not None:
place_armed_task(start, end)
else:
create_block(start, end, block_type, label, int(priority))
if b.button("Clear to open", use_container_width=True):
remove_overlaps(start, end)
apply_blocks_to_env()
clear_selection()
st.rerun()
if c.button("Cancel selection", use_container_width=True):
clear_selection()
st.rerun()
if selected_block is not None:
st.markdown("#### Edit block")
block_type = st.radio("Block type", options=BLOCK_TYPES, index=BLOCK_TYPES.index(selected_block["type"]), horizontal=True, key="edit_block_type")
label = st.text_input("Block label", value=selected_block["label"], key="edit_block_label")
priority = st.slider("Meeting priority", 1, 10, int(selected_block.get("priority", 5)), key="edit_block_priority")
a, b, c = st.columns(3)
if a.button("Save block", use_container_width=True):
update_block(selected_block["id"], block_type, label, int(priority))
if b.button("Move block", use_container_width=True):
st.session_state.move_block_id = selected_block["id"]
st.rerun()
if c.button("Delete block", use_container_width=True):
delete_block(selected_block["id"])
if st.button("Close inspector", use_container_width=True):
clear_selection()
st.rerun()
def render_task_queue(env: FocusResourceEnv) -> None:
st.markdown("### Task Queue")
sync_tasks_from_widgets()
if not env.task_buffer:
st.info("The queue is empty. Add a task to keep planning.")
for i, task in enumerate(env.task_buffer):
cols = st.columns([2.0, 0.78, 0.92, 1.38, 0.52])
cols[0].text_input("Task", value=st.session_state.get("task_names", [default_task_name(i)])[i], key=f"task_name_{i}", label_visibility="collapsed")
cols[1].number_input("Slots", min_value=1, max_value=12, value=int(task.duration), key=f"task_slots_{i}", label_visibility="collapsed")
current_complexity = float(task.hidden_complexity)
cols[2].selectbox("Complexity", COMPLEXITY_OPTIONS, index=COMPLEXITY_OPTIONS.index(current_complexity) if current_complexity in COMPLEXITY_OPTIONS else 0, key=f"task_complexity_{i}", label_visibility="collapsed")
armed = st.session_state.get("armed_task_index") == i
if cols[3].button("Selected" if armed else "Use", key=f"use_task_{i}", use_container_width=True):
arm_task(i)
st.rerun()
if cols[4].button("X", key=f"cancel_task_{i}", use_container_width=True):
sync_tasks_from_widgets()
env.task_buffer.pop(i)
st.session_state.task_names.pop(i)
st.session_state.armed_task_index = None
st.rerun()
st.markdown("#### Add task")
a, b, c, d = st.columns([2.2, 0.8, 1.0, 0.9])
task_name = a.text_input("Name", value="", placeholder="Execution block", key="add_task_name")
slots = b.number_input("Slots", min_value=1, max_value=12, value=2, key="add_task_slots")
complexity = c.selectbox("Complexity", COMPLEXITY_OPTIONS, index=1, key="add_task_complexity")
if d.button("Add", use_container_width=True):
sync_tasks_from_widgets()
env.task_buffer.append(Task(duration=int(slots), hidden_complexity=float(complexity)))
st.session_state.task_names.append(task_name.strip() or default_task_name(len(st.session_state.task_names)))
st.rerun()
def render_status(observation: Dict[str, object]) -> None:
st.markdown("### Live Scoring")
scores = st.session_state.get("last_info", {}).get("score_breakdown", {})
current_block = block_at_slot(int(observation["current_slot"]))
a, b = st.columns(2)
a.metric("Flow efficiency", f"{scores.get('flow_score', observation.get('flow_score', 0.0)):.2f}")
recovery_state = int(observation.get("recovery_state", 0))
card_class = "status-card-warning" if recovery_state > 0 else "status-card-subtle"
current_flow_label = current_block["label"] if current_block and current_block["type"] == "Focus" else "Open"
current_flow_note = "Recovery is active. Focus output is temporarily reduced." if recovery_state > 0 else "Flow is live and ready to compound."
with b:
st.markdown(
f"""
Current Flow Block
{current_flow_label}
{current_flow_note}
""",
unsafe_allow_html=True,
)
c, d = st.columns(2)
c.metric("Social Debt", f"{scores.get('social_debt', observation.get('social_debt', 0.0)):.2f}")
d.metric("Calendar Churn", int(scores.get('calendar_churn', observation.get('calendar_churn', 0))))
def render_simulator(observation: Dict[str, object]) -> None:
st.markdown("### Simulation Command Center")
quiet_mode = bool(observation.get("mute_comms", False))
st.caption(
"?? Focus Fortress: Quiet Mode is active. Notifications are suppressed, boosting your continuous work potential."
if quiet_mode
else "?? Focus Fortress is standing by. Activate it when you want maximum uninterrupted focus potential."
)
a, b = st.columns(2)
if a.button("Deactivate Focus Fortress" if quiet_mode else "Activate Focus Fortress", use_container_width=True):
run_simulation_step(OP_MUTE_COMMS)
if b.button("Step Simulator", use_container_width=True):
run_simulation_step(OP_IDLE)
if st.button("Move next meeting", use_container_width=True):
run_simulation_step(OP_RESCHEDULE_MEETING)
def main() -> None:
st.set_page_config(page_title="Focus Studio", layout="wide")
init_state()
inject_styles()
env = get_env()
if env is None:
render_flash_error()
st.stop()
observation = st.session_state.get("observation", env._observation())
render_flash_error()
render_header(observation)
col1, col2, col3 = st.columns([0.88, 1.5, 1.28], gap="large")
with col1:
render_setup()
with col2:
render_calendar(env)
with col3:
render_task_queue(env)
render_status(observation)
render_simulator(observation)
if __name__ == "__main__":
main()