Spaces:
Sleeping
Sleeping
| """ | |
| ============================================================================= | |
| Image Rescaling -- Testing & Reporting Pipeline | |
| ------------------------------------------------ | |
| A modular pipeline that: | |
| 1. Loads a dataset of images (local folder or HuggingFace) | |
| 2. Runs a LIVE Matplotlib animation for the first image | |
| 3. Batch-processes the remaining images (headless, tqdm progress) | |
| 4. Generates a formatted statistical report via pandas | |
| Architecture | |
| ~~~~~~~~~~~~ | |
| load_dataset(source) -> list of (name, BGR ndarray) | |
| scale_image(image, factor, m) -> (result ndarray, elapsed_sec) | |
| live_demo_upscale(image, f) -> animated side-by-side dashboard | |
| generate_report(results) -> pandas table + summary stats | |
| The core interpolation algorithms are imported from | |
| src/bilinear_interpolation.py to avoid code duplication. | |
| Stack: Python 3, NumPy, OpenCV, Matplotlib, pandas, tqdm | |
| ============================================================================= | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import glob | |
| import numpy as np | |
| import cv2 | |
| import pandas as pd | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| from tqdm import tqdm | |
| # ========================================================================= | |
| # TkAgg backend for interactive animation on Windows | |
| # ========================================================================= | |
| matplotlib.use("TkAgg") | |
| # ========================================================================= | |
| # Import core algorithms from the src package | |
| # ========================================================================= | |
| # We add the project root to sys.path so that `src.*` resolves correctly | |
| # regardless of the working directory used to invoke the script. | |
| PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) | |
| if PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, PROJECT_ROOT) | |
| from src.bilinear_interpolation import bilinear_interpolation, nearest_neighbor | |
| # ######################################################################### | |
| # 1. DATA INGESTION MODULE | |
| # ######################################################################### | |
| def load_dataset(source="data/Set14/Set14"): | |
| """ | |
| Load images from *source* and return a list of (name, BGR ndarray). | |
| Supported sources | |
| ----------------- | |
| * **Local directory path** (default) -- scans for .png/.jpg/.bmp files. | |
| Examples: | |
| load_dataset("data/Set14/Set14") | |
| load_dataset("data/Set5") | |
| * **HuggingFace dataset string** (e.g. "eugenesiow/Set14") -- | |
| requires the `datasets` and `Pillow` packages. Falls back to | |
| local folder if the import fails. | |
| Returns | |
| ------- | |
| list[tuple[str, np.ndarray]] | |
| Each element is (image_name, BGR_image_array). | |
| """ | |
| # ---- Resolve relative paths against the project root ---- | |
| abs_source = os.path.join(PROJECT_ROOT, source) if not os.path.isabs(source) else source | |
| # ------------------------------------------------------------------ | |
| # Path A: Local directory (fast, no network) | |
| # ------------------------------------------------------------------ | |
| if os.path.isdir(abs_source): | |
| print(f"[LOAD] Scanning local folder: {abs_source}") | |
| exts = ("*.png", "*.jpg", "*.jpeg", "*.bmp", "*.tif", "*.tiff") | |
| paths = [] | |
| for ext in exts: | |
| paths.extend(glob.glob(os.path.join(abs_source, ext))) | |
| paths.sort() | |
| if not paths: | |
| raise FileNotFoundError( | |
| f"No image files found in '{abs_source}'. " | |
| "Check the path or add images." | |
| ) | |
| images = [] | |
| for p in paths: | |
| img = cv2.imread(p) | |
| if img is not None: | |
| images.append((os.path.basename(p), img)) | |
| print(f"[LOAD] Loaded {len(images)} images from disk.\n") | |
| return images | |
| # ------------------------------------------------------------------ | |
| # Path B: HuggingFace `datasets` library (needs network + pip pkg) | |
| # ------------------------------------------------------------------ | |
| try: | |
| from datasets import load_dataset as hf_load | |
| print(f"[LOAD] Fetching HuggingFace dataset: {source} ...") | |
| ds = hf_load(source, split="validation") | |
| images = [] | |
| for i, sample in enumerate(ds): | |
| # HuggingFace image datasets usually expose a PIL Image | |
| # under the key "hr" (high-res) or "image". | |
| pil_img = sample.get("hr") or sample.get("image") | |
| if pil_img is None: | |
| continue | |
| arr = np.array(pil_img) | |
| # PIL gives RGB; OpenCV needs BGR | |
| if arr.ndim == 3 and arr.shape[2] == 3: | |
| arr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) | |
| name = sample.get("filename", f"image_{i:03d}.png") | |
| images.append((name, arr)) | |
| print(f"[LOAD] Loaded {len(images)} images from HuggingFace.\n") | |
| return images | |
| except ImportError: | |
| raise ImportError( | |
| f"'{source}' is not a local directory and the `datasets` " | |
| "package is not installed.\n" | |
| " pip install datasets OR provide a local folder path." | |
| ) | |
| # ######################################################################### | |
| # 2. CORE SCALING WRAPPER | |
| # ######################################################################### | |
| def scale_image(image, factor, method="bilinear"): | |
| """ | |
| Upscale *image* by *factor* using the specified *method*. | |
| Parameters | |
| ---------- | |
| image : np.ndarray (BGR, uint8) | |
| factor : float (e.g. 4.0 for 4x upscale) | |
| method : str "bilinear" | "nearest" | |
| Returns | |
| ------- | |
| (result, elapsed) | |
| result : np.ndarray (BGR, uint8) | |
| elapsed : float (seconds) | |
| The wrapper delegates to the manual, loop-based implementations | |
| imported from src/bilinear_interpolation.py -- NOT cv2.resize. | |
| """ | |
| h, w = image.shape[:2] | |
| new_h, new_w = int(h * factor), int(w * factor) | |
| t0 = time.perf_counter() | |
| if method == "nearest": | |
| result = nearest_neighbor(image, new_h, new_w) | |
| elif method == "bilinear": | |
| result = bilinear_interpolation(image, new_h, new_w) | |
| else: | |
| raise ValueError(f"Unknown method '{method}'. Use 'bilinear' or 'nearest'.") | |
| elapsed = time.perf_counter() - t0 | |
| return result, elapsed | |
| # ######################################################################### | |
| # 3. LIVE DEMO MODULE (animated for image #1) | |
| # ######################################################################### | |
| def live_demo_upscale(image, factor, name="demo", pause_sec=0.05): | |
| """ | |
| Run a live, row-by-row animated Matplotlib dashboard for a single | |
| image upscale, showing three side-by-side panels: | |
| [Original] | [Nearest-Neighbour] | [Bilinear Interpolation] | |
| Animation mechanics | |
| ------------------- | |
| * plt.ion() enables non-blocking interactive mode. | |
| * Destination arrays start as blank (black) grids. | |
| * After computing every pixel in row *u* for BOTH algorithms, | |
| we push the updated arrays to the canvas via im.set_data() | |
| and call plt.pause() to let the human eye track progression. | |
| * For large images we only refresh every N rows to keep it | |
| watchable (~60 visual updates max). | |
| Parameters | |
| ---------- | |
| image : np.ndarray (BGR uint8) -- source image. | |
| factor : float -- upscale factor (e.g. 4.0). | |
| name : str -- display name for the title. | |
| pause_sec : float -- delay between row refreshes. | |
| """ | |
| src = image | |
| src_h, src_w = src.shape[:2] | |
| dst_h, dst_w = int(src_h * factor), int(src_w * factor) | |
| S_x = dst_h / src_h | |
| S_y = dst_w / src_w | |
| print("=" * 65) | |
| print(f" LIVE DEMO: {name}") | |
| print(f" {src_w}x{src_h} -> {dst_w}x{dst_h} ({factor}x upscale)") | |
| print("=" * 65) | |
| # ---- Blank destination canvases ---- | |
| nn_dst = np.zeros((dst_h, dst_w, 3), dtype=np.uint8) | |
| bil_dst = np.zeros((dst_h, dst_w, 3), dtype=np.float64) | |
| def to_rgb(img): | |
| return cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_BGR2RGB) | |
| # ================================================================ | |
| # Set up the live figure | |
| # ================================================================ | |
| plt.ion() | |
| fig, (ax_src, ax_nn, ax_bil) = plt.subplots(1, 3, figsize=(16, 5)) | |
| fig.suptitle(f"Live Demo: {name} ({factor}x upscale)", | |
| fontsize=14, fontweight="bold") | |
| fig.patch.set_facecolor("#1a1a2e") | |
| # -- Left: original (static) -- | |
| ax_src.imshow(to_rgb(src), interpolation="none") | |
| ax_src.set_title(f"Original\n({src_w}x{src_h})", fontsize=11, color="white") | |
| ax_src.axis("off") | |
| # -- Middle: nearest-neighbour (live) -- | |
| im_nn = ax_nn.imshow(to_rgb(nn_dst), interpolation="none", | |
| vmin=0, vmax=255) | |
| ax_nn.set_title(f"Nearest-Neighbour\n(row 0/{dst_h})", | |
| fontsize=11, color="white") | |
| ax_nn.axis("off") | |
| # -- Right: bilinear (live) -- | |
| im_bil = ax_bil.imshow(to_rgb(bil_dst), interpolation="none", | |
| vmin=0, vmax=255) | |
| ax_bil.set_title(f"Bilinear Interpolation\n(row 0/{dst_h})", | |
| fontsize=11, color="white") | |
| ax_bil.axis("off") | |
| for ax in (ax_src, ax_nn, ax_bil): | |
| ax.set_facecolor("#0f0f23") | |
| plt.tight_layout() | |
| fig.canvas.draw() | |
| fig.canvas.flush_events() | |
| plt.pause(0.3) | |
| # ================================================================ | |
| # Adaptive refresh rate | |
| # ================================================================ | |
| # For large images, refreshing every single row is too slow. | |
| # We cap at ~60 visual refreshes so the animation stays watchable. | |
| MAX_REFRESHES = 60 | |
| refresh_every = max(1, dst_h // MAX_REFRESHES) | |
| # ================================================================ | |
| # Row-by-row computation + animation loop | |
| # ================================================================ | |
| print(f"\n[ANIM] Painting {dst_h} rows live (refresh every {refresh_every})...") | |
| for u in tqdm(range(dst_h), desc="LiveDemo", unit="row", | |
| bar_format="{l_bar}{bar:40}{r_bar}"): | |
| # ---- Inverse mapping: row direction ---- | |
| x_i = u / S_x | |
| x1 = int(np.floor(x_i)) | |
| x2 = min(x1 + 1, src_h - 1) | |
| x1 = min(x1, src_h - 1) | |
| dx = x_i - int(np.floor(x_i)) | |
| for v in range(dst_w): | |
| # ---- Inverse mapping: col direction ---- | |
| y_i = v / S_y | |
| y1 = int(np.floor(y_i)) | |
| y2 = min(y1 + 1, src_w - 1) | |
| y1 = min(y1, src_w - 1) | |
| dy = y_i - int(np.floor(y_i)) | |
| # -- Nearest-neighbour -- | |
| x_near = min(int(round(x_i)), src_h - 1) | |
| y_near = min(int(round(y_i)), src_w - 1) | |
| nn_dst[u, v] = src[x_near, y_near] | |
| # -- Bilinear (area-weighted Lagrange) -- | |
| f00 = src[x1, y1].astype(np.float64) | |
| f10 = src[x2, y1].astype(np.float64) | |
| f01 = src[x1, y2].astype(np.float64) | |
| f11 = src[x2, y2].astype(np.float64) | |
| bil_dst[u, v] = ( | |
| (1 - dx) * (1 - dy) * f00 + | |
| dx * (1 - dy) * f10 + | |
| (1 - dx) * dy * f01 + | |
| dx * dy * f11 | |
| ) | |
| # ---- Refresh the canvas every N rows ---- | |
| if (u + 1) % refresh_every == 0 or u == dst_h - 1: | |
| im_nn.set_data(to_rgb(nn_dst)) | |
| im_bil.set_data(to_rgb(np.clip(bil_dst, 0, 255).astype(np.uint8))) | |
| ax_nn.set_title( | |
| f"Nearest-Neighbour\n(row {u+1}/{dst_h})", | |
| fontsize=11, color="white") | |
| ax_bil.set_title( | |
| f"Bilinear Interpolation\n(row {u+1}/{dst_h})", | |
| fontsize=11, color="white") | |
| fig.canvas.draw_idle() | |
| fig.canvas.flush_events() | |
| plt.pause(pause_sec) | |
| # ---- Mark completion ---- | |
| fig.suptitle(f"Demo Complete: {name} ({factor}x)", | |
| fontsize=14, fontweight="bold", color="lime") | |
| fig.canvas.draw_idle() | |
| fig.canvas.flush_events() | |
| save_path = os.path.join(PROJECT_ROOT, "output", "live_demo.png") | |
| fig.savefig(save_path, dpi=150, bbox_inches="tight", | |
| facecolor=fig.get_facecolor()) | |
| print(f"[PLOT] Live demo saved -> {save_path}") | |
| # Brief pause then close -- pipeline continues headless | |
| plt.pause(1.5) | |
| plt.close(fig) | |
| plt.ioff() | |
| return nn_dst, np.clip(bil_dst, 0, 255).astype(np.uint8) | |
| # ######################################################################### | |
| # 4. BATCH PROCESSING MODULE (images 2..N) | |
| # ######################################################################### | |
| def batch_process(images, factor=4.0): | |
| """ | |
| Upscale every image in *images* with both methods, recording metrics. | |
| Parameters | |
| ---------- | |
| images : list[tuple[str, np.ndarray]] | |
| Each element is (name, BGR array). | |
| factor : float | |
| Upscale factor (default 4x). | |
| Returns | |
| ------- | |
| list[dict] | |
| One dict per image with keys: | |
| image_name, orig_h, orig_w, dst_h, dst_w, | |
| time_nn, time_bilinear | |
| """ | |
| results = [] | |
| print("\n" + "=" * 65) | |
| print(f" BATCH PROCESSING: {len(images)} images @ {factor}x upscale") | |
| print("=" * 65 + "\n") | |
| for name, img in tqdm(images, desc="Batch", unit="img", | |
| bar_format="{l_bar}{bar:40}{r_bar}"): | |
| h, w = img.shape[:2] | |
| dst_h, dst_w = int(h * factor), int(w * factor) | |
| # ---- Nearest-neighbour (timed) ---- | |
| _, t_nn = scale_image(img, factor, method="nearest") | |
| # ---- Bilinear (timed) ---- | |
| _, t_bil = scale_image(img, factor, method="bilinear") | |
| results.append({ | |
| "image_name": name, | |
| "orig_h": h, | |
| "orig_w": w, | |
| "dst_h": dst_h, | |
| "dst_w": dst_w, | |
| "time_nn": round(t_nn, 4), | |
| "time_bilinear": round(t_bil, 4), | |
| }) | |
| return results | |
| # ######################################################################### | |
| # 5. REPORTING MODULE | |
| # ######################################################################### | |
| def generate_report(results, save_csv=True): | |
| """ | |
| Compile batch results into a pandas DataFrame, print a formatted | |
| ASCII/Markdown table, and display summary statistics. | |
| Parameters | |
| ---------- | |
| results : list[dict] -- output of batch_process(). | |
| save_csv : bool -- if True, save to output/report.csv. | |
| """ | |
| df = pd.DataFrame(results) | |
| # ---- Derived columns ---- | |
| df["orig_resolution"] = df["orig_w"].astype(str) + "x" + df["orig_h"].astype(str) | |
| df["dst_resolution"] = df["dst_w"].astype(str) + "x" + df["dst_h"].astype(str) | |
| df["speedup"] = (df["time_bilinear"] / df["time_nn"]).round(2) | |
| # ---- Select display columns ---- | |
| display = df[[ | |
| "image_name", "orig_resolution", "dst_resolution", | |
| "time_nn", "time_bilinear", "speedup" | |
| ]].copy() | |
| display.columns = [ | |
| "Image", "Original", "Target", | |
| "NN Time (s)", "Bilinear Time (s)", "Speedup (Bil/NN)" | |
| ] | |
| # ================================================================== | |
| # Print the report | |
| # ================================================================== | |
| print("\n") | |
| print("=" * 80) | |
| print(" RESCALING BENCHMARK REPORT") | |
| print("=" * 80) | |
| # Markdown-style table | |
| print(display.to_markdown(index=False, floatfmt=".4f")) | |
| # ---- Summary statistics ---- | |
| print("\n" + "-" * 80) | |
| print(" SUMMARY STATISTICS") | |
| print("-" * 80) | |
| print(f" Total images processed : {len(df)}") | |
| print(f" Upscale factor : {df['dst_h'].iloc[0] / df['orig_h'].iloc[0]:.0f}x") | |
| print(f"") | |
| print(f" Nearest-Neighbour (avg) : {df['time_nn'].mean():.4f} s") | |
| print(f" Nearest-Neighbour (total): {df['time_nn'].sum():.4f} s") | |
| print(f"") | |
| print(f" Bilinear (avg) : {df['time_bilinear'].mean():.4f} s") | |
| print(f" Bilinear (total) : {df['time_bilinear'].sum():.4f} s") | |
| print(f"") | |
| print(f" Avg speedup ratio : {df['speedup'].mean():.2f}x (Bilinear is slower)") | |
| print(f" Max single-image bilinear: {df['time_bilinear'].max():.4f} s " | |
| f"({df.loc[df['time_bilinear'].idxmax(), 'image_name']})") | |
| print("=" * 80) | |
| # ---- Save CSV ---- | |
| if save_csv: | |
| csv_path = os.path.join(PROJECT_ROOT, "output", "report.csv") | |
| os.makedirs(os.path.dirname(csv_path), exist_ok=True) | |
| df.to_csv(csv_path, index=False) | |
| print(f"\n[CSV] Report saved -> {csv_path}") | |
| return df | |
| # ######################################################################### | |
| # 6. MAIN ENTRY POINT | |
| # ######################################################################### | |
| def main(): | |
| """ | |
| Pipeline execution flow | |
| ~~~~~~~~~~~~~~~~~~~~~~~ | |
| 1. Load dataset from local folder (default: data/Set14/Set14). | |
| Override via CLI: python pipeline.py data/Set5 | |
| 2. Run the live animated demo on image #1. | |
| 3. Batch-process images #2..N with timing. | |
| 4. Generate & print the statistical report. | |
| """ | |
| # ---- Determine dataset source ---- | |
| default_source = "data/Set5" | |
| source = sys.argv[1] if len(sys.argv) > 1 else default_source | |
| UPSCALE_FACTOR = 4.0 | |
| # ================================================================== | |
| # Step 1 -- Load dataset | |
| # ================================================================== | |
| images = load_dataset(source) | |
| if not images: | |
| print("[ERROR] No images loaded. Exiting.") | |
| return | |
| # ================================================================== | |
| # Step 2 -- Live demo on image #1 | |
| # ================================================================== | |
| demo_name, demo_img = images[0] | |
| print(f"\n[PIPELINE] Live demo image: {demo_name}") | |
| live_demo_upscale(demo_img, factor=UPSCALE_FACTOR, | |
| name=demo_name, pause_sec=0.05) | |
| # ================================================================== | |
| # Step 3 -- Batch process remaining images | |
| # ================================================================== | |
| remaining = images[1:] | |
| if remaining: | |
| results = batch_process(remaining, factor=UPSCALE_FACTOR) | |
| else: | |
| print("[INFO] Only one image in dataset -- skipping batch.") | |
| results = [] | |
| # Include image #1 timing at the front of the report | |
| # (run it once more headless for fair timing) | |
| print(f"\n[PIPELINE] Timing image #1 ({demo_name}) for report...") | |
| _, t_nn = scale_image(demo_img, UPSCALE_FACTOR, method="nearest") | |
| _, t_bil = scale_image(demo_img, UPSCALE_FACTOR, method="bilinear") | |
| h, w = demo_img.shape[:2] | |
| results.insert(0, { | |
| "image_name": demo_name, | |
| "orig_h": h, | |
| "orig_w": w, | |
| "dst_h": int(h * UPSCALE_FACTOR), | |
| "dst_w": int(w * UPSCALE_FACTOR), | |
| "time_nn": round(t_nn, 4), | |
| "time_bilinear": round(t_bil, 4), | |
| }) | |
| # ================================================================== | |
| # Step 4 -- Generate report | |
| # ================================================================== | |
| generate_report(results) | |
| print("\n[DONE] Pipeline finished successfully.") | |
| # ========================================================================= | |
| if __name__ == "__main__": | |
| main() | |