pegavisao / admin.py
Roudrigus's picture
Upload 17 files
3168916 verified
# admin.py
# -*- coding: utf-8 -*-
import os
import io
import re
import json
import sqlite3
import zipfile
import traceback
import time as _time
from datetime import datetime, date, time, timedelta
from typing import Optional, Dict, Any, List, Tuple
import pandas as pd
import streamlit as st
from sqlalchemy import func
# ==========================
# Imports de modelos
# ==========================
from models import (
Course, Lesson, Material, Schedule, Enrollment, Grade, MakeupRequest, Certificate
)
# ==========================
# Imports com fallback (modular → raiz)
# ==========================
try:
from core.db_ready import ensure_db_ready, commit_with_retry
except Exception:
from db_ready import ensure_db_ready, commit_with_retry
try:
from core.student_auth import set_student_password
except Exception:
from student_auth import set_student_password
try:
from core.video_access import (
get_video_visibility, set_video_visibility,
get_video_acl_emails, set_video_acl_emails,
is_video_allowed_for_student,
_ensure_video_access_tables as ensure_video_access_tables,
)
except Exception:
from video_access import (
get_video_visibility, set_video_visibility,
get_video_acl_emails, set_video_acl_emails,
is_video_allowed_for_student,
)
# Fallback simples se a função ensure não existir na raiz
def ensure_video_access_tables():
pass
try:
from core.presence import (
get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC,
_ensure_presence_table as ensure_presence_table,
)
except Exception:
from presence import get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC
def ensure_presence_table():
pass
try:
from core.comments import (
get_comments, add_comment, delete_comment,
_ensure_comments_table as ensure_comments_table,
)
except Exception:
from comments import get_comments, add_comment, delete_comment
def ensure_comments_table():
pass
try:
from core.backup_hf import (
HF_HUB_AVAILABLE, sync_to_hf_dataset,
auto_backup_tick as _auto_backup_tick,
_sanitize_repo_id,
)
except Exception:
from backup_hf import (
HF_HUB_AVAILABLE, sync_to_hf_dataset,
auto_backup_tick as _auto_backup_tick,
_sanitize_repo_id,
)
try:
from ui.gallery import aba_galeria_relatorios
except Exception:
from gallery import aba_galeria_relatorios
# Settings (suporta pasta 'Config' e 'config')
try:
from Config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC
except Exception:
try:
from config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC
except Exception:
# Fallbacks
import hashlib as _hashlib
ADMIN_FIXED_PASS_HASH = _hashlib.sha256("21003887".encode()).hexdigest()
AUTO_BKP_INTERVAL_SEC = 600
# Utilitários
try:
from utils import save_uploaded_file, human_dt, generate_certificate_pdf, ADMIN_USER
except Exception:
# Fallbacks simples se algo não existir
ADMIN_USER = "admin"
def save_uploaded_file(file, base_dir="data"):
out_dir = os.path.join(base_dir)
os.makedirs(out_dir, exist_ok=True)
path = os.path.join(out_dir, file.name)
with open(path, "wb") as f:
f.write(file.getbuffer())
return path
def human_dt(dt_obj):
try:
return pd.to_datetime(dt_obj).strftime("%d/%m/%Y %H:%M")
except Exception:
return str(dt_obj)
def generate_certificate_pdf(course_title: str, student_name: str, output_dir: str) -> str:
os.makedirs(output_dir, exist_ok=True)
# Gera um PDF simples de placeholder
path = os.path.join(output_dir, f"cert_{student_name.replace(' ', '_')}.pdf")
with open(path, "wb") as f:
f.write(b"%PDF-1.4\n% placeholder certificate\n")
return path
# Path do banco
try:
from db import DB_PATH
except Exception:
DB_PATH = os.path.join("data", "db.sqlite")
# ==========================
# Helpers locais (texto, YouTube, tópico da agenda)
# ==========================
def is_url(s: Optional[str]) -> bool:
s = (s or "").strip().lower()
return s.startswith("http://") or s.startswith("https://")
def normalize_youtube_url(url: str) -> str:
if not url:
return url
u = url.strip()
m = re.search(r"(?:https?://)?(?:www\.)?youtu\.be/([A-Za-z0-9_-]{6,})", u)
if m:
vid = m.group(1)
return f"https://www.youtube.com/watch?v={vid}"
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([A-Za-z0-9_-]{6,})", u)
if m:
return f"https://www.youtube.com/watch?v={m.group(1)}"
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([A-Za-z0-9_-]{6,})", u)
if m:
return f"https://www.youtube.com/watch?v={m.group(1)}"
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([A-Za-z0-9_-]{6,})", u)
if m:
return f"https://www.youtube.com/watch?v={m.group(1)}"
return u
def parse_schedule_topic(topic_raw: str) -> Dict[str, Any]:
defaults = {
"topic": None,
"module_id": None,
"assigned_emails": [],
"occupancy": "",
"module_index": None,
"details": None,
"descricao": None,
"semana": None,
"off_schedule": False,
"aluno_nome": None,
"matricula": None,
}
if not topic_raw:
return defaults
s = (topic_raw or "").strip()
# 1) JSON
try:
obj = json.loads(s)
if isinstance(obj, dict):
out = defaults.copy()
out["topic"] = obj.get("topic") or obj.get("titulo") or obj.get("assunto") or None
out["module_id"] = obj.get("module_id") or obj.get("mod_id") or obj.get("lesson_id") or None
emails = obj.get("assigned_emails") or obj.get("emails") or []
if isinstance(emails, str):
emails = [e.strip().lower() for e in emails.split(",") if e.strip()]
elif isinstance(emails, list):
emails = [(e or "").strip().lower() for e in emails if (e or "").strip()]
else:
emails = []
out["assigned_emails"] = emails
out["occupancy"] = (obj.get("occupancy") or obj.get("ocupacao") or "").strip()
out["module_index"] = obj.get("module_index") or obj.get("indice") or None
out["details"] = obj.get("details")
out["descricao"] = obj.get("descricao")
out["semana"] = obj.get("semana")
out["off_schedule"] = bool(obj.get("off_schedule", False))
out["aluno_nome"] = obj.get("aluno_nome")
out["matricula"] = obj.get("matricula")
return out
except Exception:
pass
# 2) "k: v; k: v"
try:
parts = [p.strip() for p in re.split(r"[;|]\s*", s) if p.strip()]
kv = {}
for p in parts:
if ":" in p:
k, v = p.split(":", 1)
kv[k.strip().lower()] = v.strip()
if kv:
out = defaults.copy()
out["topic"] = kv.get("topic") or kv.get("assunto") or None
mid = kv.get("module_id")
out["module_id"] = int(mid) if (mid and str(mid).isdigit()) else None
emails = kv.get("assigned_emails") or kv.get("emails") or ""
out["assigned_emails"] = [e.strip().lower() for e in emails.split(",") if e.strip()]
out["occupancy"] = kv.get("occupancy") or kv.get("ocupacao") or ""
out["module_index"] = kv.get("module_index") or kv.get("indice") or None
out["details"] = kv.get("details") or kv.get("descricao")
out["descricao"] = kv.get("descricao")
out["semana"] = kv.get("semana")
out["off_schedule"] = str(kv.get("off_schedule", "")).strip().lower() in ("1","true","sim","yes")
out["aluno_nome"] = kv.get("aluno_nome")
out["matricula"] = kv.get("matricula")
return out
except Exception:
pass
out = defaults.copy()
out["topic"] = s
return out
def _fmt_duration(sec: int) -> str:
if sec is None:
return "—"
sec = int(max(0, sec))
m, s = divmod(sec, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
parts = []
if d: parts.append(f"{d}d")
if h: parts.append(f"{h}h")
if m: parts.append(f"{m}m")
parts.append(f"{s}s")
return " ".join(parts)
# ==========================================================
# Helpers de agenda (importação/limpeza) — mesmos do seu projeto
# ==========================================================
def _strip_accents_ag(s: str) -> str:
import unicodedata
if s is None:
return s
return ''.join(ch for ch in unicodedata.normalize('NFKD', str(s)) if not unicodedata.combining(ch))
def _norm_col_ag(col: str) -> str:
col = _strip_accents_ag(col).lower()
col = re.sub(r'\W+', '_', col)
return col.strip('_')
def _normalize_semana_label(s: str) -> str:
if not s:
return s
s = s.strip()
s = re.sub(r'^semama', 'Semana', s, flags=re.I) # corrige "Semama9"
m = re.match(r'^(semana)\s*([0-9]+)$', s, flags=re.I)
if m:
s = f"Semana{m.group(2)}"
return s
def _split_horario_range(horario_str: str) -> Tuple[Optional[str], Optional[str]]:
if not horario_str:
return None, None
s = str(horario_str).strip()
s = re.sub(r'[–—−]+', '-', s) # normaliza qualquer traço para '-'
parts = re.split(r'\s*-\s*', s)
if len(parts) == 2:
return parts[0], parts[1]
return None, None
def _parse_tsv_text(text: str) -> pd.DataFrame:
buf = io.StringIO(text.strip())
df = pd.read_csv(
buf,
sep='\t',
engine='python',
quoting=0, # csv.QUOTE_MINIMAL
quotechar='"',
keep_default_na=False
)
return df
def _read_agenda_uploaded(file) -> Optional[pd.DataFrame]:
name = (file.name or "").lower()
try:
if name.endswith(('.xlsx', '.xls')):
return pd.read_excel(file, engine='openpyxl')
elif name.endswith(('.csv', '.tsv', '.txt')):
content = file.read().decode('utf-8-sig', errors='ignore')
try:
return _parse_tsv_text(content)
except Exception:
return pd.read_csv(io.StringIO(content), sep=',', engine='python', quoting=0, quotechar='"', keep_default_na=False)
else:
st.error("Formato não suportado. Use CSV/TSV/TXT/Excel.")
return None
except Exception as e:
st.error(f"Falha ao ler arquivo: {e}")
return None
def parse_time_hhmm(s: str) -> Optional[time]:
try:
hh, mm = s.split(":")
return time(int(hh), int(mm))
except Exception:
return None
def parse_time_range(rng: str) -> Tuple[Optional[time], Optional[time]]:
if not rng:
return None, None
s = (rng or "").replace("–", "-").replace("—", "-").replace("−", "-")
parts = [p.strip() for p in s.split("-") if p.strip()]
if len(parts) != 2:
return None, None
return parse_time_hhmm(parts[0]), parse_time_hhmm(parts[1])
def _norm(s: str) -> str:
s = (s or "").strip().lower()
s = (s.replace("á","a").replace("à","a").replace("ã","a").replace("â","a")
.replace("é","e").replace("ê","e")
.replace("í","i")
.replace("ó","o").replace("ô","o").replace("õ","o")
.replace("ú","u").replace("ç","c"))
return s
def _clean_agenda_df(df_raw: pd.DataFrame) -> pd.DataFrame:
if df_raw is None or df_raw.empty:
return pd.DataFrame()
df = df_raw.copy()
df.columns = [_norm_col_ag(c) for c in df.columns]
aliases = {
'semana': 'semana',
'data': 'data',
'dia': 'dia',
'horario': 'horario',
'ocupacao': 'ocupacao',
'aluno': 'aluno',
'mat': 'matricula',
'mat_': 'matricula',
'descricao': 'descricao',
'tema': 'tema'
}
rename = {}
for c in df.columns:
if c in aliases:
rename[c] = aliases[c]
elif c.startswith('mat'):
rename[c] = 'matricula'
else:
rename[c] = c
df = df.rename(columns=rename)
required = ['semana','data','dia','horario','ocupacao','aluno','matricula','descricao','tema']
for col in required:
if col not in df.columns:
df[col] = ""
for c in df.columns:
if df[c].dtype == object:
df[c] = df[c].astype(str).str.strip()
df['semana'] = df['semana'].apply(_normalize_semana_label)
df['data'] = pd.to_datetime(df['data'], dayfirst=True, errors='coerce')
ini_fim = df['horario'].apply(_split_horario_range)
df['inicio_hhmm'] = [t[0] if t else None for t in ini_fim]
df['fim_hhmm'] = [t[1] if t else None for t in ini_fim]
df['fora_do_cronograma'] = df['tema'].str.contains(r'\(fora do cronograma\)', case=False, na=False)
df['tema'] = df['tema'].str.replace(r'\s*\(fora do cronograma\)\s*', '', regex=True).str.strip()
df['vago'] = df['aluno'].str.strip().str.upper().eq('VAGO')
try:
df['matricula'] = pd.to_numeric(df['matricula'], errors='ignore')
except Exception:
pass
df = df.drop_duplicates(subset=['semana','data','dia','horario','aluno','tema','matricula']).reset_index(drop=True)
return df
# ==========================================================
# FullCalendar helpers
# ==========================================================
from streamlit_calendar import calendar as st_calendar
from plotly import colors as pcolors
def _to_json_safe(obj):
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, datetime):
return obj.replace(microsecond=0).isoformat()
if isinstance(obj, date):
return obj.isoformat()
if isinstance(obj, time):
return obj.strftime("%H:%M:%S")
if isinstance(obj, dict):
return {str(k): _to_json_safe(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple, set)):
return [_to_json_safe(v) for v in obj]
return str(obj)
def json_sanitize_events(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
safe = []
for ev in events:
ev_copy = _to_json_safe(json.loads(json.dumps(ev, ensure_ascii=False)))
safe.append(ev_copy)
return safe
def _make_color_map(emails: List[str]) -> Dict[str, str]:
palette = (
pcolors.qualitative.Plotly
+ pcolors.qualitative.D3
+ pcolors.qualitative.Set3
+ pcolors.qualitative.Pastel
+ pcolors.qualitative.Vivid
)
cmap = {}
idx = 0
for e in sorted(set(emails)):
if e in ("Sem aluno",):
cmap[e] = "#7f8c8d"
else:
cmap[e] = palette[idx % len(palette)]
idx += 1
return cmap
def build_fullcalendar_events(agenda_list: List[Schedule], mods_by_id: Dict[int, Lesson]):
rows = []
who = set()
for a in agenda_list:
meta = parse_schedule_topic(a.topic)
mod = mods_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
title = mod.title if mod else (meta.get("topic") or "—")
desc = meta.get("details") or meta.get("descricao") or (meta.get("topic") or "")
occupancy = meta.get("occupancy") or ""
module_index = meta.get("module_index") or ""
if not a.class_date:
continue
start_dt = datetime.combine(a.class_date, a.start_time or time(0, 0))
end_dt = datetime.combine(a.class_date, a.end_time or (a.start_time or time(0, 0)))
if end_dt <= start_dt:
end_dt = start_dt + timedelta(minutes=60)
emails = meta.get("assigned_emails") or []
if emails:
for em in emails:
who.add(em)
rows.append({
"title": title,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"who": em,
"extendedProps": {
"aluno": em,
"ocupacao": occupancy,
"mod_index": module_index,
"descricao": desc,
"data": a.class_date.strftime("%d/%m/%Y"),
"inicio": a.start_time.strftime("%H:%M") if a.start_time else "",
"fim": a.end_time.strftime("%H:%M") if a.end_time else "",
},
})
else:
who.add("Sem aluno")
rows.append({
"title": title,
"start": start_dt.isoformat(),
"end": end_dt.isoformat(),
"who": "Sem aluno",
"extendedProps": {
"aluno": "Sem aluno",
"ocupacao": occupancy,
"mod_index": module_index,
"descricao": desc,
"data": a.class_date.strftime("%d/%m/%Y"),
"inicio": a.start_time.strftime("%H:%M") if a.start_time else "",
"fim": a.end_time.strftime("%H:%M") if a.end_time else "",
},
})
color_map = _make_color_map(list(who))
for ev in rows:
ev["color"] = color_map.get(ev.get("who"), "#7f8c8d")
return rows, color_map
def render_calendar(events, options=None, custom_css=None, key=None):
try:
import inspect
sig = inspect.signature(st_calendar)
except Exception:
return st_calendar(events=events, options=options, key=key)
kwargs = {"events": events}
if "options" in sig.parameters:
kwargs["options"] = options
if "custom_css" in sig.parameters and custom_css:
kwargs["custom_css"] = custom_css
if "key" in sig.parameters and key:
kwargs["key"] = key
return st_calendar(**kwargs)
# ==========================================================
# Utilidades: reset e normalização
# ==========================================================
def reset_database(db) -> Tuple[bool, Optional[str]]:
try:
db.query(Certificate).delete(synchronize_session=False)
db.query(Grade).delete(synchronize_session=False)
db.query(MakeupRequest).delete(synchronize_session=False)
db.query(Enrollment).delete(synchronize_session=False)
db.query(Schedule).delete(synchronize_session=False)
db.query(Material).delete(synchronize_session=False)
db.query(Lesson).delete(synchronize_session=False)
db.query(Course).delete(synchronize_session=False)
commit_with_retry(db)
# Auxiliares SQLite
ensure_comments_table()
ensure_presence_table()
ensure_video_access_tables()
with sqlite3.connect(DB_PATH) as conn:
c = conn.cursor()
c.execute("DELETE FROM student_auth")
c.execute("DELETE FROM video_access_list")
c.execute("DELETE FROM video_access")
c.execute("DELETE FROM presence")
c.execute("DELETE FROM comments")
conn.commit()
return True, None
except Exception as e:
db.rollback()
return False, str(e)
def normalize_and_dedup_enrollments(db) -> Tuple[int, int]:
ens = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all()
normalized = 0
removed = 0
for e in ens:
norm = (e.student_email or "").strip().lower()
if e.student_email != norm:
e.student_email = norm
normalized += 1
commit_with_retry(db)
seen = {}
ens2 = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all()
for e in ens2:
key = (e.course_id, e.student_email)
if key in seen:
db.delete(e)
removed += 1
else:
seen[key] = e.id
commit_with_retry(db)
return normalized, removed
# ==========================================================
# Admin login/logout + presença
# ==========================================================
def admin_login_ui() -> None:
st.subheader("🔐 Login do Administrador")
with st.form("admin_login", clear_on_submit=False):
u = st.text_input("Usuário", value="", placeholder="admin")
p = st.text_input("Senha", value="", type="password")
ok = st.form_submit_button("Entrar", type="primary")
if ok:
import hashlib
if u == ADMIN_USER and (ADMIN_FIXED_PASS_HASH == hashlib.sha256(p.encode()).hexdigest()):
st.session_state.is_admin = True
st.session_state.admin_user = u
st.success("Login realizado.")
st.rerun()
else:
st.error("Credenciais inválidas.")
def admin_logout_button() -> None:
col1, _ = st.columns([1, 5])
with col1:
if st.button("Sair", type="secondary"):
st.session_state.is_admin = False
st.session_state.admin_user = None
st.rerun()
def _admin_presence_ping(page_label: str):
role = "admin" if st.session_state.get("is_admin") else "visitante"
email = None
nome = st.session_state.get("admin_user") if st.session_state.get("is_admin") else None
try:
from core.presence import update_presence
except Exception:
from presence import update_presence
update_presence(role=role, page=page_label, user_email=email, user_name=nome)
# ==========================================================
# View do Admin
# ==========================================================
def render() -> None:
_admin_presence_ping(page_label="admin_home")
if not st.session_state.is_admin:
admin_login_ui()
return
st.header("🛠️ Painel do Administrador — Pega a Visão")
admin_logout_button()
db = ensure_db_ready()
try:
tab_curso, tab_agenda, tab_avaliacao, tab_repos, tab_alunos, tab_turma, tab_interacao, tab_galeria, tab_reset = st.tabs([
"Cursos & Módulos", "Agenda", "Avaliações & Certificados", "Reposições", "Cadastrar Alunos",
"Turma em andamento", "👥 Interações", "📷 Galeria de Relatórios", "💾 Reset/Backup"
])
# ================== CURSOS & MÓDULOS ==================
with tab_curso:
st.subheader("Cadastrar curso")
with st.form("curso_form", clear_on_submit=True):
titulo = st.text_input("Título do curso")
desc = st.text_area("Descrição")
ok_curso = st.form_submit_button("Salvar curso", type="primary")
if ok_curso:
if not titulo:
st.error("Informe um título.")
else:
c = Course(title=titulo.strip(), description=desc.strip() if desc else None)
db.add(c)
commit_with_retry(db)
st.success("Curso criado.")
st.divider()
cursos = db.query(Course).order_by(Course.title.asc()).all()
if not cursos:
st.info("Nenhum curso.")
else:
idx = st.selectbox("Gerenciar curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title)
curso = cursos[idx]
st.markdown(f"### Curso: **{curso.title}**")
with st.expander("Editar curso (nome e descrição)", expanded=False):
novo_nome = st.text_input("Nome do curso", value=curso.title, key=f"curso_nome_edit_{curso.id}")
nova_desc = st.text_area("Descrição do curso", value=curso.description or "", key=f"curso_desc_edit_{curso.id}", height=120)
if st.button("💾 Salvar alterações do curso", key=f"btn_save_curso_{curso.id}"):
if not novo_nome.strip():
st.error("Informe um nome válido para o curso.")
else:
curso.title = novo_nome.strip()
curso.description = (nova_desc or "").strip()
commit_with_retry(db)
st.success("Curso atualizado com sucesso.")
st.rerun()
up_tabs = st.tabs(["Módulos (vídeo)", "Apostilas (PDF)"])
# ---- Módulos (vídeo)
with up_tabs[0]:
st.caption("Cadastrar módulo do curso (vídeo opcional).")
with st.form("add_module", clear_on_submit=True):
lt = st.text_input("Título do módulo")
ld = st.text_area("Descrição (opcional)")
yt_url_raw = st.text_input("URL do vídeo (YouTube — opcional)", placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID")
f = st.file_uploader("Arquivo de vídeo (opcional) — .mp4/.mkv/.mov/.webm", type=["mp4", "mkv", "mov", "webm"])
ok = st.form_submit_button("Adicionar módulo")
if ok:
if not lt:
st.error("Informe um título para o módulo.")
else:
vpath = None
url_norm = normalize_youtube_url(yt_url_raw.strip()) if yt_url_raw else None
if f is not None:
if url_norm:
st.info("Arquivo local enviado. A URL do YouTube foi ignorada (prioridade para arquivo local).")
vpath = save_uploaded_file(f, base_dir="data/videos")
elif url_norm:
vpath = url_norm
les = Lesson(course_id=curso.id, title=lt.strip(), description=ld.strip() if ld else None, video_path=vpath)
db.add(les)
commit_with_retry(db)
st.success("Módulo adicionado.")
st.markdown("#### Módulos cadastrados")
lessons = db.query(Lesson).filter(Lesson.course_id == curso.id).order_by(Lesson.created_at.asc()).all()
if not lessons:
st.caption("Sem módulos cadastrados ainda.")
else:
for les in lessons:
with st.expander(f"🎬 {les.title}", expanded=False):
new_title = st.text_input("Título do módulo", value=les.title, key=f"mod_title_{les.id}")
new_desc = st.text_area("Descrição", value=les.description or "", key=f"mod_desc_{les.id}")
if st.button("💾 Salvar alterações", key=f"mod_save_{les.id}"):
les.title = (new_title or les.title).strip()
les.description = (new_desc or "").strip()
commit_with_retry(db)
st.success("Módulo atualizado.")
st.rerun()
st.markdown("---")
st.markdown("**Vídeo do módulo**")
has_video = bool(les.video_path and (is_url(les.video_path) or os.path.exists(les.video_path)))
if has_video:
if is_url(les.video_path):
st.video(les.video_path)
st.caption(f"Fonte atual: YouTube ({les.video_path})")
else:
st.video(les.video_path)
st.caption(f"Fonte atual: arquivo local `{os.path.basename(les.video_path)}`")
else:
st.info("Nenhum vídeo anexado ainda.")
if not has_video:
up_file = st.file_uploader(
"Enviar vídeo agora (opcional)",
type=["mp4", "mkv", "mov", "webm"],
key=f"mod_upvid_attach_{les.id}"
)
url_new_raw = st.text_input(
"… ou informar URL do YouTube (opcional)",
key=f"mod_url_attach_{les.id}",
placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID"
)
cols_attach = st.columns([1, 1])
if cols_attach[0].button("📤 Anexar arquivo de vídeo", key=f"mod_attach_file_{les.id}"):
if not up_file:
st.warning("Selecione um arquivo de vídeo.")
else:
try:
new_path = save_uploaded_file(up_file, base_dir="data/videos")
les.video_path = new_path
commit_with_retry(db)
st.success("Vídeo (arquivo) anexado ao módulo.")
st.rerun()
except Exception as e:
st.error(f"Falha ao anexar vídeo: {e}")
if cols_attach[1].button("🔗 Anexar URL do YouTube", key=f"mod_attach_url_{les.id}"):
url_norm = normalize_youtube_url(url_new_raw or "")
if not url_norm or not is_url(url_norm):
st.warning("Informe uma URL válida do YouTube.")
else:
les.video_path = url_norm
commit_with_retry(db)
st.success("URL do YouTube anexada ao módulo.")
st.rerun()
else:
up_file = st.file_uploader(
"Substituir por novo arquivo",
type=["mp4", "mkv", "mov", "webm"],
key=f"mod_upvid_replace_{les.id}"
)
url_new_raw = st.text_input(
"… ou substituir por URL do YouTube",
key=f"mod_url_replace_{les.id}",
placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID"
)
c1, c2, c3 = st.columns([1, 1, 1])
with c1:
if up_file and st.button("🔁 Substituir por arquivo", key=f"mod_sendvid_{les.id}"):
try:
new_path = save_uploaded_file(up_file, base_dir="data/videos")
try:
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
os.remove(les.video_path)
except Exception:
pass
les.video_path = new_path
commit_with_retry(db)
st.success("Vídeo substituído por arquivo local com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Falha ao substituir vídeo: {e}")
with c2:
if st.button("🔁 Substituir por URL do YouTube", key=f"mod_replace_url_{les.id}"):
url_norm = normalize_youtube_url(url_new_raw or "")
if not url_norm or not is_url(url_norm):
st.warning("Informe uma URL válido do YouTube.")
else:
try:
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
os.remove(les.video_path)
except Exception:
pass
les.video_path = url_norm
commit_with_retry(db)
st.success("Vídeo substituído por URL do YouTube.")
st.rerun()
with c3:
if st.button("🗑️ Remover vídeo (manter módulo)", key=f"mod_rmvid_{les.id}"):
try:
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
os.remove(les.video_path)
except Exception:
pass
les.video_path = None
commit_with_retry(db)
st.success("Vídeo removido do módulo.")
st.rerun()
# Controle de acesso
st.markdown("---")
st.markdown("🔒 **Acesso ao vídeo** (somente afeta vídeos; PDFs não são impactados)")
current_vis = get_video_visibility(les.id)
vis_label_to_value = {
"Público (qualquer aluno logado)": "public",
"Restrito (somente alunos selecionados)": "restricted",
"Privado (somente Admin)": "private",
}
vis_options = list(vis_label_to_value.keys())
try:
default_idx = vis_options.index(
[lbl for lbl, val in vis_label_to_value.items() if val == current_vis][0]
)
except Exception:
default_idx = 0
vis_sel_label = st.selectbox(
"Visibilidade",
options=vis_options,
index=default_idx,
key=f"vid_vis_{les.id}"
)
vis_value = vis_label_to_value[vis_sel_label]
ens_list = db.query(Enrollment).filter(Enrollment.course_id == curso.id).order_by(Enrollment.student_name.asc()).all()
alunos_labels = [f"{e.student_name} <{e.student_email}>" for e in ens_list]
alunos_map = {f"{e.student_name} <{e.student_email}>": e.student_email for e in ens_list}
allowed_emails = set(get_video_acl_emails(les.id))
preselected_labels = [lbl for lbl in alunos_labels if alunos_map[lbl] in allowed_emails]
if vis_value == "restricted":
sel_labels = st.multiselect(
"Selecione os alunos autorizados a assistir este vídeo",
options=alunos_labels,
default=preselected_labels,
key=f"vid_acl_{les.id}"
)
else:
sel_labels = []
cva1, cva2 = st.columns([1, 1])
if cva1.button("💾 Salvar visibilidade", key=f"btn_save_vis_{les.id}"):
try:
set_video_visibility(les.id, vis_value)
st.success("Visibilidade do vídeo atualizada.")
except Exception as e:
st.error(f"Falha ao salvar visibilidade: {e}")
if vis_value == "restricted":
if cva2.button("💾 Salvar lista de alunos autorizados", key=f"btn_save_acl_{les.id}"):
try:
emails_new = [alunos_map[lbl] for lbl in sel_labels]
set_video_acl_emails(les.id, emails_new)
st.success("Lista de alunos autorizada atualizada.")
except Exception as e:
st.error(f"Falha ao salvar lista de alunos: {e}")
else:
st.caption("Para definir alunos específicos, selecione a visibilidade **Restrito**.")
st.markdown("---")
if st.button("❌ Excluir módulo", key=f"del_mod_{les.id}"):
try:
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
os.remove(les.video_path)
except Exception:
pass
db.delete(les)
commit_with_retry(db)
st.success("Módulo removido.")
st.rerun()
# ---- Apostilas (PDF)
with up_tabs[1]:
st.caption("Adicionar apostila (PDF) ao curso")
with st.form("add_mat", clear_on_submit=True):
mt = st.text_input("Título da apostila")
pf = st.file_uploader("Arquivo PDF", type=["pdf"])
okm = st.form_submit_button("Adicionar")
if okm:
if not mt or not pf:
st.error("Informe título e selecione um PDF.")
else:
ppath = save_uploaded_file(pf, base_dir="data/pdfs")
m = Material(course_id=curso.id, title=mt.strip(), pdf_path=ppath)
db.add(m)
commit_with_retry(db)
st.success("Apostila adicionada.")
st.markdown("#### Apostilas cadastradas")
mats = db.query(Material).filter(Material.course_id == curso.id).order_by(Material.created_at.asc()).all()
for m in mats:
cols = st.columns([4, 2, 1])
cols[0].write(f"📄 **{m.title}**")
if m.pdf_path and os.path.exists(m.pdf_path):
try:
with open(m.pdf_path, "rb") as f:
cols[1].download_button("Baixar", data=f.read(), file_name=os.path.basename(m.pdf_path), key=f"dl_mat_{m.id}")
except Exception:
cols[1].warning("Falha ao abrir PDF.")
else:
cols[1].warning("Arquivo não encontrado.")
if cols[2].button("Excluir", key=f"del_mat_{m.id}"):
try:
if m.pdf_path and os.path.exists(m.pdf_path):
os.remove(m.pdf_path)
except Exception:
pass
db.delete(m)
commit_with_retry(db)
st.success("Apostila removida.")
st.rerun()
# ================== AGENDA (CRUD + Calendário) ==================
with tab_agenda:
st.subheader("Agenda por curso")
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
if not cursos_all:
st.info("Cadastre um curso primeiro.")
else:
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="agenda_sel_curso")
curso_ag = cursos_all[cidx]
lessons = db.query(Lesson).filter(Lesson.course_id == curso_ag.id).order_by(Lesson.created_at.asc()).all()
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_ag.id).order_by(Enrollment.student_name.asc()).all()
with st.expander("📥 Importar Agenda (TSV/CSV/TXT/Excel ou texto colado)", expanded=False):
st.caption("Formato: **Semana, Data, Dia, Horário, Ocupação, Aluno, Mat., Descrição, Tema**.")
col_up, col_paste = st.columns(2)
uploaded = col_up.file_uploader("Enviar arquivo", type=["csv","tsv","txt","xlsx","xls"], key=f"imp_ag_up_{curso_ag.id}")
pasted_text = col_paste.text_area("… ou cole aqui o conteúdo tabulado (TSV)", height=220, key=f"imp_ag_txt_{curso_ag.id}")
try_link_module = st.checkbox("Vincular módulo automaticamente pelo TEMA (título exato)", value=True, key=f"imp_ag_trylink_{curso_ag.id}")
btn_preview = st.button("🔍 Pré-visualizar", key=f"imp_ag_preview_{curso_ag.id}")
df_raw, df_norm = None, None
if btn_preview:
try:
if uploaded is not None:
df_raw = _read_agenda_uploaded(uploaded)
elif pasted_text.strip():
df_raw = _parse_tsv_text(pasted_text)
else:
st.warning("Envie um arquivo **ou** cole o texto tabulado.")
if df_raw is not None and not df_raw.empty:
st.success(f"Dados lidos. Linhas: {len(df_raw)}")
with st.container():
st.markdown("**Prévia (dados brutos)**")
st.dataframe(df_raw.head(15), use_container_width=True)
df_norm = _clean_agenda_df(df_raw)
if df_norm.empty:
st.warning("Não foi possível normalizar. Verifique o cabeçalho e o conteúdo.")
else:
preview_cols = ['semana','data','dia','horario','inicio_hhmm','fim_hhmm','ocupacao','aluno','matricula','tema','fora_do_cronograma','vago']
for c in preview_cols:
if c not in df_norm.columns:
df_norm[c] = ""
st.subheader("✅ Prévia normalizada")
st.dataframe(df_norm[preview_cols], use_container_width=True, hide_index=True)
st.session_state[f"imp_df_norm_{curso_ag.id}"] = df_norm.to_dict(orient='list')
else:
st.info("Sem dados para pré-visualizar.")
except Exception as e:
st.error(f"Falha ao pré-visualizar: {e}")
btn_import = st.button("📦 Importar para este curso", key=f"imp_ag_do_{curso_ag.id}")
if btn_import:
payload = st.session_state.get(f"imp_df_norm_{curso_ag.id}")
if not payload:
st.warning("Faça a pré-visualização primeiro.")
else:
try:
df_norm = pd.DataFrame(payload)
lessons_by_title_norm = { _norm(l.title): l for l in lessons }
enrolls_by_name_norm = { _norm(e.student_name): e for e in ens }
agenda_exist = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).all()
def _key_ev(a: Schedule, meta: Dict[str,Any]) -> tuple:
topic = meta.get("topic") or ""
emails = meta.get("assigned_emails") or []
em = (emails[0] if emails else "").strip().lower()
return (a.class_date, a.start_time, a.end_time, topic.strip().lower(), em)
exist_keys = set()
for a in agenda_exist:
m = parse_schedule_topic(a.topic)
exist_keys.add(_key_ev(a, m))
created, skipped = 0, 0
for _, r in df_norm.iterrows():
d = r.get('data')
ini = r.get('inicio_hhmm')
fim = r.get('fim_hhmm')
tema = (r.get('tema') or "").strip()
ocup = (r.get('ocupacao') or "").strip()
aluno_nome = (r.get('aluno') or "").strip()
vago = bool(r.get('vago'))
semana = r.get('semana') or None
details = (r.get('descricao') or "").strip()
fora = bool(r.get('fora_do_cronograma'))
if pd.isna(d) or not ini or not fim:
skipped += 1
continue
t1, t2 = parse_time_range(f"{ini}-{fim}")
if not t1 or not t2:
skipped += 1
continue
assigned = []
if not vago and aluno_nome:
e = enrolls_by_name_norm.get(_norm(aluno_nome))
if e:
assigned = [e.student_email.strip().lower()]
module_id = None
if try_link_module and tema:
l = lessons_by_title_norm.get(_norm(tema))
if l:
module_id = l.id
meta_obj = {
"topic": tema or None,
"module_id": module_id,
"assigned_emails": assigned,
"occupancy": ocup,
"module_index": None,
"details": details or None,
"descricao": details or None,
"semana": semana,
"off_schedule": fora,
"aluno_nome": (aluno_nome or None),
"matricula": r.get('matricula'),
}
topic_json = json.dumps(meta_obj, ensure_ascii=False)
key = (pd.to_datetime(d).date(), t1, t2, (tema or "").strip().lower(), (assigned[0] if assigned else ""))
if key in exist_keys:
skipped += 1
continue
ag = Schedule(
course_id=curso_ag.id,
class_date=pd.to_datetime(d).date(),
start_time=t1,
end_time=t2,
topic=topic_json
)
db.add(ag)
commit_with_retry(db)
exist_keys.add(key)
created += 1
st.session_state.cal_ver += 1
st.success(f"Importação concluída. Criados: {created} · Ignorados (duplicados ou inválidos): {skipped}")
st.rerun()
except Exception as e:
db.rollback()
st.error(f"Falha ao importar: {e}")
with st.form("agenda_add", clear_on_submit=True):
d = st.date_input("Data da aula")
colh1, colh2 = st.columns(2)
t1 = colh1.time_input("Início", value=time(19, 0))
t2 = colh2.time_input("Fim", value=time(20, 0))
mod_opt = st.selectbox("Vincular a um módulo (opcional)", options=["— sem vínculo —"] + [f"[{i+1}] {l.title}" for i, l in enumerate(lessons)])
alunos_opts = [f"{e.student_name} <{e.student_email}>" for e in ens]
alunos_sel = st.multiselect("Atribuir alunos (opcional)", options=alunos_opts)
ocup = st.text_input("Ocupação (ex.: AO VIVO, Gravado…) (opcional)", value="")
mod_index = st.text_input("Índice do módulo (opcional)", value="")
tema_manual = st.text_input("Tema/descrição (caso não vincule módulo)", value="")
ok_add = st.form_submit_button("Adicionar à agenda", type="primary")
if ok_add:
try:
module_id = None
if mod_opt != "— sem vínculo —":
idx_mod = int(mod_opt.split("]")[0].replace("[",""))
module_id = lessons[idx_mod-1].id
emails = [re.search(r"<(.+?)>", s).group(1).strip().lower() for s in alunos_sel] if alunos_sel else []
topic_obj = {
"topic": (tema_manual.strip() if tema_manual.strip() else None),
"module_id": module_id,
"assigned_emails": emails,
"occupancy": ocup.strip(),
"module_index": (mod_index.strip() or None),
}
ag = Schedule(
course_id=curso_ag.id,
class_date=d,
start_time=t1,
end_time=t2,
topic=json.dumps(topic_obj, ensure_ascii=False),
)
db.add(ag)
commit_with_retry(db)
st.success("Aula adicionada à agenda.")
st.rerun()
except Exception as e:
st.error(f"Falha ao adicionar à agenda: {e}")
st.markdown("#### Itens da agenda")
agenda = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all()
if not agenda:
st.info("Sem agenda cadastrada.")
else:
lessons_by_id = {l.id: l for l in lessons}
rows = []
for a in agenda:
meta = parse_schedule_topic(a.topic)
mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
rows.append({
"ID": a.id,
"Data": a.class_date.strftime("%d/%m/%Y") if a.class_date else "",
"Início": a.start_time.strftime("%H:%M") if a.start_time else "",
"Fim": a.end_time.strftime("%H:%M") if a.end_time else "",
"Módulo/Assunto": (mod.title if mod else (meta.get("topic") or "—")),
"Alunos": ", ".join(meta.get("assigned_emails") or []),
"Ocupação": meta.get("occupancy") or "",
})
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
col_del1, col_del2 = st.columns([1,3])
del_id = col_del1.number_input("ID para excluir", min_value=0, value=0, step=1, key="agenda_del_id")
if col_del2.button("🗑️ Excluir item da agenda", disabled=(del_id<=0), key="agenda_btn_del"):
item = db.query(Schedule).filter(Schedule.id == int(del_id)).first()
if not item:
st.warning("ID não encontrado.")
else:
db.delete(item)
commit_with_retry(db)
st.success("Item excluído.")
st.rerun()
st.markdown("#### Calendário")
events, color_map = build_fullcalendar_events(agenda, lessons_by_id)
events = json_sanitize_events(events)
cal_options = {
"initialView": "dayGridMonth",
"locale": "pt-br",
"height": 740,
"headerToolbar": {
"left": "prev,next today",
"center": "title",
"right": "dayGridMonth,timeGridWeek,listWeek"
},
}
render_calendar(events=events, options=cal_options, key=f"adm_cal_{curso_ag.id}_{st.session_state.cal_ver}")
# ================== AVALIAÇÕES & CERTIFICADOS ==================
with tab_avaliacao:
st.subheader("Notas e Certificados")
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
if not cursos_all:
st.info("Cadastre um curso primeiro.")
else:
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="av_sel_curso")
curso_av = cursos_all[cidx]
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_av.id).order_by(Enrollment.student_name.asc()).all()
if not ens:
st.info("Cadastre alunos neste curso para lançar notas e emitir certificados.")
else:
nomes = [f"{e.student_name} <{e.student_email}>" for e in ens]
# ---- Notas
st.markdown("### 📝 Lançar nota")
with st.form("grade_form", clear_on_submit=True):
who = st.selectbox("Aluno", options=nomes)
nota = st.number_input("Nota", min_value=0.0, max_value=10.0, step=0.1)
obs = st.text_area("Observações (opcional)")
okg = st.form_submit_button("Salvar nota", type="primary")
if okg:
email = re.search(r"<(.+?)>", who).group(1).strip().lower()
nome = who.split(" <")[0].strip()
g = Grade(course_id=curso_av.id, student_name=nome, student_email=email, grade=float(nota), note=(obs.strip() or None))
db.add(g)
commit_with_retry(db)
st.success("Nota lançada.")
grades = db.query(Grade).filter(Grade.course_id == curso_av.id).order_by(Grade.created_at.desc()).all()
if grades:
df_g = pd.DataFrame([{"ID":g.id,"Aluno":g.student_name,"E-mail":g.student_email,"Nota":g.grade,"Obs":g.note,"Data":g.created_at} for g in grades])
st.dataframe(df_g, use_container_width=True, hide_index=True)
st.markdown("---")
# ---- Certificados
st.markdown("### 🎓 Emitir certificado")
with st.form("cert_form", clear_on_submit=True):
who2 = st.selectbox("Aluno", options=nomes, key="cert_who")
nome_c = st.text_input("Nome no certificado (como deve aparecer)", value=who2.split(" <")[0].strip())
okc = st.form_submit_button("Emitir certificado (PDF)", type="primary")
if okc:
try:
email_c = re.search(r"<(.+?)>", who2).group(1).strip().lower()
out_dir = os.path.join("data","certificates")
os.makedirs(out_dir, exist_ok=True)
pdf_path = generate_certificate_pdf(course_title=curso_av.title, student_name=nome_c, output_dir=out_dir)
cert = Certificate(course_id=curso_av.id, student_name=nome_c, student_email=email_c, pdf_path=pdf_path, issued_at=datetime.utcnow())
db.add(cert)
commit_with_retry(db)
st.success("Certificado emitido.")
with open(pdf_path,"rb") as f:
st.download_button("⬇️ Baixar certificado agora", data=f.read(), file_name=os.path.basename(pdf_path), key=f"dl_newcert_{curso_av.id}_{email_c}")
except Exception as e:
st.error(f"Falha ao emitir certificado: {e}")
certs = db.query(Certificate).filter(Certificate.course_id == curso_av.id).order_by(Certificate.issued_at.desc()).all()
if certs:
df_c = pd.DataFrame([{"ID":c.id,"Aluno":c.student_name,"E-mail":c.student_email,"Arquivo":os.path.basename(c.pdf_path) if c.pdf_path else "—","Emitido em":c.issued_at} for c in certs])
st.dataframe(df_c, use_container_width=True, hide_index=True)
# ================== REPOSIÇÕES ==================
with tab_repos:
st.subheader("Solicitações de reposição")
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
if not cursos_all:
st.info("Cadastre um curso primeiro.")
else:
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="rep_sel_curso")
curso_rp = cursos_all[cidx]
reqs = db.query(MakeupRequest).filter(MakeupRequest.course_id == curso_rp.id).order_by(MakeupRequest.created_at.desc()).all()
if not reqs:
st.info("Sem solicitações para este curso.")
else:
for r in reqs:
with st.expander(f"{r.student_name} <{r.student_email}> — {r.requested_date} — status: {r.status}", expanded=False):
st.write(r.note or "Sem observações.")
c1, c2 = st.columns([1,3])
new_status = c1.selectbox("Novo status", options=["pending","approved","denied"], index=["pending","approved","denied"].index(r.status), key=f"rp_status_{r.id}")
new_note = c2.text_input("Nota interna (opcional)", value=r.note or "", key=f"rp_note_{r.id}")
if st.button("💾 Atualizar solicitação", key=f"rp_save_{r.id}"):
r.status = new_status
r.note = (new_note or "").strip() or None
commit_with_retry(db)
st.success("Solicitação atualizada.")
st.rerun()
# ================== CADASTRAR ALUNOS ==================
with tab_alunos:
st.subheader("Matrículas por curso")
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
if not cursos_all:
st.info("Cadastre um curso primeiro.")
else:
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="aluno_sel_curso")
curso_en = cursos_all[cidx]
st.markdown("### ➕ Adicionar matrícula")
with st.form("matricula_add", clear_on_submit=True):
nome = st.text_input("Nome do aluno")
email = st.text_input("E-mail do aluno").strip().lower()
ok_en = st.form_submit_button("Matricular", type="primary")
if ok_en:
if not nome or not email:
st.error("Preencha nome e e-mail.")
else:
e = Enrollment(course_id=curso_en.id, student_name=nome.strip(), student_email=email.strip().lower())
db.add(e)
commit_with_retry(db)
st.success("Aluno matriculado.")
st.rerun()
st.markdown("### Alunos matriculados")
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_en.id).order_by(Enrollment.created_at.desc()).all()
if not ens:
st.info("Nenhum aluno neste curso.")
else:
for e in ens:
cols = st.columns([3,3,2,2,2])
cols[0].write(f"**{e.student_name}**")
cols[1].write(e.student_email)
with cols[2]:
with st.expander("🔑 Definir/Resetar senha", expanded=False):
pwd = st.text_input("Nova senha", type="password", key=f"setpwd_{e.id}")
if st.button("Salvar senha", key=f"btn_setpwd_{e.id}"):
try:
set_student_password(e.student_email, pwd)
st.success("Senha definida.")
except Exception as ex:
st.error(f"Erro ao salvar senha: {ex}")
if cols[3].button("🗑️ Remover", key=f"del_en_{e.id}"):
db.delete(e)
commit_with_retry(db)
st.success("Matrícula removida.")
st.rerun()
if cols[4].button("🧹 Normalizar & deduplicar", key=f"normdedup_{e.id}"):
n, r = normalize_and_dedup_enrollments(db)
st.success(f"Normalizados: {n}, apagados duplicados: {r}")
st.rerun()
# ================== TURMA EM ANDAMENTO (ADMIN) ==================
with tab_turma:
st.subheader("Turma em andamento (visão do Admin)")
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
if not cursos_all:
st.info("Cadastre um curso primeiro.")
else:
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="turma_sel_curso")
curso_tx = cursos_all[cidx]
lessons = db.query(Lesson).filter(Lesson.course_id == curso_tx.id).all()
agenda = db.query(Schedule).filter(Schedule.course_id == curso_tx.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all()
lessons_by_id = {l.id: l for l in lessons}
rows = []
for a in agenda:
meta = parse_schedule_topic(a.topic)
mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
title = mod.title if mod else (meta.get("topic") or "—")
rows.append({
"Data": a.class_date,
"Início": a.start_time.strftime("%H:%M") if a.start_time else "",
"Fim": a.end_time.strftime("%H:%M") if a.end_time else "",
"Aula": title,
"Alunos": ", ".join(meta.get("assigned_emails") or []),
"Ocupação": meta.get("occupancy") or "",
})
if rows:
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("Sem agenda cadastrada.")
st.markdown("#### Calendário")
events, color_map = build_fullcalendar_events(agenda, lessons_by_id)
events = json_sanitize_events(events)
cal_options = {
"initialView": "listWeek",
"locale": "pt-br",
"height": 640,
"headerToolbar": {
"left": "prev,next today",
"center": "title",
"right": "dayGridMonth,timeGridWeek,listWeek"
},
}
render_calendar(events=events, options=cal_options, key=f"adm_turma_cal_{curso_tx.id}_{st.session_state.cal_ver}")
# ================== INTERAÇÕES (PRESENÇA + COMENTÁRIOS) ==================
with tab_interacao:
st.subheader("👥 Usuários online (últimos N segundos)")
secs = int(st.number_input("Threshold (segundos)", min_value=30, max_value=3600, value=PRESENCE_ONLINE_THRESHOLD_SEC, step=30))
dfp = get_online_users(threshold_sec=secs)
if dfp is None or dfp.empty:
st.info("Ninguém online neste momento dentro do threshold.")
else:
dfv = dfp.copy()
dfv["Tempo de sessão"] = dfv["session_sec"].apply(_fmt_duration)
dfv["Inatividade"] = dfv["idle_sec"].apply(_fmt_duration)
dfv.rename(columns={"user_email":"E-mail","user_name":"Nome","role":"Perfil","page":"Página","first_seen":"Primeiro acesso","last_seen":"Última ação"}, inplace=True)
st.dataframe(
dfv[["Nome","E-mail","Perfil","Página","Primeiro acesso","Última ação","Tempo de sessão","Inatividade"]],
use_container_width=True, hide_index=True
)
st.markdown("---")
st.subheader("💬 Moderação de comentários")
cursos = db.query(Course).order_by(Course.title.asc()).all()
if not cursos:
st.info("Cadastre um curso primeiro para ver comentários.")
else:
idxc = st.selectbox("Curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title, key="adm_curso_coment")
curso_sel = cursos[idxc]
dft = get_comments(curso_sel.id)
if dft is None or dft.empty:
st.info("Sem comentários neste curso.")
else:
st.dataframe(dft, use_container_width=True)
del_id = st.number_input("ID para excluir (apaga também respostas)", min_value=0, value=0, step=1)
if st.button("🗑️ Excluir comentário selecionado", disabled=(del_id <= 0)):
try:
delete_comment(int(del_id))
st.success("Comentário excluído.")
st.rerun()
except Exception as e:
st.error(f"Falha ao excluir: {e}")
# ================== GALERIA DE RELATÓRIOS (ADMIN) ==================
with tab_galeria:
st.subheader("📷 Galeria de Relatórios (Admin)")
aba_galeria_relatorios(context_key="admin_")
# ================== RESET DA BASE + BACKUP/RESTORE ==================
with tab_reset:
st.subheader("💾 Backup & Restore do Banco e Mídias")
# ---------- SYNC COM HUGGING FACE (DATASET) ----------
st.markdown("### ☁️ Backup para Hugging Face (Dataset)")
st.caption("Envia snapshot do **db.sqlite** e um **ZIP** de **data/** para um repositório Dataset no Hugging Face. Requer **HF_TOKEN**.")
default_repo = st.session_state.get("hf_dataset_repo") or "Roudrigus/minha-academia-db"
repo_id_raw = st.text_input("Repo Dataset (ex.: Owner/Nome)", value=default_repo, key="hf_repo_id")
repo_id = _sanitize_repo_id(repo_id_raw)
if repo_id != repo_id_raw:
st.info(f"Repo ajustado para **{repo_id}** (sanitizado).")
colhfa, colhfb, colhfc = st.columns(3)
with colhfa:
st.markdown("**Biblioteca**")
if HF_HUB_AVAILABLE:
st.success("📦 huggingface_hub disponível")
else:
st.warning("📦 huggingface_hub NÃO instalado. Adicione no requirements.txt: `huggingface_hub>=0.21.4`")
with colhfb:
st.markdown("**Token**")
has_token = bool((os.environ.get("HF_TOKEN") or "").strip())
if has_token:
st.success("🔑 HF_TOKEN definido")
else:
st.warning("🔑 HF_TOKEN ausente (Settings → Variables)")
with colhfc:
st.markdown("**Repositório**")
if repo_id and "/" in repo_id:
st.success(f"🗃️ {repo_id}")
else:
st.warning("Informe como `Owner/Nome`")
do_sync = st.button("☁️ Sincronizar no Hugging Face (Dataset)", key="btn_sync_now")
if do_sync:
if not HF_HUB_AVAILABLE:
st.error("Instale huggingface_hub e reinicie o app.")
elif not has_token:
st.error("Defina o secret HF_TOKEN no ambiente para prosseguir.")
elif not repo_id or "/" not in repo_id:
st.error("Informe um repo válido no formato `Owner/Nome`.")
else:
try:
st.session_state["hf_dataset_repo"] = repo_id.strip()
stats = sync_to_hf_dataset(dataset_repo=repo_id.strip(), db_path=DB_PATH, data_dir="data")
msgs = []
if stats.get("db_uploaded"):
msgs.append("db.sqlite")
if stats.get("data_uploaded"):
msgs.append("data.zip")
if msgs:
st.success(f"Backup enviado: {', '.join(msgs)}{repo_id}")
else:
st.info("Nada para enviar (db.sqlite não encontrado e pasta data/ inexistente).")
except Exception as e:
st.error(f"Falha ao sincronizar: {e}")
st.caption("Stacktrace (debug):")
st.code(traceback.format_exc())
st.markdown("---")
# ---------- BACKUP AUTOMÁTICO ----------
st.markdown("### ⏱️ Backup automático (inteligente, a cada 10 min)")
enable_auto = st.checkbox("Habilitar backup automático (10 min)", value=st.session_state.get("auto_bkp_enabled", False))
st.session_state["auto_bkp_enabled"] = enable_auto
now_ts = _time.time()
last_ts = st.session_state.get("auto_bkp_last_ts", 0.0)
next_in = max(0, int(AUTO_BKP_INTERVAL_SEC - (now_ts - last_ts))) if enable_auto else None
cstat1, cstat2, cstat3 = st.columns(3)
with cstat1:
st.metric("Intervalo", "10 min")
with cstat2:
st.metric("Próxima execução", f"{next_in}s" if enable_auto else "—")
with cstat3:
last_status = st.session_state.get("auto_bkp_last_status","—")
st.metric("Último ciclo", last_status)
if enable_auto and HF_HUB_AVAILABLE and has_token and repo_id and "/" in repo_id:
if now_ts - last_ts >= AUTO_BKP_INTERVAL_SEC:
try:
status = _auto_backup_tick(repo_id)
st.session_state["auto_bkp_last_status"] = status
st.session_state["auto_bkp_last_ts"] = now_ts
st.success(f"Auto-backup: {status}")
st.rerun()
except Exception as e:
st.session_state["auto_bkp_last_status"] = f"erro: {e}"
st.error(f"Auto-backup falhou: {e}")
st.caption("Obs.: o auto-backup executa no render do painel Admin. Mantenha esta aba aberta.")
st.markdown("### Backup do Banco (.sqlite)")
if os.path.exists(DB_PATH):
with open(DB_PATH, "rb") as f:
st.download_button(
"⬇️ Baixar banco (SQLite)",
data=f.read(),
file_name="db_backup.sqlite",
mime="application/octet-stream",
)
else:
st.info("Banco ainda não existe no disco (será criado ao salvar dado).")
st.markdown("### Backup em CSV de todas as tabelas")
db2 = ensure_db_ready()
try:
cur_df = pd.DataFrame([{"id": c.id, "title": c.title, "description": c.description, "created_at": c.created_at} for c in db2.query(Course).all()])
en_df = pd.DataFrame([{"id": e.id, "course_id": e.course_id, "student_name": e.student_name, "student_email": e.student_email, "created_at": e.created_at} for e in db2.query(Enrollment).all()])
md_df = pd.DataFrame([{"id": m.id, "course_id": m.course_id, "title": m.title, "description": m.description, "video_path": m.video_path, "created_at": m.created_at} for m in db2.query(Lesson).all()])
ag_df = pd.DataFrame([{"id": a.id, "course_id": a.course_id, "class_date": a.class_date, "start_time": a.start_time, "end_time": a.end_time, "topic": a.topic, "created_at": a.created_at} for a in db2.query(Schedule).all()])
grades_df = pd.DataFrame([{
"id": g.id, "course_id": g.course_id, "student_name": g.student_name,
"student_email": g.student_email, "grade": g.grade, "note": g.note, "created_at": g.created_at
} for g in db2.query(Grade).all()])
certs_df = pd.DataFrame([{
"id": c.id, "course_id": c.course_id, "student_name": c.student_name,
"student_email": c.student_email, "pdf_path": c.pdf_path, "issued_at": c.issued_at
} for c in db2.query(Certificate).all()])
mk_df = pd.DataFrame([{
"id": m.id, "course_id": m.course_id, "student_name": m.student_name,
"student_email": m.student_email, "requested_date": m.requested_date,
"note": m.note, "status": m.status, "created_at": m.created_at
} for m in db2.query(MakeupRequest).all()])
finally:
db2.close()
# Garantir auxiliares antes do snapshot CSV (para SELECT funcionar bem no SQLite)
ensure_presence_table()
ensure_video_access_tables()
ensure_comments_table()
def _safe_read_sql(query: str, conn: sqlite3.Connection) -> pd.DataFrame:
try:
return pd.read_sql_query(query, conn)
except Exception:
q = query.lower()
if "from student_auth" in q:
return pd.DataFrame(columns=["student_email", "password_hash", "created_at"])
if "from video_access_list" in q:
return pd.DataFrame(columns=["lesson_id", "student_email", "created_at"])
if "from video_access" in q:
return pd.DataFrame(columns=["lesson_id", "visibility", "updated_at"])
if "from presence" in q:
return pd.DataFrame(columns=["session_id","user_email","user_name","role","page","first_seen","last_seen"])
if "from comments" in q:
return pd.DataFrame(columns=["id","course_id","student_email","student_name","content","parent_id","created_at"])
return pd.DataFrame()
with sqlite3.connect(DB_PATH) as _conn:
try:
student_auth_df = _safe_read_sql("SELECT student_email, password_hash, created_at FROM student_auth", _conn)
except Exception:
student_auth_df = pd.DataFrame()
video_access_df = _safe_read_sql("SELECT lesson_id, visibility, updated_at FROM video_access", _conn)
video_acl_df = _safe_read_sql("SELECT lesson_id, student_email, created_at FROM video_access_list", _conn)
presence_df = _safe_read_sql("SELECT session_id, user_email, user_name, role, page, first_seen, last_seen FROM presence", _conn)
comments_df = _safe_read_sql("SELECT id, course_id, student_email, student_name, content, parent_id, created_at FROM comments", _conn)
colb1, colb2, colb3 = st.columns(3)
colb1.download_button("⬇️ CSV — Cursos", data=cur_df.to_csv(index=False).encode("utf-8-sig"), file_name="courses.csv")
colb1.download_button("⬇️ CSV — Alunos", data=en_df.to_csv(index=False).encode("utf-8-sig"), file_name="enrollments.csv")
colb1.download_button("⬇️ CSV — Módulos", data=md_df.to_csv(index=False).encode("utf-8-sig"), file_name="lessons.csv")
colb2.download_button("⬇️ CSV — Agenda", data=ag_df.to_csv(index=False).encode("utf-8-sig"), file_name="schedule.csv")
colb2.download_button("⬇️ CSV — Notas", data=grades_df.to_csv(index=False).encode("utf-8-sig"), file_name="grades.csv")
colb2.download_button("⬇️ CSV — Certificados", data=certs_df.to_csv(index=False).encode("utf-8-sig"), file_name="certificates.csv")
colb3.download_button("⬇️ CSV — Reposições", data=mk_df.to_csv(index=False).encode("utf-8-sig"), file_name="makeup_requests.csv")
colb3.download_button("⬇️ CSV — Vídeo: visibilidade", data=video_access_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access.csv")
colb3.download_button("⬇️ CSV — Vídeo: ACL", data=video_acl_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access_list.csv")
st.download_button("⬇️ CSV — Comentários", data=comments_df.to_csv(index=False).encode("utf-8-sig"), file_name="comments.csv")
st.download_button("⬇️ CSV — Presença (snapshot)", data=presence_df.to_csv(index=False).encode("utf-8-sig"), file_name="presence.csv")
st.markdown("---")
st.subheader("🧨 Reset da Base (Atenção: ação irreversível)")
st.warning("Isto apaga TODOS os dados do banco (cursos, módulos, apostilas, agenda, matrículas, notas, certificados, reposições e auxiliares). Não apaga arquivos em 'data/'.")
confirm = st.text_input("Confirmação", placeholder="APAGAR TUDO")
btn = st.button("Apagar tudo agora", type="primary", disabled=(confirm.strip().upper() != "APAGAR TUDO"))
if btn:
ok, err = reset_database(db)
if ok:
st.success("✅ Base resetada com sucesso (tabelas esvaziadas).")
else:
st.error(f"Falha ao resetar base: {err}")
finally:
db.close()