Spaces:
Running
Running
File size: 5,151 Bytes
a8aea21 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import os
import shutil
import logging
from pathlib import Path
from collections import defaultdict
import glob
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)
# Try to import optional dependencies
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
logger.warning("β οΈ PIL (Pillow) not found. Image validation will be skipped (only file extension check).")
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
# Configuration
TARGET_COUNT = 1300 # Safety margin above 1000
TARGET_CATEGORIES = [
"workshops/coding",
"workshops/design"
]
DATA_ROOT = Path("data")
RAW_ROOT = DATA_ROOT / "raw"
PROCESSED_ROOT = DATA_ROOT / "processed"
def get_image_files(directory):
"""Recursively get all image files in a directory."""
extensions = {'*.jpg', '*.jpeg', '*.png', '*.webp', '*.bmp'}
files = []
if not directory.exists():
return files
for ext in extensions:
# Case insensitive search would be better but glob is case sensitive on Linux/WSL usually.
# We will try both cases or just standarize.
# Walking is safer for case insensitivity if needed, but glob is faster.
files.extend(directory.glob(f"**/{ext}"))
files.extend(directory.glob(f"**/{ext.upper()}"))
return sorted(list(set(files)))
def check_image_quality(file_path):
"""
Basic quality check using PIL (if available).
Returns (Passed: bool, Message: str)
"""
if not PIL_AVAILABLE:
# If PIL is missing, we assume file is okay if it exists and has size
if file_path.stat().st_size < 5120: # < 5KB is suspect
return False, "File too small"
return True, "No PIL check"
try:
with Image.open(file_path) as img:
width, height = img.size
if width < 256 or height < 256:
return False, f"Low resolution: {width}x{height}"
# Aspect ratio check
aspect = width / height
if aspect < 0.4 or aspect > 2.5:
return False, f"Extreme aspect ratio: {aspect:.2f}"
return True, "OK"
except Exception as e:
return False, f"Corrupt image: {str(e)}"
def process_category(relative_path):
"""Process a single category."""
category_name = str(relative_path).replace("\\", "/")
logger.info(f"π Checking category: {category_name}")
raw_path = RAW_ROOT / relative_path
processed_path = PROCESSED_ROOT / relative_path
# Ensure processed directory exists
processed_path.mkdir(parents=True, exist_ok=True)
# 1. Count current Processed
processed_files = get_image_files(processed_path)
current_count = len(processed_files)
processed_filenames = {f.name for f in processed_files}
logger.info(f" Existing processed images: {current_count}")
if current_count >= TARGET_COUNT:
logger.info(f" β
Already met target of {TARGET_COUNT}. Skipping.")
return
needed = TARGET_COUNT - current_count
logger.info(f" β οΈ Need {needed} more images.")
# 2. Get Raw Candidates
raw_files = get_image_files(raw_path)
logger.info(f" Found {len(raw_files)} raw images available.")
# Filter out files that are already in processed (by filename)
candidates = [f for f in raw_files if f.name not in processed_filenames]
logger.info(f" {len(candidates)} new unique candidates available to process.")
if not candidates:
logger.warning(" β No new candidates found in raw folder!")
return
# 3. Copy Candidates
added_count = 0
passed_check = 0
failed_check = 0
# Progress bar setup
iterator = tqdm(candidates, unit="img") if TQDM_AVAILABLE else candidates
for src_file in iterator:
if added_count >= needed:
break
# Quality Check
is_ok, msg = check_image_quality(src_file)
if not is_ok:
failed_check += 1
continue
# Copy
dst_file = processed_path / src_file.name
try:
shutil.copy2(src_file, dst_file)
added_count += 1
passed_check += 1
except Exception as e:
logger.error(f"Failed to copy {src_file.name}: {e}")
logger.info(f" π Added {added_count} images.")
logger.info(f" Final Count: {current_count + added_count}")
logger.info("-" * 40)
def main():
logger.info("π Starting targeted dataset augmentation...")
logger.info(f"π Data Root: {DATA_ROOT.absolute()}")
logger.info(f"π― Target: {TARGET_COUNT} images per category")
for cat in TARGET_CATEGORIES:
process_category(Path(cat))
logger.info("β¨ Done.")
if __name__ == "__main__":
main()
|