| |
| """ |
| app.py — Gradio-морда MakeFaces: датасет лиц из нескольких видео под LoRA. |
| |
| Идея: одно видео = одно освещение, и LoRA рискует впитать "этот свет" как часть |
| лица. Поэтому просим несколько видео из разных обстановок — получаем крест |
| "ракурс x освещение", куда более репрезентативный датасет. |
| |
| Слоты (все опциональны): |
| 4 train-слота: Помещение/День, Помещение/Ночь, Улица/День, Улица/Ночь |
| 1 holdout-слот: контрольное видео (любое, отдельное) — НЕ идёт в обучение, |
| только для финальной проверки LoRA на невиданных кадрах. |
| |
| Поток: |
| 1. Грузишь видео в слоты -> "Собрать". |
| 2. Каждое train-видео обрабатывается отдельно со своей квотой ракурсов |
| (target / число залитых) -> крест ракурс x свет. |
| 3. Эталон идентичности — по всем train-видео сразу (устойчивый центр личности). |
| 4. Если суммарно мало нормальных — тянем мелкие (FSRCNN), серую зону на ревизию. |
| 5. holdout-видео гонится тем же пайплайном в папку holdout/, сверяется с эталоном. |
| 6. Уровни результата: |
| < N (мало для обучения) -> ошибка "трудно с вами", датасет всё равно отдаём |
| недобор в конкретном слоте -> заметный текст "переснимите слот X" |
| перебор (> потолка) -> предупреждение о переобучении |
| 7. Выдача: один zip с train/ и holdout/ внутри. |
| |
| Большие картинки на диск; в gr.State — пути и числа. |
| Файлы рядом: build_face_dataset.py, telemetry.py |
| Запуск: python app.py -> http://127.0.0.1:7860 |
| """ |
|
|
| import os |
| import math |
| import shutil |
| import tempfile |
| import zipfile |
| from pathlib import Path |
|
|
| import cv2 |
| import gradio as gr |
|
|
| from build_face_dataset import ( |
| process_one_video, build_identity_reference, compute_identity_bands, |
| rescue_smalls_zoned, pick_diverse, crop_align, |
| ) |
| import numpy as np |
| import telemetry |
| import guard |
|
|
| IS_SPACE = bool(os.environ.get("SPACE_ID")) |
|
|
| |
| N_MIN = 15 |
| TARGET_DEFAULT = 40 |
| SOFT_CAP = 80 |
|
|
| SLOTS = [ |
| ("indoor_day", "Помещение / День"), |
| ("indoor_night", "Помещение / Ночь"), |
| ("outdoor_day", "Улица / День"), |
| ("outdoor_night", "Улица / Ночь"), |
| ] |
|
|
|
|
| def _zip_traindir(train_dir, holdout_dir, work): |
| """Собрать zip с папками train/ и holdout/ внутри.""" |
| zp = work / "face_dataset.zip" |
| with zipfile.ZipFile(zp, "w") as z: |
| for p in sorted(train_dir.glob("*.png")): |
| z.write(p, f"train/{p.name}") |
| if holdout_dir.exists(): |
| for p in sorted(holdout_dir.glob("*.png")): |
| z.write(p, f"holdout/{p.name}") |
| return str(zp) |
|
|
|
|
| MIN_VIDEO_SEC = 15 |
| LIMIT_VIDEO_SEC = 60 if IS_SPACE else 300 |
| RES_WARN = 360 |
| RES_FAIL = 144 |
|
|
|
|
| def _probe_video(path): |
| """Читает метаданные видео одним проходом: (dur_sec, w, h, size_mb). |
| Возвращает None для невидимого/нечитаемого файла.""" |
| cap = cv2.VideoCapture(str(path)) |
| if not cap.isOpened(): |
| cap.release() |
| return None |
| fc = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| vfps = cap.get(cv2.CAP_PROP_FPS) or 25 |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| cap.release() |
| if w == 0 or h == 0: |
| return None |
| dur = fc / vfps if vfps else 0 |
| size_mb = os.path.getsize(path) / (1024 ** 2) |
| return dur, w, h, size_mb |
|
|
|
|
| def inspect_upload(path): |
| """Вердикт при загрузке: имя, длина, вес, разрешение + оценка. Только текст — |
| сам File-компонент НЕ трогаем (обратная запись создаёт бесконечный change-цикл).""" |
| if not path: |
| return "" |
| try: |
| name = Path(path).name |
| info = _probe_video(path) |
| if info is None: |
| return f"❌ {name}: это не видео. Убери файл крестиком и перетащи видео (mp4, mov, mkv…)." |
| dur, w, h, size_mb = info |
| short = min(w, h) |
| base = f"{name} · {dur:.0f}с · {w}×{h} · {size_mb:.0f} МБ" |
|
|
| if short < RES_FAIL: |
| return f"❌ {base}\nРазрешение {short}p — слишком мелко, апскейл не вытянет." |
| if dur and dur < MIN_VIDEO_SEC: |
| return f"❌ {base}\nКороче {MIN_VIDEO_SEC}с — мало материала." |
|
|
| notes = [] |
| if dur > LIMIT_VIDEO_SEC: |
| notes.append(f"✂️ длиннее {LIMIT_VIDEO_SEC}с — обработаю первые {LIMIT_VIDEO_SEC}с") |
| if short < RES_WARN: |
| notes.append(f"⚠️ невысокое разрешение ({short}p) — лицо может выйти мелким") |
| if notes: |
| return f"✓ {base}\n" + "\n".join(notes) |
| return f"✓ {base} — годится" |
| except Exception as e: |
| return f"❌ Не смог прочитать файл ({type(e).__name__})." |
|
|
|
|
| def _vet_video(video_path, label): |
| """Проверка перед обработкой. Возвращает limit_sec (0 = целиком, >0 = обрезать). |
| Бросает gr.Error на фатальном (короткое/мелкое/не видео).""" |
| info = _probe_video(video_path) |
| if info is None: |
| raise gr.Error(f"«{label}»: файл не читается как видео.") |
| dur, w, h, _ = info |
| short = min(w, h) |
| if short < RES_FAIL: |
| raise gr.Error(f"«{label}»: разрешение {short}p — слишком мелко, апскейл не спасёт.") |
| if dur and dur < MIN_VIDEO_SEC: |
| raise gr.Error(f"«{label}»: видео {dur:.0f}с — слишком короткое. " |
| f"Нужно хотя бы {MIN_VIDEO_SEC}с: покрути головой, построй рожи.") |
| if dur > LIMIT_VIDEO_SEC: |
| return LIMIT_VIDEO_SEC |
| return 0 |
|
|
|
|
| def build(indoor_day, indoor_night, outdoor_day, outdoor_night, holdout_video, |
| fps, target, size, min_sharp, min_face, margin, do_crop, |
| progress=gr.Progress()): |
| """Тонкая обёртка: ловит краши тяжёлой обработки, шлёт алерт в телегу, |
| отдаёт юзеру внятный отказ вместо немого падения. gr.Error (валидация — |
| короткое/отсутствующее видео) пробрасывается как есть.""" |
| snap_holder = {} |
| try: |
| return _build_core(indoor_day, indoor_night, outdoor_day, outdoor_night, |
| holdout_video, fps, target, size, min_sharp, min_face, |
| margin, do_crop, progress, snap_holder) |
| except gr.Error: |
| raise |
| except MemoryError as e: |
| guard.alert_crash(snap_holder.get("snap"), e, |
| {"слотов залито": snap_holder.get("n_vids", "?"), |
| "цель": int(target)}) |
| raise gr.Error("Не потянул столько видео разом — кончилась память. " |
| "Залей меньше слотов (2-3 вместо 5) или покороче.") |
| except Exception as e: |
| guard.alert_crash(snap_holder.get("snap"), e, |
| {"слотов залито": snap_holder.get("n_vids", "?"), |
| "цель": int(target)}) |
| raise gr.Error(f"Что-то пошло не так при обработке ({type(e).__name__}). " |
| "Попробуй меньше/покороче видео или другие файлы.") |
|
|
|
|
| def _build_core(indoor_day, indoor_night, outdoor_day, outdoor_night, holdout_video, |
| fps, target, size, min_sharp, min_face, margin, do_crop, |
| progress, snap_holder): |
| train_videos = [(SLOTS[i][1], v) for i, v in |
| enumerate([indoor_day, indoor_night, outdoor_day, outdoor_night]) if v] |
|
|
| if not train_videos: |
| raise gr.Error("Залей хотя бы одно видео в любой из четырёх слотов.") |
|
|
| |
| limits = {} |
| trimmed = [] |
| for label, v in train_videos: |
| lim = _vet_video(v, label) |
| limits[label] = lim |
| if lim: |
| trimmed.append(label) |
| holdout_limit = 0 |
| if holdout_video: |
| holdout_limit = _vet_video(holdout_video, "Контрольное") |
| if holdout_limit: |
| trimmed.append("Контрольное") |
|
|
| work = Path(tempfile.mkdtemp(prefix="faceds_")) |
| train_dir = work / "train" |
| train_dir.mkdir(parents=True, exist_ok=True) |
| max_side = 1280 if IS_SPACE else 0 |
|
|
| |
| snap = guard.MemorySnapshot() |
| snap_holder["snap"] = snap |
| snap_holder["n_vids"] = len(train_videos) |
| all_vid = [v for _, v in train_videos] + ([holdout_video] if holdout_video else []) |
| total_mb = sum(os.path.getsize(v) for v in all_vid if v and os.path.exists(v)) / (1024 ** 2) |
| snap.note_videos(len(all_vid), total_mb) |
| guard.memory_guard(snap, "загрузка моделей") |
| snap.freeze_static() |
|
|
| |
| n_vids = len(train_videos) |
| quota = max(3, math.ceil(int(target) / n_vids)) |
|
|
| |
| all_good_div, all_small = [], [] |
| slot_report = [] |
| slot_rows = [] |
| for i, (slot_name, v) in enumerate(train_videos): |
| progress((i + 0.5) / (n_vids + 1), desc=f"Обрабатываю: {slot_name}") |
| guard.memory_guard(snap, f"видео {i+1}/{n_vids}: {slot_name}") |
| r = process_one_video(v, quota, min_sharp, int(min_face), max_side, |
| int(size), margin, fps, limits[slot_name]) |
| all_good_div.extend(r["good_div"]) |
| all_small.extend(r["small"]) |
| slot_report.append((slot_name, len(r["good_div"]), quota, r["n_good_raw"])) |
| rs = r.get("reasons", {}) |
| slot_rows.append(dict(slot=slot_name, frames=r["frames"], |
| n_good=r["n_good_raw"], n_small=len(r["small"]), quota=quota, |
| drop_blur=rs.get("hard_blur", 0) + rs.get("soft_blur", 0), |
| drop_no_face=rs.get("no_face", 0), |
| drop_edge=rs.get("at_edge", 0), |
| multi_handled=rs.get("multi_face_handled", 0))) |
|
|
| |
| ref = build_identity_reference(all_good_div, all_small) |
|
|
| |
| train_paths = [] |
| for j, r in enumerate(sorted(all_good_div, key=lambda m: -m["sharp"])): |
| img = r["img"] if not do_crop else crop_align(r["face"], r["img"], int(size), margin) |
| p = train_dir / f"{j:03d}.png" |
| cv2.imwrite(str(p), img) |
| train_paths.append(str(p)) |
|
|
| base_run = dict(video=" + ".join(s for s, _ in train_videos), |
| frames_total=0, fps=fps, num=quota, target=int(target), |
| size=int(size), min_sharp=min_sharp, min_face=int(min_face), |
| n_good=len(all_good_div), n_small=len(all_small)) |
|
|
| |
| need = int(target) - len(train_paths) |
| pending = [] |
| floor = ceil = None |
| dropped = {"blur": 0, "noface": 0, "identity": 0} |
| if need > 0 and all_small: |
| progress((n_vids + 0.5) / (n_vids + 1), desc="Дотягиваю мелкие кадры") |
| guard.memory_guard(snap, "добор мелких (FSRCNN)") |
| small_div = pick_diverse(all_small, min(need, len(all_small))) |
| small_crops = [crop_align(r["face"], r["img"], int(size), margin) for r in small_div] |
| floor, ceil = compute_identity_bands(all_good_div, ref) |
| res = rescue_smalls_zoned(small_crops, 2, min_sharp, ref, floor, ceil, work / "pending") |
| for j, im in enumerate(res["take"], start=len(train_paths)): |
| p = train_dir / f"{j:03d}.png" |
| cv2.imwrite(str(p), im) |
| train_paths.append(str(p)) |
| pending = res["pending"] |
| dropped = res["dropped"] |
|
|
| |
| holdout_dir = work / "holdout" |
| n_holdout = 0 |
| if holdout_video: |
| progress(0.97, desc="Контрольное видео") |
| hr = process_one_video(holdout_video, max(5, int(target) // 3), |
| min_sharp, int(min_face), max_side, int(size), margin, |
| fps, holdout_limit) |
| holdout_dir.mkdir(parents=True, exist_ok=True) |
| |
| for j, r in enumerate(sorted(hr["good_div"], key=lambda m: -m["sharp"])): |
| if ref is not None: |
| sim = float(np.dot(r["emb"], ref)) |
| if sim < 0.3: |
| continue |
| img = r["img"] if not do_crop else crop_align(r["face"], r["img"], int(size), margin) |
| cv2.imwrite(str(holdout_dir / f"h_{j:03d}.png"), img) |
| n_holdout += 1 |
|
|
| state = dict(work=str(work), train_dir=str(train_dir), holdout_dir=str(holdout_dir), |
| train_paths=train_paths, pending=pending, target=int(target), |
| n_holdout=n_holdout, had_holdout=bool(holdout_video), |
| slot_report=slot_report, trimmed=trimmed, slot_rows=slot_rows, |
| run_fields=dict(**base_run, floor=floor, ceil=ceil, |
| n_take=len(train_paths) - len(all_good_div), |
| n_pending=len(pending), drop_blur=dropped["blur"], |
| drop_noface=dropped["noface"], drop_identity=dropped["identity"])) |
|
|
| |
| if pending: |
| gallery = [(p["path"], f"похож на {int(p['sim']*100)}%") for p in pending] |
| status = (f"Собрал {len(train_paths)} кадров. Ещё {len(pending)} спорных — " |
| f"апскейл их вытянул, но не уверен в идентичности. Отметь, какие " |
| f"оставить, и жми «Подтвердить».") |
| return (None, None, status, gr.update(visible=True), |
| gr.update(value=gallery), state) |
|
|
| |
| return _finalize_build(state, [], set(), progress) |
|
|
|
|
| def _slot_warnings(slot_report): |
| """Заметный текст про слоты, где просадка (нормальных нашлось меньше квоты).""" |
| lines = [] |
| for name, taken, quota, raw in slot_report: |
| if raw < quota: |
| lines.append(f" ⚠️ **{name}**: всего {raw} годных кадров (нужно ~{quota}). " |
| f"Переснимите это видео — лицо крупнее, меньше смаза, ярче свет.") |
| return lines |
|
|
|
|
| def _finalize_build(state, selected, kept_set, progress=None): |
| """Общий финал: добор отмеченных из ревизии, проверка уровней, zip, лог, статус.""" |
| work = Path(state["work"]) |
| train_dir = Path(state["train_dir"]) |
| holdout_dir = Path(state["holdout_dir"]) |
| train_paths = list(state["train_paths"]) |
| pending = state["pending"] |
| target = state["target"] |
|
|
| |
| kept = set(selected or []) |
| for p in pending: |
| if p["path"] in kept: |
| j = len(train_paths) |
| dst = train_dir / f"{j:03d}.png" |
| cv2.imwrite(str(dst), cv2.imread(p["path"])) |
| train_paths.append(str(dst)) |
|
|
| total = len(train_paths) |
| nh = state["n_holdout"] |
|
|
| |
| run_id = telemetry.log_run(**state["run_fields"], final_size=total, |
| hit_target=1 if total >= target else 0) |
| telemetry.log_grays(run_id, pending, kept) |
| telemetry.log_slots(run_id, state.get("slot_rows", [])) |
|
|
| zp = _zip_traindir(train_dir, holdout_dir, work) |
| gallery_paths = sorted(str(p) for p in train_dir.glob("*.png")) |
|
|
| |
| parts = [] |
| if total < N_MIN: |
| parts.append(f"⛔ **Собрано всего {total} кадров — для обучения LoRA маловато " |
| f"(нужно хотя бы {N_MIN}).** Датасет ниже отдаю, но с таким объёмом " |
| f"трудно получить хороший результат. Снимите больше видео или " |
| f"в лучших условиях.") |
| elif total > SOFT_CAP: |
| parts.append(f"✅ Собрано {total} кадров — даже с запасом. Учти: больше ~{SOFT_CAP} " |
| f"редко улучшает LoRA, а риск переобучения растёт. Можно смело " |
| f"оставить лучшие {SOFT_CAP}.") |
| else: |
| parts.append(f"✅ Готово: {total} кадров для обучения" + |
| (f" + {nh} контрольных" if nh else "") + ".") |
|
|
| warns = _slot_warnings(state["slot_report"]) |
| if warns: |
| parts.append("\nПросадки по слотам (датасет рабочий, но переснимете — будет лучше):") |
| parts.extend(warns) |
|
|
| if not state.get("had_holdout"): |
| parts.append("\n💡 Совет: залейте отдельное контрольное видео (любое — кружок, " |
| "селфи 15 сек) — отложу его кадры для финальной проверки LoRA.") |
|
|
| trimmed = state.get("trimmed") or [] |
| if trimmed: |
| lim = LIMIT_VIDEO_SEC |
| parts.append(f"\n✂️ Обрезано до {lim}с (дольше обработать не могу): " |
| f"{', '.join(trimmed)}. Взял первую минуту — этого обычно хватает." |
| if IS_SPACE else |
| f"\n✂️ Обрезано до {lim//60} мин: {', '.join(trimmed)}. " |
| f"Взял начало — остальное не обрабатывал.") |
|
|
| status = "\n".join(parts) |
| return (gallery_paths, zp, status, gr.update(visible=False), |
| gr.update(value=None), None) |
|
|
|
|
| def confirm_review(selected, state): |
| if not state: |
| raise gr.Error("Нет данных ревизии — запусти сбор заново") |
| return _finalize_build(state, selected, set(selected or [])) |
|
|
|
|
| with gr.Blocks(title="MakeFaces") as demo: |
| gr.Markdown("## MakeFaces · Покривляйся в камеру\n" |
| "Несколько коротких видео в разных условиях → разнообразный датасет " |
| "лиц для LoRA. Строй рожи, верти головой, меняй обстановку.") |
| if IS_SPACE: |
| gr.Markdown(f"⚠️ Облачная версия: видео от {MIN_VIDEO_SEC}с; длиннее " |
| f"{LIMIT_VIDEO_SEC}с обрежу до первой минуты.") |
|
|
| st = gr.State() |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("**Видео для обучения** (залей сколько сможешь — чем разнообразнее " |
| "условия, тем лучше). Перетащи файл или нажми на зону:") |
| vids, verdicts = [], [] |
| for key, label in SLOTS: |
| f = gr.File(label=label, height=80) |
| vinfo = gr.Textbox(label=None, interactive=False, lines=2, |
| show_label=False, container=False, |
| placeholder=f"{label}: файл не выбран") |
| f.change(inspect_upload, inputs=[f], outputs=[vinfo]) |
| vids.append(f) |
| verdicts.append(vinfo) |
| gr.Markdown("**Контрольное видео** (необязательно): любое отдельное видео с " |
| "вашим лицом — кружок из соцсетей, селфи на 15 секунд, что угодно. " |
| "В обучение не пойдёт, отложу для проверки результата.") |
| holdout = gr.File(label="Контрольное (holdout)", height=80) |
| holdout_info = gr.Textbox(label=None, interactive=False, lines=2, |
| show_label=False, container=False, |
| placeholder="Контрольное: файл не выбран") |
| holdout.change(inspect_upload, inputs=[holdout], outputs=[holdout_info]) |
| with gr.Accordion("Параметры", open=False): |
| fps = gr.Slider(0.5, 10, value=2, step=0.5, label="FPS нарезки") |
| target = gr.Slider(N_MIN, SOFT_CAP, value=TARGET_DEFAULT, step=1, |
| label="Цель по числу кадров") |
| size = gr.Slider(512, 1536, value=1024, step=128, label="Разрешение кропа") |
| min_sharp = gr.Slider(10, 300, value=80, step=10, label="Порог резкости") |
| min_face = gr.Slider(80, 600, value=200, step=20, label="Мин. размер лица (px)") |
| margin = gr.Slider(0, 1.5, value=0.6, step=0.1, label="Запас при кропе") |
| do_crop = gr.Checkbox(value=True, label="Кропать по лицу") |
| btn = gr.Button("Собрать датасет", variant="primary") |
|
|
| with gr.Column(scale=2): |
| status = gr.Markdown() |
| with gr.Group(visible=False) as review_block: |
| gr.Markdown("**Спорные кадры — кликни те, что оставить:**") |
| review_gallery = gr.Gallery(columns=5, height=300, object_fit="cover") |
| review_selected = gr.State([]) |
| confirm_btn = gr.Button("Подтвердить выбор", variant="primary") |
| gallery = gr.Gallery(label="Датасет (train)", columns=5, height=480) |
| download = gr.File(label="Скачать zip (train/ + holdout/)") |
|
|
| def _toggle(selected, evt: gr.SelectData, state): |
| if not state: |
| return selected |
| path = state["pending"][evt.index]["path"] |
| selected = list(selected or []) |
| if path in selected: |
| selected.remove(path) |
| else: |
| selected.append(path) |
| return selected |
|
|
| review_gallery.select(_toggle, [review_selected, st], [review_selected]) |
|
|
| btn.click( |
| build, |
| inputs=[*vids, holdout, fps, target, size, min_sharp, min_face, margin, do_crop], |
| outputs=[gallery, download, status, review_block, review_gallery, st], |
| ) |
| confirm_btn.click( |
| confirm_review, |
| inputs=[review_selected, st], |
| outputs=[gallery, download, status, review_block, review_gallery, st], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|