MakeFaces / app.py
hleserg's picture
first commit
02ecd57
Raw
History Blame Contribute Delete
25.8 kB
#!/usr/bin/env python3
"""
app.py — Gradio-морда MakeFaces: датасет лиц из нескольких видео под LoRA.
Идея: одно видео = одно освещение, и LoRA рискует впитать "этот свет" как часть
лица. Поэтому просим несколько видео из разных обстановок — получаем крест
"ракурс x освещение", куда более репрезентативный датасет.
Слоты (все опциональны):
4 train-слота: Помещение/День, Помещение/Ночь, Улица/День, Улица/Ночь
1 holdout-слот: контрольное видео (любое, отдельное) — НЕ идёт в обучение,
только для финальной проверки LoRA на невиданных кадрах.
Поток:
1. Грузишь видео в слоты -> "Собрать".
2. Каждое train-видео обрабатывается отдельно со своей квотой ракурсов
(target / число залитых) -> крест ракурс x свет.
3. Эталон идентичности — по всем train-видео сразу (устойчивый центр личности).
4. Если суммарно мало нормальных — тянем мелкие (FSRCNN), серую зону на ревизию.
5. holdout-видео гонится тем же пайплайном в папку holdout/, сверяется с эталоном.
6. Уровни результата:
< N (мало для обучения) -> ошибка "трудно с вами", датасет всё равно отдаём
недобор в конкретном слоте -> заметный текст "переснимите слот X"
перебор (> потолка) -> предупреждение о переобучении
7. Выдача: один zip с train/ и holdout/ внутри.
Большие картинки на диск; в gr.State — пути и числа.
Файлы рядом: build_face_dataset.py, telemetry.py
Запуск: python app.py -> http://127.0.0.1:7860
"""
import os
import math
import shutil
import tempfile
import zipfile
from pathlib import Path
import cv2
import gradio as gr
from build_face_dataset import (
process_one_video, build_identity_reference, compute_identity_bands,
rescue_smalls_zoned, pick_diverse, crop_align,
)
import numpy as np
import telemetry
import guard
IS_SPACE = bool(os.environ.get("SPACE_ID"))
# уровни размера датасета
N_MIN = 15 # ниже — не обучить
TARGET_DEFAULT = 40
SOFT_CAP = 80 # выше — предупреждение о переобучении
SLOTS = [
("indoor_day", "Помещение / День"),
("indoor_night", "Помещение / Ночь"),
("outdoor_day", "Улица / День"),
("outdoor_night", "Улица / Ночь"),
]
def _zip_traindir(train_dir, holdout_dir, work):
"""Собрать zip с папками train/ и holdout/ внутри."""
zp = work / "face_dataset.zip"
with zipfile.ZipFile(zp, "w") as z:
for p in sorted(train_dir.glob("*.png")):
z.write(p, f"train/{p.name}")
if holdout_dir.exists():
for p in sorted(holdout_dir.glob("*.png")):
z.write(p, f"holdout/{p.name}")
return str(zp)
MIN_VIDEO_SEC = 15 # короче — отказ (защита от огрызка), везде
LIMIT_VIDEO_SEC = 60 if IS_SPACE else 300 # длиннее — обрезаем первый отрезок, не отказ
RES_WARN = 360 # короткая сторона < этого -> предупреждение (лицо вероятно мелкое)
RES_FAIL = 144 # короткая сторона < этого -> отказ (даже апскейл не спасёт)
def _probe_video(path):
"""Читает метаданные видео одним проходом: (dur_sec, w, h, size_mb).
Возвращает None для невидимого/нечитаемого файла."""
cap = cv2.VideoCapture(str(path))
if not cap.isOpened():
cap.release()
return None
fc = cap.get(cv2.CAP_PROP_FRAME_COUNT)
vfps = cap.get(cv2.CAP_PROP_FPS) or 25
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
if w == 0 or h == 0: # не видео (картинка/мусор) — кадр не прочитался
return None
dur = fc / vfps if vfps else 0
size_mb = os.path.getsize(path) / (1024 ** 2)
return dur, w, h, size_mb
def inspect_upload(path):
"""Вердикт при загрузке: имя, длина, вес, разрешение + оценка. Только текст —
сам File-компонент НЕ трогаем (обратная запись создаёт бесконечный change-цикл)."""
if not path:
return ""
try:
name = Path(path).name
info = _probe_video(path)
if info is None:
return f"❌ {name}: это не видео. Убери файл крестиком и перетащи видео (mp4, mov, mkv…)."
dur, w, h, size_mb = info
short = min(w, h)
base = f"{name} · {dur:.0f}с · {w}×{h} · {size_mb:.0f} МБ"
if short < RES_FAIL:
return f"❌ {base}\nРазрешение {short}p — слишком мелко, апскейл не вытянет."
if dur and dur < MIN_VIDEO_SEC:
return f"❌ {base}\nКороче {MIN_VIDEO_SEC}с — мало материала."
notes = []
if dur > LIMIT_VIDEO_SEC:
notes.append(f"✂️ длиннее {LIMIT_VIDEO_SEC}с — обработаю первые {LIMIT_VIDEO_SEC}с")
if short < RES_WARN:
notes.append(f"⚠️ невысокое разрешение ({short}p) — лицо может выйти мелким")
if notes:
return f"✓ {base}\n" + "\n".join(notes)
return f"✓ {base} — годится"
except Exception as e:
return f"❌ Не смог прочитать файл ({type(e).__name__})."
def _vet_video(video_path, label):
"""Проверка перед обработкой. Возвращает limit_sec (0 = целиком, >0 = обрезать).
Бросает gr.Error на фатальном (короткое/мелкое/не видео)."""
info = _probe_video(video_path)
if info is None:
raise gr.Error(f"«{label}»: файл не читается как видео.")
dur, w, h, _ = info
short = min(w, h)
if short < RES_FAIL:
raise gr.Error(f"«{label}»: разрешение {short}p — слишком мелко, апскейл не спасёт.")
if dur and dur < MIN_VIDEO_SEC:
raise gr.Error(f"«{label}»: видео {dur:.0f}с — слишком короткое. "
f"Нужно хотя бы {MIN_VIDEO_SEC}с: покрути головой, построй рожи.")
if dur > LIMIT_VIDEO_SEC:
return LIMIT_VIDEO_SEC
return 0
def build(indoor_day, indoor_night, outdoor_day, outdoor_night, holdout_video,
fps, target, size, min_sharp, min_face, margin, do_crop,
progress=gr.Progress()):
"""Тонкая обёртка: ловит краши тяжёлой обработки, шлёт алерт в телегу,
отдаёт юзеру внятный отказ вместо немого падения. gr.Error (валидация —
короткое/отсутствующее видео) пробрасывается как есть."""
snap_holder = {}
try:
return _build_core(indoor_day, indoor_night, outdoor_day, outdoor_night,
holdout_video, fps, target, size, min_sharp, min_face,
margin, do_crop, progress, snap_holder)
except gr.Error:
raise # это сообщение юзеру по делу, не краш
except MemoryError as e:
guard.alert_crash(snap_holder.get("snap"), e,
{"слотов залито": snap_holder.get("n_vids", "?"),
"цель": int(target)})
raise gr.Error("Не потянул столько видео разом — кончилась память. "
"Залей меньше слотов (2-3 вместо 5) или покороче.")
except Exception as e:
guard.alert_crash(snap_holder.get("snap"), e,
{"слотов залито": snap_holder.get("n_vids", "?"),
"цель": int(target)})
raise gr.Error(f"Что-то пошло не так при обработке ({type(e).__name__}). "
"Попробуй меньше/покороче видео или другие файлы.")
def _build_core(indoor_day, indoor_night, outdoor_day, outdoor_night, holdout_video,
fps, target, size, min_sharp, min_face, margin, do_crop,
progress, snap_holder):
train_videos = [(SLOTS[i][1], v) for i, v in
enumerate([indoor_day, indoor_night, outdoor_day, outdoor_night]) if v]
if not train_videos:
raise gr.Error("Залей хотя бы одно видео в любой из четырёх слотов.")
# проверка длины: короткие отсекаем, длинные помечаем на обрезку
limits = {} # label -> limit_sec
trimmed = [] # какие слоты обрезаны (для сообщения)
for label, v in train_videos:
lim = _vet_video(v, label)
limits[label] = lim
if lim:
trimmed.append(label)
holdout_limit = 0
if holdout_video:
holdout_limit = _vet_video(holdout_video, "Контрольное")
if holdout_limit:
trimmed.append("Контрольное")
work = Path(tempfile.mkdtemp(prefix="faceds_"))
train_dir = work / "train"
train_dir.mkdir(parents=True, exist_ok=True)
max_side = 1280 if IS_SPACE else 0 # локально — полное качество, без даунскейла
# снимок памяти по этапам — пригодится, если упадём
snap = guard.MemorySnapshot()
snap_holder["snap"] = snap
snap_holder["n_vids"] = len(train_videos)
all_vid = [v for _, v in train_videos] + ([holdout_video] if holdout_video else [])
total_mb = sum(os.path.getsize(v) for v in all_vid if v and os.path.exists(v)) / (1024 ** 2)
snap.note_videos(len(all_vid), total_mb)
guard.memory_guard(snap, "загрузка моделей")
snap.freeze_static()
# квота ракурсов на каждое видео — поровну от цели
n_vids = len(train_videos)
quota = max(3, math.ceil(int(target) / n_vids))
# ── обрабатываем каждый train-слот отдельно
all_good_div, all_small = [], []
slot_report = [] # (slot_name, n_taken, quota, n_raw) — для текста юзеру
slot_rows = [] # детальная статистика для телеметрии
for i, (slot_name, v) in enumerate(train_videos):
progress((i + 0.5) / (n_vids + 1), desc=f"Обрабатываю: {slot_name}")
guard.memory_guard(snap, f"видео {i+1}/{n_vids}: {slot_name}")
r = process_one_video(v, quota, min_sharp, int(min_face), max_side,
int(size), margin, fps, limits[slot_name])
all_good_div.extend(r["good_div"])
all_small.extend(r["small"])
slot_report.append((slot_name, len(r["good_div"]), quota, r["n_good_raw"]))
rs = r.get("reasons", {})
slot_rows.append(dict(slot=slot_name, frames=r["frames"],
n_good=r["n_good_raw"], n_small=len(r["small"]), quota=quota,
drop_blur=rs.get("hard_blur", 0) + rs.get("soft_blur", 0),
drop_no_face=rs.get("no_face", 0),
drop_edge=rs.get("at_edge", 0),
multi_handled=rs.get("multi_face_handled", 0)))
# эталон — по всем train-видео сразу
ref = build_identity_reference(all_good_div, all_small)
# сохраняем нормальные в train/
train_paths = []
for j, r in enumerate(sorted(all_good_div, key=lambda m: -m["sharp"])):
img = r["img"] if not do_crop else crop_align(r["face"], r["img"], int(size), margin)
p = train_dir / f"{j:03d}.png"
cv2.imwrite(str(p), img)
train_paths.append(str(p))
base_run = dict(video=" + ".join(s for s, _ in train_videos),
frames_total=0, fps=fps, num=quota, target=int(target),
size=int(size), min_sharp=min_sharp, min_face=int(min_face),
n_good=len(all_good_div), n_small=len(all_small))
# ── добор мелкими, если суммарно не хватило
need = int(target) - len(train_paths)
pending = []
floor = ceil = None
dropped = {"blur": 0, "noface": 0, "identity": 0}
if need > 0 and all_small:
progress((n_vids + 0.5) / (n_vids + 1), desc="Дотягиваю мелкие кадры")
guard.memory_guard(snap, "добор мелких (FSRCNN)")
small_div = pick_diverse(all_small, min(need, len(all_small)))
small_crops = [crop_align(r["face"], r["img"], int(size), margin) for r in small_div]
floor, ceil = compute_identity_bands(all_good_div, ref)
res = rescue_smalls_zoned(small_crops, 2, min_sharp, ref, floor, ceil, work / "pending")
for j, im in enumerate(res["take"], start=len(train_paths)):
p = train_dir / f"{j:03d}.png"
cv2.imwrite(str(p), im)
train_paths.append(str(p))
pending = res["pending"]
dropped = res["dropped"]
# ── holdout-видео (если залито) — отдельным пайплайном в holdout/
holdout_dir = work / "holdout"
n_holdout = 0
if holdout_video:
progress(0.97, desc="Контрольное видео")
hr = process_one_video(holdout_video, max(5, int(target) // 3),
min_sharp, int(min_face), max_side, int(size), margin,
fps, holdout_limit)
holdout_dir.mkdir(parents=True, exist_ok=True)
# сверяем с train-эталоном: holdout не должен содержать чужого
for j, r in enumerate(sorted(hr["good_div"], key=lambda m: -m["sharp"])):
if ref is not None:
sim = float(np.dot(r["emb"], ref))
if sim < 0.3: # явно не тот человек — пропускаем
continue
img = r["img"] if not do_crop else crop_align(r["face"], r["img"], int(size), margin)
cv2.imwrite(str(holdout_dir / f"h_{j:03d}.png"), img)
n_holdout += 1
state = dict(work=str(work), train_dir=str(train_dir), holdout_dir=str(holdout_dir),
train_paths=train_paths, pending=pending, target=int(target),
n_holdout=n_holdout, had_holdout=bool(holdout_video),
slot_report=slot_report, trimmed=trimmed, slot_rows=slot_rows,
run_fields=dict(**base_run, floor=floor, ceil=ceil,
n_take=len(train_paths) - len(all_good_div),
n_pending=len(pending), drop_blur=dropped["blur"],
drop_noface=dropped["noface"], drop_identity=dropped["identity"]))
# ── есть серая зона -> ревизия (общий пул со всех train-видео)
if pending:
gallery = [(p["path"], f"похож на {int(p['sim']*100)}%") for p in pending]
status = (f"Собрал {len(train_paths)} кадров. Ещё {len(pending)} спорных — "
f"апскейл их вытянул, но не уверен в идентичности. Отметь, какие "
f"оставить, и жми «Подтвердить».")
return (None, None, status, gr.update(visible=True),
gr.update(value=gallery), state)
# серой зоны нет -> сразу финал
return _finalize_build(state, [], set(), progress)
def _slot_warnings(slot_report):
"""Заметный текст про слоты, где просадка (нормальных нашлось меньше квоты)."""
lines = []
for name, taken, quota, raw in slot_report:
if raw < quota:
lines.append(f" ⚠️ **{name}**: всего {raw} годных кадров (нужно ~{quota}). "
f"Переснимите это видео — лицо крупнее, меньше смаза, ярче свет.")
return lines
def _finalize_build(state, selected, kept_set, progress=None):
"""Общий финал: добор отмеченных из ревизии, проверка уровней, zip, лог, статус."""
work = Path(state["work"])
train_dir = Path(state["train_dir"])
holdout_dir = Path(state["holdout_dir"])
train_paths = list(state["train_paths"])
pending = state["pending"]
target = state["target"]
# добираем отмеченные в ревизии
kept = set(selected or [])
for p in pending:
if p["path"] in kept:
j = len(train_paths)
dst = train_dir / f"{j:03d}.png"
cv2.imwrite(str(dst), cv2.imread(p["path"]))
train_paths.append(str(dst))
total = len(train_paths)
nh = state["n_holdout"]
# лог
run_id = telemetry.log_run(**state["run_fields"], final_size=total,
hit_target=1 if total >= target else 0)
telemetry.log_grays(run_id, pending, kept)
telemetry.log_slots(run_id, state.get("slot_rows", []))
zp = _zip_traindir(train_dir, holdout_dir, work)
gallery_paths = sorted(str(p) for p in train_dir.glob("*.png"))
# ── уровни результата
parts = []
if total < N_MIN:
parts.append(f"⛔ **Собрано всего {total} кадров — для обучения LoRA маловато "
f"(нужно хотя бы {N_MIN}).** Датасет ниже отдаю, но с таким объёмом "
f"трудно получить хороший результат. Снимите больше видео или "
f"в лучших условиях.")
elif total > SOFT_CAP:
parts.append(f"✅ Собрано {total} кадров — даже с запасом. Учти: больше ~{SOFT_CAP} "
f"редко улучшает LoRA, а риск переобучения растёт. Можно смело "
f"оставить лучшие {SOFT_CAP}.")
else:
parts.append(f"✅ Готово: {total} кадров для обучения" +
(f" + {nh} контрольных" if nh else "") + ".")
warns = _slot_warnings(state["slot_report"])
if warns:
parts.append("\nПросадки по слотам (датасет рабочий, но переснимете — будет лучше):")
parts.extend(warns)
if not state.get("had_holdout"):
parts.append("\n💡 Совет: залейте отдельное контрольное видео (любое — кружок, "
"селфи 15 сек) — отложу его кадры для финальной проверки LoRA.")
trimmed = state.get("trimmed") or []
if trimmed:
lim = LIMIT_VIDEO_SEC
parts.append(f"\n✂️ Обрезано до {lim}с (дольше обработать не могу): "
f"{', '.join(trimmed)}. Взял первую минуту — этого обычно хватает."
if IS_SPACE else
f"\n✂️ Обрезано до {lim//60} мин: {', '.join(trimmed)}. "
f"Взял начало — остальное не обрабатывал.")
status = "\n".join(parts)
return (gallery_paths, zp, status, gr.update(visible=False),
gr.update(value=None), None)
def confirm_review(selected, state):
if not state:
raise gr.Error("Нет данных ревизии — запусти сбор заново")
return _finalize_build(state, selected, set(selected or []))
with gr.Blocks(title="MakeFaces") as demo:
gr.Markdown("## MakeFaces · Покривляйся в камеру\n"
"Несколько коротких видео в разных условиях → разнообразный датасет "
"лиц для LoRA. Строй рожи, верти головой, меняй обстановку.")
if IS_SPACE:
gr.Markdown(f"⚠️ Облачная версия: видео от {MIN_VIDEO_SEC}с; длиннее "
f"{LIMIT_VIDEO_SEC}с обрежу до первой минуты.")
st = gr.State()
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("**Видео для обучения** (залей сколько сможешь — чем разнообразнее "
"условия, тем лучше). Перетащи файл или нажми на зону:")
vids, verdicts = [], []
for key, label in SLOTS:
f = gr.File(label=label, height=80)
vinfo = gr.Textbox(label=None, interactive=False, lines=2,
show_label=False, container=False,
placeholder=f"{label}: файл не выбран")
f.change(inspect_upload, inputs=[f], outputs=[vinfo])
vids.append(f)
verdicts.append(vinfo)
gr.Markdown("**Контрольное видео** (необязательно): любое отдельное видео с "
"вашим лицом — кружок из соцсетей, селфи на 15 секунд, что угодно. "
"В обучение не пойдёт, отложу для проверки результата.")
holdout = gr.File(label="Контрольное (holdout)", height=80)
holdout_info = gr.Textbox(label=None, interactive=False, lines=2,
show_label=False, container=False,
placeholder="Контрольное: файл не выбран")
holdout.change(inspect_upload, inputs=[holdout], outputs=[holdout_info])
with gr.Accordion("Параметры", open=False):
fps = gr.Slider(0.5, 10, value=2, step=0.5, label="FPS нарезки")
target = gr.Slider(N_MIN, SOFT_CAP, value=TARGET_DEFAULT, step=1,
label="Цель по числу кадров")
size = gr.Slider(512, 1536, value=1024, step=128, label="Разрешение кропа")
min_sharp = gr.Slider(10, 300, value=80, step=10, label="Порог резкости")
min_face = gr.Slider(80, 600, value=200, step=20, label="Мин. размер лица (px)")
margin = gr.Slider(0, 1.5, value=0.6, step=0.1, label="Запас при кропе")
do_crop = gr.Checkbox(value=True, label="Кропать по лицу")
btn = gr.Button("Собрать датасет", variant="primary")
with gr.Column(scale=2):
status = gr.Markdown()
with gr.Group(visible=False) as review_block:
gr.Markdown("**Спорные кадры — кликни те, что оставить:**")
review_gallery = gr.Gallery(columns=5, height=300, object_fit="cover")
review_selected = gr.State([])
confirm_btn = gr.Button("Подтвердить выбор", variant="primary")
gallery = gr.Gallery(label="Датасет (train)", columns=5, height=480)
download = gr.File(label="Скачать zip (train/ + holdout/)")
def _toggle(selected, evt: gr.SelectData, state):
if not state:
return selected
path = state["pending"][evt.index]["path"]
selected = list(selected or [])
if path in selected:
selected.remove(path)
else:
selected.append(path)
return selected
review_gallery.select(_toggle, [review_selected, st], [review_selected])
btn.click(
build,
inputs=[*vids, holdout, fps, target, size, min_sharp, min_face, margin, do_crop],
outputs=[gallery, download, status, review_block, review_gallery, st],
)
confirm_btn.click(
confirm_review,
inputs=[review_selected, st],
outputs=[gallery, download, status, review_block, review_gallery, st],
)
if __name__ == "__main__":
demo.launch()