|
|
|
|
|
|
|
|
import os
|
|
|
import io
|
|
|
import re
|
|
|
import json
|
|
|
import sqlite3
|
|
|
import zipfile
|
|
|
import traceback
|
|
|
import time as _time
|
|
|
from datetime import datetime, date, time, timedelta
|
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
|
|
|
import pandas as pd
|
|
|
import streamlit as st
|
|
|
from sqlalchemy import func
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from models import (
|
|
|
Course, Lesson, Material, Schedule, Enrollment, Grade, MakeupRequest, Certificate
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
from core.db_ready import ensure_db_ready, commit_with_retry
|
|
|
except Exception:
|
|
|
from db_ready import ensure_db_ready, commit_with_retry
|
|
|
|
|
|
try:
|
|
|
from core.student_auth import set_student_password
|
|
|
except Exception:
|
|
|
from student_auth import set_student_password
|
|
|
|
|
|
try:
|
|
|
from core.video_access import (
|
|
|
get_video_visibility, set_video_visibility,
|
|
|
get_video_acl_emails, set_video_acl_emails,
|
|
|
is_video_allowed_for_student,
|
|
|
_ensure_video_access_tables as ensure_video_access_tables,
|
|
|
)
|
|
|
except Exception:
|
|
|
from video_access import (
|
|
|
get_video_visibility, set_video_visibility,
|
|
|
get_video_acl_emails, set_video_acl_emails,
|
|
|
is_video_allowed_for_student,
|
|
|
)
|
|
|
|
|
|
def ensure_video_access_tables():
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
from core.presence import (
|
|
|
get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC,
|
|
|
_ensure_presence_table as ensure_presence_table,
|
|
|
)
|
|
|
except Exception:
|
|
|
from presence import get_online_users, PRESENCE_ONLINE_THRESHOLD_SEC
|
|
|
def ensure_presence_table():
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
from core.comments import (
|
|
|
get_comments, add_comment, delete_comment,
|
|
|
_ensure_comments_table as ensure_comments_table,
|
|
|
)
|
|
|
except Exception:
|
|
|
from comments import get_comments, add_comment, delete_comment
|
|
|
def ensure_comments_table():
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
from core.backup_hf import (
|
|
|
HF_HUB_AVAILABLE, sync_to_hf_dataset,
|
|
|
auto_backup_tick as _auto_backup_tick,
|
|
|
_sanitize_repo_id,
|
|
|
)
|
|
|
except Exception:
|
|
|
from backup_hf import (
|
|
|
HF_HUB_AVAILABLE, sync_to_hf_dataset,
|
|
|
auto_backup_tick as _auto_backup_tick,
|
|
|
_sanitize_repo_id,
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
from ui.gallery import aba_galeria_relatorios
|
|
|
except Exception:
|
|
|
from gallery import aba_galeria_relatorios
|
|
|
|
|
|
|
|
|
try:
|
|
|
from Config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC
|
|
|
except Exception:
|
|
|
try:
|
|
|
from config.settings import ADMIN_FIXED_PASS_HASH, AUTO_BKP_INTERVAL_SEC
|
|
|
except Exception:
|
|
|
|
|
|
import hashlib as _hashlib
|
|
|
ADMIN_FIXED_PASS_HASH = _hashlib.sha256("21003887".encode()).hexdigest()
|
|
|
AUTO_BKP_INTERVAL_SEC = 600
|
|
|
|
|
|
|
|
|
try:
|
|
|
from utils import save_uploaded_file, human_dt, generate_certificate_pdf, ADMIN_USER
|
|
|
except Exception:
|
|
|
|
|
|
ADMIN_USER = "admin"
|
|
|
def save_uploaded_file(file, base_dir="data"):
|
|
|
out_dir = os.path.join(base_dir)
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
path = os.path.join(out_dir, file.name)
|
|
|
with open(path, "wb") as f:
|
|
|
f.write(file.getbuffer())
|
|
|
return path
|
|
|
def human_dt(dt_obj):
|
|
|
try:
|
|
|
return pd.to_datetime(dt_obj).strftime("%d/%m/%Y %H:%M")
|
|
|
except Exception:
|
|
|
return str(dt_obj)
|
|
|
def generate_certificate_pdf(course_title: str, student_name: str, output_dir: str) -> str:
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
path = os.path.join(output_dir, f"cert_{student_name.replace(' ', '_')}.pdf")
|
|
|
with open(path, "wb") as f:
|
|
|
f.write(b"%PDF-1.4\n% placeholder certificate\n")
|
|
|
return path
|
|
|
|
|
|
|
|
|
try:
|
|
|
from db import DB_PATH
|
|
|
except Exception:
|
|
|
DB_PATH = os.path.join("data", "db.sqlite")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_url(s: Optional[str]) -> bool:
|
|
|
s = (s or "").strip().lower()
|
|
|
return s.startswith("http://") or s.startswith("https://")
|
|
|
|
|
|
|
|
|
def normalize_youtube_url(url: str) -> str:
|
|
|
if not url:
|
|
|
return url
|
|
|
u = url.strip()
|
|
|
|
|
|
m = re.search(r"(?:https?://)?(?:www\.)?youtu\.be/([A-Za-z0-9_-]{6,})", u)
|
|
|
if m:
|
|
|
vid = m.group(1)
|
|
|
return f"https://www.youtube.com/watch?v={vid}"
|
|
|
|
|
|
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([A-Za-z0-9_-]{6,})", u)
|
|
|
if m:
|
|
|
return f"https://www.youtube.com/watch?v={m.group(1)}"
|
|
|
|
|
|
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([A-Za-z0-9_-]{6,})", u)
|
|
|
if m:
|
|
|
return f"https://www.youtube.com/watch?v={m.group(1)}"
|
|
|
|
|
|
m = re.search(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([A-Za-z0-9_-]{6,})", u)
|
|
|
if m:
|
|
|
return f"https://www.youtube.com/watch?v={m.group(1)}"
|
|
|
|
|
|
return u
|
|
|
|
|
|
|
|
|
def parse_schedule_topic(topic_raw: str) -> Dict[str, Any]:
|
|
|
defaults = {
|
|
|
"topic": None,
|
|
|
"module_id": None,
|
|
|
"assigned_emails": [],
|
|
|
"occupancy": "",
|
|
|
"module_index": None,
|
|
|
"details": None,
|
|
|
"descricao": None,
|
|
|
"semana": None,
|
|
|
"off_schedule": False,
|
|
|
"aluno_nome": None,
|
|
|
"matricula": None,
|
|
|
}
|
|
|
if not topic_raw:
|
|
|
return defaults
|
|
|
s = (topic_raw or "").strip()
|
|
|
|
|
|
|
|
|
try:
|
|
|
obj = json.loads(s)
|
|
|
if isinstance(obj, dict):
|
|
|
out = defaults.copy()
|
|
|
out["topic"] = obj.get("topic") or obj.get("titulo") or obj.get("assunto") or None
|
|
|
out["module_id"] = obj.get("module_id") or obj.get("mod_id") or obj.get("lesson_id") or None
|
|
|
emails = obj.get("assigned_emails") or obj.get("emails") or []
|
|
|
if isinstance(emails, str):
|
|
|
emails = [e.strip().lower() for e in emails.split(",") if e.strip()]
|
|
|
elif isinstance(emails, list):
|
|
|
emails = [(e or "").strip().lower() for e in emails if (e or "").strip()]
|
|
|
else:
|
|
|
emails = []
|
|
|
out["assigned_emails"] = emails
|
|
|
out["occupancy"] = (obj.get("occupancy") or obj.get("ocupacao") or "").strip()
|
|
|
out["module_index"] = obj.get("module_index") or obj.get("indice") or None
|
|
|
out["details"] = obj.get("details")
|
|
|
out["descricao"] = obj.get("descricao")
|
|
|
out["semana"] = obj.get("semana")
|
|
|
out["off_schedule"] = bool(obj.get("off_schedule", False))
|
|
|
out["aluno_nome"] = obj.get("aluno_nome")
|
|
|
out["matricula"] = obj.get("matricula")
|
|
|
return out
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
parts = [p.strip() for p in re.split(r"[;|]\s*", s) if p.strip()]
|
|
|
kv = {}
|
|
|
for p in parts:
|
|
|
if ":" in p:
|
|
|
k, v = p.split(":", 1)
|
|
|
kv[k.strip().lower()] = v.strip()
|
|
|
if kv:
|
|
|
out = defaults.copy()
|
|
|
out["topic"] = kv.get("topic") or kv.get("assunto") or None
|
|
|
mid = kv.get("module_id")
|
|
|
out["module_id"] = int(mid) if (mid and str(mid).isdigit()) else None
|
|
|
emails = kv.get("assigned_emails") or kv.get("emails") or ""
|
|
|
out["assigned_emails"] = [e.strip().lower() for e in emails.split(",") if e.strip()]
|
|
|
out["occupancy"] = kv.get("occupancy") or kv.get("ocupacao") or ""
|
|
|
out["module_index"] = kv.get("module_index") or kv.get("indice") or None
|
|
|
out["details"] = kv.get("details") or kv.get("descricao")
|
|
|
out["descricao"] = kv.get("descricao")
|
|
|
out["semana"] = kv.get("semana")
|
|
|
out["off_schedule"] = str(kv.get("off_schedule", "")).strip().lower() in ("1","true","sim","yes")
|
|
|
out["aluno_nome"] = kv.get("aluno_nome")
|
|
|
out["matricula"] = kv.get("matricula")
|
|
|
return out
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
out = defaults.copy()
|
|
|
out["topic"] = s
|
|
|
return out
|
|
|
|
|
|
|
|
|
def _fmt_duration(sec: int) -> str:
|
|
|
if sec is None:
|
|
|
return "—"
|
|
|
sec = int(max(0, sec))
|
|
|
m, s = divmod(sec, 60)
|
|
|
h, m = divmod(m, 60)
|
|
|
d, h = divmod(h, 24)
|
|
|
parts = []
|
|
|
if d: parts.append(f"{d}d")
|
|
|
if h: parts.append(f"{h}h")
|
|
|
if m: parts.append(f"{m}m")
|
|
|
parts.append(f"{s}s")
|
|
|
return " ".join(parts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _strip_accents_ag(s: str) -> str:
|
|
|
import unicodedata
|
|
|
if s is None:
|
|
|
return s
|
|
|
return ''.join(ch for ch in unicodedata.normalize('NFKD', str(s)) if not unicodedata.combining(ch))
|
|
|
|
|
|
def _norm_col_ag(col: str) -> str:
|
|
|
col = _strip_accents_ag(col).lower()
|
|
|
col = re.sub(r'\W+', '_', col)
|
|
|
return col.strip('_')
|
|
|
|
|
|
def _normalize_semana_label(s: str) -> str:
|
|
|
if not s:
|
|
|
return s
|
|
|
s = s.strip()
|
|
|
s = re.sub(r'^semama', 'Semana', s, flags=re.I)
|
|
|
m = re.match(r'^(semana)\s*([0-9]+)$', s, flags=re.I)
|
|
|
if m:
|
|
|
s = f"Semana{m.group(2)}"
|
|
|
return s
|
|
|
|
|
|
def _split_horario_range(horario_str: str) -> Tuple[Optional[str], Optional[str]]:
|
|
|
if not horario_str:
|
|
|
return None, None
|
|
|
s = str(horario_str).strip()
|
|
|
s = re.sub(r'[–—−]+', '-', s)
|
|
|
parts = re.split(r'\s*-\s*', s)
|
|
|
if len(parts) == 2:
|
|
|
return parts[0], parts[1]
|
|
|
return None, None
|
|
|
|
|
|
def _parse_tsv_text(text: str) -> pd.DataFrame:
|
|
|
buf = io.StringIO(text.strip())
|
|
|
df = pd.read_csv(
|
|
|
buf,
|
|
|
sep='\t',
|
|
|
engine='python',
|
|
|
quoting=0,
|
|
|
quotechar='"',
|
|
|
keep_default_na=False
|
|
|
)
|
|
|
return df
|
|
|
|
|
|
def _read_agenda_uploaded(file) -> Optional[pd.DataFrame]:
|
|
|
name = (file.name or "").lower()
|
|
|
try:
|
|
|
if name.endswith(('.xlsx', '.xls')):
|
|
|
return pd.read_excel(file, engine='openpyxl')
|
|
|
elif name.endswith(('.csv', '.tsv', '.txt')):
|
|
|
content = file.read().decode('utf-8-sig', errors='ignore')
|
|
|
try:
|
|
|
return _parse_tsv_text(content)
|
|
|
except Exception:
|
|
|
return pd.read_csv(io.StringIO(content), sep=',', engine='python', quoting=0, quotechar='"', keep_default_na=False)
|
|
|
else:
|
|
|
st.error("Formato não suportado. Use CSV/TSV/TXT/Excel.")
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao ler arquivo: {e}")
|
|
|
return None
|
|
|
|
|
|
def parse_time_hhmm(s: str) -> Optional[time]:
|
|
|
try:
|
|
|
hh, mm = s.split(":")
|
|
|
return time(int(hh), int(mm))
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
def parse_time_range(rng: str) -> Tuple[Optional[time], Optional[time]]:
|
|
|
if not rng:
|
|
|
return None, None
|
|
|
s = (rng or "").replace("–", "-").replace("—", "-").replace("−", "-")
|
|
|
parts = [p.strip() for p in s.split("-") if p.strip()]
|
|
|
if len(parts) != 2:
|
|
|
return None, None
|
|
|
return parse_time_hhmm(parts[0]), parse_time_hhmm(parts[1])
|
|
|
|
|
|
def _norm(s: str) -> str:
|
|
|
s = (s or "").strip().lower()
|
|
|
s = (s.replace("á","a").replace("à","a").replace("ã","a").replace("â","a")
|
|
|
.replace("é","e").replace("ê","e")
|
|
|
.replace("í","i")
|
|
|
.replace("ó","o").replace("ô","o").replace("õ","o")
|
|
|
.replace("ú","u").replace("ç","c"))
|
|
|
return s
|
|
|
|
|
|
def _clean_agenda_df(df_raw: pd.DataFrame) -> pd.DataFrame:
|
|
|
if df_raw is None or df_raw.empty:
|
|
|
return pd.DataFrame()
|
|
|
|
|
|
df = df_raw.copy()
|
|
|
df.columns = [_norm_col_ag(c) for c in df.columns]
|
|
|
|
|
|
aliases = {
|
|
|
'semana': 'semana',
|
|
|
'data': 'data',
|
|
|
'dia': 'dia',
|
|
|
'horario': 'horario',
|
|
|
'ocupacao': 'ocupacao',
|
|
|
'aluno': 'aluno',
|
|
|
'mat': 'matricula',
|
|
|
'mat_': 'matricula',
|
|
|
'descricao': 'descricao',
|
|
|
'tema': 'tema'
|
|
|
}
|
|
|
rename = {}
|
|
|
for c in df.columns:
|
|
|
if c in aliases:
|
|
|
rename[c] = aliases[c]
|
|
|
elif c.startswith('mat'):
|
|
|
rename[c] = 'matricula'
|
|
|
else:
|
|
|
rename[c] = c
|
|
|
df = df.rename(columns=rename)
|
|
|
|
|
|
required = ['semana','data','dia','horario','ocupacao','aluno','matricula','descricao','tema']
|
|
|
for col in required:
|
|
|
if col not in df.columns:
|
|
|
df[col] = ""
|
|
|
|
|
|
for c in df.columns:
|
|
|
if df[c].dtype == object:
|
|
|
df[c] = df[c].astype(str).str.strip()
|
|
|
|
|
|
df['semana'] = df['semana'].apply(_normalize_semana_label)
|
|
|
df['data'] = pd.to_datetime(df['data'], dayfirst=True, errors='coerce')
|
|
|
|
|
|
ini_fim = df['horario'].apply(_split_horario_range)
|
|
|
df['inicio_hhmm'] = [t[0] if t else None for t in ini_fim]
|
|
|
df['fim_hhmm'] = [t[1] if t else None for t in ini_fim]
|
|
|
|
|
|
df['fora_do_cronograma'] = df['tema'].str.contains(r'\(fora do cronograma\)', case=False, na=False)
|
|
|
df['tema'] = df['tema'].str.replace(r'\s*\(fora do cronograma\)\s*', '', regex=True).str.strip()
|
|
|
df['vago'] = df['aluno'].str.strip().str.upper().eq('VAGO')
|
|
|
|
|
|
try:
|
|
|
df['matricula'] = pd.to_numeric(df['matricula'], errors='ignore')
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
df = df.drop_duplicates(subset=['semana','data','dia','horario','aluno','tema','matricula']).reset_index(drop=True)
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from streamlit_calendar import calendar as st_calendar
|
|
|
from plotly import colors as pcolors
|
|
|
|
|
|
def _to_json_safe(obj):
|
|
|
if obj is None:
|
|
|
return None
|
|
|
if isinstance(obj, (str, int, float, bool)):
|
|
|
return obj
|
|
|
if isinstance(obj, datetime):
|
|
|
return obj.replace(microsecond=0).isoformat()
|
|
|
if isinstance(obj, date):
|
|
|
return obj.isoformat()
|
|
|
if isinstance(obj, time):
|
|
|
return obj.strftime("%H:%M:%S")
|
|
|
if isinstance(obj, dict):
|
|
|
return {str(k): _to_json_safe(v) for k, v in obj.items()}
|
|
|
if isinstance(obj, (list, tuple, set)):
|
|
|
return [_to_json_safe(v) for v in obj]
|
|
|
return str(obj)
|
|
|
|
|
|
def json_sanitize_events(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
safe = []
|
|
|
for ev in events:
|
|
|
ev_copy = _to_json_safe(json.loads(json.dumps(ev, ensure_ascii=False)))
|
|
|
safe.append(ev_copy)
|
|
|
return safe
|
|
|
|
|
|
def _make_color_map(emails: List[str]) -> Dict[str, str]:
|
|
|
palette = (
|
|
|
pcolors.qualitative.Plotly
|
|
|
+ pcolors.qualitative.D3
|
|
|
+ pcolors.qualitative.Set3
|
|
|
+ pcolors.qualitative.Pastel
|
|
|
+ pcolors.qualitative.Vivid
|
|
|
)
|
|
|
cmap = {}
|
|
|
idx = 0
|
|
|
for e in sorted(set(emails)):
|
|
|
if e in ("Sem aluno",):
|
|
|
cmap[e] = "#7f8c8d"
|
|
|
else:
|
|
|
cmap[e] = palette[idx % len(palette)]
|
|
|
idx += 1
|
|
|
return cmap
|
|
|
|
|
|
def build_fullcalendar_events(agenda_list: List[Schedule], mods_by_id: Dict[int, Lesson]):
|
|
|
rows = []
|
|
|
who = set()
|
|
|
|
|
|
for a in agenda_list:
|
|
|
meta = parse_schedule_topic(a.topic)
|
|
|
mod = mods_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
|
|
|
title = mod.title if mod else (meta.get("topic") or "—")
|
|
|
desc = meta.get("details") or meta.get("descricao") or (meta.get("topic") or "")
|
|
|
occupancy = meta.get("occupancy") or ""
|
|
|
module_index = meta.get("module_index") or ""
|
|
|
|
|
|
if not a.class_date:
|
|
|
continue
|
|
|
start_dt = datetime.combine(a.class_date, a.start_time or time(0, 0))
|
|
|
end_dt = datetime.combine(a.class_date, a.end_time or (a.start_time or time(0, 0)))
|
|
|
if end_dt <= start_dt:
|
|
|
end_dt = start_dt + timedelta(minutes=60)
|
|
|
|
|
|
emails = meta.get("assigned_emails") or []
|
|
|
if emails:
|
|
|
for em in emails:
|
|
|
who.add(em)
|
|
|
rows.append({
|
|
|
"title": title,
|
|
|
"start": start_dt.isoformat(),
|
|
|
"end": end_dt.isoformat(),
|
|
|
"who": em,
|
|
|
"extendedProps": {
|
|
|
"aluno": em,
|
|
|
"ocupacao": occupancy,
|
|
|
"mod_index": module_index,
|
|
|
"descricao": desc,
|
|
|
"data": a.class_date.strftime("%d/%m/%Y"),
|
|
|
"inicio": a.start_time.strftime("%H:%M") if a.start_time else "",
|
|
|
"fim": a.end_time.strftime("%H:%M") if a.end_time else "",
|
|
|
},
|
|
|
})
|
|
|
else:
|
|
|
who.add("Sem aluno")
|
|
|
rows.append({
|
|
|
"title": title,
|
|
|
"start": start_dt.isoformat(),
|
|
|
"end": end_dt.isoformat(),
|
|
|
"who": "Sem aluno",
|
|
|
"extendedProps": {
|
|
|
"aluno": "Sem aluno",
|
|
|
"ocupacao": occupancy,
|
|
|
"mod_index": module_index,
|
|
|
"descricao": desc,
|
|
|
"data": a.class_date.strftime("%d/%m/%Y"),
|
|
|
"inicio": a.start_time.strftime("%H:%M") if a.start_time else "",
|
|
|
"fim": a.end_time.strftime("%H:%M") if a.end_time else "",
|
|
|
},
|
|
|
})
|
|
|
|
|
|
color_map = _make_color_map(list(who))
|
|
|
for ev in rows:
|
|
|
ev["color"] = color_map.get(ev.get("who"), "#7f8c8d")
|
|
|
return rows, color_map
|
|
|
|
|
|
def render_calendar(events, options=None, custom_css=None, key=None):
|
|
|
try:
|
|
|
import inspect
|
|
|
sig = inspect.signature(st_calendar)
|
|
|
except Exception:
|
|
|
return st_calendar(events=events, options=options, key=key)
|
|
|
|
|
|
kwargs = {"events": events}
|
|
|
if "options" in sig.parameters:
|
|
|
kwargs["options"] = options
|
|
|
if "custom_css" in sig.parameters and custom_css:
|
|
|
kwargs["custom_css"] = custom_css
|
|
|
if "key" in sig.parameters and key:
|
|
|
kwargs["key"] = key
|
|
|
return st_calendar(**kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reset_database(db) -> Tuple[bool, Optional[str]]:
|
|
|
try:
|
|
|
db.query(Certificate).delete(synchronize_session=False)
|
|
|
db.query(Grade).delete(synchronize_session=False)
|
|
|
db.query(MakeupRequest).delete(synchronize_session=False)
|
|
|
db.query(Enrollment).delete(synchronize_session=False)
|
|
|
db.query(Schedule).delete(synchronize_session=False)
|
|
|
db.query(Material).delete(synchronize_session=False)
|
|
|
db.query(Lesson).delete(synchronize_session=False)
|
|
|
db.query(Course).delete(synchronize_session=False)
|
|
|
commit_with_retry(db)
|
|
|
|
|
|
|
|
|
ensure_comments_table()
|
|
|
ensure_presence_table()
|
|
|
ensure_video_access_tables()
|
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
|
c = conn.cursor()
|
|
|
c.execute("DELETE FROM student_auth")
|
|
|
c.execute("DELETE FROM video_access_list")
|
|
|
c.execute("DELETE FROM video_access")
|
|
|
c.execute("DELETE FROM presence")
|
|
|
c.execute("DELETE FROM comments")
|
|
|
conn.commit()
|
|
|
return True, None
|
|
|
except Exception as e:
|
|
|
db.rollback()
|
|
|
return False, str(e)
|
|
|
|
|
|
def normalize_and_dedup_enrollments(db) -> Tuple[int, int]:
|
|
|
ens = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all()
|
|
|
normalized = 0
|
|
|
removed = 0
|
|
|
for e in ens:
|
|
|
norm = (e.student_email or "").strip().lower()
|
|
|
if e.student_email != norm:
|
|
|
e.student_email = norm
|
|
|
normalized += 1
|
|
|
commit_with_retry(db)
|
|
|
|
|
|
seen = {}
|
|
|
ens2 = db.query(Enrollment).order_by(Enrollment.course_id.asc(), Enrollment.student_email.asc(), Enrollment.created_at.asc()).all()
|
|
|
for e in ens2:
|
|
|
key = (e.course_id, e.student_email)
|
|
|
if key in seen:
|
|
|
db.delete(e)
|
|
|
removed += 1
|
|
|
else:
|
|
|
seen[key] = e.id
|
|
|
commit_with_retry(db)
|
|
|
return normalized, removed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def admin_login_ui() -> None:
|
|
|
st.subheader("🔐 Login do Administrador")
|
|
|
with st.form("admin_login", clear_on_submit=False):
|
|
|
u = st.text_input("Usuário", value="", placeholder="admin")
|
|
|
p = st.text_input("Senha", value="", type="password")
|
|
|
ok = st.form_submit_button("Entrar", type="primary")
|
|
|
if ok:
|
|
|
import hashlib
|
|
|
if u == ADMIN_USER and (ADMIN_FIXED_PASS_HASH == hashlib.sha256(p.encode()).hexdigest()):
|
|
|
st.session_state.is_admin = True
|
|
|
st.session_state.admin_user = u
|
|
|
st.success("Login realizado.")
|
|
|
st.rerun()
|
|
|
else:
|
|
|
st.error("Credenciais inválidas.")
|
|
|
|
|
|
def admin_logout_button() -> None:
|
|
|
col1, _ = st.columns([1, 5])
|
|
|
with col1:
|
|
|
if st.button("Sair", type="secondary"):
|
|
|
st.session_state.is_admin = False
|
|
|
st.session_state.admin_user = None
|
|
|
st.rerun()
|
|
|
|
|
|
def _admin_presence_ping(page_label: str):
|
|
|
role = "admin" if st.session_state.get("is_admin") else "visitante"
|
|
|
email = None
|
|
|
nome = st.session_state.get("admin_user") if st.session_state.get("is_admin") else None
|
|
|
try:
|
|
|
from core.presence import update_presence
|
|
|
except Exception:
|
|
|
from presence import update_presence
|
|
|
update_presence(role=role, page=page_label, user_email=email, user_name=nome)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def render() -> None:
|
|
|
_admin_presence_ping(page_label="admin_home")
|
|
|
|
|
|
if not st.session_state.is_admin:
|
|
|
admin_login_ui()
|
|
|
return
|
|
|
|
|
|
st.header("🛠️ Painel do Administrador — Pega a Visão")
|
|
|
admin_logout_button()
|
|
|
|
|
|
db = ensure_db_ready()
|
|
|
try:
|
|
|
tab_curso, tab_agenda, tab_avaliacao, tab_repos, tab_alunos, tab_turma, tab_interacao, tab_galeria, tab_reset = st.tabs([
|
|
|
"Cursos & Módulos", "Agenda", "Avaliações & Certificados", "Reposições", "Cadastrar Alunos",
|
|
|
"Turma em andamento", "👥 Interações", "📷 Galeria de Relatórios", "💾 Reset/Backup"
|
|
|
])
|
|
|
|
|
|
|
|
|
with tab_curso:
|
|
|
st.subheader("Cadastrar curso")
|
|
|
with st.form("curso_form", clear_on_submit=True):
|
|
|
titulo = st.text_input("Título do curso")
|
|
|
desc = st.text_area("Descrição")
|
|
|
ok_curso = st.form_submit_button("Salvar curso", type="primary")
|
|
|
if ok_curso:
|
|
|
if not titulo:
|
|
|
st.error("Informe um título.")
|
|
|
else:
|
|
|
c = Course(title=titulo.strip(), description=desc.strip() if desc else None)
|
|
|
db.add(c)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Curso criado.")
|
|
|
|
|
|
st.divider()
|
|
|
cursos = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos:
|
|
|
st.info("Nenhum curso.")
|
|
|
else:
|
|
|
idx = st.selectbox("Gerenciar curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title)
|
|
|
curso = cursos[idx]
|
|
|
st.markdown(f"### Curso: **{curso.title}**")
|
|
|
|
|
|
with st.expander("Editar curso (nome e descrição)", expanded=False):
|
|
|
novo_nome = st.text_input("Nome do curso", value=curso.title, key=f"curso_nome_edit_{curso.id}")
|
|
|
nova_desc = st.text_area("Descrição do curso", value=curso.description or "", key=f"curso_desc_edit_{curso.id}", height=120)
|
|
|
if st.button("💾 Salvar alterações do curso", key=f"btn_save_curso_{curso.id}"):
|
|
|
if not novo_nome.strip():
|
|
|
st.error("Informe um nome válido para o curso.")
|
|
|
else:
|
|
|
curso.title = novo_nome.strip()
|
|
|
curso.description = (nova_desc or "").strip()
|
|
|
commit_with_retry(db)
|
|
|
st.success("Curso atualizado com sucesso.")
|
|
|
st.rerun()
|
|
|
|
|
|
up_tabs = st.tabs(["Módulos (vídeo)", "Apostilas (PDF)"])
|
|
|
|
|
|
|
|
|
with up_tabs[0]:
|
|
|
st.caption("Cadastrar módulo do curso (vídeo opcional).")
|
|
|
with st.form("add_module", clear_on_submit=True):
|
|
|
lt = st.text_input("Título do módulo")
|
|
|
ld = st.text_area("Descrição (opcional)")
|
|
|
yt_url_raw = st.text_input("URL do vídeo (YouTube — opcional)", placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID")
|
|
|
f = st.file_uploader("Arquivo de vídeo (opcional) — .mp4/.mkv/.mov/.webm", type=["mp4", "mkv", "mov", "webm"])
|
|
|
ok = st.form_submit_button("Adicionar módulo")
|
|
|
if ok:
|
|
|
if not lt:
|
|
|
st.error("Informe um título para o módulo.")
|
|
|
else:
|
|
|
vpath = None
|
|
|
url_norm = normalize_youtube_url(yt_url_raw.strip()) if yt_url_raw else None
|
|
|
if f is not None:
|
|
|
if url_norm:
|
|
|
st.info("Arquivo local enviado. A URL do YouTube foi ignorada (prioridade para arquivo local).")
|
|
|
vpath = save_uploaded_file(f, base_dir="data/videos")
|
|
|
elif url_norm:
|
|
|
vpath = url_norm
|
|
|
|
|
|
les = Lesson(course_id=curso.id, title=lt.strip(), description=ld.strip() if ld else None, video_path=vpath)
|
|
|
db.add(les)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Módulo adicionado.")
|
|
|
|
|
|
st.markdown("#### Módulos cadastrados")
|
|
|
lessons = db.query(Lesson).filter(Lesson.course_id == curso.id).order_by(Lesson.created_at.asc()).all()
|
|
|
if not lessons:
|
|
|
st.caption("Sem módulos cadastrados ainda.")
|
|
|
else:
|
|
|
for les in lessons:
|
|
|
with st.expander(f"🎬 {les.title}", expanded=False):
|
|
|
new_title = st.text_input("Título do módulo", value=les.title, key=f"mod_title_{les.id}")
|
|
|
new_desc = st.text_area("Descrição", value=les.description or "", key=f"mod_desc_{les.id}")
|
|
|
if st.button("💾 Salvar alterações", key=f"mod_save_{les.id}"):
|
|
|
les.title = (new_title or les.title).strip()
|
|
|
les.description = (new_desc or "").strip()
|
|
|
commit_with_retry(db)
|
|
|
st.success("Módulo atualizado.")
|
|
|
st.rerun()
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.markdown("**Vídeo do módulo**")
|
|
|
has_video = bool(les.video_path and (is_url(les.video_path) or os.path.exists(les.video_path)))
|
|
|
if has_video:
|
|
|
if is_url(les.video_path):
|
|
|
st.video(les.video_path)
|
|
|
st.caption(f"Fonte atual: YouTube ({les.video_path})")
|
|
|
else:
|
|
|
st.video(les.video_path)
|
|
|
st.caption(f"Fonte atual: arquivo local `{os.path.basename(les.video_path)}`")
|
|
|
else:
|
|
|
st.info("Nenhum vídeo anexado ainda.")
|
|
|
|
|
|
if not has_video:
|
|
|
up_file = st.file_uploader(
|
|
|
"Enviar vídeo agora (opcional)",
|
|
|
type=["mp4", "mkv", "mov", "webm"],
|
|
|
key=f"mod_upvid_attach_{les.id}"
|
|
|
)
|
|
|
url_new_raw = st.text_input(
|
|
|
"… ou informar URL do YouTube (opcional)",
|
|
|
key=f"mod_url_attach_{les.id}",
|
|
|
placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID"
|
|
|
)
|
|
|
cols_attach = st.columns([1, 1])
|
|
|
if cols_attach[0].button("📤 Anexar arquivo de vídeo", key=f"mod_attach_file_{les.id}"):
|
|
|
if not up_file:
|
|
|
st.warning("Selecione um arquivo de vídeo.")
|
|
|
else:
|
|
|
try:
|
|
|
new_path = save_uploaded_file(up_file, base_dir="data/videos")
|
|
|
les.video_path = new_path
|
|
|
commit_with_retry(db)
|
|
|
st.success("Vídeo (arquivo) anexado ao módulo.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao anexar vídeo: {e}")
|
|
|
if cols_attach[1].button("🔗 Anexar URL do YouTube", key=f"mod_attach_url_{les.id}"):
|
|
|
url_norm = normalize_youtube_url(url_new_raw or "")
|
|
|
if not url_norm or not is_url(url_norm):
|
|
|
st.warning("Informe uma URL válida do YouTube.")
|
|
|
else:
|
|
|
les.video_path = url_norm
|
|
|
commit_with_retry(db)
|
|
|
st.success("URL do YouTube anexada ao módulo.")
|
|
|
st.rerun()
|
|
|
else:
|
|
|
up_file = st.file_uploader(
|
|
|
"Substituir por novo arquivo",
|
|
|
type=["mp4", "mkv", "mov", "webm"],
|
|
|
key=f"mod_upvid_replace_{les.id}"
|
|
|
)
|
|
|
url_new_raw = st.text_input(
|
|
|
"… ou substituir por URL do YouTube",
|
|
|
key=f"mod_url_replace_{les.id}",
|
|
|
placeholder="https://youtu.be/VIDEO_ID ou https://www.youtube.com/shorts/VIDEO_ID"
|
|
|
)
|
|
|
c1, c2, c3 = st.columns([1, 1, 1])
|
|
|
with c1:
|
|
|
if up_file and st.button("🔁 Substituir por arquivo", key=f"mod_sendvid_{les.id}"):
|
|
|
try:
|
|
|
new_path = save_uploaded_file(up_file, base_dir="data/videos")
|
|
|
try:
|
|
|
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
|
|
|
os.remove(les.video_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
les.video_path = new_path
|
|
|
commit_with_retry(db)
|
|
|
st.success("Vídeo substituído por arquivo local com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao substituir vídeo: {e}")
|
|
|
with c2:
|
|
|
if st.button("🔁 Substituir por URL do YouTube", key=f"mod_replace_url_{les.id}"):
|
|
|
url_norm = normalize_youtube_url(url_new_raw or "")
|
|
|
if not url_norm or not is_url(url_norm):
|
|
|
st.warning("Informe uma URL válido do YouTube.")
|
|
|
else:
|
|
|
try:
|
|
|
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
|
|
|
os.remove(les.video_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
les.video_path = url_norm
|
|
|
commit_with_retry(db)
|
|
|
st.success("Vídeo substituído por URL do YouTube.")
|
|
|
st.rerun()
|
|
|
with c3:
|
|
|
if st.button("🗑️ Remover vídeo (manter módulo)", key=f"mod_rmvid_{les.id}"):
|
|
|
try:
|
|
|
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
|
|
|
os.remove(les.video_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
les.video_path = None
|
|
|
commit_with_retry(db)
|
|
|
st.success("Vídeo removido do módulo.")
|
|
|
st.rerun()
|
|
|
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.markdown("🔒 **Acesso ao vídeo** (somente afeta vídeos; PDFs não são impactados)")
|
|
|
current_vis = get_video_visibility(les.id)
|
|
|
vis_label_to_value = {
|
|
|
"Público (qualquer aluno logado)": "public",
|
|
|
"Restrito (somente alunos selecionados)": "restricted",
|
|
|
"Privado (somente Admin)": "private",
|
|
|
}
|
|
|
vis_options = list(vis_label_to_value.keys())
|
|
|
try:
|
|
|
default_idx = vis_options.index(
|
|
|
[lbl for lbl, val in vis_label_to_value.items() if val == current_vis][0]
|
|
|
)
|
|
|
except Exception:
|
|
|
default_idx = 0
|
|
|
vis_sel_label = st.selectbox(
|
|
|
"Visibilidade",
|
|
|
options=vis_options,
|
|
|
index=default_idx,
|
|
|
key=f"vid_vis_{les.id}"
|
|
|
)
|
|
|
vis_value = vis_label_to_value[vis_sel_label]
|
|
|
|
|
|
ens_list = db.query(Enrollment).filter(Enrollment.course_id == curso.id).order_by(Enrollment.student_name.asc()).all()
|
|
|
alunos_labels = [f"{e.student_name} <{e.student_email}>" for e in ens_list]
|
|
|
alunos_map = {f"{e.student_name} <{e.student_email}>": e.student_email for e in ens_list}
|
|
|
|
|
|
allowed_emails = set(get_video_acl_emails(les.id))
|
|
|
preselected_labels = [lbl for lbl in alunos_labels if alunos_map[lbl] in allowed_emails]
|
|
|
|
|
|
if vis_value == "restricted":
|
|
|
sel_labels = st.multiselect(
|
|
|
"Selecione os alunos autorizados a assistir este vídeo",
|
|
|
options=alunos_labels,
|
|
|
default=preselected_labels,
|
|
|
key=f"vid_acl_{les.id}"
|
|
|
)
|
|
|
else:
|
|
|
sel_labels = []
|
|
|
|
|
|
cva1, cva2 = st.columns([1, 1])
|
|
|
if cva1.button("💾 Salvar visibilidade", key=f"btn_save_vis_{les.id}"):
|
|
|
try:
|
|
|
set_video_visibility(les.id, vis_value)
|
|
|
st.success("Visibilidade do vídeo atualizada.")
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao salvar visibilidade: {e}")
|
|
|
|
|
|
if vis_value == "restricted":
|
|
|
if cva2.button("💾 Salvar lista de alunos autorizados", key=f"btn_save_acl_{les.id}"):
|
|
|
try:
|
|
|
emails_new = [alunos_map[lbl] for lbl in sel_labels]
|
|
|
set_video_acl_emails(les.id, emails_new)
|
|
|
st.success("Lista de alunos autorizada atualizada.")
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao salvar lista de alunos: {e}")
|
|
|
else:
|
|
|
st.caption("Para definir alunos específicos, selecione a visibilidade **Restrito**.")
|
|
|
|
|
|
st.markdown("---")
|
|
|
if st.button("❌ Excluir módulo", key=f"del_mod_{les.id}"):
|
|
|
try:
|
|
|
if les.video_path and not is_url(les.video_path) and os.path.exists(les.video_path):
|
|
|
os.remove(les.video_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
db.delete(les)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Módulo removido.")
|
|
|
st.rerun()
|
|
|
|
|
|
|
|
|
with up_tabs[1]:
|
|
|
st.caption("Adicionar apostila (PDF) ao curso")
|
|
|
with st.form("add_mat", clear_on_submit=True):
|
|
|
mt = st.text_input("Título da apostila")
|
|
|
pf = st.file_uploader("Arquivo PDF", type=["pdf"])
|
|
|
okm = st.form_submit_button("Adicionar")
|
|
|
if okm:
|
|
|
if not mt or not pf:
|
|
|
st.error("Informe título e selecione um PDF.")
|
|
|
else:
|
|
|
ppath = save_uploaded_file(pf, base_dir="data/pdfs")
|
|
|
m = Material(course_id=curso.id, title=mt.strip(), pdf_path=ppath)
|
|
|
db.add(m)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Apostila adicionada.")
|
|
|
|
|
|
st.markdown("#### Apostilas cadastradas")
|
|
|
mats = db.query(Material).filter(Material.course_id == curso.id).order_by(Material.created_at.asc()).all()
|
|
|
for m in mats:
|
|
|
cols = st.columns([4, 2, 1])
|
|
|
cols[0].write(f"📄 **{m.title}**")
|
|
|
if m.pdf_path and os.path.exists(m.pdf_path):
|
|
|
try:
|
|
|
with open(m.pdf_path, "rb") as f:
|
|
|
cols[1].download_button("Baixar", data=f.read(), file_name=os.path.basename(m.pdf_path), key=f"dl_mat_{m.id}")
|
|
|
except Exception:
|
|
|
cols[1].warning("Falha ao abrir PDF.")
|
|
|
else:
|
|
|
cols[1].warning("Arquivo não encontrado.")
|
|
|
if cols[2].button("Excluir", key=f"del_mat_{m.id}"):
|
|
|
try:
|
|
|
if m.pdf_path and os.path.exists(m.pdf_path):
|
|
|
os.remove(m.pdf_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
db.delete(m)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Apostila removida.")
|
|
|
st.rerun()
|
|
|
|
|
|
|
|
|
with tab_agenda:
|
|
|
st.subheader("Agenda por curso")
|
|
|
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos_all:
|
|
|
st.info("Cadastre um curso primeiro.")
|
|
|
else:
|
|
|
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="agenda_sel_curso")
|
|
|
curso_ag = cursos_all[cidx]
|
|
|
|
|
|
lessons = db.query(Lesson).filter(Lesson.course_id == curso_ag.id).order_by(Lesson.created_at.asc()).all()
|
|
|
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_ag.id).order_by(Enrollment.student_name.asc()).all()
|
|
|
|
|
|
with st.expander("📥 Importar Agenda (TSV/CSV/TXT/Excel ou texto colado)", expanded=False):
|
|
|
st.caption("Formato: **Semana, Data, Dia, Horário, Ocupação, Aluno, Mat., Descrição, Tema**.")
|
|
|
|
|
|
col_up, col_paste = st.columns(2)
|
|
|
uploaded = col_up.file_uploader("Enviar arquivo", type=["csv","tsv","txt","xlsx","xls"], key=f"imp_ag_up_{curso_ag.id}")
|
|
|
pasted_text = col_paste.text_area("… ou cole aqui o conteúdo tabulado (TSV)", height=220, key=f"imp_ag_txt_{curso_ag.id}")
|
|
|
|
|
|
try_link_module = st.checkbox("Vincular módulo automaticamente pelo TEMA (título exato)", value=True, key=f"imp_ag_trylink_{curso_ag.id}")
|
|
|
|
|
|
btn_preview = st.button("🔍 Pré-visualizar", key=f"imp_ag_preview_{curso_ag.id}")
|
|
|
df_raw, df_norm = None, None
|
|
|
|
|
|
if btn_preview:
|
|
|
try:
|
|
|
if uploaded is not None:
|
|
|
df_raw = _read_agenda_uploaded(uploaded)
|
|
|
elif pasted_text.strip():
|
|
|
df_raw = _parse_tsv_text(pasted_text)
|
|
|
else:
|
|
|
st.warning("Envie um arquivo **ou** cole o texto tabulado.")
|
|
|
if df_raw is not None and not df_raw.empty:
|
|
|
st.success(f"Dados lidos. Linhas: {len(df_raw)}")
|
|
|
with st.container():
|
|
|
st.markdown("**Prévia (dados brutos)**")
|
|
|
st.dataframe(df_raw.head(15), use_container_width=True)
|
|
|
|
|
|
df_norm = _clean_agenda_df(df_raw)
|
|
|
if df_norm.empty:
|
|
|
st.warning("Não foi possível normalizar. Verifique o cabeçalho e o conteúdo.")
|
|
|
else:
|
|
|
preview_cols = ['semana','data','dia','horario','inicio_hhmm','fim_hhmm','ocupacao','aluno','matricula','tema','fora_do_cronograma','vago']
|
|
|
for c in preview_cols:
|
|
|
if c not in df_norm.columns:
|
|
|
df_norm[c] = ""
|
|
|
st.subheader("✅ Prévia normalizada")
|
|
|
st.dataframe(df_norm[preview_cols], use_container_width=True, hide_index=True)
|
|
|
|
|
|
st.session_state[f"imp_df_norm_{curso_ag.id}"] = df_norm.to_dict(orient='list')
|
|
|
else:
|
|
|
st.info("Sem dados para pré-visualizar.")
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao pré-visualizar: {e}")
|
|
|
|
|
|
btn_import = st.button("📦 Importar para este curso", key=f"imp_ag_do_{curso_ag.id}")
|
|
|
if btn_import:
|
|
|
payload = st.session_state.get(f"imp_df_norm_{curso_ag.id}")
|
|
|
if not payload:
|
|
|
st.warning("Faça a pré-visualização primeiro.")
|
|
|
else:
|
|
|
try:
|
|
|
df_norm = pd.DataFrame(payload)
|
|
|
|
|
|
lessons_by_title_norm = { _norm(l.title): l for l in lessons }
|
|
|
enrolls_by_name_norm = { _norm(e.student_name): e for e in ens }
|
|
|
|
|
|
agenda_exist = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).all()
|
|
|
|
|
|
def _key_ev(a: Schedule, meta: Dict[str,Any]) -> tuple:
|
|
|
topic = meta.get("topic") or ""
|
|
|
emails = meta.get("assigned_emails") or []
|
|
|
em = (emails[0] if emails else "").strip().lower()
|
|
|
return (a.class_date, a.start_time, a.end_time, topic.strip().lower(), em)
|
|
|
|
|
|
exist_keys = set()
|
|
|
for a in agenda_exist:
|
|
|
m = parse_schedule_topic(a.topic)
|
|
|
exist_keys.add(_key_ev(a, m))
|
|
|
|
|
|
created, skipped = 0, 0
|
|
|
for _, r in df_norm.iterrows():
|
|
|
d = r.get('data')
|
|
|
ini = r.get('inicio_hhmm')
|
|
|
fim = r.get('fim_hhmm')
|
|
|
tema = (r.get('tema') or "").strip()
|
|
|
ocup = (r.get('ocupacao') or "").strip()
|
|
|
aluno_nome = (r.get('aluno') or "").strip()
|
|
|
vago = bool(r.get('vago'))
|
|
|
semana = r.get('semana') or None
|
|
|
details = (r.get('descricao') or "").strip()
|
|
|
fora = bool(r.get('fora_do_cronograma'))
|
|
|
|
|
|
if pd.isna(d) or not ini or not fim:
|
|
|
skipped += 1
|
|
|
continue
|
|
|
|
|
|
t1, t2 = parse_time_range(f"{ini}-{fim}")
|
|
|
if not t1 or not t2:
|
|
|
skipped += 1
|
|
|
continue
|
|
|
|
|
|
assigned = []
|
|
|
if not vago and aluno_nome:
|
|
|
e = enrolls_by_name_norm.get(_norm(aluno_nome))
|
|
|
if e:
|
|
|
assigned = [e.student_email.strip().lower()]
|
|
|
|
|
|
module_id = None
|
|
|
if try_link_module and tema:
|
|
|
l = lessons_by_title_norm.get(_norm(tema))
|
|
|
if l:
|
|
|
module_id = l.id
|
|
|
|
|
|
meta_obj = {
|
|
|
"topic": tema or None,
|
|
|
"module_id": module_id,
|
|
|
"assigned_emails": assigned,
|
|
|
"occupancy": ocup,
|
|
|
"module_index": None,
|
|
|
"details": details or None,
|
|
|
"descricao": details or None,
|
|
|
"semana": semana,
|
|
|
"off_schedule": fora,
|
|
|
"aluno_nome": (aluno_nome or None),
|
|
|
"matricula": r.get('matricula'),
|
|
|
}
|
|
|
topic_json = json.dumps(meta_obj, ensure_ascii=False)
|
|
|
|
|
|
key = (pd.to_datetime(d).date(), t1, t2, (tema or "").strip().lower(), (assigned[0] if assigned else ""))
|
|
|
if key in exist_keys:
|
|
|
skipped += 1
|
|
|
continue
|
|
|
|
|
|
ag = Schedule(
|
|
|
course_id=curso_ag.id,
|
|
|
class_date=pd.to_datetime(d).date(),
|
|
|
start_time=t1,
|
|
|
end_time=t2,
|
|
|
topic=topic_json
|
|
|
)
|
|
|
db.add(ag)
|
|
|
commit_with_retry(db)
|
|
|
exist_keys.add(key)
|
|
|
created += 1
|
|
|
|
|
|
st.session_state.cal_ver += 1
|
|
|
st.success(f"Importação concluída. Criados: {created} · Ignorados (duplicados ou inválidos): {skipped}")
|
|
|
st.rerun()
|
|
|
|
|
|
except Exception as e:
|
|
|
db.rollback()
|
|
|
st.error(f"Falha ao importar: {e}")
|
|
|
|
|
|
with st.form("agenda_add", clear_on_submit=True):
|
|
|
d = st.date_input("Data da aula")
|
|
|
colh1, colh2 = st.columns(2)
|
|
|
t1 = colh1.time_input("Início", value=time(19, 0))
|
|
|
t2 = colh2.time_input("Fim", value=time(20, 0))
|
|
|
mod_opt = st.selectbox("Vincular a um módulo (opcional)", options=["— sem vínculo —"] + [f"[{i+1}] {l.title}" for i, l in enumerate(lessons)])
|
|
|
alunos_opts = [f"{e.student_name} <{e.student_email}>" for e in ens]
|
|
|
alunos_sel = st.multiselect("Atribuir alunos (opcional)", options=alunos_opts)
|
|
|
ocup = st.text_input("Ocupação (ex.: AO VIVO, Gravado…) (opcional)", value="")
|
|
|
mod_index = st.text_input("Índice do módulo (opcional)", value="")
|
|
|
tema_manual = st.text_input("Tema/descrição (caso não vincule módulo)", value="")
|
|
|
ok_add = st.form_submit_button("Adicionar à agenda", type="primary")
|
|
|
if ok_add:
|
|
|
try:
|
|
|
module_id = None
|
|
|
if mod_opt != "— sem vínculo —":
|
|
|
idx_mod = int(mod_opt.split("]")[0].replace("[",""))
|
|
|
module_id = lessons[idx_mod-1].id
|
|
|
emails = [re.search(r"<(.+?)>", s).group(1).strip().lower() for s in alunos_sel] if alunos_sel else []
|
|
|
topic_obj = {
|
|
|
"topic": (tema_manual.strip() if tema_manual.strip() else None),
|
|
|
"module_id": module_id,
|
|
|
"assigned_emails": emails,
|
|
|
"occupancy": ocup.strip(),
|
|
|
"module_index": (mod_index.strip() or None),
|
|
|
}
|
|
|
ag = Schedule(
|
|
|
course_id=curso_ag.id,
|
|
|
class_date=d,
|
|
|
start_time=t1,
|
|
|
end_time=t2,
|
|
|
topic=json.dumps(topic_obj, ensure_ascii=False),
|
|
|
)
|
|
|
db.add(ag)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Aula adicionada à agenda.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao adicionar à agenda: {e}")
|
|
|
|
|
|
st.markdown("#### Itens da agenda")
|
|
|
agenda = db.query(Schedule).filter(Schedule.course_id == curso_ag.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all()
|
|
|
if not agenda:
|
|
|
st.info("Sem agenda cadastrada.")
|
|
|
else:
|
|
|
lessons_by_id = {l.id: l for l in lessons}
|
|
|
rows = []
|
|
|
for a in agenda:
|
|
|
meta = parse_schedule_topic(a.topic)
|
|
|
mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
|
|
|
rows.append({
|
|
|
"ID": a.id,
|
|
|
"Data": a.class_date.strftime("%d/%m/%Y") if a.class_date else "",
|
|
|
"Início": a.start_time.strftime("%H:%M") if a.start_time else "",
|
|
|
"Fim": a.end_time.strftime("%H:%M") if a.end_time else "",
|
|
|
"Módulo/Assunto": (mod.title if mod else (meta.get("topic") or "—")),
|
|
|
"Alunos": ", ".join(meta.get("assigned_emails") or []),
|
|
|
"Ocupação": meta.get("occupancy") or "",
|
|
|
})
|
|
|
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
|
|
|
|
|
|
col_del1, col_del2 = st.columns([1,3])
|
|
|
del_id = col_del1.number_input("ID para excluir", min_value=0, value=0, step=1, key="agenda_del_id")
|
|
|
if col_del2.button("🗑️ Excluir item da agenda", disabled=(del_id<=0), key="agenda_btn_del"):
|
|
|
item = db.query(Schedule).filter(Schedule.id == int(del_id)).first()
|
|
|
if not item:
|
|
|
st.warning("ID não encontrado.")
|
|
|
else:
|
|
|
db.delete(item)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Item excluído.")
|
|
|
st.rerun()
|
|
|
|
|
|
st.markdown("#### Calendário")
|
|
|
events, color_map = build_fullcalendar_events(agenda, lessons_by_id)
|
|
|
events = json_sanitize_events(events)
|
|
|
cal_options = {
|
|
|
"initialView": "dayGridMonth",
|
|
|
"locale": "pt-br",
|
|
|
"height": 740,
|
|
|
"headerToolbar": {
|
|
|
"left": "prev,next today",
|
|
|
"center": "title",
|
|
|
"right": "dayGridMonth,timeGridWeek,listWeek"
|
|
|
},
|
|
|
}
|
|
|
render_calendar(events=events, options=cal_options, key=f"adm_cal_{curso_ag.id}_{st.session_state.cal_ver}")
|
|
|
|
|
|
|
|
|
with tab_avaliacao:
|
|
|
st.subheader("Notas e Certificados")
|
|
|
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos_all:
|
|
|
st.info("Cadastre um curso primeiro.")
|
|
|
else:
|
|
|
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="av_sel_curso")
|
|
|
curso_av = cursos_all[cidx]
|
|
|
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_av.id).order_by(Enrollment.student_name.asc()).all()
|
|
|
if not ens:
|
|
|
st.info("Cadastre alunos neste curso para lançar notas e emitir certificados.")
|
|
|
else:
|
|
|
nomes = [f"{e.student_name} <{e.student_email}>" for e in ens]
|
|
|
|
|
|
st.markdown("### 📝 Lançar nota")
|
|
|
with st.form("grade_form", clear_on_submit=True):
|
|
|
who = st.selectbox("Aluno", options=nomes)
|
|
|
nota = st.number_input("Nota", min_value=0.0, max_value=10.0, step=0.1)
|
|
|
obs = st.text_area("Observações (opcional)")
|
|
|
okg = st.form_submit_button("Salvar nota", type="primary")
|
|
|
if okg:
|
|
|
email = re.search(r"<(.+?)>", who).group(1).strip().lower()
|
|
|
nome = who.split(" <")[0].strip()
|
|
|
g = Grade(course_id=curso_av.id, student_name=nome, student_email=email, grade=float(nota), note=(obs.strip() or None))
|
|
|
db.add(g)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Nota lançada.")
|
|
|
|
|
|
grades = db.query(Grade).filter(Grade.course_id == curso_av.id).order_by(Grade.created_at.desc()).all()
|
|
|
if grades:
|
|
|
df_g = pd.DataFrame([{"ID":g.id,"Aluno":g.student_name,"E-mail":g.student_email,"Nota":g.grade,"Obs":g.note,"Data":g.created_at} for g in grades])
|
|
|
st.dataframe(df_g, use_container_width=True, hide_index=True)
|
|
|
|
|
|
st.markdown("---")
|
|
|
|
|
|
st.markdown("### 🎓 Emitir certificado")
|
|
|
with st.form("cert_form", clear_on_submit=True):
|
|
|
who2 = st.selectbox("Aluno", options=nomes, key="cert_who")
|
|
|
nome_c = st.text_input("Nome no certificado (como deve aparecer)", value=who2.split(" <")[0].strip())
|
|
|
okc = st.form_submit_button("Emitir certificado (PDF)", type="primary")
|
|
|
if okc:
|
|
|
try:
|
|
|
email_c = re.search(r"<(.+?)>", who2).group(1).strip().lower()
|
|
|
out_dir = os.path.join("data","certificates")
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
pdf_path = generate_certificate_pdf(course_title=curso_av.title, student_name=nome_c, output_dir=out_dir)
|
|
|
cert = Certificate(course_id=curso_av.id, student_name=nome_c, student_email=email_c, pdf_path=pdf_path, issued_at=datetime.utcnow())
|
|
|
db.add(cert)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Certificado emitido.")
|
|
|
with open(pdf_path,"rb") as f:
|
|
|
st.download_button("⬇️ Baixar certificado agora", data=f.read(), file_name=os.path.basename(pdf_path), key=f"dl_newcert_{curso_av.id}_{email_c}")
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao emitir certificado: {e}")
|
|
|
|
|
|
certs = db.query(Certificate).filter(Certificate.course_id == curso_av.id).order_by(Certificate.issued_at.desc()).all()
|
|
|
if certs:
|
|
|
df_c = pd.DataFrame([{"ID":c.id,"Aluno":c.student_name,"E-mail":c.student_email,"Arquivo":os.path.basename(c.pdf_path) if c.pdf_path else "—","Emitido em":c.issued_at} for c in certs])
|
|
|
st.dataframe(df_c, use_container_width=True, hide_index=True)
|
|
|
|
|
|
|
|
|
with tab_repos:
|
|
|
st.subheader("Solicitações de reposição")
|
|
|
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos_all:
|
|
|
st.info("Cadastre um curso primeiro.")
|
|
|
else:
|
|
|
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="rep_sel_curso")
|
|
|
curso_rp = cursos_all[cidx]
|
|
|
reqs = db.query(MakeupRequest).filter(MakeupRequest.course_id == curso_rp.id).order_by(MakeupRequest.created_at.desc()).all()
|
|
|
if not reqs:
|
|
|
st.info("Sem solicitações para este curso.")
|
|
|
else:
|
|
|
for r in reqs:
|
|
|
with st.expander(f"{r.student_name} <{r.student_email}> — {r.requested_date} — status: {r.status}", expanded=False):
|
|
|
st.write(r.note or "Sem observações.")
|
|
|
c1, c2 = st.columns([1,3])
|
|
|
new_status = c1.selectbox("Novo status", options=["pending","approved","denied"], index=["pending","approved","denied"].index(r.status), key=f"rp_status_{r.id}")
|
|
|
new_note = c2.text_input("Nota interna (opcional)", value=r.note or "", key=f"rp_note_{r.id}")
|
|
|
if st.button("💾 Atualizar solicitação", key=f"rp_save_{r.id}"):
|
|
|
r.status = new_status
|
|
|
r.note = (new_note or "").strip() or None
|
|
|
commit_with_retry(db)
|
|
|
st.success("Solicitação atualizada.")
|
|
|
st.rerun()
|
|
|
|
|
|
|
|
|
with tab_alunos:
|
|
|
st.subheader("Matrículas por curso")
|
|
|
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos_all:
|
|
|
st.info("Cadastre um curso primeiro.")
|
|
|
else:
|
|
|
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="aluno_sel_curso")
|
|
|
curso_en = cursos_all[cidx]
|
|
|
|
|
|
st.markdown("### ➕ Adicionar matrícula")
|
|
|
with st.form("matricula_add", clear_on_submit=True):
|
|
|
nome = st.text_input("Nome do aluno")
|
|
|
email = st.text_input("E-mail do aluno").strip().lower()
|
|
|
ok_en = st.form_submit_button("Matricular", type="primary")
|
|
|
if ok_en:
|
|
|
if not nome or not email:
|
|
|
st.error("Preencha nome e e-mail.")
|
|
|
else:
|
|
|
e = Enrollment(course_id=curso_en.id, student_name=nome.strip(), student_email=email.strip().lower())
|
|
|
db.add(e)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Aluno matriculado.")
|
|
|
st.rerun()
|
|
|
|
|
|
st.markdown("### Alunos matriculados")
|
|
|
ens = db.query(Enrollment).filter(Enrollment.course_id == curso_en.id).order_by(Enrollment.created_at.desc()).all()
|
|
|
if not ens:
|
|
|
st.info("Nenhum aluno neste curso.")
|
|
|
else:
|
|
|
for e in ens:
|
|
|
cols = st.columns([3,3,2,2,2])
|
|
|
cols[0].write(f"**{e.student_name}**")
|
|
|
cols[1].write(e.student_email)
|
|
|
with cols[2]:
|
|
|
with st.expander("🔑 Definir/Resetar senha", expanded=False):
|
|
|
pwd = st.text_input("Nova senha", type="password", key=f"setpwd_{e.id}")
|
|
|
if st.button("Salvar senha", key=f"btn_setpwd_{e.id}"):
|
|
|
try:
|
|
|
set_student_password(e.student_email, pwd)
|
|
|
st.success("Senha definida.")
|
|
|
except Exception as ex:
|
|
|
st.error(f"Erro ao salvar senha: {ex}")
|
|
|
if cols[3].button("🗑️ Remover", key=f"del_en_{e.id}"):
|
|
|
db.delete(e)
|
|
|
commit_with_retry(db)
|
|
|
st.success("Matrícula removida.")
|
|
|
st.rerun()
|
|
|
if cols[4].button("🧹 Normalizar & deduplicar", key=f"normdedup_{e.id}"):
|
|
|
n, r = normalize_and_dedup_enrollments(db)
|
|
|
st.success(f"Normalizados: {n}, apagados duplicados: {r}")
|
|
|
st.rerun()
|
|
|
|
|
|
|
|
|
with tab_turma:
|
|
|
st.subheader("Turma em andamento (visão do Admin)")
|
|
|
cursos_all = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos_all:
|
|
|
st.info("Cadastre um curso primeiro.")
|
|
|
else:
|
|
|
cidx = st.selectbox("Curso", options=list(range(len(cursos_all))), format_func=lambda i: cursos_all[i].title, key="turma_sel_curso")
|
|
|
curso_tx = cursos_all[cidx]
|
|
|
lessons = db.query(Lesson).filter(Lesson.course_id == curso_tx.id).all()
|
|
|
agenda = db.query(Schedule).filter(Schedule.course_id == curso_tx.id).order_by(Schedule.class_date.asc(), Schedule.start_time.asc()).all()
|
|
|
|
|
|
lessons_by_id = {l.id: l for l in lessons}
|
|
|
rows = []
|
|
|
for a in agenda:
|
|
|
meta = parse_schedule_topic(a.topic)
|
|
|
mod = lessons_by_id.get(meta.get("module_id")) if meta.get("module_id") else None
|
|
|
title = mod.title if mod else (meta.get("topic") or "—")
|
|
|
rows.append({
|
|
|
"Data": a.class_date,
|
|
|
"Início": a.start_time.strftime("%H:%M") if a.start_time else "",
|
|
|
"Fim": a.end_time.strftime("%H:%M") if a.end_time else "",
|
|
|
"Aula": title,
|
|
|
"Alunos": ", ".join(meta.get("assigned_emails") or []),
|
|
|
"Ocupação": meta.get("occupancy") or "",
|
|
|
})
|
|
|
if rows:
|
|
|
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
|
|
|
else:
|
|
|
st.info("Sem agenda cadastrada.")
|
|
|
|
|
|
st.markdown("#### Calendário")
|
|
|
events, color_map = build_fullcalendar_events(agenda, lessons_by_id)
|
|
|
events = json_sanitize_events(events)
|
|
|
cal_options = {
|
|
|
"initialView": "listWeek",
|
|
|
"locale": "pt-br",
|
|
|
"height": 640,
|
|
|
"headerToolbar": {
|
|
|
"left": "prev,next today",
|
|
|
"center": "title",
|
|
|
"right": "dayGridMonth,timeGridWeek,listWeek"
|
|
|
},
|
|
|
}
|
|
|
render_calendar(events=events, options=cal_options, key=f"adm_turma_cal_{curso_tx.id}_{st.session_state.cal_ver}")
|
|
|
|
|
|
|
|
|
with tab_interacao:
|
|
|
st.subheader("👥 Usuários online (últimos N segundos)")
|
|
|
secs = int(st.number_input("Threshold (segundos)", min_value=30, max_value=3600, value=PRESENCE_ONLINE_THRESHOLD_SEC, step=30))
|
|
|
dfp = get_online_users(threshold_sec=secs)
|
|
|
if dfp is None or dfp.empty:
|
|
|
st.info("Ninguém online neste momento dentro do threshold.")
|
|
|
else:
|
|
|
dfv = dfp.copy()
|
|
|
dfv["Tempo de sessão"] = dfv["session_sec"].apply(_fmt_duration)
|
|
|
dfv["Inatividade"] = dfv["idle_sec"].apply(_fmt_duration)
|
|
|
dfv.rename(columns={"user_email":"E-mail","user_name":"Nome","role":"Perfil","page":"Página","first_seen":"Primeiro acesso","last_seen":"Última ação"}, inplace=True)
|
|
|
st.dataframe(
|
|
|
dfv[["Nome","E-mail","Perfil","Página","Primeiro acesso","Última ação","Tempo de sessão","Inatividade"]],
|
|
|
use_container_width=True, hide_index=True
|
|
|
)
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.subheader("💬 Moderação de comentários")
|
|
|
cursos = db.query(Course).order_by(Course.title.asc()).all()
|
|
|
if not cursos:
|
|
|
st.info("Cadastre um curso primeiro para ver comentários.")
|
|
|
else:
|
|
|
idxc = st.selectbox("Curso", options=list(range(len(cursos))), format_func=lambda i: cursos[i].title, key="adm_curso_coment")
|
|
|
curso_sel = cursos[idxc]
|
|
|
dft = get_comments(curso_sel.id)
|
|
|
if dft is None or dft.empty:
|
|
|
st.info("Sem comentários neste curso.")
|
|
|
else:
|
|
|
st.dataframe(dft, use_container_width=True)
|
|
|
del_id = st.number_input("ID para excluir (apaga também respostas)", min_value=0, value=0, step=1)
|
|
|
if st.button("🗑️ Excluir comentário selecionado", disabled=(del_id <= 0)):
|
|
|
try:
|
|
|
delete_comment(int(del_id))
|
|
|
st.success("Comentário excluído.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao excluir: {e}")
|
|
|
|
|
|
|
|
|
with tab_galeria:
|
|
|
st.subheader("📷 Galeria de Relatórios (Admin)")
|
|
|
aba_galeria_relatorios(context_key="admin_")
|
|
|
|
|
|
|
|
|
with tab_reset:
|
|
|
st.subheader("💾 Backup & Restore do Banco e Mídias")
|
|
|
|
|
|
|
|
|
st.markdown("### ☁️ Backup para Hugging Face (Dataset)")
|
|
|
st.caption("Envia snapshot do **db.sqlite** e um **ZIP** de **data/** para um repositório Dataset no Hugging Face. Requer **HF_TOKEN**.")
|
|
|
|
|
|
default_repo = st.session_state.get("hf_dataset_repo") or "Roudrigus/minha-academia-db"
|
|
|
repo_id_raw = st.text_input("Repo Dataset (ex.: Owner/Nome)", value=default_repo, key="hf_repo_id")
|
|
|
repo_id = _sanitize_repo_id(repo_id_raw)
|
|
|
if repo_id != repo_id_raw:
|
|
|
st.info(f"Repo ajustado para **{repo_id}** (sanitizado).")
|
|
|
|
|
|
colhfa, colhfb, colhfc = st.columns(3)
|
|
|
with colhfa:
|
|
|
st.markdown("**Biblioteca**")
|
|
|
if HF_HUB_AVAILABLE:
|
|
|
st.success("📦 huggingface_hub disponível")
|
|
|
else:
|
|
|
st.warning("📦 huggingface_hub NÃO instalado. Adicione no requirements.txt: `huggingface_hub>=0.21.4`")
|
|
|
with colhfb:
|
|
|
st.markdown("**Token**")
|
|
|
has_token = bool((os.environ.get("HF_TOKEN") or "").strip())
|
|
|
if has_token:
|
|
|
st.success("🔑 HF_TOKEN definido")
|
|
|
else:
|
|
|
st.warning("🔑 HF_TOKEN ausente (Settings → Variables)")
|
|
|
with colhfc:
|
|
|
st.markdown("**Repositório**")
|
|
|
if repo_id and "/" in repo_id:
|
|
|
st.success(f"🗃️ {repo_id}")
|
|
|
else:
|
|
|
st.warning("Informe como `Owner/Nome`")
|
|
|
|
|
|
do_sync = st.button("☁️ Sincronizar no Hugging Face (Dataset)", key="btn_sync_now")
|
|
|
if do_sync:
|
|
|
if not HF_HUB_AVAILABLE:
|
|
|
st.error("Instale huggingface_hub e reinicie o app.")
|
|
|
elif not has_token:
|
|
|
st.error("Defina o secret HF_TOKEN no ambiente para prosseguir.")
|
|
|
elif not repo_id or "/" not in repo_id:
|
|
|
st.error("Informe um repo válido no formato `Owner/Nome`.")
|
|
|
else:
|
|
|
try:
|
|
|
st.session_state["hf_dataset_repo"] = repo_id.strip()
|
|
|
stats = sync_to_hf_dataset(dataset_repo=repo_id.strip(), db_path=DB_PATH, data_dir="data")
|
|
|
msgs = []
|
|
|
if stats.get("db_uploaded"):
|
|
|
msgs.append("db.sqlite")
|
|
|
if stats.get("data_uploaded"):
|
|
|
msgs.append("data.zip")
|
|
|
if msgs:
|
|
|
st.success(f"Backup enviado: {', '.join(msgs)} → {repo_id}")
|
|
|
else:
|
|
|
st.info("Nada para enviar (db.sqlite não encontrado e pasta data/ inexistente).")
|
|
|
except Exception as e:
|
|
|
st.error(f"Falha ao sincronizar: {e}")
|
|
|
st.caption("Stacktrace (debug):")
|
|
|
st.code(traceback.format_exc())
|
|
|
|
|
|
st.markdown("---")
|
|
|
|
|
|
st.markdown("### ⏱️ Backup automático (inteligente, a cada 10 min)")
|
|
|
enable_auto = st.checkbox("Habilitar backup automático (10 min)", value=st.session_state.get("auto_bkp_enabled", False))
|
|
|
st.session_state["auto_bkp_enabled"] = enable_auto
|
|
|
|
|
|
now_ts = _time.time()
|
|
|
last_ts = st.session_state.get("auto_bkp_last_ts", 0.0)
|
|
|
next_in = max(0, int(AUTO_BKP_INTERVAL_SEC - (now_ts - last_ts))) if enable_auto else None
|
|
|
|
|
|
cstat1, cstat2, cstat3 = st.columns(3)
|
|
|
with cstat1:
|
|
|
st.metric("Intervalo", "10 min")
|
|
|
with cstat2:
|
|
|
st.metric("Próxima execução", f"{next_in}s" if enable_auto else "—")
|
|
|
with cstat3:
|
|
|
last_status = st.session_state.get("auto_bkp_last_status","—")
|
|
|
st.metric("Último ciclo", last_status)
|
|
|
|
|
|
if enable_auto and HF_HUB_AVAILABLE and has_token and repo_id and "/" in repo_id:
|
|
|
if now_ts - last_ts >= AUTO_BKP_INTERVAL_SEC:
|
|
|
try:
|
|
|
status = _auto_backup_tick(repo_id)
|
|
|
st.session_state["auto_bkp_last_status"] = status
|
|
|
st.session_state["auto_bkp_last_ts"] = now_ts
|
|
|
st.success(f"Auto-backup: {status}")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.session_state["auto_bkp_last_status"] = f"erro: {e}"
|
|
|
st.error(f"Auto-backup falhou: {e}")
|
|
|
|
|
|
st.caption("Obs.: o auto-backup executa no render do painel Admin. Mantenha esta aba aberta.")
|
|
|
|
|
|
st.markdown("### Backup do Banco (.sqlite)")
|
|
|
if os.path.exists(DB_PATH):
|
|
|
with open(DB_PATH, "rb") as f:
|
|
|
st.download_button(
|
|
|
"⬇️ Baixar banco (SQLite)",
|
|
|
data=f.read(),
|
|
|
file_name="db_backup.sqlite",
|
|
|
mime="application/octet-stream",
|
|
|
)
|
|
|
else:
|
|
|
st.info("Banco ainda não existe no disco (será criado ao salvar dado).")
|
|
|
|
|
|
st.markdown("### Backup em CSV de todas as tabelas")
|
|
|
db2 = ensure_db_ready()
|
|
|
try:
|
|
|
cur_df = pd.DataFrame([{"id": c.id, "title": c.title, "description": c.description, "created_at": c.created_at} for c in db2.query(Course).all()])
|
|
|
en_df = pd.DataFrame([{"id": e.id, "course_id": e.course_id, "student_name": e.student_name, "student_email": e.student_email, "created_at": e.created_at} for e in db2.query(Enrollment).all()])
|
|
|
md_df = pd.DataFrame([{"id": m.id, "course_id": m.course_id, "title": m.title, "description": m.description, "video_path": m.video_path, "created_at": m.created_at} for m in db2.query(Lesson).all()])
|
|
|
ag_df = pd.DataFrame([{"id": a.id, "course_id": a.course_id, "class_date": a.class_date, "start_time": a.start_time, "end_time": a.end_time, "topic": a.topic, "created_at": a.created_at} for a in db2.query(Schedule).all()])
|
|
|
grades_df = pd.DataFrame([{
|
|
|
"id": g.id, "course_id": g.course_id, "student_name": g.student_name,
|
|
|
"student_email": g.student_email, "grade": g.grade, "note": g.note, "created_at": g.created_at
|
|
|
} for g in db2.query(Grade).all()])
|
|
|
certs_df = pd.DataFrame([{
|
|
|
"id": c.id, "course_id": c.course_id, "student_name": c.student_name,
|
|
|
"student_email": c.student_email, "pdf_path": c.pdf_path, "issued_at": c.issued_at
|
|
|
} for c in db2.query(Certificate).all()])
|
|
|
mk_df = pd.DataFrame([{
|
|
|
"id": m.id, "course_id": m.course_id, "student_name": m.student_name,
|
|
|
"student_email": m.student_email, "requested_date": m.requested_date,
|
|
|
"note": m.note, "status": m.status, "created_at": m.created_at
|
|
|
} for m in db2.query(MakeupRequest).all()])
|
|
|
finally:
|
|
|
db2.close()
|
|
|
|
|
|
|
|
|
ensure_presence_table()
|
|
|
ensure_video_access_tables()
|
|
|
ensure_comments_table()
|
|
|
|
|
|
def _safe_read_sql(query: str, conn: sqlite3.Connection) -> pd.DataFrame:
|
|
|
try:
|
|
|
return pd.read_sql_query(query, conn)
|
|
|
except Exception:
|
|
|
q = query.lower()
|
|
|
if "from student_auth" in q:
|
|
|
return pd.DataFrame(columns=["student_email", "password_hash", "created_at"])
|
|
|
if "from video_access_list" in q:
|
|
|
return pd.DataFrame(columns=["lesson_id", "student_email", "created_at"])
|
|
|
if "from video_access" in q:
|
|
|
return pd.DataFrame(columns=["lesson_id", "visibility", "updated_at"])
|
|
|
if "from presence" in q:
|
|
|
return pd.DataFrame(columns=["session_id","user_email","user_name","role","page","first_seen","last_seen"])
|
|
|
if "from comments" in q:
|
|
|
return pd.DataFrame(columns=["id","course_id","student_email","student_name","content","parent_id","created_at"])
|
|
|
return pd.DataFrame()
|
|
|
|
|
|
with sqlite3.connect(DB_PATH) as _conn:
|
|
|
try:
|
|
|
student_auth_df = _safe_read_sql("SELECT student_email, password_hash, created_at FROM student_auth", _conn)
|
|
|
except Exception:
|
|
|
student_auth_df = pd.DataFrame()
|
|
|
video_access_df = _safe_read_sql("SELECT lesson_id, visibility, updated_at FROM video_access", _conn)
|
|
|
video_acl_df = _safe_read_sql("SELECT lesson_id, student_email, created_at FROM video_access_list", _conn)
|
|
|
presence_df = _safe_read_sql("SELECT session_id, user_email, user_name, role, page, first_seen, last_seen FROM presence", _conn)
|
|
|
comments_df = _safe_read_sql("SELECT id, course_id, student_email, student_name, content, parent_id, created_at FROM comments", _conn)
|
|
|
|
|
|
colb1, colb2, colb3 = st.columns(3)
|
|
|
colb1.download_button("⬇️ CSV — Cursos", data=cur_df.to_csv(index=False).encode("utf-8-sig"), file_name="courses.csv")
|
|
|
colb1.download_button("⬇️ CSV — Alunos", data=en_df.to_csv(index=False).encode("utf-8-sig"), file_name="enrollments.csv")
|
|
|
colb1.download_button("⬇️ CSV — Módulos", data=md_df.to_csv(index=False).encode("utf-8-sig"), file_name="lessons.csv")
|
|
|
|
|
|
colb2.download_button("⬇️ CSV — Agenda", data=ag_df.to_csv(index=False).encode("utf-8-sig"), file_name="schedule.csv")
|
|
|
colb2.download_button("⬇️ CSV — Notas", data=grades_df.to_csv(index=False).encode("utf-8-sig"), file_name="grades.csv")
|
|
|
colb2.download_button("⬇️ CSV — Certificados", data=certs_df.to_csv(index=False).encode("utf-8-sig"), file_name="certificates.csv")
|
|
|
|
|
|
colb3.download_button("⬇️ CSV — Reposições", data=mk_df.to_csv(index=False).encode("utf-8-sig"), file_name="makeup_requests.csv")
|
|
|
colb3.download_button("⬇️ CSV — Vídeo: visibilidade", data=video_access_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access.csv")
|
|
|
colb3.download_button("⬇️ CSV — Vídeo: ACL", data=video_acl_df.to_csv(index=False).encode("utf-8-sig"), file_name="video_access_list.csv")
|
|
|
st.download_button("⬇️ CSV — Comentários", data=comments_df.to_csv(index=False).encode("utf-8-sig"), file_name="comments.csv")
|
|
|
st.download_button("⬇️ CSV — Presença (snapshot)", data=presence_df.to_csv(index=False).encode("utf-8-sig"), file_name="presence.csv")
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.subheader("🧨 Reset da Base (Atenção: ação irreversível)")
|
|
|
st.warning("Isto apaga TODOS os dados do banco (cursos, módulos, apostilas, agenda, matrículas, notas, certificados, reposições e auxiliares). Não apaga arquivos em 'data/'.")
|
|
|
confirm = st.text_input("Confirmação", placeholder="APAGAR TUDO")
|
|
|
btn = st.button("Apagar tudo agora", type="primary", disabled=(confirm.strip().upper() != "APAGAR TUDO"))
|
|
|
if btn:
|
|
|
ok, err = reset_database(db)
|
|
|
if ok:
|
|
|
st.success("✅ Base resetada com sucesso (tabelas esvaziadas).")
|
|
|
else:
|
|
|
st.error(f"Falha ao resetar base: {err}")
|
|
|
|
|
|
finally:
|
|
|
db.close() |