# admin.py # -*- coding: utf-8 -*- import os import io import re import json import sqlite3 import zipfile import traceback import time as _time from datetime import datetime, date, time, timedelta from typing import Optional, Dict, Any, List, Tuple import pandas as pd import streamlit as st from sqlalchemy import func # ========================== # Imports de modelos # ========================== from models import ( Course, Lesson, Material, Schedule, Enrollment, Grade, MakeupRequest, Certificate ) # ========================== # Imports com fallback (modular → raiz) # ========================== try: from core.db_ready import ensure_db_ready, commit_with_retry except Exception: from db_ready import ensure_db_ready, commit_with_retry try: from core.student_auth import set_student_password except Exception: from student_auth import set_student_password try: from core.video_access import ( get_video_visibility, set_video_visibility, get_video_acl_emails, set_video_acl_emails, is_video_allowed_for_student, _ensure_video_access_tables as ensure_video_access_tables, ) except Exception: from video_access import ( get_video_visibility, set_video_visibility, get_video_acl_emails, set_video_acl_emails, is_video_allowed_for_student, ) # Fallback simples se a função ensure não existir na raiz def ensure_video_access_tables(): pass try: from core.presence import ( get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC, _ensure_presence_table as ensure_presence_table, ) except Exception: from presence import get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC def ensure_presence_table(): pass try: from core.comments import ( get_comments, add_comment, delete_comment, _ensure_comments_table as ensure_comments_table, ) except Exception: from comments import get_comments, add_comment, delete_comment def ensure_comments_table(): pass try: from core.backup_hf import ( HF_HUB_AVAILABLE, sync_to_hf_dataset, auto_backup_tick as _auto_backup_tick, _sanitize_repo_id, ) except Exception: from backup_hf import ( HF_HUB_AVAILABLE, sync_to_hf_dataset, auto_backup_tick as _auto_backup_tick, _sanitize_repo_id, ) try: from ui.gallery import aba_galeria_relatorios except Exception: from gallery import aba_galeria_relatorios # Settings (suporta pasta 'Config' e 'config') try: from Config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC except Exception: try: from config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC except Exception: # Fallbacks import hashlib as _hashlib ADMIN_FIXED_PASS_HASH = _hashlib.sha256("21003887".encode()).hexdigest() AUTO_BKP_INTERVAL_SEC = 600 # Utilitários try: from utils import save_uploaded_file, human_dt, generate_certificate_pdf, ADMIN_USER except Exception: # Fallbacks simples se algo não existir ADMIN_USER = "admin" def save_uploaded_file(file, base_dir="data"): out_dir = os.path.join(base_dir) os.makedirs(out_dir, exist_ok=True) path = os.path.join(out_dir, file.name) with open(path, "wb") as f: f.write(file.getbuffer()) return path def human_dt(dt_obj): try: return pd.to_datetime(dt_obj).strftime("%d/%m/%Y %H:%M") except Exception: return str(dt_obj) def generate_certificate_pdf(course_title: str, student_name: str, output_dir: str) -> str: os.makedirs(output_dir, exist_ok=True) # Gera um PDF simples de placeholder path = os.path.join(output_dir, f"cert_{student_name.replace(' ', '_')}.pdf") with open(path, "wb") as f: f.write(b"%PDF-1.4\n% placeholder certificate\n") return path # Path do banco try: from db import DB_PATH except Exception: DB_PATH = os.path.join("data", "db.sqlite") # ========================== # Helpers locais (texto, YouTube, tópico da agenda) # ========================== def is_url(s: Optional[str]) -> bool: s = (s or "").strip().lower() return s.startswith("http://") or s.startswith("https://") def normalize_youtube_url(url: str) -> str: if not url: return url u = url.strip() m = re.search(r"(?:https?://)?(?:www\.)?youtu\.be/([A-Za-z0-9_-]{6,})", u) if m: vid = m.group(1) return f"https://www.youtube.com/watch?v={vid}" m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([A-Za-z0-9_-]{6,})", u) if m: return f"https://www.youtube.com/watch?v={m.group(1)}" m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([A-Za-z0-9_-]{6,})", u) if m: return f"https://www.youtube.com/watch?v={m.group(1)}" m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([A-Za-z0-9_-]{6,})", u) if m: return f"https://www.youtube.com/watch?v={m.group(1)}" return u def parse_schedule_topic(topic_raw: str) -> Dict[str, Any]: defaults = { "topic": None, "module_id": None, "assigned_emails": [], "occupancy": "", "module_index": None, "details": None, "descricao": None, "semana": None, "off_schedule": False, "aluno_nome": None, "matricula": None, } if not topic_raw: return defaults s = (topic_raw or "").strip() # 1) JSON try: obj = json.loads(s) if isinstance(obj, dict): out = defaults.copy() out["topic"] = obj.get("topic") or obj.get("titulo") or obj.get("assunto") or None out["module_id"] = obj.get("module_id") or obj.get("mod_id") or obj.get("lesson_id") or None emails = obj.get("assigned_emails") or obj.get("emails") or [] if isinstance(emails, str): emails = [e.strip().lower() for e in emails.split(",") if e.strip()] elif isinstance(emails, list): emails = [(e or "").strip().lower() for e in emails if (e or "").strip()] else: emails = [] out["assigned_emails"] = emails out["occupancy"] = (obj.get("occupancy") or obj.get("ocupacao") or "").strip() out["module_index"] = obj.get("module_index") or obj.get("indice") or None out["details"] = obj.get("details") out["descricao"] = obj.get("descricao") out["semana"] = obj.get("semana") out["off_schedule"] = bool(obj.get("off_schedule", False)) out["aluno_nome"] = obj.get("aluno_nome") out["matricula"] = obj.get("matricula") return out except Exception: pass # 2) "k: v; k: v" try: parts = [p.strip() for p in re.split(r"[;|]\s*", s) if p.strip()] kv = {} for p in parts: if ":" in p: k, v = p.split(":", 1) kv[k.strip().lower()] = v.strip() if kv: out = defaults.copy() out["topic"] = kv.get("topic") or kv.get("assunto") or None mid = kv.get("module_id") out["module_id"] = int(mid) if (mid and str(mid).isdigit()) else None emails = kv.get("assigned_emails") or kv.get("emails") or "" out["assigned_emails"] = [e.strip().lower() for e in emails.split(",") if e.strip()] out["occupancy"] = kv.get("occupancy") or kv.get("ocupacao") or "" out["module_index"] = kv.get("module_index") or kv.get("indice") or None out["details"] = kv.get("details") or kv.get("descricao") out["descricao"] = kv.get("descricao") out["semana"] = kv.get("semana") out["off_schedule"] = str(kv.get("off_schedule", "")).strip().lower() in ("1","true","sim","yes") out["aluno_nome"] = kv.get("aluno_nome") out["matricula"] = kv.get("matricula") return out except Exception: pass out = defaults.copy() out["topic"] = s return out def _fmt_duration(sec: int) -> str: if sec is None: return "—" sec = int(max(0, sec)) m, s = divmod(sec, 60) h, m = divmod(m, 60) d, h = divmod(h, 24) parts = [] if d: parts.append(f"{d}d") if h: parts.append(f"{h}h") if m: parts.append(f"{m}m") parts.append(f"{s}s") return " ".join(parts) # ========================================================== # Helpers de agenda (importação/limpeza) — mesmos do seu projeto # ========================================================== def _strip_accents_ag(s: str) -> str: import unicodedata if s is None: return s return ''.join(ch for ch in unicodedata.normalize('NFKD', str(s)) if not unicodedata.combining(ch)) def _norm_col_ag(col: str) -> str: col = _strip_accents_ag(col).lower() col = re.sub(r'\W+', '_', col) return col.strip('_') def _normalize_semana_label(s: str) -> str: if not s: return s s = s.strip() s = re.sub(r'^semama', 'Semana', s, flags=re.I) # corrige "Semama9" m = re.match(r'^(semana)\s*([0-9]+)$', s, flags=re.I) if m: s = f"Semana{m.group(2)}" return s def _split_horario_range(horario_str: str) -> Tuple[Optional[str], Optional[str]]: if not horario_str: return None, None s = str(horario_str).strip() s = re.sub(r'[–—−]+', '-', s) # normaliza qualquer traço para '-' parts = re.split(r'\s*-\s*', s) if len(parts) == 2: return parts[0], parts[1] return None, None def _parse_tsv_text(text: str) -> pd.DataFrame: buf = io.StringIO(text.strip()) df = pd.read_csv( buf, sep='\t', engine='python', quoting=0, # csv.QUOTE_MINIMAL quotechar='"', keep_default_na=False ) return df def _read_agenda_uploaded(file) -> Optional[pd.DataFrame]: name = (file.name or "").lower() try: if name.endswith(('.xlsx', '.xls')): return pd.read_excel(file, engine='openpyxl') elif name.endswith(('.csv', '.tsv', '.txt')): content = file.read().decode('utf-8-sig', errors='ignore') try: return _parse_tsv_text(content) except Exception: return pd.read_csv(io.StringIO(content), sep=',', engine='python', quoting=0, quotechar='"', keep_default_na=False) else: st.error("Formato não suportado. Use CSV/TSV/TXT/Excel.") return None except Exception as e: st.error(f"Falha ao ler arquivo: {e}") return None def parse_time_hhmm(s: str) -> Optional[time]: try: hh, mm = s.split(":") return time(int(hh), int(mm)) except Exception: return None def parse_time_range(rng: str) -> Tuple[Optional[time], Optional[time]]: if not rng: return None, None s = (rng or "").replace("–", "-").replace("—", "-").replace("−", "-") parts = [p.strip() for p in s.split("-") if p.strip()] if len(parts) != 2: return None, None return parse_time_hhmm(parts[0]), parse_time_hhmm(parts[1]) def _norm(s: str) -> str: s = (s or "").strip().lower() s = (s.replace("á","a").replace("à","a").replace("ã","a").replace("â","a") .replace("é","e").replace("ê","e") .replace("í","i") .replace("ó","o").replace("ô","o").replace("õ","o") .replace("ú","u").replace("ç","c")) return s def _clean_agenda_df(df_raw: pd.DataFrame) -> pd.DataFrame: if df_raw is None or df_raw.empty: return pd.DataFrame() df = df_raw.copy() df.columns = [_norm_col_ag(c) for c in df.columns] aliases = { 'semana': 'semana', 'data': 'data', 'dia': 'dia', 'horario': 'horario', 'ocupacao': 'ocupacao', 'aluno': 'aluno', 'mat': 'matricula', 'mat_': 'matricula', 'descricao': 'descricao', 'tema': 'tema' } rename = {} for c in df.columns: if c in aliases: rename[c] = aliases[c] elif c.startswith('mat'): rename[c] = 'matricula' else: rename[c] = c df = df.rename(columns=rename) required = ['semana','data','dia','horario','ocupacao','aluno','matricula','descricao','tema'] for col in required: if col not in df.columns: df[col] = "" for c in df.columns: if df[c].dtype == object: df[c] = df[c].astype(str).str.strip() df['semana'] = df['semana'].apply(_normalize_semana_label) df['data'] = pd.to_datetime(df['data'], dayfirst=True, errors='coerce') ini_fim = df['horario'].apply(_split_horario_range) df['inicio_hhmm'] = [t[0] if t else None for t in ini_fim] df['fim_hhmm'] = [t[1] if t else None for t in ini_fim] df['fora_do_cronograma'] = df['tema'].str.contains(r'\(fora do cronograma\)', case=False, na=False) df['tema'] = df['tema'].str.replace(r'\s*\(fora do cronograma\)\s*', '', regex=True).str.strip() df['vago'] = df['aluno'].str.strip().str.upper().eq('VAGO') try: df['matricula'] = pd.to_numeric(df['matricula'], errors='ignore') except Exception: pass df = df.drop_duplicates(subset=['semana','data','dia','horario','aluno','tema','matricula']).reset_index(drop=True) return df # ========================================================== # FullCalendar helpers # ========================================================== from streamlit_calendar import calendar as st_calendar from plotly import colors as pcolors def _to_json_safe(obj): if obj is None: return None if isinstance(obj, (str, int, float, bool)): return obj if isinstance(obj, datetime): return obj.replace(microsecond=0).isoformat() if isinstance(obj, date): return obj.isoformat() if isinstance(obj, time): return obj.strftime("%H:%M:%S") if isinstance(obj, dict): return {str(k): _to_json_safe(v) for k, v in obj.items()} if isinstance(obj, (list, tuple, set)): return [_to_json_safe(v) for v in obj] return str(obj) def json_sanitize_events(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: safe = [] for ev in events: ev_copy = _to_json_safe(json.loads(json.dumps(ev, ensure_ascii=False))) safe.append(ev_copy) return safe def _make_color_map(emails: List[str]) -> Dict[str, str]: palette = ( pcolors.qualitative.Plotly + pcolors.qualitative.D3 + pcolors.qualitative.Set3 + pcolors.qualitative.Pastel + pcolors.qualitative.Vivid ) cmap = {} idx = 0 for e in sorted(set(emails)): if e in ("Sem aluno",): cmap[e] = "#7f8c8d" else: cmap[e] = palette[idx % len(palette)] idx += 1 return cmap def build_fullcalendar_events(agenda_list: List[Schedule], mods_by_id: Dict[int, Lesson]): rows = [] who = set() for a in agenda_list: meta = parse_schedule_topic(a.topic) mod = mods_by_id.get(meta.get("module_id")) if meta.get("module_id") else None title = mod.title if mod else (meta.get("topic") or "—") desc = meta.get("details") or meta.get("descricao") or (meta.get("topic") or "") occupancy = meta.get("occupancy") or "" module_index = meta.get("module_index") or "" if not a.class_date: continue start_dt = datetime.combine(a.class_date, a.start_time or time(0, 0)) end_dt = datetime.combine(a.class_date, a.end_time or (a.start_time or time(0, 0))) if end_dt <= start_dt: end_dt = start_dt + timedelta(minutes=60) emails = meta.get("assigned_emails") or [] if emails: for em in emails: who.add(em) rows.append({ "title": title, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "who": em, "extendedProps": { "aluno": em, "ocupacao": occupancy, "mod_index": module_index, "descricao": desc, "data": a.class_date.strftime("%d/%m/%Y"), "inicio": a.start_time.strftime("%H:%M") if a.start_time else "", "fim": a.end_time.strftime("%H:%M") if a.end_time else "", }, }) else: who.add("Sem aluno") rows.append({ "title": title, "start": start_dt.isoformat(), "end": end_dt.isoformat(), "who": "Sem aluno", "extendedProps": { "aluno": "Sem aluno", "ocupacao": occupancy, "mod_index": module_index, "descricao": desc, "data": a.class_date.strftime("%d/%m/%Y"), "inicio": a.start_time.strftime("%H:%M") if a.start_time else "", "fim": a.end_time.strftime("%H:%M") if a.end_time else "", }, }) color_map = _make_color_map(list(who)) for ev in rows: ev["color"] = color_map.get(ev.get("who"), "#7f8c8d") return rows, color_map def render_calendar(events, options=None, custom_css=None, key=None): try: import inspect sig = inspect.signature(st_calendar) except Exception: return st_calendar(events=events, options=options, key=key) kwargs = {"events": events} if "options" in sig.parameters: kwargs["options"] = options if "custom_css" in sig.parameters and custom_css: kwargs["custom_css"] = custom_css if "key" in sig.parameters and key: kwargs["key"] = key return st_calendar(**kwargs) # ========================================================== # Utilidades: reset e normalização # ========================================================== def reset_database(db) -> Tuple[bool, Optional[str]]: try: db.query(Certificate).delete(synchronize_session=False) db.query(Grade).delete(synchronize_session=False) db.query(MakeupRequest).delete(synchronize_session=False) db.query(Enrollment).delete(synchronize_session=False) db.query(Schedule).delete(synchronize_session=False) db.query(Material).delete(synchronize_session=False) db.query(Lesson).delete(synchronize_session=False) db.query(Course).delete(synchronize_session=False) commit_with_retry(db) # Auxiliares SQLite ensure_comments_table() ensure_presence_table() ensure_video_access_tables() with sqlite3.connect(DB_PATH) as conn: c = conn.cursor() c.execute("DELETE FROM student_auth") c.execute("DELETE FROM video_access_list") c.execute("DELETE FROM video_access") c.execute("DELETE FROM presence") c.execute("DELETE FROM comments") conn.commit() return True, None except Exception as e: db.rollback() return False, str(e) def normalize_and_dedup_enrollments(db) -> Tuple[int, int]: ens = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all() normalized = 0 removed = 0 for e in ens: norm = (e.student_email or "").strip().lower() if e.student_email != norm: e.student_email = norm normalized += 1 commit_with_retry(db) seen = {} ens2 = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all() for e in ens2: key = (e.course_id, e.student_email) if key in seen: db.delete(e) removed += 1 else: seen[key] = e.id commit_with_retry(db) return normalized, removed # ========================================================== # Admin login/logout + presença # ========================================================== def admin_login_ui() -> None: st.subheader("🔐 Login do Administrador") with st.form("admin_login", clear_on_submit=False): u = st.text_input("Usuário", value="", placeholder="admin") p = st.text_input("Senha", value="", type="password") ok = st.form_submit_button("Entrar", type="primary") if ok: import hashlib if u == ADMIN_USER and (ADMIN_FIXED_PASS_HASH == hashlib.sha256(p.encode()).hexdigest()): st.session_state.is_admin = True st.session_state.admin_user = u st.success("Login realizado.") st.rerun() else: st.error("Credenciais inválidas.") def admin_logout_button() -> None: col1, _ = st.columns([1, 5]) with col1: if st.button("Sair", type="secondary"): st.session_state.is_admin = False st.session_state.admin_user = None st.rerun() def _admin_presence_ping(page_label: str): role = "admin" if st.session_state.get("is_admin") else "visitante" email = None nome = st.session_state.get("admin_user") if st.session_state.get("is_admin") else None try: from core.presence import update_presence except Exception: from presence import update_presence update_presence(role=role, page=page_label, user_email=email, user_name=nome) # ========================================================== # View do Admin # ========================================================== def render() -> None: _admin_presence_ping(page_label="admin_home") if not st.session_state.is_admin: admin_login_ui() return st.header("🛠️ Painel do Administrador — Pega a Visão") admin_logout_button() db = ensure_db_ready() try: tab_curso, tab_agenda, tab_avaliacao, tab_repos, tab_alunos, tab_turma, tab_interacao, tab_galeria, tab_reset = st.tabs([ "Cursos & Módulos", "Agenda", "Avaliações & Certificados", "Reposições", "Cadastrar Alunos", "Turma em andamento", "👥 Interações", "📷 Galeria de Relatórios", "💾 Reset/Backup" ]) # ================== CURSOS & MÓDULOS ================== with tab_curso: st.subheader("Cadastrar curso") with st.form("curso_form", clear_on_submit=True): titulo = st.text_input("Título do curso") desc = st.text_area("Descrição") ok_curso = st.form_submit_button("Salvar curso", type="primary") if ok_curso: if not titulo: st.error("Informe um título.") else: c = Course(title=titulo.strip(), description=desc.strip() if desc else None) db.add(c) commit_with_retry(db) st.success("Curso criado.") st.divider() cursos = db.query(Course).order_by(Course.title.asc()).all() if not cursos: st.info("Nenhum curso.") else: idx = st.selectbox("Gerenciar curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title) curso = cursos[idx] st.markdown(f"### Curso: **{curso.title}**") with st.expander("Editar curso (nome e descrição)", expanded=False): novo_nome = st.text_input("Nome do curso", value=curso.title, key=f"curso_nome_edit_{curso.id}") nova_desc = st.text_area("Descrição do curso", value=curso.description or "", key=f"curso_desc_edit_{curso.id}", height=120) if st.button("💾 Salvar alterações do curso", key=f"btn_save_curso_{curso.id}"): if not novo_nome.strip(): st.error("Informe um nome válido para o curso.") else: curso.title = novo_nome.strip() curso.description = (nova_desc or "").strip() commit_with_retry(db) st.success("Curso atualizado com sucesso.") st.rerun() up_tabs = st.tabs(["Módulos (vídeo)", "Apostilas (PDF)"]) # ---- Módulos (vídeo) with up_tabs[0]: st.caption("Cadastrar módulo do curso (vídeo opcional).") with st.form("add_module", clear_on_submit=True): lt = st.text_input("Título do módulo") ld = st.text_area("Descrição (opcional)") yt_url_raw = st.text_input("URL do vídeo (YouTube — opcional)", placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID") f = st.file_uploader("Arquivo de vídeo (opcional) — .mp4/.mkv/.mov/.webm", type=["mp4", "mkv", "mov", "webm"]) ok = st.form_submit_button("Adicionar módulo") if ok: if not lt: st.error("Informe um título para o módulo.") else: vpath = None url_norm = normalize_youtube_url(yt_url_raw.strip()) if yt_url_raw else None if f is not None: if url_norm: st.info("Arquivo local enviado. A URL do YouTube foi ignorada (prioridade para arquivo local).") vpath = save_uploaded_file(f, base_dir="data/videos") elif url_norm: vpath = url_norm les = Lesson(course_id=curso.id, title=lt.strip(), description=ld.strip() if ld else None, video_path=vpath) db.add(les) commit_with_retry(db) st.success("Módulo adicionado.") st.markdown("#### Módulos cadastrados") lessons = db.query(Lesson).filter(Lesson.course_id == curso.id).order_by(Lesson.created_at.asc()).all() if not lessons: st.caption("Sem módulos cadastrados ainda.") else: for les in lessons: with st.expander(f"🎬 {les.title}", expanded=False): new_title = st.text_input("Título do módulo", value=les.title, key=f"mod_title_{les.id}") new_desc = st.text_area("Descrição", value=les.description or "", key=f"mod_desc_{les.id}") if st.button("💾 Salvar alterações", key=f"mod_save_{les.id}"): les.title = (new_title or les.title).strip() les.description = (new_desc or "").strip() commit_with_retry(db) st.success("Módulo atualizado.") st.rerun() st.markdown("---") st.markdown("**Vídeo do módulo**") has_video = bool(les.video_path and (is_url(les.video_path) or os.path.exists(les.video_path))) if has_video: if is_url(les.video_path): st.video(les.video_path) st.caption(f"Fonte atual: YouTube ({les.video_path})") else: st.video(les.video_path) st.caption(f"Fonte atual: arquivo local `{os.path.basename(les.video_path)}`") else: st.info("Nenhum vídeo anexado ainda.") if not has_video: up_file = st.file_uploader( "Enviar vídeo agora (opcional)", type=["mp4", "mkv", "mov", "webm"], key=f"mod_upvid_attach_{les.id}" ) url_new_raw = st.text_input( "… ou informar URL do YouTube (opcional)", key=f"mod_url_attach_{les.id}", placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID" ) cols_attach = st.columns([1, 1]) if cols_attach[0].button("📤 Anexar arquivo de vídeo", key=f"mod_attach_file_{les.id}"): if not up_file: st.warning("Selecione um arquivo de vídeo.") else: try: new_path = save_uploaded_file(up_file, base_dir="data/videos") les.video_path = new_path commit_with_retry(db) st.success("Vídeo (arquivo) anexado ao módulo.") st.rerun() except Exception as e: st.error(f"Falha ao anexar vídeo: {e}") if cols_attach[1].button("🔗 Anexar URL do YouTube", key=f"mod_attach_url_{les.id}"): url_norm = normalize_youtube_url(url_new_raw or "") if not url_norm or not is_url(url_norm): st.warning("Informe uma URL válida do YouTube.") else: les.video_path = url_norm commit_with_retry(db) st.success("URL do YouTube anexada ao módulo.") st.rerun() else: up_file = st.file_uploader( "Substituir por novo arquivo", type=["mp4", "mkv", "mov", "webm"], key=f"mod_upvid_replace_{les.id}" ) url_new_raw = st.text_input( "… ou substituir por URL do YouTube", key=f"mod_url_replace_{les.id}", placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID" ) c1, c2, c3 = st.columns([1, 1, 1]) with c1: if up_file and st.button("🔁 Substituir por arquivo", key=f"mod_sendvid_{les.id}"): try: new_path = save_uploaded_file(up_file, base_dir="data/videos") try: if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path): os.remove(les.video_path) except Exception: pass les.video_path = new_path commit_with_retry(db) st.success("Vídeo substituído por arquivo local com sucesso.") st.rerun() except Exception as e: st.error(f"Falha ao substituir vídeo: {e}") with c2: if st.button("🔁 Substituir por URL do YouTube", key=f"mod_replace_url_{les.id}"): url_norm = normalize_youtube_url(url_new_raw or "") if not url_norm or not is_url(url_norm): st.warning("Informe uma URL válido do YouTube.") else: try: if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path): os.remove(les.video_path) except Exception: pass les.video_path = url_norm commit_with_retry(db) st.success("Vídeo substituído por URL do YouTube.") st.rerun() with c3: if st.button("🗑️ Remover vídeo (manter módulo)", key=f"mod_rmvid_{les.id}"): try: if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path): os.remove(les.video_path) except Exception: pass les.video_path = None commit_with_retry(db) st.success("Vídeo removido do módulo.") st.rerun() # Controle de acesso st.markdown("---") st.markdown("🔒 **Acesso ao vídeo** (somente afeta vídeos; PDFs não são impactados)") current_vis = get_video_visibility(les.id) vis_label_to_value = { "Público (qualquer aluno logado)": "public", "Restrito (somente alunos selecionados)": "restricted", "Privado (somente Admin)": "private", } vis_options = list(vis_label_to_value.keys()) try: default_idx = vis_options.index( [lbl for lbl, val in vis_label_to_value.items() if val == current_vis][0] ) except Exception: default_idx = 0 vis_sel_label = st.selectbox( "Visibilidade", options=vis_options, index=default_idx, key=f"vid_vis_{les.id}" ) vis_value = vis_label_to_value[vis_sel_label] ens_list = db.query(Enrollment).filter(Enrollment.course_id == curso.id).order_by(Enrollment.student_name.asc()).all() alunos_labels = [f"{e.student_name} <{e.student_email}>" for e in ens_list] alunos_map = {f"{e.student_name} <{e.student_email}>": e.student_email for e in ens_list} allowed_emails = set(get_video_acl_emails(les.id)) preselected_labels = [lbl for lbl in alunos_labels if alunos_map[lbl] in allowed_emails] if vis_value == "restricted": sel_labels = st.multiselect( "Selecione os alunos autorizados a assistir este vídeo", options=alunos_labels, default=preselected_labels, key=f"vid_acl_{les.id}" ) else: sel_labels = [] cva1, cva2 = st.columns([1, 1]) if cva1.button("💾 Salvar visibilidade", key=f"btn_save_vis_{les.id}"): try: set_video_visibility(les.id, vis_value) st.success("Visibilidade do vídeo atualizada.") except Exception as e: st.error(f"Falha ao salvar visibilidade: {e}") if vis_value == "restricted": if cva2.button("💾 Salvar lista de alunos autorizados", key=f"btn_save_acl_{les.id}"): try: emails_new = [alunos_map[lbl] for lbl in sel_labels] set_video_acl_emails(les.id, emails_new) st.success("Lista de alunos autorizada atualizada.") except Exception as e: st.error(f"Falha ao salvar lista de alunos: {e}") else: st.caption("Para definir alunos específicos, selecione a visibilidade **Restrito**.") st.markdown("---") if st.button("❌ Excluir módulo", key=f"del_mod_{les.id}"): try: if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path): os.remove(les.video_path) except Exception: pass db.delete(les) commit_with_retry(db) st.success("Módulo removido.") st.rerun() # ---- Apostilas (PDF) with up_tabs[1]: st.caption("Adicionar apostila (PDF) ao curso") with st.form("add_mat", clear_on_submit=True): mt = st.text_input("Título da apostila") pf = st.file_uploader("Arquivo PDF", type=["pdf"]) okm = st.form_submit_button("Adicionar") if okm: if not mt or not pf: st.error("Informe título e selecione um PDF.") else: ppath = save_uploaded_file(pf, base_dir="data/pdfs") m = Material(course_id=curso.id, title=mt.strip(), pdf_path=ppath) db.add(m) commit_with_retry(db) st.success("Apostila adicionada.") st.markdown("#### Apostilas cadastradas") mats = db.query(Material).filter(Material.course_id == curso.id).order_by(Material.created_at.asc()).all() for m in mats: cols = st.columns([4, 2, 1]) cols[0].write(f"📄 **{m.title}**") if m.pdf_path and os.path.exists(m.pdf_path): try: with open(m.pdf_path, "rb") as f: cols[1].download_button("Baixar", data=f.read(), file_name=os.path.basename(m.pdf_path), key=f"dl_mat_{m.id}") except Exception: cols[1].warning("Falha ao abrir PDF.") else: cols[1].warning("Arquivo não encontrado.") if cols[2].button("Excluir", key=f"del_mat_{m.id}"): try: if m.pdf_path and os.path.exists(m.pdf_path): os.remove(m.pdf_path) except Exception: pass db.delete(m) commit_with_retry(db) st.success("Apostila removida.") st.rerun() # ================== AGENDA (CRUD + Calendário) ================== with tab_agenda: st.subheader("Agenda por curso") cursos_all = db.query(Course).order_by(Course.title.asc()).all() if not cursos_all: st.info("Cadastre um curso primeiro.") else: cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="agenda_sel_curso") curso_ag = cursos_all[cidx] lessons = db.query(Lesson).filter(Lesson.course_id == curso_ag.id).order_by(Lesson.created_at.asc()).all() ens = db.query(Enrollment).filter(Enrollment.course_id == curso_ag.id).order_by(Enrollment.student_name.asc()).all() with st.expander("📥 Importar Agenda (TSV/CSV/TXT/Excel ou texto colado)", expanded=False): st.caption("Formato: **Semana, Data, Dia, Horário, Ocupação, Aluno, Mat., Descrição, Tema**.") col_up, col_paste = st.columns(2) uploaded = col_up.file_uploader("Enviar arquivo", type=["csv","tsv","txt","xlsx","xls"], key=f"imp_ag_up_{curso_ag.id}") pasted_text = col_paste.text_area("… ou cole aqui o conteúdo tabulado (TSV)", height=220, key=f"imp_ag_txt_{curso_ag.id}") try_link_module = st.checkbox("Vincular módulo automaticamente pelo TEMA (título exato)", value=True, key=f"imp_ag_trylink_{curso_ag.id}") btn_preview = st.button("🔍 Pré-visualizar", key=f"imp_ag_preview_{curso_ag.id}") df_raw, df_norm = None, None if btn_preview: try: if uploaded is not None: df_raw = _read_agenda_uploaded(uploaded) elif pasted_text.strip(): df_raw = _parse_tsv_text(pasted_text) else: st.warning("Envie um arquivo **ou** cole o texto tabulado.") if df_raw is not None and not df_raw.empty: st.success(f"Dados lidos. Linhas: {len(df_raw)}") with st.container(): st.markdown("**Prévia (dados brutos)**") st.dataframe(df_raw.head(15), use_container_width=True) df_norm = _clean_agenda_df(df_raw) if df_norm.empty: st.warning("Não foi possível normalizar. Verifique o cabeçalho e o conteúdo.") else: preview_cols = ['semana','data','dia','horario','inicio_hhmm','fim_hhmm','ocupacao','aluno','matricula','tema','fora_do_cronograma','vago'] for c in preview_cols: if c not in df_norm.columns: df_norm[c] = "" st.subheader("✅ Prévia normalizada") st.dataframe(df_norm[preview_cols], use_container_width=True, hide_index=True) st.session_state[f"imp_df_norm_{curso_ag.id}"] = df_norm.to_dict(orient='list') else: st.info("Sem dados para pré-visualizar.") except Exception as e: st.error(f"Falha ao pré-visualizar: {e}") btn_import = st.button("📦 Importar para este curso", key=f"imp_ag_do_{curso_ag.id}") if btn_import: payload = st.session_state.get(f"imp_df_norm_{curso_ag.id}") if not payload: st.warning("Faça a pré-visualização primeiro.") else: try: df_norm = pd.DataFrame(payload) lessons_by_title_norm = { _norm(l.title): l for l in lessons } enrolls_by_name_norm = { _norm(e.student_name): e for e in ens } agenda_exist = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).all() def _key_ev(a: Schedule, meta: Dict[str,Any]) -> tuple: topic = meta.get("topic") or "" emails = meta.get("assigned_emails") or [] em = (emails[0] if emails else "").strip().lower() return (a.class_date, a.start_time, a.end_time, topic.strip().lower(), em) exist_keys = set() for a in agenda_exist: m = parse_schedule_topic(a.topic) exist_keys.add(_key_ev(a, m)) created, skipped = 0, 0 for _, r in df_norm.iterrows(): d = r.get('data') ini = r.get('inicio_hhmm') fim = r.get('fim_hhmm') tema = (r.get('tema') or "").strip() ocup = (r.get('ocupacao') or "").strip() aluno_nome = (r.get('aluno') or "").strip() vago = bool(r.get('vago')) semana = r.get('semana') or None details = (r.get('descricao') or "").strip() fora = bool(r.get('fora_do_cronograma')) if pd.isna(d) or not ini or not fim: skipped += 1 continue t1, t2 = parse_time_range(f"{ini}-{fim}") if not t1 or not t2: skipped += 1 continue assigned = [] if not vago and aluno_nome: e = enrolls_by_name_norm.get(_norm(aluno_nome)) if e: assigned = [e.student_email.strip().lower()] module_id = None if try_link_module and tema: l = lessons_by_title_norm.get(_norm(tema)) if l: module_id = l.id meta_obj = { "topic": tema or None, "module_id": module_id, "assigned_emails": assigned, "occupancy": ocup, "module_index": None, "details": details or None, "descricao": details or None, "semana": semana, "off_schedule": fora, "aluno_nome": (aluno_nome or None), "matricula": r.get('matricula'), } topic_json = json.dumps(meta_obj, ensure_ascii=False) key = (pd.to_datetime(d).date(), t1, t2, (tema or "").strip().lower(), (assigned[0] if assigned else "")) if key in exist_keys: skipped += 1 continue ag = Schedule( course_id=curso_ag.id, class_date=pd.to_datetime(d).date(), start_time=t1, end_time=t2, topic=topic_json ) db.add(ag) commit_with_retry(db) exist_keys.add(key) created += 1 st.session_state.cal_ver += 1 st.success(f"Importação concluída. Criados: {created} · Ignorados (duplicados ou inválidos): {skipped}") st.rerun() except Exception as e: db.rollback() st.error(f"Falha ao importar: {e}") with st.form("agenda_add", clear_on_submit=True): d = st.date_input("Data da aula") colh1, colh2 = st.columns(2) t1 = colh1.time_input("Início", value=time(19, 0)) t2 = colh2.time_input("Fim", value=time(20, 0)) mod_opt = st.selectbox("Vincular a um módulo (opcional)", options=["— sem vínculo —"] + [f"[{i+1}] {l.title}" for i, l in enumerate(lessons)]) alunos_opts = [f"{e.student_name} <{e.student_email}>" for e in ens] alunos_sel = st.multiselect("Atribuir alunos (opcional)", options=alunos_opts) ocup = st.text_input("Ocupação (ex.: AO VIVO, Gravado…) (opcional)", value="") mod_index = st.text_input("Índice do módulo (opcional)", value="") tema_manual = st.text_input("Tema/descrição (caso não vincule módulo)", value="") ok_add = st.form_submit_button("Adicionar à agenda", type="primary") if ok_add: try: module_id = None if mod_opt != "— sem vínculo —": idx_mod = int(mod_opt.split("]")[0].replace("[","")) module_id = lessons[idx_mod-1].id emails = [re.search(r"<(.+?)>", s).group(1).strip().lower() for s in alunos_sel] if alunos_sel else [] topic_obj = { "topic": (tema_manual.strip() if tema_manual.strip() else None), "module_id": module_id, "assigned_emails": emails, "occupancy": ocup.strip(), "module_index": (mod_index.strip() or None), } ag = Schedule( course_id=curso_ag.id, class_date=d, start_time=t1, end_time=t2, topic=json.dumps(topic_obj, ensure_ascii=False), ) db.add(ag) commit_with_retry(db) st.success("Aula adicionada à agenda.") st.rerun() except Exception as e: st.error(f"Falha ao adicionar à agenda: {e}") st.markdown("#### Itens da agenda") agenda = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all() if not agenda: st.info("Sem agenda cadastrada.") else: lessons_by_id = {l.id: l for l in lessons} rows = [] for a in agenda: meta = parse_schedule_topic(a.topic) mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None rows.append({ "ID": a.id, "Data": a.class_date.strftime("%d/%m/%Y") if a.class_date else "", "Início": a.start_time.strftime("%H:%M") if a.start_time else "", "Fim": a.end_time.strftime("%H:%M") if a.end_time else "", "Módulo/Assunto": (mod.title if mod else (meta.get("topic") or "—")), "Alunos": ", ".join(meta.get("assigned_emails") or []), "Ocupação": meta.get("occupancy") or "", }) st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) col_del1, col_del2 = st.columns([1,3]) del_id = col_del1.number_input("ID para excluir", min_value=0, value=0, step=1, key="agenda_del_id") if col_del2.button("🗑️ Excluir item da agenda", disabled=(del_id<=0), key="agenda_btn_del"): item = db.query(Schedule).filter(Schedule.id == int(del_id)).first() if not item: st.warning("ID não encontrado.") else: db.delete(item) commit_with_retry(db) st.success("Item excluído.") st.rerun() st.markdown("#### Calendário") events, color_map = build_fullcalendar_events(agenda, lessons_by_id) events = json_sanitize_events(events) cal_options = { "initialView": "dayGridMonth", "locale": "pt-br", "height": 740, "headerToolbar": { "left": "prev,next today", "center": "title", "right": "dayGridMonth,timeGridWeek,listWeek" }, } render_calendar(events=events, options=cal_options, key=f"adm_cal_{curso_ag.id}_{st.session_state.cal_ver}") # ================== AVALIAÇÕES & CERTIFICADOS ================== with tab_avaliacao: st.subheader("Notas e Certificados") cursos_all = db.query(Course).order_by(Course.title.asc()).all() if not cursos_all: st.info("Cadastre um curso primeiro.") else: cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="av_sel_curso") curso_av = cursos_all[cidx] ens = db.query(Enrollment).filter(Enrollment.course_id == curso_av.id).order_by(Enrollment.student_name.asc()).all() if not ens: st.info("Cadastre alunos neste curso para lançar notas e emitir certificados.") else: nomes = [f"{e.student_name} <{e.student_email}>" for e in ens] # ---- Notas st.markdown("### 📝 Lançar nota") with st.form("grade_form", clear_on_submit=True): who = st.selectbox("Aluno", options=nomes) nota = st.number_input("Nota", min_value=0.0, max_value=10.0, step=0.1) obs = st.text_area("Observações (opcional)") okg = st.form_submit_button("Salvar nota", type="primary") if okg: email = re.search(r"<(.+?)>", who).group(1).strip().lower() nome = who.split(" <")[0].strip() g = Grade(course_id=curso_av.id, student_name=nome, student_email=email, grade=float(nota), note=(obs.strip() or None)) db.add(g) commit_with_retry(db) st.success("Nota lançada.") grades = db.query(Grade).filter(Grade.course_id == curso_av.id).order_by(Grade.created_at.desc()).all() if grades: df_g = pd.DataFrame([{"ID":g.id,"Aluno":g.student_name,"E-mail":g.student_email,"Nota":g.grade,"Obs":g.note,"Data":g.created_at} for g in grades]) st.dataframe(df_g, use_container_width=True, hide_index=True) st.markdown("---") # ---- Certificados st.markdown("### 🎓 Emitir certificado") with st.form("cert_form", clear_on_submit=True): who2 = st.selectbox("Aluno", options=nomes, key="cert_who") nome_c = st.text_input("Nome no certificado (como deve aparecer)", value=who2.split(" <")[0].strip()) okc = st.form_submit_button("Emitir certificado (PDF)", type="primary") if okc: try: email_c = re.search(r"<(.+?)>", who2).group(1).strip().lower() out_dir = os.path.join("data","certificates") os.makedirs(out_dir, exist_ok=True) pdf_path = generate_certificate_pdf(course_title=curso_av.title, student_name=nome_c, output_dir=out_dir) cert = Certificate(course_id=curso_av.id, student_name=nome_c, student_email=email_c, pdf_path=pdf_path, issued_at=datetime.utcnow()) db.add(cert) commit_with_retry(db) st.success("Certificado emitido.") with open(pdf_path,"rb") as f: st.download_button("⬇️ Baixar certificado agora", data=f.read(), file_name=os.path.basename(pdf_path), key=f"dl_newcert_{curso_av.id}_{email_c}") except Exception as e: st.error(f"Falha ao emitir certificado: {e}") certs = db.query(Certificate).filter(Certificate.course_id == curso_av.id).order_by(Certificate.issued_at.desc()).all() if certs: df_c = pd.DataFrame([{"ID":c.id,"Aluno":c.student_name,"E-mail":c.student_email,"Arquivo":os.path.basename(c.pdf_path) if c.pdf_path else "—","Emitido em":c.issued_at} for c in certs]) st.dataframe(df_c, use_container_width=True, hide_index=True) # ================== REPOSIÇÕES ================== with tab_repos: st.subheader("Solicitações de reposição") cursos_all = db.query(Course).order_by(Course.title.asc()).all() if not cursos_all: st.info("Cadastre um curso primeiro.") else: cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="rep_sel_curso") curso_rp = cursos_all[cidx] reqs = db.query(MakeupRequest).filter(MakeupRequest.course_id == curso_rp.id).order_by(MakeupRequest.created_at.desc()).all() if not reqs: st.info("Sem solicitações para este curso.") else: for r in reqs: with st.expander(f"{r.student_name} <{r.student_email}> — {r.requested_date} — status: {r.status}", expanded=False): st.write(r.note or "Sem observações.") c1, c2 = st.columns([1,3]) new_status = c1.selectbox("Novo status", options=["pending","approved","denied"], index=["pending","approved","denied"].index(r.status), key=f"rp_status_{r.id}") new_note = c2.text_input("Nota interna (opcional)", value=r.note or "", key=f"rp_note_{r.id}") if st.button("💾 Atualizar solicitação", key=f"rp_save_{r.id}"): r.status = new_status r.note = (new_note or "").strip() or None commit_with_retry(db) st.success("Solicitação atualizada.") st.rerun() # ================== CADASTRAR ALUNOS ================== with tab_alunos: st.subheader("Matrículas por curso") cursos_all = db.query(Course).order_by(Course.title.asc()).all() if not cursos_all: st.info("Cadastre um curso primeiro.") else: cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="aluno_sel_curso") curso_en = cursos_all[cidx] st.markdown("### ➕ Adicionar matrícula") with st.form("matricula_add", clear_on_submit=True): nome = st.text_input("Nome do aluno") email = st.text_input("E-mail do aluno").strip().lower() ok_en = st.form_submit_button("Matricular", type="primary") if ok_en: if not nome or not email: st.error("Preencha nome e e-mail.") else: e = Enrollment(course_id=curso_en.id, student_name=nome.strip(), student_email=email.strip().lower()) db.add(e) commit_with_retry(db) st.success("Aluno matriculado.") st.rerun() st.markdown("### Alunos matriculados") ens = db.query(Enrollment).filter(Enrollment.course_id == curso_en.id).order_by(Enrollment.created_at.desc()).all() if not ens: st.info("Nenhum aluno neste curso.") else: for e in ens: cols = st.columns([3,3,2,2,2]) cols[0].write(f"**{e.student_name}**") cols[1].write(e.student_email) with cols[2]: with st.expander("🔑 Definir/Resetar senha", expanded=False): pwd = st.text_input("Nova senha", type="password", key=f"setpwd_{e.id}") if st.button("Salvar senha", key=f"btn_setpwd_{e.id}"): try: set_student_password(e.student_email, pwd) st.success("Senha definida.") except Exception as ex: st.error(f"Erro ao salvar senha: {ex}") if cols[3].button("🗑️ Remover", key=f"del_en_{e.id}"): db.delete(e) commit_with_retry(db) st.success("Matrícula removida.") st.rerun() if cols[4].button("🧹 Normalizar & deduplicar", key=f"normdedup_{e.id}"): n, r = normalize_and_dedup_enrollments(db) st.success(f"Normalizados: {n}, apagados duplicados: {r}") st.rerun() # ================== TURMA EM ANDAMENTO (ADMIN) ================== with tab_turma: st.subheader("Turma em andamento (visão do Admin)") cursos_all = db.query(Course).order_by(Course.title.asc()).all() if not cursos_all: st.info("Cadastre um curso primeiro.") else: cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="turma_sel_curso") curso_tx = cursos_all[cidx] lessons = db.query(Lesson).filter(Lesson.course_id == curso_tx.id).all() agenda = db.query(Schedule).filter(Schedule.course_id == curso_tx.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all() lessons_by_id = {l.id: l for l in lessons} rows = [] for a in agenda: meta = parse_schedule_topic(a.topic) mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None title = mod.title if mod else (meta.get("topic") or "—") rows.append({ "Data": a.class_date, "Início": a.start_time.strftime("%H:%M") if a.start_time else "", "Fim": a.end_time.strftime("%H:%M") if a.end_time else "", "Aula": title, "Alunos": ", ".join(meta.get("assigned_emails") or []), "Ocupação": meta.get("occupancy") or "", }) if rows: st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) else: st.info("Sem agenda cadastrada.") st.markdown("#### Calendário") events, color_map = build_fullcalendar_events(agenda, lessons_by_id) events = json_sanitize_events(events) cal_options = { "initialView": "listWeek", "locale": "pt-br", "height": 640, "headerToolbar": { "left": "prev,next today", "center": "title", "right": "dayGridMonth,timeGridWeek,listWeek" }, } render_calendar(events=events, options=cal_options, key=f"adm_turma_cal_{curso_tx.id}_{st.session_state.cal_ver}") # ================== INTERAÇÕES (PRESENÇA + COMENTÁRIOS) ================== with tab_interacao: st.subheader("👥 Usuários online (últimos N segundos)") secs = int(st.number_input("Threshold (segundos)", min_value=30, max_value=3600, value=PRESENCE_ONLINE_THRESHOLD_SEC, step=30)) dfp = get_online_users(threshold_sec=secs) if dfp is None or dfp.empty: st.info("Ninguém online neste momento dentro do threshold.") else: dfv = dfp.copy() dfv["Tempo de sessão"] = dfv["session_sec"].apply(_fmt_duration) dfv["Inatividade"] = dfv["idle_sec"].apply(_fmt_duration) dfv.rename(columns={"user_email":"E-mail","user_name":"Nome","role":"Perfil","page":"Página","first_seen":"Primeiro acesso","last_seen":"Última ação"}, inplace=True) st.dataframe( dfv[["Nome","E-mail","Perfil","Página","Primeiro acesso","Última ação","Tempo de sessão","Inatividade"]], use_container_width=True, hide_index=True ) st.markdown("---") st.subheader("💬 Moderação de comentários") cursos = db.query(Course).order_by(Course.title.asc()).all() if not cursos: st.info("Cadastre um curso primeiro para ver comentários.") else: idxc = st.selectbox("Curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title, key="adm_curso_coment") curso_sel = cursos[idxc] dft = get_comments(curso_sel.id) if dft is None or dft.empty: st.info("Sem comentários neste curso.") else: st.dataframe(dft, use_container_width=True) del_id = st.number_input("ID para excluir (apaga também respostas)", min_value=0, value=0, step=1) if st.button("🗑️ Excluir comentário selecionado", disabled=(del_id <= 0)): try: delete_comment(int(del_id)) st.success("Comentário excluído.") st.rerun() except Exception as e: st.error(f"Falha ao excluir: {e}") # ================== GALERIA DE RELATÓRIOS (ADMIN) ================== with tab_galeria: st.subheader("📷 Galeria de Relatórios (Admin)") aba_galeria_relatorios(context_key="admin_") # ================== RESET DA BASE + BACKUP/RESTORE ================== with tab_reset: st.subheader("💾 Backup & Restore do Banco e Mídias") # ---------- SYNC COM HUGGING FACE (DATASET) ---------- st.markdown("### ☁️ Backup para Hugging Face (Dataset)") st.caption("Envia snapshot do **db.sqlite** e um **ZIP** de **data/** para um repositório Dataset no Hugging Face. Requer **HF_TOKEN**.") default_repo = st.session_state.get("hf_dataset_repo") or "Roudrigus/minha-academia-db" repo_id_raw = st.text_input("Repo Dataset (ex.: Owner/Nome)", value=default_repo, key="hf_repo_id") repo_id = _sanitize_repo_id(repo_id_raw) if repo_id != repo_id_raw: st.info(f"Repo ajustado para **{repo_id}** (sanitizado).") colhfa, colhfb, colhfc = st.columns(3) with colhfa: st.markdown("**Biblioteca**") if HF_HUB_AVAILABLE: st.success("📦 huggingface_hub disponível") else: st.warning("📦 huggingface_hub NÃO instalado. Adicione no requirements.txt: `huggingface_hub>=0.21.4`") with colhfb: st.markdown("**Token**") has_token = bool((os.environ.get("HF_TOKEN") or "").strip()) if has_token: st.success("🔑 HF_TOKEN definido") else: st.warning("🔑 HF_TOKEN ausente (Settings → Variables)") with colhfc: st.markdown("**Repositório**") if repo_id and "/" in repo_id: st.success(f"🗃️ {repo_id}") else: st.warning("Informe como `Owner/Nome`") do_sync = st.button("☁️ Sincronizar no Hugging Face (Dataset)", key="btn_sync_now") if do_sync: if not HF_HUB_AVAILABLE: st.error("Instale huggingface_hub e reinicie o app.") elif not has_token: st.error("Defina o secret HF_TOKEN no ambiente para prosseguir.") elif not repo_id or "/" not in repo_id: st.error("Informe um repo válido no formato `Owner/Nome`.") else: try: st.session_state["hf_dataset_repo"] = repo_id.strip() stats = sync_to_hf_dataset(dataset_repo=repo_id.strip(), db_path=DB_PATH, data_dir="data") msgs = [] if stats.get("db_uploaded"): msgs.append("db.sqlite") if stats.get("data_uploaded"): msgs.append("data.zip") if msgs: st.success(f"Backup enviado: {', '.join(msgs)} → {repo_id}") else: st.info("Nada para enviar (db.sqlite não encontrado e pasta data/ inexistente).") except Exception as e: st.error(f"Falha ao sincronizar: {e}") st.caption("Stacktrace (debug):") st.code(traceback.format_exc()) st.markdown("---") # ---------- BACKUP AUTOMÁTICO ---------- st.markdown("### ⏱️ Backup automático (inteligente, a cada 10 min)") enable_auto = st.checkbox("Habilitar backup automático (10 min)", value=st.session_state.get("auto_bkp_enabled", False)) st.session_state["auto_bkp_enabled"] = enable_auto now_ts = _time.time() last_ts = st.session_state.get("auto_bkp_last_ts", 0.0) next_in = max(0, int(AUTO_BKP_INTERVAL_SEC - (now_ts - last_ts))) if enable_auto else None cstat1, cstat2, cstat3 = st.columns(3) with cstat1: st.metric("Intervalo", "10 min") with cstat2: st.metric("Próxima execução", f"{next_in}s" if enable_auto else "—") with cstat3: last_status = st.session_state.get("auto_bkp_last_status","—") st.metric("Último ciclo", last_status) if enable_auto and HF_HUB_AVAILABLE and has_token and repo_id and "/" in repo_id: if now_ts - last_ts >= AUTO_BKP_INTERVAL_SEC: try: status = _auto_backup_tick(repo_id) st.session_state["auto_bkp_last_status"] = status st.session_state["auto_bkp_last_ts"] = now_ts st.success(f"Auto-backup: {status}") st.rerun() except Exception as e: st.session_state["auto_bkp_last_status"] = f"erro: {e}" st.error(f"Auto-backup falhou: {e}") st.caption("Obs.: o auto-backup executa no render do painel Admin. Mantenha esta aba aberta.") st.markdown("### Backup do Banco (.sqlite)") if os.path.exists(DB_PATH): with open(DB_PATH, "rb") as f: st.download_button( "⬇️ Baixar banco (SQLite)", data=f.read(), file_name="db_backup.sqlite", mime="application/octet-stream", ) else: st.info("Banco ainda não existe no disco (será criado ao salvar dado).") st.markdown("### Backup em CSV de todas as tabelas") db2 = ensure_db_ready() try: cur_df = pd.DataFrame([{"id": c.id, "title": c.title, "description": c.description, "created_at": c.created_at} for c in db2.query(Course).all()]) en_df = pd.DataFrame([{"id": e.id, "course_id": e.course_id, "student_name": e.student_name, "student_email": e.student_email, "created_at": e.created_at} for e in db2.query(Enrollment).all()]) md_df = pd.DataFrame([{"id": m.id, "course_id": m.course_id, "title": m.title, "description": m.description, "video_path": m.video_path, "created_at": m.created_at} for m in db2.query(Lesson).all()]) ag_df = pd.DataFrame([{"id": a.id, "course_id": a.course_id, "class_date": a.class_date, "start_time": a.start_time, "end_time": a.end_time, "topic": a.topic, "created_at": a.created_at} for a in db2.query(Schedule).all()]) grades_df = pd.DataFrame([{ "id": g.id, "course_id": g.course_id, "student_name": g.student_name, "student_email": g.student_email, "grade": g.grade, "note": g.note, "created_at": g.created_at } for g in db2.query(Grade).all()]) certs_df = pd.DataFrame([{ "id": c.id, "course_id": c.course_id, "student_name": c.student_name, "student_email": c.student_email, "pdf_path": c.pdf_path, "issued_at": c.issued_at } for c in db2.query(Certificate).all()]) mk_df = pd.DataFrame([{ "id": m.id, "course_id": m.course_id, "student_name": m.student_name, "student_email": m.student_email, "requested_date": m.requested_date, "note": m.note, "status": m.status, "created_at": m.created_at } for m in db2.query(MakeupRequest).all()]) finally: db2.close() # Garantir auxiliares antes do snapshot CSV (para SELECT funcionar bem no SQLite) ensure_presence_table() ensure_video_access_tables() ensure_comments_table() def _safe_read_sql(query: str, conn: sqlite3.Connection) -> pd.DataFrame: try: return pd.read_sql_query(query, conn) except Exception: q = query.lower() if "from student_auth" in q: return pd.DataFrame(columns=["student_email", "password_hash", "created_at"]) if "from video_access_list" in q: return pd.DataFrame(columns=["lesson_id", "student_email", "created_at"]) if "from video_access" in q: return pd.DataFrame(columns=["lesson_id", "visibility", "updated_at"]) if "from presence" in q: return pd.DataFrame(columns=["session_id","user_email","user_name","role","page","first_seen","last_seen"]) if "from comments" in q: return pd.DataFrame(columns=["id","course_id","student_email","student_name","content","parent_id","created_at"]) return pd.DataFrame() with sqlite3.connect(DB_PATH) as _conn: try: student_auth_df = _safe_read_sql("SELECT student_email, password_hash, created_at FROM student_auth", _conn) except Exception: student_auth_df = pd.DataFrame() video_access_df = _safe_read_sql("SELECT lesson_id, visibility, updated_at FROM video_access", _conn) video_acl_df = _safe_read_sql("SELECT lesson_id, student_email, created_at FROM video_access_list", _conn) presence_df = _safe_read_sql("SELECT session_id, user_email, user_name, role, page, first_seen, last_seen FROM presence", _conn) comments_df = _safe_read_sql("SELECT id, course_id, student_email, student_name, content, parent_id, created_at FROM comments", _conn) colb1, colb2, colb3 = st.columns(3) colb1.download_button("⬇️ CSV — Cursos", data=cur_df.to_csv(index=False).encode("utf-8-sig"), file_name="courses.csv") colb1.download_button("⬇️ CSV — Alunos", data=en_df.to_csv(index=False).encode("utf-8-sig"), file_name="enrollments.csv") colb1.download_button("⬇️ CSV — Módulos", data=md_df.to_csv(index=False).encode("utf-8-sig"), file_name="lessons.csv") colb2.download_button("⬇️ CSV — Agenda", data=ag_df.to_csv(index=False).encode("utf-8-sig"), file_name="schedule.csv") colb2.download_button("⬇️ CSV — Notas", data=grades_df.to_csv(index=False).encode("utf-8-sig"), file_name="grades.csv") colb2.download_button("⬇️ CSV — Certificados", data=certs_df.to_csv(index=False).encode("utf-8-sig"), file_name="certificates.csv") colb3.download_button("⬇️ CSV — Reposições", data=mk_df.to_csv(index=False).encode("utf-8-sig"), file_name="makeup_requests.csv") colb3.download_button("⬇️ CSV — Vídeo: visibilidade", data=video_access_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access.csv") colb3.download_button("⬇️ CSV — Vídeo: ACL", data=video_acl_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access_list.csv") st.download_button("⬇️ CSV — Comentários", data=comments_df.to_csv(index=False).encode("utf-8-sig"), file_name="comments.csv") st.download_button("⬇️ CSV — Presença (snapshot)", data=presence_df.to_csv(index=False).encode("utf-8-sig"), file_name="presence.csv") st.markdown("---") st.subheader("🧨 Reset da Base (Atenção: ação irreversível)") st.warning("Isto apaga TODOS os dados do banco (cursos, módulos, apostilas, agenda, matrículas, notas, certificados, reposições e auxiliares). Não apaga arquivos em 'data/'.") confirm = st.text_input("Confirmação", placeholder="APAGAR TUDO") btn = st.button("Apagar tudo agora", type="primary", disabled=(confirm.strip().upper() != "APAGAR TUDO")) if btn: ok, err = reset_database(db) if ok: st.success("✅ Base resetada com sucesso (tabelas esvaziadas).") else: st.error(f"Falha ao resetar base: {err}") finally: db.close()