synthsenses-api / preprocessing /extract_frames.py
parina004
Initial HF deployment
e6efe7e
Raw
History Blame Contribute Delete
19.2 kB
## Step 1 β€” Frame Extraction for Model A (Synthetic Media Detection)
## HOW IT WORKS
## ─────────────
## Instead of feeding whole videos into our model, we break each video into
## individual frames and treat each frame as an image. This is standard practice
## because deep learning models like EfficientNet-B4 are trained on images, not
## video files.
## We sample at 1 frame per second (1 fps) and cap at 30 frames per video.
## This gives us up to 30 snapshot images from the first 30 seconds of each
## video β€” enough to capture the visual patterns we need without blowing up
## disk space. Each frame is resized to 224Γ—224 pixels (the standard input size
## for EfficientNet-B4) and saved as a JPEG.
## WHY SPLIT AT VIDEO LEVEL
## ─────────────────────────
## We assign each WHOLE VIDEO to either train or test before extracting any
## frames. This is critical β€” if we split at frame level, frames from the same
## video would appear in both train and test, the model would just memorise the
## video and report inflated accuracy. Splitting at video level ensures the
## test set contains videos the model has genuinely never seen.
## OUTPUT STRUCTURE
## ─────────────────
## data/model_a_datasets/frames/
## train/
## real/
## celeb_real/ {video_id}/ frame_0001.jpg …
## youtube_real/ {video_id}/ …
## dfdc_part0/ {video_id}/ …
## dfdc_part1/ {video_id}/ …
## dfdc_part2/ {video_id}/ …
## faceforensics_real/{video_id}/ …
## pexels/ {video_id}/ …
## deepfake/
## celeb_synthesis/ {video_id}/ …
## dfdc_part0/ {video_id}/ …
## dfdc_part1/ {video_id}/ …
## dfdc_part2/ {video_id}/ …
## faceforensics_deepfake/ {video_id}/ …
## ai_generated/
## videocraft/ {video_id}/ …
## animatediff/ {video_id}/ …
## cogvideox/ {video_id}/ …
## runwayml/ {video_id}/ …
## stable_diffusion/ {video_id}/ …
## videopoet/ {video_id}/ …
## test/
## (same structure as train/)
## manifest.csv β€” one row per video, written as we go (crash-safe)
## If the script is interrupted, just run it again β€” it picks up from where
## it left off via a checkpoint file. The checkpoint is deleted when done.
## Run with: uv run preprocessing/extract_frames.py
import csv
import json
import logging
import random
import re
import time
from pathlib import Path
import cv2
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
log = logging.getLogger(__name__)
## project paths
ROOT = Path(__file__).parent.parent
DATA = ROOT / "data" / "model_a_datasets"
DEEPFAKE = DATA / "deepfake_datasets"
AI_DATA = DATA / "ai_datasets"
OUT = DATA / "frames"
MANIFEST = OUT / "manifest.csv"
CKPT = OUT / ".extract_checkpoint.json"
## settings
SEED = 42
TEST_RATIO = 0.20 ## 20% of videos go to test, 80% to train
MAX_FRAMES = 30 ## max frames per video (1 fps, so = 30 seconds)
IMG_SIZE = 224 ## resize every frame to 224Γ—224
JPEG_QUALITY = 95
## batch processing β€” process this many videos, then pause before continuing.
## keeps the laptop from being pinned at 100% CPU for hours on end.
BATCH_SIZE = 50 ## videos per batch
BATCH_PAUSE = 8 ## seconds to rest between batches
## columns written to manifest.csv β€” one row per video
MANIFEST_FIELDS = [
"video_id",
"dataset_source",
"label",
"split",
"frame_count",
"frame_dir", ## relative to project root
"original_video_path", ## relative to project root
]
def _rel(path: Path) -> str:
## return path relative to project root so manifest.csv is portable
return str(path.relative_to(ROOT))
def _sanitize(name: str) -> str:
## some videocraft filenames have spaces or special chars β€” clean them up
## so they're safe to use as folder names on any OS
return re.sub(r"[^a-zA-Z0-9_\-]+", "_", name).strip("_")
## each collect_* function scans one dataset folder and returns a list of
## dicts, one per video, with keys: video_path, label, dataset_source, video_id
def collect_celeb_df_v2() -> list[dict]:
base = DEEPFAKE / "celeb_df_v2"
entries = []
real_dir = base / "Celeb-real"
if real_dir.exists():
for mp4 in sorted(real_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "real",
"dataset_source": "celeb_real",
"video_id": mp4.stem,
})
else:
log.warning(" celeb_df_v2/Celeb-real not found β€” skipping")
yt_dir = base / "YouTube-real"
if yt_dir.exists():
for mp4 in sorted(yt_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "real",
"dataset_source": "youtube_real",
"video_id": mp4.stem,
})
else:
log.warning(" celeb_df_v2/YouTube-real not found β€” skipping")
synth_dir = base / "Celeb-synthesis"
if synth_dir.exists():
for mp4 in sorted(synth_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "deepfake",
"dataset_source": "celeb_synthesis",
"video_id": mp4.stem,
})
else:
log.warning(" celeb_df_v2/Celeb-synthesis not found β€” skipping")
log.info(f" celeb_df_v2 : {len(entries):>5} videos")
return entries
def collect_dfdc() -> list[dict]:
## DFDC doesn't encode labels in filenames β€” they're in metadata.json
## each part has its own metadata.json with {filename: {label: FAKE/REAL}}
_PART_MAP = {
"dfdc_train_part_0": "dfdc_part0",
"dfdc_train_part_1": "dfdc_part1",
"dfdc_train_part_2": "dfdc_part2",
}
entries = []
for part_folder, source_name in _PART_MAP.items():
part_dir = DEEPFAKE / "dfdc" / part_folder
meta_file = part_dir / "metadata.json"
if not part_dir.exists():
log.warning(f" dfdc/{part_folder} not found β€” skipping")
continue
if not meta_file.exists():
log.warning(f" dfdc/{part_folder}/metadata.json missing β€” skipping")
continue
meta = json.loads(meta_file.read_text())
part_count = 0
for filename, info in meta.items():
mp4 = part_dir / filename
if not mp4.exists():
continue
label = "deepfake" if info.get("label", "").upper() == "FAKE" else "real"
entries.append({
"video_path": mp4,
"label": label,
"dataset_source": source_name,
"video_id": mp4.stem,
})
part_count += 1
log.info(f" {source_name:<22} : {part_count:>5} videos")
log.info(f" dfdc total : {len(entries):>5} videos")
return entries
def collect_faceforensics() -> list[dict]:
## FaceForensics++ has two clear folders: original (real) and
## manipulated/Deepfakes (fake). Labels come from folder structure alone.
base = DEEPFAKE / "faceforensics"
entries = []
real_dir = base / "original_sequences" / "youtube" / "c23" / "videos"
if real_dir.exists():
for mp4 in sorted(real_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "real",
"dataset_source": "faceforensics_real",
"video_id": mp4.stem,
})
else:
log.warning(" faceforensics original_sequences not found β€” skipping")
fake_dir = base / "manipulated_sequences" / "Deepfakes" / "c23" / "videos"
if fake_dir.exists():
for mp4 in sorted(fake_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "deepfake",
"dataset_source": "faceforensics_deepfake",
"video_id": mp4.stem,
})
else:
log.warning(" faceforensics manipulated_sequences not found β€” skipping")
log.info(f" faceforensics : {len(entries):>5} videos")
return entries
## maps GenVideo generator folder names to clean short source names
_GENERATOR_MAP = {
"BDAnimateDiffLightning": "animatediff",
"CogVideoX5B": "cogvideox",
"RunwayML": "runwayml",
"StableDiffusion": "stable_diffusion",
"VideoPoet": "videopoet",
}
def collect_genvideo() -> list[dict]:
base = AI_DATA / "GenVideo"
entries = []
## videocraft: flat folder of AI-generated mp4s
vc_dir = base / "AIGVDet" / "T2V" / "videocraft_mp4"
if vc_dir.exists():
vc_count = 0
for mp4 in sorted(vc_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "ai_generated",
"dataset_source": "videocraft",
"video_id": _sanitize(mp4.stem),
})
vc_count += 1
log.info(f" videocraft : {vc_count:>5} videos")
else:
log.warning(" GenVideo/AIGVDet/T2V/videocraft_mp4 not found β€” skipping")
deepaction = base / "deepaction"
## Pexels: real stock videos. Each action folder has one file: a.mp4
## video_id = the action folder name (e.g. 000, 001, ...)
pexels_dir = deepaction / "Pexels"
if pexels_dir.exists():
pexels_count = 0
for action_dir in sorted(pexels_dir.iterdir()):
if not action_dir.is_dir():
continue
mp4 = action_dir / "a.mp4"
if mp4.exists():
entries.append({
"video_path": mp4,
"label": "real",
"dataset_source": "pexels",
"video_id": action_dir.name,
})
pexels_count += 1
log.info(f" pexels : {pexels_count:>5} videos")
else:
log.warning(" GenVideo/deepaction/Pexels not found β€” skipping")
## AI generators: each has action folders, each action has 5 variants (b–e)
## video_id = {action_id}_{variant} e.g. 000_b, 001_c
for gen_folder, source_name in _GENERATOR_MAP.items():
gen_dir = deepaction / gen_folder
if not gen_dir.exists():
log.warning(f" deepaction/{gen_folder} not found β€” skipping")
continue
gen_count = 0
for action_dir in sorted(gen_dir.iterdir()):
if not action_dir.is_dir():
continue
for mp4 in sorted(action_dir.glob("*.mp4")):
entries.append({
"video_path": mp4,
"label": "ai_generated",
"dataset_source": source_name,
"video_id": f"{action_dir.name}_{mp4.stem}",
})
gen_count += 1
log.info(f" {source_name:<22} : {gen_count:>5} videos")
return entries
def assign_splits(entries: list[dict]) -> list[dict]:
## shuffle and split within each label class so that train and test both
## have the same proportion of real / deepfake / ai_generated videos
by_label: dict[str, list] = {}
for e in entries:
by_label.setdefault(e["label"], []).append(e)
rng = random.Random(SEED)
log.info("Split (80% train / 20% test, stratified per label):")
for label in sorted(by_label):
group = by_label[label]
rng.shuffle(group)
n_test = max(1, int(len(group) * TEST_RATIO))
for i, e in enumerate(group):
e["split"] = "test" if i < n_test else "train"
log.info(f" {label:<14} {len(group) - n_test:>5} train / {n_test:>4} test")
return entries
def extract_frames(entry: dict) -> int:
## open the video, figure out its FPS, then seek to the frame at second 0,
## second 1, second 2 ... up to MAX_FRAMES. Resize each to 224Γ—224 and
## save as JPEG. Returns how many frames were actually saved.
out_dir = (
OUT
/ entry["split"]
/ entry["label"]
/ entry["dataset_source"]
/ entry["video_id"]
)
out_dir.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(str(entry["video_path"]))
if not cap.isOpened():
log.warning(f" cannot open: {entry['video_path']}")
return 0
try:
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration_s = total_frames / fps
n_seconds = min(int(duration_s), MAX_FRAMES)
sample_indices = [
int(s * fps)
for s in range(n_seconds)
if int(s * fps) < total_frames
]
if not sample_indices:
log.warning(f" video too short or unreadable: {entry['video_path']}")
return 0
saved = 0
for n, frame_idx in enumerate(sample_indices, start=1):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
resized = cv2.resize(frame, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
out_path = out_dir / f"frame_{n:04d}.jpg"
cv2.imwrite(str(out_path), resized, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY])
saved += 1
return saved
finally:
cap.release()
def load_checkpoint() -> set[str]:
if CKPT.exists():
return set(json.loads(CKPT.read_text()).get("done", []))
return set()
def save_checkpoint(done: set[str]):
CKPT.write_text(json.dumps({"done": list(done)}, indent=2))
def main():
OUT.mkdir(parents=True, exist_ok=True)
log.info("=" * 70)
log.info("Discovering videos across all datasets ...")
entries = []
entries += collect_celeb_df_v2()
entries += collect_dfdc()
entries += collect_faceforensics()
entries += collect_genvideo()
log.info(f"Total videos found : {len(entries):>5}")
log.info("")
entries = assign_splits(entries)
log.info("")
done = load_checkpoint()
all_uids = {f"{e['dataset_source']}__{e['video_id']}" for e in entries}
if all_uids and all_uids.issubset(done):
log.info("All videos already processed β€” nothing to do. Exiting.")
return
if done:
log.info(f"Resuming β€” {len(done)} videos already done, skipping them.")
manifest_exists = MANIFEST.exists()
manifest_file = open(MANIFEST, "a", newline="", encoding="utf-8")
writer = csv.DictWriter(manifest_file, fieldnames=MANIFEST_FIELDS)
if not manifest_exists:
writer.writeheader()
total = len(entries)
n_done = len(done)
n_skipped_at_start = len(done) ## how many were already done before this run
n_success = 0
n_failed = 0
t_start = time.time()
log.info("=" * 70)
log.info("Extracting frames ...")
log.info("")
try:
for entry in entries:
uid = f"{entry['dataset_source']}__{entry['video_id']}"
if uid in done:
continue
n_done += 1
t0 = time.time()
try:
frame_count = extract_frames(entry)
except Exception as exc:
log.error(f" [{n_done}/{total}] ERROR β€” {uid}: {exc}")
frame_count = 0
if frame_count > 0:
writer.writerow({
"video_id": entry["video_id"],
"dataset_source": entry["dataset_source"],
"label": entry["label"],
"split": entry["split"],
"frame_count": frame_count,
"frame_dir": _rel(
OUT / entry["split"] / entry["label"]
/ entry["dataset_source"] / entry["video_id"]
),
"original_video_path": _rel(entry["video_path"]),
})
manifest_file.flush()
n_success += 1
elapsed = time.time() - t0
log.info(
f" [{n_done:>5}/{total}] "
f"{entry['split']:<5} "
f"{entry['label']:<14} "
f"{entry['dataset_source']:<24} "
f"{entry['video_id']:<35} "
f"{frame_count:>2} frames [{elapsed:.1f}s]"
)
else:
n_failed += 1
log.warning(
f" [{n_done:>5}/{total}] SKIP (0 frames) "
f"{entry['dataset_source']} / {entry['video_id']}"
)
done.add(uid)
save_checkpoint(done)
## after every BATCH_SIZE videos, take a short rest so the laptop
## has a chance to cool down before the next batch begins automatically.
## the script continues on its own β€” you don't need to restart anything.
if (n_done - n_skipped_at_start) % BATCH_SIZE == 0:
elapsed_total = time.time() - t_start
rate = elapsed_total / max(n_done - n_skipped_at_start, 1)
eta_s = (total - n_done) * rate
remaining = total - n_done
log.info("")
log.info(
f" Batch complete. {n_done}/{total} done | "
f"remaining {remaining} | "
f"elapsed {elapsed_total / 60:.1f} min | "
f"ETA ~{eta_s / 60:.0f} min"
)
if remaining > 0:
log.info(f" Pausing {BATCH_PAUSE}s before next batch ...")
time.sleep(BATCH_PAUSE)
log.info(" Resuming ...")
log.info("")
finally:
manifest_file.close()
if len(done) >= total:
CKPT.unlink(missing_ok=True)
log.info("Checkpoint removed β€” all videos processed.")
log.info("")
log.info("=" * 70)
log.info("Frame extraction complete.")
log.info(f" total videos : {total}")
log.info(f" frames saved : {n_success} videos with β‰₯ 1 frame")
log.info(f" skipped (empty) : {n_failed}")
log.info(f" manifest : {_rel(MANIFEST)}")
log.info("=" * 70)
if __name__ == "__main__":
main()