jw-search / docs /BATCH_PROCESSING.md
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified

Batch Processing Enhancements

This document covers the batch processing improvements for handling 3,000+ videos efficiently.

Key Improvements

1. Resume Checkpoint System

Processing state is saved after each video, allowing recovery from crashes.

checkpoints/
β”œβ”€β”€ checkpoint_20251225_183600.json    # Run-specific checkpoint
└── checkpoint_20251225_190000.json    # Another run

Checkpoint Contents:

{
    "run_id": "20251225_183600",
    "started_at": "2025-12-25T18:36:00",
    "processed_keys": ["video1", "video2", ...],
    "failed_keys": ["video_that_failed"],
    "skipped_keys": ["already_processed"],
    "last_key": "video2"
}

2. Structured Per-Run Logging

Each processing run creates separate log files:

logs/
β”œβ”€β”€ processing_20251225_183600.log     # Main processing log
β”œβ”€β”€ errors_20251225_183600.csv         # Error details (CSV format)
└── metrics_20251225_183600.json       # Performance metrics

Error CSV Format:

timestamp,natural_key,stage,error_type,error_message
2025-12-25 18:45:23,pub-xyz_123_VIDEO,faces,DeepFaceError,"No face detected"

This replaces the single growing log.txt with manageable per-run files.

3. Memory Monitoring

The system monitors available RAM and triggers cleanup when low:

from batch_processor import MemoryMonitor

monitor = MemoryMonitor(min_free_mb=2000, warning_threshold_mb=4000)

# Check before processing each video
if not monitor.ensure_memory_available():
    # Wait or abort

API Endpoint:

curl http://localhost:8000/api/batch-processing/memory

4. Batch Database Transactions

Instead of one INSERT per category/embedding:

Before (inefficient):

for category in categories:  # 30 categories Γ— 70 thumbnails = 2100 INSERTs
    conn.execute("INSERT INTO image_categories ...")
    conn.commit()

After (efficient):

batch_writer = BatchDatabaseWriter()
for category in categories:
    batch_writer.add_categories(natural_key, frame_number, category)

batch_writer.flush_all()  # Single transaction for 50+ rows

Bottleneck Analysis

Identified Bottlenecks

Component Issue Impact
Thumbnail Resizing FFmpeg called separately for each thumbnail 100+ subprocess calls per video
Database Writes Each category/embedding is a separate INSERT 3000+ connections per video
Logging Every image categorization logs full results Disk I/O and log bloat
Model Loading Models lazily loaded but should persist Low (models stay in memory)

Performance Impact

  • Before optimizations: ~10-18 days for 3,000 videos
  • After optimizations: ~5-7 days for 3,000 videos
  • Improvement: 40-60% reduction in processing time

API Endpoint for Analysis

curl http://localhost:8000/api/batch-processing/bottlenecks

New API Endpoints

Bottleneck Analysis

GET /api/batch-processing/bottlenecks

Returns analysis of processing pipeline inefficiencies.

List Checkpoints

GET /api/batch-processing/checkpoints

Lists all available checkpoints for resume capability.

List Processing Logs

GET /api/batch-processing/logs

Lists all per-run log files with sizes and types.

Memory Status

GET /api/batch-processing/memory

Returns current RAM and GPU memory status.


Usage Guide

Starting a New Processing Run

from batch_processor import (
    generate_run_id,
    StructuredLogger,
    CheckpointManager,
    MemoryMonitor,
    PerformanceTracker
)

run_id = generate_run_id()
logger = StructuredLogger(run_id)
checkpoint = CheckpointManager(run_id)
memory = MemoryMonitor()
perf = PerformanceTracker()

for natural_key in video_keys:
    if checkpoint.is_processed(natural_key):
        logger.log("INFO", "Already processed, skipping", natural_key)
        continue

    if not memory.ensure_memory_available(logger):
        logger.log("ERROR", "Insufficient memory, pausing")
        break

    perf.start_video(natural_key)

    try:
        # Process video...
        perf.start_stage("thumbnails")
        generate_thumbnails(natural_key)
        perf.end_stage("thumbnails")

        perf.start_stage("embeddings")
        generate_embeddings(natural_key)
        perf.end_stage("embeddings")

        checkpoint.mark_processed(natural_key)

    except Exception as e:
        logger.log_error(natural_key, "processing", type(e).__name__, str(e))
        checkpoint.mark_failed(natural_key)

    duration = perf.end_video()
    logger.log_progress(i, total, natural_key, "success", duration)

# Save final metrics
logger.save_metrics(perf.get_stats())

Resuming a Failed Run

# Find the checkpoint
checkpoint = CheckpointManager(run_id="20251225_183600")

# Get remaining videos to process
pending = checkpoint.get_pending_keys(all_video_keys)
print(f"Resuming: {len(pending)} videos remaining")

# Continue processing...

Recommendations Before Processing 3,000+ Videos

  1. Test with 50-100 videos first to validate the pipeline
  2. Monitor the metrics file to identify slow stages
  3. Check error CSV for patterns in failures
  4. Use memory endpoint to verify adequate RAM/GPU
  5. Set up checkpoints to enable recovery from crashes
  6. Clean old log.txt if it's grown too large

File Locations

File/Directory Purpose
backend/batch_processor.py Batch processing utilities
backend/logs/ Per-run log files
backend/checkpoints/ Resume checkpoints
backend/log.txt Legacy single log (deprecated)

Monitoring During Processing

# Watch the current run's log
tail -f logs/processing_YYYYMMDD_HHMMSS.log

# Check errors only
cat logs/errors_YYYYMMDD_HHMMSS.csv

# View performance metrics
cat logs/metrics_YYYYMMDD_HHMMSS.json | python -m json.tool

# Check memory status
curl http://localhost:8000/api/batch-processing/memory