import os import shutil import re import json import random from collections import Counter import numpy as np import pandas as pd import matplotlib.pyplot as plt import tempfile import zipfile # ############################################################################# # CORE LOGIC FROM UTILITY SCRIPTS # ############################################################################# # --- From organise_dataset.py --- def util_get_class_name_from_filename(filename): match = re.match(r'(.+?)_\d{3,}', filename) return match.group(1) if match else None def util_create_class_mapping(train_txt_path, log_capture): class_mapping = {} try: with open(train_txt_path, 'r') as f: for line in f: parts = line.strip().split() if len(parts) == 2: filename, class_id_str = parts class_id = int(class_id_str) if class_id not in class_mapping: class_name = util_get_class_name_from_filename(filename) if class_name: class_mapping[class_id] = class_name except FileNotFoundError: print(f"Error: {train_txt_path} not found.", file=log_capture) return None return class_mapping def util_process_dataset(annotations_path, source_dir, dest_dir, class_mapping, log_capture): if not os.path.exists(annotations_path): print(f"Warning: Annotations file not found: {annotations_path}", file=log_capture) return if not os.path.exists(source_dir): print(f"Warning: Source directory not found: {source_dir}", file=log_capture) return with open(annotations_path, 'r') as f: for line in f: parts = line.strip().split() if len(parts) == 2: filename, class_id_str = parts class_id = int(class_id_str) if class_id in class_mapping: class_name = class_mapping[class_id] class_dir = os.path.join(dest_dir, class_name) os.makedirs(class_dir, exist_ok=True) source_path = os.path.join(source_dir, filename) dest_path = os.path.join(class_dir, filename) if os.path.exists(source_path): print(f"Moving {source_path} to {dest_path}", file=log_capture) shutil.move(source_path, dest_path) else: print(f"Warning: Source file not found: {source_path}", file=log_capture) else: print(f"Warning: Class ID {class_id} for file {filename} not in mapping.", file=log_capture) # --- From normalise_class_names.py --- def util_normalise_class_names(target_dir, log_capture): print(f"\nStarting normalisation in '{target_dir}'...", file=log_capture) try: # Make a static list of directories to process, as we're modifying the contents of the target_dir subdirectories = [d for d in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, d))] if not subdirectories: print("No subdirectories found to normalise.", file=log_capture) return for old_name in subdirectories: new_name = old_name.lower() if old_name == new_name: print(f"Skipping '{old_name}' as it is already normalised.", file=log_capture) continue old_path = os.path.join(target_dir, old_name) new_path = os.path.join(target_dir, new_name) # On case-sensitive systems, check if a different directory with the new name already exists. if os.path.exists(new_path) and not os.path.samefile(old_path, new_path): print(f"Warning: Cannot rename '{old_name}' to '{new_name}' because a different directory with that name already exists. Skipping.", file=log_capture) continue try: # Two-stage rename to handle case-insensitivity issues on some filesystems temp_name = old_name + "_temp_rename" temp_path = os.path.join(target_dir, temp_name) if os.path.exists(temp_path): print(f"Warning: Temporary path '{temp_path}' already exists. Skipping rename for '{old_name}'.", file=log_capture) continue os.rename(old_path, temp_path) os.rename(temp_path, new_path) print(f"Renamed '{old_name}' to '{new_name}'.", file=log_capture) except OSError as e: print(f"Error renaming '{old_name}': {e}", file=log_capture) print("\nNormalisation complete.", file=log_capture) except Exception as e: print(f"An unexpected error occurred: {e}", file=log_capture) # --- From normalise_image_names.py --- def util_lowercase_filenames(target_dir, log_capture): file_count, renamed_count = 0, 0 for dirpath, _, filenames in os.walk(target_dir): for old_name in filenames: file_count += 1 new_name = old_name.lower() if old_name == new_name: continue old_path, new_path = os.path.join(dirpath, old_name), os.path.join(dirpath, new_name) if os.path.exists(new_path) and not os.path.samefile(old_path, new_path): print(f"Warning: Cannot rename '{old_path}' to '{new_path}'. Skipping.", file=log_capture) continue try: temp_name = old_name + "_temp_rename" temp_path = os.path.join(dirpath, temp_name) if os.path.exists(temp_path): print(f"Warning: Temp path '{temp_path}' exists. Skipping '{old_name}'.", file=log_capture) continue os.rename(old_path, temp_path) os.rename(temp_path, new_path) print(f"Renamed '{os.path.basename(old_path)}' to '{os.path.basename(new_path)}'", file=log_capture) renamed_count += 1 except OSError as e: print(f"Error renaming '{old_path}': {e}", file=log_capture) print(f"\nProcessed {file_count} files, renamed {renamed_count}.", file=log_capture) def util_standardise_filenames(target_dir, to_lowercase, log_capture): total_renamed = 0 for dirpath, _, filenames in os.walk(target_dir): if not filenames: continue class_name = os.path.basename(dirpath).lower() if to_lowercase else os.path.basename(dirpath) print(f"\nProcessing directory: {class_name}", file=log_capture) rename_map = [] for i, old_name in enumerate(sorted(filenames)): _, extension = os.path.splitext(old_name) if to_lowercase: extension = extension.lower() new_name = f"{class_name}_{i+1:04d}{extension}" rename_map.append({'old_path': os.path.join(dirpath, old_name), 'new_path': os.path.join(dirpath, new_name)}) for item in rename_map: item['temp_path'] = item['old_path'] + '.tmp' os.rename(item['old_path'], item['temp_path']) for item in rename_map: os.rename(item['temp_path'], item['new_path']) print(f"Renamed '{os.path.basename(item['old_path'])}' to '{os.path.basename(item['new_path'])}'", file=log_capture) total_renamed += 1 print(f"\nStandardised {total_renamed} files.", file=log_capture) # --- From autotrain_dataset_splitter.py --- def util_split_image_dataset(source_dir, output_dir, min_images_per_split, log_capture): main_output_path = os.path.join(os.getcwd(), output_dir) os.makedirs(main_output_path, exist_ok=True) print(f"Created output directory: {main_output_path}", file=log_capture) try: with tempfile.TemporaryDirectory(prefix="autotrain_split_train_") as train_dir, \ tempfile.TemporaryDirectory(prefix="autotrain_split_val_") as validation_dir: class_dirs = [r for r, d, f in os.walk(source_dir) if not d and f] image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".gif"} required_total = min_images_per_split * 2 class_image_data = {cd: [f for f in os.listdir(cd) if os.path.splitext(f)[1].lower() in image_extensions] for cd in class_dirs} valid_class_count = sum(1 for images in class_image_data.values() if len(images) >= required_total) if valid_class_count < 2: print(f"Error: Dataset splitting requires at least 2 classes with >= {required_total} images each. Found {valid_class_count} valid classes.", file=log_capture) return manifest = {"included_classes": {}, "skipped_classes": {}} processed_class_names = set() for class_dir, images in class_image_data.items(): base_class_name = os.path.basename(class_dir) class_name, counter = base_class_name, 1 while class_name in processed_class_names: class_name = f"{base_class_name}_{counter}"; counter += 1 processed_class_names.add(class_name) if len(images) < required_total: manifest["skipped_classes"][class_name] = {"count": len(images), "reason": f"Not enough images ({len(images)}), required {required_total}."} continue random.shuffle(images) num_val_ratio = round(len(images) * 0.2) num_train_ratio = len(images) - num_val_ratio num_validation = num_val_ratio if num_val_ratio >= min_images_per_split and num_train_ratio >= min_images_per_split else min_images_per_split validation_images, train_images = images[:num_validation], images[num_validation:] manifest["included_classes"][class_name] = {"train": len(train_images), "validation": len(validation_images)} for split_dir, split_images in [(train_dir, train_images), (validation_dir, validation_images)]: split_class_dir = os.path.join(split_dir, class_name) os.makedirs(split_class_dir, exist_ok=True) for image in split_images: shutil.copy(os.path.join(class_dir, image), os.path.join(split_class_dir, image)) manifest_filename = f"{output_dir}-manifest.json" manifest_path = os.path.join(main_output_path, manifest_filename) with open(manifest_path, "w") as f: json.dump(manifest, f, indent=4) print(f"Manifest saved to: {manifest_path}", file=log_capture) train_zip_filename = f"{output_dir}-train" train_zip_path = os.path.join(main_output_path, train_zip_filename) shutil.make_archive(train_zip_path, 'zip', train_dir) print(f"Training data zip created: {train_zip_path}.zip", file=log_capture) validation_zip_filename = f"{output_dir}-validation" validation_zip_path = os.path.join(main_output_path, validation_zip_filename) shutil.make_archive(validation_zip_path, 'zip', validation_dir) print(f"Validation data zip created: {validation_zip_path}.zip", file=log_capture) print(f"\nDataset splitting and packaging complete. Files are in '{main_output_path}'.", file=log_capture) except Exception as e: print(f"An error occurred during dataset splitting: {e}", file=log_capture) # --- From directory_manifest.py --- def util_generate_manifest(directory, save_manifest, manifest_path, log_capture): ignored_dirs = {'.git', '__pycache__', '.vscode', '.idea', 'node_modules', 'venv', '.venv'} ignored_files = {os.path.basename(manifest_path)} if save_manifest else set() ignored_extensions = {'.pyc', '.zip', '.log', '.tmp', '.bak', '.swp'} manifest_content = [] try: for root, dirs, files in os.walk(directory, topdown=True): dirs[:] = sorted([d for d in dirs if d not in ignored_dirs]) files.sort() for filename in files: if filename in ignored_files or os.path.splitext(filename)[1].lower() in ignored_extensions: continue relative_path = os.path.relpath(os.path.join(root, filename), directory).replace(os.sep, '/') manifest_content.append(f"- {relative_path}\n") manifest_string = "".join(manifest_content) print("--- Manifest Content ---", file=log_capture) print(manifest_string, file=log_capture) print("------------------------", file=log_capture) if save_manifest: with open(manifest_path, "w", encoding="utf-8") as f: f.write(manifest_string) print(f"Manifest file created at: {manifest_path}", file=log_capture) else: print("Manifest generated but not saved.", file=log_capture) except Exception as e: print(f"An error occurred: {e}", file=log_capture) # --- From check_balance.py --- def util_get_class_from_line(line: str): return line.strip().lstrip('- ').split('/')[0] if '/' in line else None def util_analyse_balance(manifest_path): if not os.path.exists(manifest_path): raise FileNotFoundError(f"Error: Manifest file not found at '{manifest_path}'") with open(manifest_path, 'r', encoding='utf-8') as f: lines = f.readlines() is_class_count_manifest = any('|---|' in line for line in lines[:5]) or (lines and lines[0].strip() == "# Class Count Manifest") class_counts = Counter() if is_class_count_manifest: for line in lines: line = line.strip() if line.startswith('|'): parts = [p.strip() for p in line.split('|')] if len(parts) >= 4: class_name, count_str = parts[1], parts[2] if class_name.lower() not in ('class name', '---') and count_str.isdigit(): class_counts[class_name] = int(count_str) else: class_counts = Counter(c for line in lines if (c := util_get_class_from_line(line))) if not class_counts: return "No classes found in the manifest file.", None counts = list(class_counts.values()) imbalance_ratio = max(counts) / min(counts) summary = ( f"Dataset Balance Analysis\n" f"=========================\n" f"Total classes: {len(class_counts)}\n" f"Total images: {sum(counts)}\n" f"Images per class:\n" f" - Minimum: {min(counts)}\n" f" - Maximum: {max(counts)}\n" f" - Average: {np.mean(counts):.2f}\n" f" - Std Dev: {np.std(counts):.2f}\n" f"Imbalance Ratio (Max/Min): {imbalance_ratio:.2f}:1" ) sorted_classes = sorted(class_counts.keys()) sorted_counts = [class_counts[c] for c in sorted_classes] fig, ax = plt.subplots(figsize=(20, 10)) ax.bar(sorted_classes, sorted_counts) ax.set_xlabel('Class'); ax.set_ylabel('Number of Images'); ax.set_title('Image Distribution Across Classes') plt.xticks(rotation=90, fontsize='small'); plt.tight_layout() return summary, fig # --- From count_classes.py --- def util_count_classes(target_dir, save_to_manifest, manifest_path, log_capture): if not os.path.isdir(target_dir): print(f"Error: Directory not found at '{target_dir}'", file=log_capture) return try: class_dirs = [e for e in os.listdir(target_dir) if os.path.isdir(os.path.join(target_dir, e))] if not class_dirs: print(f"No class subdirectories found in '{target_dir}'.", file=log_capture) return class_counts = Counter({name: len([f for f in os.listdir(os.path.join(target_dir, name)) if os.path.isfile(os.path.join(target_dir, name, f))]) for name in class_dirs}) sorted_counts = sorted(class_counts.items()) print(f"Found {len(class_dirs)} classes.", file=log_capture) print("-" * 20, file=log_capture) for class_name, count in sorted_counts: print(f"{class_name}: {count} items", file=log_capture) print("-" * 20, file=log_capture) if save_to_manifest: with open(manifest_path, 'w') as f: f.write(f"# Class Count Manifest\n\n**Total classes:** {len(class_dirs)}\n\n| Class Name | Item Count |\n|---|---|\n") for class_name, count in sorted_counts: f.write(f"| {class_name} | {count} |\n") print(f"Manifest saved to {manifest_path}", file=log_capture) except OSError as e: print(f"Error accessing directory '{target_dir}': {e}", file=log_capture) # --- From plot_metrics.py --- def util_plot_training_metrics(json_path): with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) df = pd.DataFrame(data.get('log_history', [])) if df.empty: raise ValueError("No 'log_history' found.") train_df = df[df['loss'].notna()].copy() eval_df = df[df['eval_loss'].notna()].copy() figures = {} # Plot Loss fig_loss, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Training vs. Evaluation Loss') if 'loss' in train_df: ax.plot(train_df['step'], train_df['loss'], label='Training Loss', marker='o') if 'eval_loss' in eval_df: ax.plot(eval_df['step'], eval_df['eval_loss'], label='Evaluation Loss', marker='x') ax.set_xlabel('Step'); ax.set_ylabel('Loss') ax.legend(); ax.grid(True); figures['Loss'] = fig_loss # Plot Accuracy fig_acc, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Accuracy') if 'eval_accuracy' in eval_df: ax.plot(eval_df['step'], eval_df['eval_accuracy'], label='Evaluation Accuracy', marker='o', color='g') ax.set_xlabel('Step'); ax.set_ylabel('Accuracy') ax.legend(); ax.grid(True); figures['Accuracy'] = fig_acc # Plot Learning Rate fig_lr, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Learning Rate Schedule') if 'learning_rate' in train_df: ax.plot(train_df['step'], train_df['learning_rate'], label='Learning Rate', marker='o', color='r') ax.set_xlabel('Step'); ax.set_ylabel('Learning Rate') ax.legend(); ax.grid(True); figures['Learning Rate'] = fig_lr # Plot Grad Norm fig_gn, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Gradient Norm') if 'grad_norm' in train_df: ax.plot(train_df['step'], train_df['grad_norm'], label='Grad Norm', marker='o', color='purple') ax.set_xlabel('Step'); ax.set_ylabel('Gradient Norm') ax.legend(); ax.grid(True); figures['Gradient Norm'] = fig_gn # Plot F1 fig_f1, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation F1 Scores') if 'eval_f1_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_macro'], label='F1 Macro', marker='o') if 'eval_f1_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_micro'], label='F1 Micro', marker='x') if 'eval_f1_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_f1_weighted'], label='F1 Weighted', marker='s') ax.set_xlabel('Step'); ax.set_ylabel('F1 Score') ax.legend(); ax.grid(True); figures['F1 Scores'] = fig_f1 # Plot Precision fig_prec, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Precision Scores') if 'eval_precision_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_macro'], label='Precision Macro', marker='o') if 'eval_precision_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_micro'], label='Precision Micro', marker='x') if 'eval_precision_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_precision_weighted'], label='Precision Weighted', marker='s') ax.set_xlabel('Step'); ax.set_ylabel('Precision') ax.legend(); ax.grid(True); figures['Precision'] = fig_prec # Plot Recall fig_recall, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Recall Scores') if 'eval_recall_macro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_macro'], label='Recall Macro', marker='o') if 'eval_recall_micro' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_micro'], label='Recall Micro', marker='x') if 'eval_recall_weighted' in eval_df: ax.plot(eval_df['step'], eval_df['eval_recall_weighted'], label='Recall Weighted', marker='s') ax.set_xlabel('Step'); ax.set_ylabel('Recall') ax.legend(); ax.grid(True); figures['Recall'] = fig_recall # Plot Epoch fig_epoch, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Epoch Progression') if 'epoch' in df: epoch_df = df[['step', 'epoch']].dropna().drop_duplicates('step').sort_values('step') ax.plot(epoch_df['step'], epoch_df['epoch'], label='Epoch', marker='.') ax.set_xlabel('Step'); ax.set_ylabel('Epoch') ax.legend(); ax.grid(True); figures['Epoch'] = fig_epoch # Plot Eval Runtime fig_runtime, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Runtime') if 'eval_runtime' in eval_df: ax.plot(eval_df['step'], eval_df['eval_runtime'], label='Eval Runtime', marker='o') ax.set_xlabel('Step'); ax.set_ylabel('Runtime (s)') ax.legend(); ax.grid(True); figures['Eval Runtime'] = fig_runtime # Plot Eval Samples Per Second fig_sps, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Samples Per Second') if 'eval_samples_per_second' in eval_df: ax.plot(eval_df['step'], eval_df['eval_samples_per_second'], label='Eval Samples/sec', marker='o') ax.set_xlabel('Step'); ax.set_ylabel('Samples / Second') ax.legend(); ax.grid(True); figures['Eval Samples/sec'] = fig_sps # Plot Eval Steps Per Second fig_steps_ps, ax = plt.subplots(figsize=(10, 6)); ax.set_title('Evaluation Steps Per Second') if 'eval_steps_per_second' in eval_df: ax.plot(eval_df['step'], eval_df['eval_steps_per_second'], label='Eval Steps/sec', marker='o') ax.set_xlabel('Step'); ax.set_ylabel('Steps / Second') ax.legend(); ax.grid(True); figures['Eval Steps/sec'] = fig_steps_ps return figures