import streamlit as st import os import sys import tempfile import zipfile import json import random import math import csv from pathlib import Path from datetime import datetime import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) script_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts') sys.path.insert(0, script_dir) try: from pipeline import ( generate_counterfactuals, generate_base_scene, save_scene, render_scene, create_patched_render_script, IMAGE_COUNTERFACTUALS, NEGATIVE_COUNTERFACTUALS ) try: import sys script_dir = os.path.dirname(os.path.abspath(__file__)) scripts_path = os.path.join(script_dir, 'scripts') if scripts_path not in sys.path: sys.path.insert(0, scripts_path) from generate_questions_mapping import ( load_scene, generate_question_for_scene as _generate_question_for_scene_file, answer_question_for_scene, generate_mapping_with_questions ) except ImportError: def load_scene(scene_file): with open(scene_file, 'r') as f: return json.load(f) def answer_question_for_scene(question, scene): objects = scene.get('objects', []) return len(objects) _generate_question_for_scene_file = None generate_mapping_with_questions = None PIPELINE_AVAILABLE = True except ImportError as e: print(f"Warning: Error importing pipeline functions: {e}") PIPELINE_AVAILABLE = False answer_question_for_scene = None load_scene = None _generate_question_for_scene_file = None st.set_page_config( page_title="Counterfactual Image Generator", page_icon="🎨", layout="wide", initial_sidebar_state="expanded" ) st.markdown(""" """, unsafe_allow_html=True) def create_zip_file(output_dir, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(output_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, output_dir) zipf.write(file_path, arcname) def generate_fallback_scene(num_objects, scene_idx): script_dir = os.path.dirname(os.path.abspath(__file__)) props_path = os.path.join(script_dir, 'data', 'properties.json') try: with open(props_path, 'r') as f: properties = json.load(f) except: properties = { 'shapes': {'cube': 'SmoothCube_v2', 'sphere': 'Sphere', 'cylinder': 'SmoothCylinder'}, 'colors': {'gray': [87, 87, 87], 'red': [173, 35, 35], 'blue': [42, 75, 215], 'green': [29, 105, 20], 'brown': [129, 74, 25], 'purple': [129, 38, 192], 'cyan': [41, 208, 208], 'yellow': [255, 238, 51]}, 'materials': {'rubber': 'Rubber', 'metal': 'MyMetal'}, 'sizes': {'large': 0.7, 'small': 0.35} } shapes = list(properties['shapes'].keys()) colors = list(properties['colors'].keys()) materials = list(properties['materials'].keys()) sizes = list(properties['sizes'].keys()) scene_num = scene_idx + 1 scene = { 'split': 'fallback', 'image_index': scene_num, 'image_filename': f'scene_{scene_num:04d}_original.png', 'objects': [], 'directions': { 'behind': (0.0, -1.0, 0.0), 'front': (0.0, 1.0, 0.0), 'left': (-1.0, 0.0, 0.0), 'right': (1.0, 0.0, 0.0), 'above': (0.0, 0.0, 1.0), 'below': (0.0, 0.0, -1.0) } } positions = [] min_dist = 0.25 for i in range(num_objects): max_attempts = 100 placed = False for attempt in range(max_attempts): x = random.uniform(-3, 3) y = random.uniform(-3, 3) z = random.uniform(0.35, 0.7) collision = False size = random.choice(sizes) r = properties['sizes'][size] for (px, py, pz, pr) in positions: dist = math.sqrt((x - px)**2 + (y - py)**2) if dist < (r + pr + min_dist): collision = True break if not collision: positions.append((x, y, z, r)) placed = True break if not placed: x = random.uniform(-3, 3) y = random.uniform(-3, 3) z = random.uniform(0.35, 0.7) size = random.choice(sizes) r = properties['sizes'][size] positions.append((x, y, z, r)) shape = random.choice(shapes) color = random.choice(colors) material = random.choice(materials) obj = { 'shape': shape, 'size': size, 'material': material, '3d_coords': [x, y, z], 'rotation': random.uniform(0, 360), 'pixel_coords': [0, 0, 0], 'color': color } scene['objects'].append(obj) return scene def generate_question_for_scene_dict(scene): if _generate_question_for_scene_file is None: objects = scene.get('objects', []) if len(objects) == 0: return "How many objects are in the scene?", {} colors = list(set(obj.get('color') for obj in objects if obj.get('color'))) shapes = list(set(obj.get('shape') for obj in objects if obj.get('shape'))) if colors: return f"How many {random.choice(colors)} objects are there?", {'color': random.choice(colors)} else: return "How many objects are in the scene?", {} with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as tmp_file: json.dump(scene, tmp_file) tmp_path = tmp_file.name try: question, params = _generate_question_for_scene_file(tmp_path) return question, params finally: try: os.unlink(tmp_path) except: pass def generate_counterfactual_scenes(num_scenes, num_objects, min_objects, max_objects, num_counterfactuals, cf_types, same_cf_type, min_change_score, max_cf_attempts, min_noise_level, output_dir, blender_path=None, use_gpu=0, samples=512, width=320, height=240, skip_render=False, generate_questions=False, semantic_only=False, negative_only=False): if not PIPELINE_AVAILABLE: return { 'success': False, 'error': 'Pipeline functions not available. Please ensure pipeline.py is accessible.' } scenes_dir = os.path.join(output_dir, 'scenes') images_dir = os.path.join(output_dir, 'images') os.makedirs(scenes_dir, exist_ok=True) os.makedirs(images_dir, exist_ok=True) script_dir = os.path.dirname(os.path.abspath(__file__)) cwd = os.getcwd() import shutil import time temp_output_dir = os.path.join(cwd, 'temp_output') if os.path.exists(temp_output_dir): for attempt in range(3): try: shutil.rmtree(temp_output_dir) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove temp_output after 3 attempts: {e}") render_patched_path = os.path.join(cwd, 'render_images_patched.py') if os.path.exists(render_patched_path): for attempt in range(3): try: time.sleep(0.2) if os.path.exists(render_patched_path): os.remove(render_patched_path) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove render_images_patched.py after 3 attempts: {e}") blender_available = False if blender_path is None: try: from pipeline import find_blender blender_path = find_blender() except: blender_path = 'blender' if blender_path and blender_path != 'blender': blender_available = os.path.exists(blender_path) else: try: import subprocess test_path = blender_path if blender_path and blender_path != 'blender' else 'blender' env = os.environ.copy() result = subprocess.run([test_path, '--version'], capture_output=True, timeout=5, env=env) blender_available = (result.returncode == 0) except: blender_available = False successful_scenes = 0 successful_renders = 0 error_messages = [] try: for scene_idx in range(num_scenes): if num_objects is not None: scene_num_objects = num_objects else: scene_num_objects = random.randint(min_objects, max_objects) base_scene = None if blender_available: scene_error = None for retry in range(3): try: import io import contextlib output_buffer = io.StringIO() with contextlib.redirect_stdout(output_buffer), contextlib.redirect_stderr(output_buffer): base_scene = generate_base_scene( scene_num_objects, blender_path, scene_idx ) blender_output = output_buffer.getvalue() if blender_output and retry == 2: st.text(f"Blender output for scene {scene_idx + 1} (last 1000 chars):") st.code(blender_output[-1000:] if len(blender_output) > 1000 else blender_output) if base_scene and len(base_scene.get('objects', [])) > 0: break elif base_scene is None: if retry == 2: scene_error = f"generate_base_scene returned None - Blender may have failed (check output above)" error_messages.append(f"Scene {scene_idx + 1}: {scene_error}") elif len(base_scene.get('objects', [])) == 0: if retry == 2: scene_error = f"Scene has 0 objects - Blender may have hit max_retries (check output above)" error_messages.append(f"Scene {scene_idx + 1}: {scene_error}") except FileNotFoundError as e: scene_error = f"Blender not found: {e}" error_messages.append(f"Scene {scene_idx + 1}: {scene_error}") blender_available = False break except Exception as e: import traceback scene_error = f"Error generating base scene: {str(e)}" print(f"Error generating base scene (retry {retry + 1}/3): {e}") print(f" Traceback: {traceback.format_exc()}") if retry == 2: full_error = f"Scene {scene_idx + 1}: {scene_error} (Blender path: {blender_path})" error_messages.append(full_error) blender_available = False continue else: print(f"Scene {scene_idx + 1} (Blender not available)...") base_scene = generate_fallback_scene(scene_num_objects, scene_idx) if not base_scene or len(base_scene.get('objects', [])) == 0: error_detail = f"Scene {scene_idx + 1}: Failed to generate" if blender_available: error_detail += f" (Blender was available at {blender_path} but returned empty scene)" else: error_detail += " (Blender not available, fallback scene also failed)" print(f"Failed to generate scene {scene_idx + 1}") print(f" Blender available: {blender_available}") print(f" Blender path: {blender_path}") print(f" Base scene: {base_scene is not None}") if base_scene: print(f" Objects in scene: {len(base_scene.get('objects', []))}") error_messages.append(error_detail) continue successful_scenes += 1 counterfactuals = generate_counterfactuals( base_scene, num_counterfactuals=num_counterfactuals, cf_types=cf_types, same_cf_type=same_cf_type, min_change_score=min_change_score, max_cf_attempts=max_cf_attempts, min_noise_level='light', semantic_only=semantic_only, negative_only=negative_only ) scene_num = scene_idx + 1 scene_prefix = f"scene_{scene_num:04d}" base_scene['cf_metadata'] = { 'variant': 'original', 'is_counterfactual': False, 'cf_index': None, 'cf_category': 'original', 'cf_type': None, 'cf_description': None, 'source_scene': scene_prefix, } original_scene_path = os.path.join(scenes_dir, f"{scene_prefix}_original.json") save_scene(base_scene, original_scene_path) for idx, cf in enumerate(counterfactuals): cf_name = f"cf{idx+1}" cf_scene = cf['scene'] cf_scene['cf_metadata'] = { 'variant': cf_name, 'is_counterfactual': True, 'cf_index': idx + 1, 'cf_category': cf.get('cf_category', 'unknown'), 'cf_type': cf.get('type', None), 'cf_description': cf.get('description', None), 'change_score': cf.get('change_score', None), 'change_attempts': cf.get('change_attempts', None), 'source_scene': scene_prefix, } cf_scene_path = os.path.join(scenes_dir, f"{scene_prefix}_{cf_name}.json") save_scene(cf_scene, cf_scene_path) render_success = 0 total_to_render = len(counterfactuals) + 1 if not skip_render: if blender_path and blender_available: original_image_path = os.path.join(images_dir, f"{scene_prefix}_original.png") if render_scene( blender_path, original_scene_path, original_image_path, use_gpu=use_gpu, samples=samples, width=width, height=height ): render_success += 1 for idx, cf in enumerate(counterfactuals): cf_name = f"cf{idx+1}" cf_scene_path = os.path.join(scenes_dir, f"{scene_prefix}_{cf_name}.json") cf_image_path = os.path.join(images_dir, f"{scene_prefix}_{cf_name}.png") if render_scene( blender_path, cf_scene_path, cf_image_path, use_gpu=use_gpu, samples=samples, width=width, height=height ): render_success += 1 if render_success == total_to_render: successful_renders += 1 else: print("Blender not available - skipping image rendering. Scene JSON files will still be generated.") csv_filename = 'image_mapping_with_questions.csv' if generate_questions else 'image_mapping.csv' csv_path = os.path.join(output_dir, csv_filename) try: if generate_mapping_with_questions is not None: generate_mapping_with_questions( run_dir=output_dir, csv_filename=csv_filename, generate_questions=generate_questions, with_links=False, strict_question_validation=True ) csv_created = os.path.exists(csv_path) else: csv_created = False except Exception: import traceback traceback.print_exc() csv_created = False scene_files = list(Path(scenes_dir).glob("*.json")) if os.path.exists(scenes_dir) else [] image_files = list(Path(images_dir).glob("*.png")) if os.path.exists(images_dir) else [] statistics = { 'scenes_generated': successful_scenes, 'scenes_rendered': successful_renders, 'total_scene_files': len(scene_files), 'total_image_files': len(image_files), 'num_counterfactuals': num_counterfactuals, 'cf_types_used': cf_types if cf_types else 'default', 'csv_created': csv_created, 'csv_path': csv_path if csv_created else None } script_dir = os.path.dirname(os.path.abspath(__file__)) cwd = os.getcwd() import shutil import time temp_output_dir = os.path.join(cwd, 'temp_output') if os.path.exists(temp_output_dir): for attempt in range(3): try: shutil.rmtree(temp_output_dir) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove temp_output after 3 attempts: {e}") render_patched_path = os.path.join(cwd, 'render_images_patched.py') if os.path.exists(render_patched_path): for attempt in range(3): try: time.sleep(0.2) if os.path.exists(render_patched_path): os.remove(render_patched_path) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove render_images_patched.py after 3 attempts: {e}") if successful_scenes == 0 and error_messages: error_summary = "Scenes failed. Common reasons:\n" error_summary += "- Blender is not installed or not in PATH\n" error_summary += "- Blender executable not found\n" error_summary += f"\nFirst error: {error_messages[0] if error_messages else 'Unknown error'}" return { 'success': False, 'error': error_summary, 'num_scenes': successful_scenes, 'output_dir': output_dir, 'error_messages': error_messages } return { 'success': True, 'num_scenes': successful_scenes, 'output_dir': output_dir, 'statistics': statistics, 'error_messages': error_messages if error_messages else None } except Exception as e: import traceback error_msg = f"Error: {str(e)}\n{traceback.format_exc()}" print(error_msg) return { 'success': False, 'error': error_msg, 'num_scenes': successful_scenes, 'output_dir': output_dir, 'error_messages': error_messages if 'error_messages' in locals() else [] } def main(): st.markdown('

Counterfactual Image Generator

', unsafe_allow_html=True) if 'output_dir' not in st.session_state: st.session_state.output_dir = None if 'generation_complete' not in st.session_state: st.session_state.generation_complete = False with st.sidebar: st.header("Configuration") st.subheader("Scene Settings") num_scenes = st.number_input( "Number of Scenes", min_value=1, max_value=10000, value=5, help="Number of scene sets to generate" ) use_fixed_objects = st.checkbox("Use Fixed Number of Objects", value=True) if use_fixed_objects: num_objects = st.number_input( "Number of Objects per Scene", min_value=1, max_value=15, value=5, help="Fixed number of objects per scene" ) min_objects = None max_objects = None else: num_objects = None min_objects = st.number_input( "Min Objects per Scene", min_value=1, max_value=15, value=3, help="Minimum objects per scene" ) max_objects = st.number_input( "Max Objects per Scene", min_value=1, max_value=15, value=7, help="Maximum objects per scene" ) if min_objects > max_objects: st.error("Min objects must be <= Max objects") return st.subheader("Counterfactual Settings") num_counterfactuals = st.number_input( "Number of Counterfactuals", min_value=1, max_value=10, value=2, help="Number of counterfactual variants per scene" ) st.markdown("**Counterfactual Types**") st.caption("Leave all unchecked to use default behavior (1 Image CF + 1 Negative CF)") semantic_only = st.checkbox( "Semantic only", value=False, help="Generate only Semantic/Image counterfactuals (Change Color, Add Object, etc.); no Negative CFs" ) negative_only = st.checkbox( "Negative only", value=False, help="Generate only Negative counterfactuals (Change Lighting, Add Noise, Occlusion Change, etc.); no Semantic CFs" ) same_cf_type = st.checkbox( "Same CF type for all", value=False, help="Use the same counterfactual type for every variant (first selected type, or one random if none selected)" ) with st.expander("Image CFs (change answers)", expanded=True): use_change_color = st.checkbox("Change Color", value=False) use_change_shape = st.checkbox("Change Shape", value=False) use_change_size = st.checkbox("Change Size", value=False) use_change_material = st.checkbox("Change Material", value=False) use_change_position = st.checkbox("Change Position", value=False) use_add_object = st.checkbox("Add Object", value=False) use_remove_object = st.checkbox("Remove Object", value=False) use_replace_object = st.checkbox("Replace Object", value=False) use_swap_attribute = st.checkbox("Swap Attribute", value=False) use_relational_flip = st.checkbox("Relational Flip", value=False) with st.expander("Negative CFs (don't change answers)", expanded=False): use_change_background = st.checkbox("Change Background", value=False) use_change_lighting = st.checkbox("Change Lighting", value=False) use_add_noise = st.checkbox("Add Noise", value=False) use_occlusion_change = st.checkbox("Occlusion Change", value=False) use_apply_fisheye = st.checkbox("Apply Fisheye", value=False) use_apply_blur = st.checkbox("Apply Blur", value=False) use_apply_vignette = st.checkbox("Apply Vignette", value=False) use_apply_chromatic_aberration = st.checkbox("Apply Chromatic Aberration", value=False) with st.expander("Advanced Settings", expanded=False): min_change_score = st.slider( "Minimum Change Score", min_value=0.5, max_value=5.0, value=1.0, step=0.1, help="Minimum heuristic change score for counterfactuals" ) max_cf_attempts = st.number_input( "Max CF Attempts", min_value=1, max_value=50, value=10, help="Maximum retries per counterfactual" ) min_noise_level = st.selectbox( "Min Noise Level (for add_noise CF)", options=['light', 'medium', 'heavy'], index=0, help="Minimum noise level when using add_noise counterfactual" ) st.markdown("---") st.markdown("**Rendering Settings**") use_gpu = st.checkbox("Use GPU Rendering", value=False) use_gpu_int = 1 if use_gpu else 0 samples = st.number_input( "Render Samples", min_value=64, max_value=2048, value=512, step=64, help="Cycles sampling rate (higher = better quality, slower)" ) image_width = st.number_input( "Image Width", min_value=160, max_value=1920, value=320, step=80 ) image_height = st.number_input( "Image Height", min_value=120, max_value=1080, value=240, step=60 ) st.markdown("**CSV Options**") generate_questions = st.checkbox( "Generate Questions in CSV", value=False, help="Include question and answer columns in the CSV file" ) cf_types = [] if use_change_color: cf_types.append('change_color') if use_change_shape: cf_types.append('change_shape') if use_change_size: cf_types.append('change_size') if use_change_material: cf_types.append('change_material') if use_change_position: cf_types.append('change_position') if use_add_object: cf_types.append('add_object') if use_remove_object: cf_types.append('remove_object') if use_replace_object: cf_types.append('replace_object') if use_swap_attribute: cf_types.append('swap_attribute') if use_relational_flip: cf_types.append('relational_flip') if use_change_background: cf_types.append('change_background') if use_change_lighting: cf_types.append('change_lighting') if use_add_noise: cf_types.append('add_noise') if use_occlusion_change: cf_types.append('occlusion_change') if use_apply_fisheye: cf_types.append('apply_fisheye') if use_apply_blur: cf_types.append('apply_blur') if use_apply_vignette: cf_types.append('apply_vignette') if use_apply_chromatic_aberration: cf_types.append('apply_chromatic_aberration') if not cf_types: cf_types = None col1, col2 = st.columns([2, 1]) with col1: st.header("Generate Counterfactual Images") if st.button("Generate Counterfactual", use_container_width=True, key="generate_button"): st.session_state.generation_complete = False st.session_state.generating = True if num_scenes < 1: st.error("Please specify at least 1 scene to generate.") return if use_fixed_objects and num_objects < 1: st.error("Please specify at least 1 object per scene.") return if not use_fixed_objects and (min_objects < 1 or max_objects < 1 or min_objects > max_objects): st.error("Invalid min/max objects configuration.") return if os.path.exists('/tmp'): base_dir = '/tmp' else: base_dir = tempfile.gettempdir() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = os.path.join(base_dir, f"counterfactual_output_{timestamp}") os.makedirs(output_dir, exist_ok=True) st.session_state.output_dir = output_dir import shutil import time script_dir = os.path.dirname(os.path.abspath(__file__)) cwd = os.getcwd() temp_output_dir = os.path.join(cwd, 'temp_output') if os.path.exists(temp_output_dir): for attempt in range(3): try: shutil.rmtree(temp_output_dir) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove temp_output after 3 attempts: {e}") render_patched_path = os.path.join(cwd, 'render_images_patched.py') if os.path.exists(render_patched_path): for attempt in range(3): try: time.sleep(0.2) if os.path.exists(render_patched_path): os.remove(render_patched_path) break except Exception as e: if attempt < 2: time.sleep(0.3) else: print(f"Warning: Could not remove render_images_patched.py after 3 attempts: {e}") try: from pipeline import create_patched_render_script create_patched_render_script() except Exception as e: st.warning(f"Could not create patched render script: {e}") params = { 'num_scenes': num_scenes, 'num_objects': num_objects, 'num_counterfactuals': num_counterfactuals, 'cf_types': cf_types if cf_types else None, 'same_cf_type': same_cf_type, 'min_change_score': min_change_score, 'max_cf_attempts': max_cf_attempts, 'width': image_width, 'height': image_height, 'output_dir': output_dir } progress_bar = st.progress(0) status_text = st.empty() try: if not PIPELINE_AVAILABLE: st.error("Pipeline functions are not available. Please check your installation.") return status_text.text("Initializing generator...") progress_bar.progress(10) if use_fixed_objects: status_text.text(f"Generating {num_scenes} scenes with {num_objects} objects each...") else: status_text.text(f"Generating {num_scenes} scenes with {min_objects}-{max_objects} objects each...") progress_bar.progress(30) result = generate_counterfactual_scenes( num_scenes=num_scenes, num_objects=num_objects, min_objects=min_objects, max_objects=max_objects, num_counterfactuals=num_counterfactuals, cf_types=cf_types, same_cf_type=same_cf_type, min_change_score=min_change_score, max_cf_attempts=max_cf_attempts, min_noise_level=min_noise_level, output_dir=output_dir, use_gpu=use_gpu_int, samples=samples, width=image_width, height=image_height, skip_render=False, generate_questions=generate_questions, semantic_only=semantic_only, negative_only=negative_only ) progress_bar.progress(80) status_text.text("Preparing output...") if result and result.get('success', False): num_scenes_generated = result.get('num_scenes', 0) if num_scenes_generated == 0: st.warning("No scenes were created. Blender is required and is not available in this environment.") st.info("**To use this application:**\n" "1. Run it locally with Blender installed\n" "2. Use the command-line `pipeline.py` script\n" "3. Install Blender and ensure it's in your system PATH") st.session_state.generation_complete = False else: st.session_state.generation_complete = True progress_bar.progress(100) status_text.text("Done.") st.success(f"Successfully generated {num_scenes_generated} scene sets!") st.info(f"Output directory: {output_dir}") if 'statistics' in result and result['statistics'].get('csv_created'): csv_path = result['statistics'].get('csv_path') if csv_path: st.success(f"CSV file created: `{os.path.basename(csv_path)}`") if 'statistics' in result: stats = result['statistics'] st.json(stats) else: error_msg = result.get('error', 'Unknown error occurred') if result else 'Failed' st.error(f"Generation failed: {error_msg}") if 'blender' in error_msg.lower() or 'Blender' in error_msg or result.get('num_scenes', 0) == 0: st.warning("**Important:** This application requires Blender to generate scenes. Blender is not available on Hugging Face Spaces.") st.info("**To use this application:**\n" "1. Run it locally with Blender installed\n" "2. Use the command-line `pipeline.py` script\n" "3. Install Blender and ensure it's in your system PATH") st.session_state.generation_complete = False st.session_state.generating = False except Exception as e: st.error(f"Error during generation: {str(e)}") st.exception(e) st.session_state.generation_complete = False st.session_state.generating = False progress_bar.progress(0) status_text.text("Failed") with col2: st.header("Output") if st.session_state.generation_complete and st.session_state.output_dir: output_dir = st.session_state.output_dir if os.path.exists(output_dir): images_dir = os.path.join(output_dir, 'images') scenes_dir = os.path.join(output_dir, 'scenes') scene_files = list(Path(scenes_dir).glob("*.json")) if os.path.exists(scenes_dir) else [] image_files = list(Path(images_dir).glob("*.png")) if os.path.exists(images_dir) else [] csv_files = list(Path(output_dir).rglob("*.csv")) st.success("Complete!") st.metric("Scene Files", len(scene_files)) st.metric("CSV Files", len(csv_files)) st.metric("Image Files", len(image_files)) if image_files: st.markdown("---") st.subheader("Generated Images") def get_counterfactual_type_from_scene(scene_file): try: with open(scene_file, 'r') as f: scene_data = json.load(f) cf_metadata = scene_data.get('cf_metadata', {}) cf_type = cf_metadata.get('cf_type', '') if cf_type: return cf_type.replace('_', ' ').title() except Exception as e: pass return "Counterfactual" scene_sets = {} for img_file in image_files: filename = img_file.name if filename.startswith('scene_'): parts = filename.replace('.png', '').split('_') if len(parts) >= 3: scene_num = parts[1] scene_type = parts[2] if scene_num not in scene_sets: scene_sets[scene_num] = {} scene_sets[scene_num][scene_type] = { 'image_path': str(img_file), 'filename': filename } sorted_scenes = sorted(scene_sets.keys())[:3] for scene_idx, scene_num in enumerate(sorted_scenes): scene_data = scene_sets[scene_num] if 'original' not in scene_data: continue st.markdown(f"### Scene {scene_num}") cols = st.columns(3) with cols[0]: original = scene_data['original'] st.image(original['image_path'], use_container_width=True, caption="Original") cf_count = 0 for cf_key in ['cf1', 'cf2']: if cf_key in scene_data and cf_count < 2: cf_data = scene_data[cf_key] cf_scene_file = os.path.join(scenes_dir, cf_data['filename'].replace('.png', '.json')) cf_type = get_counterfactual_type_from_scene(cf_scene_file) if os.path.exists(cf_scene_file) else f"Counterfactual {cf_count + 1}" with cols[cf_count + 1]: st.image(cf_data['image_path'], use_container_width=True, caption=cf_type) cf_count += 1 if scene_idx < len(sorted_scenes) - 1: st.markdown("---") st.markdown("---") st.subheader("Download Output") zip_filename = f"counterfactual_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" zip_path = os.path.join(tempfile.gettempdir(), zip_filename) try: create_zip_file(output_dir, zip_path) file_size = os.path.getsize(zip_path) / (1024 * 1024) with open(zip_path, 'rb') as f: st.download_button( label=f"Download as ZIP ({file_size:.2f} MB)", data=f.read(), file_name=zip_filename, mime="application/zip", use_container_width=True ) with st.expander("Output Structure"): st.text(f"Output directory: {output_dir}") if scene_files: st.text(f"\nScene files: {len(scene_files)}") st.text("Sample files:") for f in scene_files[:5]: st.text(f" - {f.name}") if csv_files: st.text(f"\nCSV files: {len(csv_files)}") for f in csv_files: st.text(f" - {f.name}") if image_files: st.text(f"\nImage files: {len(image_files)}") st.text("Sample files:") for f in image_files[:5]: st.text(f" - {f.name}") except Exception as e: st.error(f"Error creating zip file: {str(e)}") else: st.warning("Output directory not found.") else: st.info("Configure parameters and click 'Generate Counterfactual' to start.") st.markdown("---") st.markdown( "
" "Counterfactual Image Tool | Built with Streamlit" "
", unsafe_allow_html=True ) if __name__ == "__main__": main()