| import os |
| import shutil |
| import re |
| import json |
| import random |
| from collections import Counter |
| import numpy as np |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import tempfile |
| import zipfile |
|
|
|
|
| |
| |
| |
|
|
| |
| def util_get_class_name_from_filename(filename): |
| match = re.match(r'(.+?)_\d{3,}', filename) |
| return match.group(1) if match else None |
|
|
| def util_create_class_mapping(train_txt_path, log_capture): |
| class_mapping = {} |
| try: |
| with open(train_txt_path, 'r') as f: |
| for line in f: |
| parts = line.strip().split() |
| if len(parts) == 2: |
| filename, class_id_str = parts |
| class_id = int(class_id_str) |
| if class_id not in class_mapping: |
| class_name = util_get_class_name_from_filename(filename) |
| if class_name: |
| class_mapping[class_id] = class_name |
| except FileNotFoundError: |
| print(f"Error: {train_txt_path} not found.", file=log_capture) |
| return None |
| return class_mapping |
|
|
| def util_process_dataset(annotations_path, source_dir, dest_dir, class_mapping, log_capture): |
| if not os.path.exists(annotations_path): |
| print(f"Warning: Annotations file not found: {annotations_path}", file=log_capture) |
| return |
| if not os.path.exists(source_dir): |
| print(f"Warning: Source directory not found: {source_dir}", file=log_capture) |
| return |
| with open(annotations_path, 'r') as f: |
| for line in f: |
| parts = line.strip().split() |
| if len(parts) == 2: |
| filename, class_id_str = parts |
| class_id = int(class_id_str) |
| if class_id in class_mapping: |
| class_name = class_mapping[class_id] |
| class_dir = os.path.join(dest_dir, class_name) |
| os.makedirs(class_dir, exist_ok=True) |
| source_path = os.path.join(source_dir, filename) |
| dest_path = os.path.join(class_dir, filename) |
| if os.path.exists(source_path): |
| print(f"Moving {source_path} to {dest_path}", file=log_capture) |
| shutil.move(source_path, dest_path) |
| else: |
| print(f"Warning: Source file not found: {source_path}", file=log_capture) |
| else: |
| print(f"Warning: Class ID {class_id} for file {filename} not in mapping.", file=log_capture) |
|
|
| |
| def util_normalise_class_names(target_dir, log_capture): |
| print(f"\nStarting normalisation in '{target_dir}'...", file=log_capture) |
| try: |
| |
| subdirectories = [d for d in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, d))] |
| if not subdirectories: |
| print("No subdirectories found to normalise.", file=log_capture) |
| return |
| for old_name in subdirectories: |
| new_name = old_name.lower() |
| if old_name == new_name: |
| print(f"Skipping '{old_name}' as it is already normalised.", file=log_capture) |
| continue |
| old_path = os.path.join(target_dir, old_name) |
| new_path = os.path.join(target_dir, new_name) |
| |
| if os.path.exists(new_path) and not os.path.samefile(old_path, new_path): |
| print(f"Warning: Cannot rename '{old_name}' to '{new_name}' because a different directory with that name already exists. Skipping.", file=log_capture) |
| continue |
| try: |
| |
| temp_name = old_name + "_temp_rename" |
| temp_path = os.path.join(target_dir, temp_name) |
| if os.path.exists(temp_path): |
| print(f"Warning: Temporary path '{temp_path}' already exists. Skipping rename for '{old_name}'.", file=log_capture) |
| continue |
| os.rename(old_path, temp_path) |
| os.rename(temp_path, new_path) |
| print(f"Renamed '{old_name}' to '{new_name}'.", file=log_capture) |
| except OSError as e: |
| print(f"Error renaming '{old_name}': {e}", file=log_capture) |
| print("\nNormalisation complete.", file=log_capture) |
| except Exception as e: |
| print(f"An unexpected error occurred: {e}", file=log_capture) |
|
|
| |
| def util_lowercase_filenames(target_dir, log_capture): |
| file_count, renamed_count = 0, 0 |
| for dirpath, _, filenames in os.walk(target_dir): |
| for old_name in filenames: |
| file_count += 1 |
| new_name = old_name.lower() |
| if old_name == new_name: continue |
| old_path, new_path = os.path.join(dirpath, old_name), os.path.join(dirpath, new_name) |
| if os.path.exists(new_path) and not os.path.samefile(old_path, new_path): |
| print(f"Warning: Cannot rename '{old_path}' to '{new_path}'. Skipping.", file=log_capture) |
| continue |
| try: |
| temp_name = old_name + "_temp_rename" |
| temp_path = os.path.join(dirpath, temp_name) |
| if os.path.exists(temp_path): |
| print(f"Warning: Temp path '{temp_path}' exists. Skipping '{old_name}'.", file=log_capture) |
| continue |
| os.rename(old_path, temp_path) |
| os.rename(temp_path, new_path) |
| print(f"Renamed '{os.path.basename(old_path)}' to '{os.path.basename(new_path)}'", file=log_capture) |
| renamed_count += 1 |
| except OSError as e: |
| print(f"Error renaming '{old_path}': {e}", file=log_capture) |
| print(f"\nProcessed {file_count} files, renamed {renamed_count}.", file=log_capture) |
|
|
| def util_standardise_filenames(target_dir, to_lowercase, log_capture): |
| total_renamed = 0 |
| for dirpath, _, filenames in os.walk(target_dir): |
| if not filenames: continue |
| class_name = os.path.basename(dirpath).lower() if to_lowercase else os.path.basename(dirpath) |
| print(f"\nProcessing directory: {class_name}", file=log_capture) |
| rename_map = [] |
| for i, old_name in enumerate(sorted(filenames)): |
| _, extension = os.path.splitext(old_name) |
| if to_lowercase: extension = extension.lower() |
| new_name = f"{class_name}_{i+1:04d}{extension}" |
| rename_map.append({'old_path': os.path.join(dirpath, old_name), 'new_path': os.path.join(dirpath, new_name)}) |
| for item in rename_map: |
| item['temp_path'] = item['old_path'] + '.tmp' |
| os.rename(item['old_path'], item['temp_path']) |
| for item in rename_map: |
| os.rename(item['temp_path'], item['new_path']) |
| print(f"Renamed '{os.path.basename(item['old_path'])}' to '{os.path.basename(item['new_path'])}'", file=log_capture) |
| total_renamed += 1 |
| print(f"\nStandardised {total_renamed} files.", file=log_capture) |
|
|
| |
| def util_split_image_dataset(source_dir, output_dir, min_images_per_split, log_capture): |
| main_output_path = os.path.join(os.getcwd(), output_dir) |
| os.makedirs(main_output_path, exist_ok=True) |
| print(f"Created output directory: {main_output_path}", file=log_capture) |
|
|
| try: |
| with tempfile.TemporaryDirectory(prefix="autotrain_split_train_") as train_dir, \ |
| tempfile.TemporaryDirectory(prefix="autotrain_split_val_") as validation_dir: |
|
|
| class_dirs = [r for r, d, f in os.walk(source_dir) if not d and f] |
| image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".gif"} |
| required_total = min_images_per_split * 2 |
| class_image_data = {cd: [f for f in os.listdir(cd) if os.path.splitext(f)[1].lower() in image_extensions] for cd in class_dirs} |
| valid_class_count = sum(1 for images in class_image_data.values() if len(images) >= required_total) |
| if valid_class_count < 2: |
| print(f"Error: Dataset splitting requires at least 2 classes with >= {required_total} images each. Found {valid_class_count} valid classes.", file=log_capture) |
| return |
|
|
| manifest = {"included_classes": {}, "skipped_classes": {}} |
| processed_class_names = set() |
| for class_dir, images in class_image_data.items(): |
| base_class_name = os.path.basename(class_dir) |
| class_name, counter = base_class_name, 1 |
| while class_name in processed_class_names: |
| class_name = f"{base_class_name}_{counter}"; counter += 1 |
| processed_class_names.add(class_name) |
| if len(images) < required_total: |
| manifest["skipped_classes"][class_name] = {"count": len(images), "reason": f"Not enough images ({len(images)}), required {required_total}."} |
| continue |
| random.shuffle(images) |
| num_val_ratio = round(len(images) * 0.2) |
| num_train_ratio = len(images) - num_val_ratio |
| num_validation = num_val_ratio if num_val_ratio >= min_images_per_split and num_train_ratio >= min_images_per_split else min_images_per_split |
| validation_images, train_images = images[:num_validation], images[num_validation:] |
| manifest["included_classes"][class_name] = {"train": len(train_images), "validation": len(validation_images)} |
| for split_dir, split_images in [(train_dir, train_images), (validation_dir, validation_images)]: |
| split_class_dir = os.path.join(split_dir, class_name) |
| os.makedirs(split_class_dir, exist_ok=True) |
| for image in split_images: |
| shutil.copy(os.path.join(class_dir, image), os.path.join(split_class_dir, image)) |
|
|
| manifest_filename = f"{output_dir}-manifest.json" |
| manifest_path = os.path.join(main_output_path, manifest_filename) |
| with open(manifest_path, "w") as f: json.dump(manifest, f, indent=4) |
| print(f"Manifest saved to: {manifest_path}", file=log_capture) |
|
|
| train_zip_filename = f"{output_dir}-train" |
| train_zip_path = os.path.join(main_output_path, train_zip_filename) |
| shutil.make_archive(train_zip_path, 'zip', train_dir) |
| print(f"Training data zip created: {train_zip_path}.zip", file=log_capture) |
|
|
| validation_zip_filename = f"{output_dir}-validation" |
| validation_zip_path = os.path.join(main_output_path, validation_zip_filename) |
| shutil.make_archive(validation_zip_path, 'zip', validation_dir) |
| print(f"Validation data zip created: {validation_zip_path}.zip", file=log_capture) |
|
|
| print(f"\nDataset splitting and packaging complete. Files are in '{main_output_path}'.", file=log_capture) |
| except Exception as e: |
| print(f"An error occurred during dataset splitting: {e}", file=log_capture) |
|
|
| |
| def util_generate_manifest(directory, save_manifest, manifest_path, log_capture): |
| ignored_dirs = {'.git', '__pycache__', '.vscode', '.idea', 'node_modules', 'venv', '.venv'} |
| ignored_files = {os.path.basename(manifest_path)} if save_manifest else set() |
| ignored_extensions = {'.pyc', '.zip', '.log', '.tmp', '.bak', '.swp'} |
| manifest_content = [] |
| try: |
| for root, dirs, files in os.walk(directory, topdown=True): |
| dirs[:] = sorted([d for d in dirs if d not in ignored_dirs]) |
| files.sort() |
| for filename in files: |
| if filename in ignored_files or os.path.splitext(filename)[1].lower() in ignored_extensions: |
| continue |
| relative_path = os.path.relpath(os.path.join(root, filename), directory).replace(os.sep, '/') |
| manifest_content.append(f"- {relative_path}\n") |
| |
| manifest_string = "".join(manifest_content) |
|
|
| print("--- Manifest Content ---", file=log_capture) |
| print(manifest_string, file=log_capture) |
| print("------------------------", file=log_capture) |
|
|
| if save_manifest: |
| with open(manifest_path, "w", encoding="utf-8") as f: |
| f.write(manifest_string) |
| print(f"Manifest file created at: {manifest_path}", file=log_capture) |
| else: |
| print("Manifest generated but not saved.", file=log_capture) |
| except Exception as e: |
| print(f"An error occurred: {e}", file=log_capture) |
|
|
| |
| def util_get_class_from_line(line: str): |
| return line.strip().lstrip('- ').split('/')[0] if '/' in line else None |
|
|
| def util_analyse_balance(manifest_path): |
| if not os.path.exists(manifest_path): |
| raise FileNotFoundError(f"Error: Manifest file not found at '{manifest_path}'") |
| with open(manifest_path, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| is_class_count_manifest = any('|---|' in line for line in lines[:5]) or (lines and lines[0].strip() == "# Class Count Manifest") |
| class_counts = Counter() |
| if is_class_count_manifest: |
| for line in lines: |
| line = line.strip() |
| if line.startswith('|'): |
| parts = [p.strip() for p in line.split('|')] |
| if len(parts) >= 4: |
| class_name, count_str = parts[1], parts[2] |
| if class_name.lower() not in ('class name', '---') and count_str.isdigit(): |
| class_counts[class_name] = int(count_str) |
| else: |
| class_counts = Counter(c for line in lines if (c := util_get_class_from_line(line))) |
| if not class_counts: |
| return "No classes found in the manifest file.", None |
| counts = list(class_counts.values()) |
| imbalance_ratio = max(counts) / min(counts) |
| summary = ( |
| f"Dataset Balance Analysis\n" |
| f"=========================\n" |
| f"Total classes: {len(class_counts)}\n" |
| f"Total images: {sum(counts)}\n" |
| f"Images per class:\n" |
| f" - Minimum: {min(counts)}\n" |
| f" - Maximum: {max(counts)}\n" |
| f" - Average: {np.mean(counts):.2f}\n" |
| f" - Std Dev: {np.std(counts):.2f}\n" |
| f"Imbalance Ratio (Max/Min): {imbalance_ratio:.2f}:1" |
| ) |
| sorted_classes = sorted(class_counts.keys()) |
| sorted_counts = [class_counts[c] for c in sorted_classes] |
| fig, ax = plt.subplots(figsize=(20, 10)) |
| ax.bar(sorted_classes, sorted_counts) |
| ax.set_xlabel('Class'); ax.set_ylabel('Number of Images'); ax.set_title('Image Distribution Across Classes') |
| plt.xticks(rotation=90, fontsize='small'); plt.tight_layout() |
| return summary, fig |
|
|
| |
| def util_count_classes(target_dir, save_to_manifest, manifest_path, log_capture): |
| if not os.path.isdir(target_dir): |
| print(f"Error: Directory not found at '{target_dir}'", file=log_capture) |
| return |
| try: |
| class_dirs = [e for e in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, e))] |
| if not class_dirs: |
| print(f"No class subdirectories found in '{target_dir}'.", file=log_capture) |
| return |
| class_counts = Counter({name: len([f for f in os.listdir(os.path.join(target_dir, name)) if os.path.isfile(os.path.join(target_dir, name, f))]) for name in class_dirs}) |
| sorted_counts = sorted(class_counts.items()) |
| print(f"Found {len(class_dirs)} classes.", file=log_capture) |
| print("-" * 20, file=log_capture) |
| for class_name, count in sorted_counts: print(f"{class_name}: {count} items", file=log_capture) |
| print("-" * 20, file=log_capture) |
| if save_to_manifest: |
| with open(manifest_path, 'w') as f: |
| f.write(f"# Class Count Manifest\n\n**Total classes:** {len(class_dirs)}\n\n| Class Name | Item Count |\n|---|---|\n") |
| for class_name, count in sorted_counts: f.write(f"| {class_name} | {count} |\n") |
| print(f"Manifest saved to {manifest_path}", file=log_capture) |
| except OSError as e: |
| print(f"Error accessing directory '{target_dir}': {e}", file=log_capture) |
|
|
| |
| def util_plot_training_metrics(json_path): |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) |
| df = pd.DataFrame(data.get('log_history', [])) |
| if df.empty: raise ValueError("No 'log_history' found.") |
| train_df = df[df['loss'].notna()].copy() |
| eval_df = df[df['eval_loss'].notna()].copy() |
| figures = {} |
| |
| fig_loss, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Training vs. Evaluation Loss') |
| if 'loss' in train_df: ax.plot(train_df['step'], train_df['loss'], label='Training Loss', marker='o') |
| if 'eval_loss' in eval_df: ax.plot(eval_df['step'], eval_df['eval_loss'], label='Evaluation Loss', marker='x') |
| ax.set_xlabel('Step'); ax.set_ylabel('Loss') |
| ax.legend(); ax.grid(True); figures['Loss'] = fig_loss |
| |
| fig_acc, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Accuracy') |
| if 'eval_accuracy' in eval_df: ax.plot(eval_df['step'], eval_df['eval_accuracy'], label='Evaluation Accuracy', marker='o', color='g') |
| ax.set_xlabel('Step'); ax.set_ylabel('Accuracy') |
| ax.legend(); ax.grid(True); figures['Accuracy'] = fig_acc |
| |
| fig_lr, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Learning Rate Schedule') |
| if 'learning_rate' in train_df: ax.plot(train_df['step'], train_df['learning_rate'], label='Learning Rate', marker='o', color='r') |
| ax.set_xlabel('Step'); ax.set_ylabel('Learning Rate') |
| ax.legend(); ax.grid(True); figures['Learning Rate'] = fig_lr |
| |
| fig_gn, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Gradient Norm') |
| if 'grad_norm' in train_df: ax.plot(train_df['step'], train_df['grad_norm'], label='Grad Norm', marker='o', color='purple') |
| ax.set_xlabel('Step'); ax.set_ylabel('Gradient Norm') |
| ax.legend(); ax.grid(True); figures['Gradient Norm'] = fig_gn |
| |
| fig_f1, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation F1 Scores') |
| if 'eval_f1_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_macro'], label='F1 Macro', marker='o') |
| if 'eval_f1_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_micro'], label='F1 Micro', marker='x') |
| if 'eval_f1_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_weighted'], label='F1 Weighted', marker='s') |
| ax.set_xlabel('Step'); ax.set_ylabel('F1 Score') |
| ax.legend(); ax.grid(True); figures['F1 Scores'] = fig_f1 |
| |
| fig_prec, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Precision Scores') |
| if 'eval_precision_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_macro'], label='Precision Macro', marker='o') |
| if 'eval_precision_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_micro'], label='Precision Micro', marker='x') |
| if 'eval_precision_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_weighted'], label='Precision Weighted', marker='s') |
| ax.set_xlabel('Step'); ax.set_ylabel('Precision') |
| ax.legend(); ax.grid(True); figures['Precision'] = fig_prec |
| |
| fig_recall, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Recall Scores') |
| if 'eval_recall_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_macro'], label='Recall Macro', marker='o') |
| if 'eval_recall_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_micro'], label='Recall Micro', marker='x') |
| if 'eval_recall_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_weighted'], label='Recall Weighted', marker='s') |
| ax.set_xlabel('Step'); ax.set_ylabel('Recall') |
| ax.legend(); ax.grid(True); figures['Recall'] = fig_recall |
| |
| fig_epoch, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Epoch Progression') |
| if 'epoch' in df: |
| epoch_df = df[['step', 'epoch']].dropna().drop_duplicates('step').sort_values('step') |
| ax.plot(epoch_df['step'], epoch_df['epoch'], label='Epoch', marker='.') |
| ax.set_xlabel('Step'); ax.set_ylabel('Epoch') |
| ax.legend(); ax.grid(True); figures['Epoch'] = fig_epoch |
| |
| fig_runtime, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Runtime') |
| if 'eval_runtime' in eval_df: ax.plot(eval_df['step'], eval_df['eval_runtime'], label='Eval Runtime', marker='o') |
| ax.set_xlabel('Step'); ax.set_ylabel('Runtime (s)') |
| ax.legend(); ax.grid(True); figures['Eval Runtime'] = fig_runtime |
| |
| fig_sps, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Samples Per Second') |
| if 'eval_samples_per_second' in eval_df: ax.plot(eval_df['step'], eval_df['eval_samples_per_second'], label='Eval Samples/sec', marker='o') |
| ax.set_xlabel('Step'); ax.set_ylabel('Samples / Second') |
| ax.legend(); ax.grid(True); figures['Eval Samples/sec'] = fig_sps |
| |
| fig_steps_ps, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Steps Per Second') |
| if 'eval_steps_per_second' in eval_df: ax.plot(eval_df['step'], eval_df['eval_steps_per_second'], label='Eval Steps/sec', marker='o') |
| ax.set_xlabel('Step'); ax.set_ylabel('Steps / Second') |
| ax.legend(); ax.grid(True); figures['Eval Steps/sec'] = fig_steps_ps |
| return figures |
|
|