import os import argparse import csv import json import random import re from pathlib import Path def find_latest_run(base_output_dir): if not os.path.exists(base_output_dir): return None subdirs = [d for d in os.listdir(base_output_dir) if os.path.isdir(os.path.join(base_output_dir, d))] if not subdirs: return None timestamped = [d for d in subdirs if re.match(r'^\d{8}_\d{6}$', d)] if timestamped: latest = sorted(timestamped)[-1] return os.path.join(base_output_dir, latest) dirs_with_time = [(d, os.path.getmtime(os.path.join(base_output_dir, d))) for d in subdirs] latest = max(dirs_with_time, key=lambda x: x[1])[0] return os.path.join(base_output_dir, latest) def find_scene_file(scenes_dir, image_filename): base_name = os.path.splitext(image_filename)[0] scene_file = os.path.join(scenes_dir, base_name + '.json') if os.path.exists(scene_file): return scene_file return None def load_scene(scene_file): with open(scene_file, 'r') as f: return json.load(f) RELATION_KEYS_TO_PHRASES = {'left': 'left of', 'right': 'right of', 'front': 'in front of', 'behind': 'behind'} PHRASES_TO_RELATION_KEYS = {'left of': 'left', 'right of': 'right', 'in front of': 'front', 'behind': 'behind'} DEFAULT_RELATIONS = ['left of', 'right of', 'in front of', 'behind'] # Token sets for parsing questions _COLORS = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] _SHAPES = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] _MATERIALS = ['metal', 'rubber', 'metals', 'rubbers'] _SIZES = ['small', 'large'] def _find_objects_matching(objects, color=None, shape=None, material=None, size=None): """Return list of object indices that match all specified attributes (None means any).""" out = [] for i, obj in enumerate(objects): if color is not None and (obj.get('color') or '').lower() != color: continue if shape is not None: s = (obj.get('shape') or '').lower() if s != shape and s != shape.rstrip('s') and s + 's' != shape: continue if material is not None and (obj.get('material') or '').lower() != material: continue if size is not None and (obj.get('size') or '').lower() != size: continue out.append(i) return out def _first_value_in_question(question_lower, values, strip_s=True): for v in values: if v in question_lower: return v.rstrip('s') if strip_s and v.endswith('s') else v return None def _objects_in_relation_to_reference(scene, relation_phrase, ref_color=None, ref_shape=None, ref_material=None, ref_size=None): """Return set of object indices that stand in relation_phrase to the reference object (e.g. 'left of' the red cube).""" objects = scene.get('objects', []) rel_key = PHRASES_TO_RELATION_KEYS.get(relation_phrase) if not rel_key: return set() rels = scene.get('relationships') or {} rel_list = rels.get(rel_key) if not rel_list or len(rel_list) != len(objects): return set() ref_indices = _find_objects_matching( objects, color=ref_color, shape=ref_shape, material=ref_material, size=ref_size ) if not ref_indices: return set() ref_idx = ref_indices[0] return set(rel_list[ref_idx]) def get_scene_properties(scene): objects = scene.get('objects', []) if not objects: return { 'colors': ['red', 'blue', 'green'], 'shapes': ['cube', 'sphere', 'cylinder'], 'materials': ['metal', 'rubber'], 'sizes': ['small', 'large'], 'relations': DEFAULT_RELATIONS } colors = list(set(obj.get('color') for obj in objects if obj.get('color'))) shapes = list(set(obj.get('shape') for obj in objects if obj.get('shape'))) materials = list(set(obj.get('material') for obj in objects if obj.get('material'))) sizes = list(set(obj.get('size') for obj in objects if obj.get('size'))) relationships = scene.get('relationships') or {} relations = [RELATION_KEYS_TO_PHRASES[k] for k in relationships if k in RELATION_KEYS_TO_PHRASES] if not relations: relations = DEFAULT_RELATIONS all_colors = ['gray', 'red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow'] all_shapes = ['cube', 'sphere', 'cylinder'] all_materials = ['metal', 'rubber'] all_sizes = ['small', 'large'] return { 'colors': colors if colors else all_colors, 'shapes': shapes if shapes else all_shapes, 'materials': materials if materials else all_materials, 'sizes': sizes if sizes else all_sizes, 'relations': relations, 'all_colors': all_colors, 'all_shapes': all_shapes, 'all_materials': all_materials, 'all_sizes': all_sizes } IMAGE_CF_TYPES = { 'change_color', 'change_shape', 'change_size', 'change_material', 'change_position', 'add_object', 'remove_object', 'replace_object', 'swap_attribute', 'relational_flip' } NEGATIVE_CF_TYPES = { 'change_background', 'change_lighting', 'add_noise', 'apply_fisheye', 'apply_blur', 'apply_vignette', 'apply_chromatic_aberration', 'occlusion_change' } MAX_CF_ANSWER_RETRIES = 150 def get_cf_type_from_scene(scene): meta = scene.get('cf_metadata') or {} if not meta.get('is_counterfactual'): return None return meta.get('cf_type') def get_cf_description_from_scene(scene): meta = scene.get('cf_metadata') or {} if not meta.get('is_counterfactual'): return None return meta.get('cf_description') def get_change_details(original_scene, cf_scene): orig_objs = original_scene.get('objects', []) cf_objs = cf_scene.get('objects', []) if len(orig_objs) != len(cf_objs): return {'attribute': 'count', 'orig_count': len(orig_objs), 'cf_count': len(cf_objs)} attrs = ['color', 'shape', 'material', 'size'] for i, (o, c) in enumerate(zip(orig_objs, cf_objs)): for attr in attrs: ov = (o.get(attr) or '').lower().strip() cv = (c.get(attr) or '').lower().strip() if ov != cv: return {'attribute': attr, 'orig_val': ov or 'unknown', 'cf_val': cv or 'unknown', 'object_index': i} return None CF_COUNT_QUESTION_TEMPLATES = [ "How many objects are in the scene?", "What is the total number of objects in the scene?", ] CF_COLOR_QUESTION_TEMPLATES = [ ("How many {val} objects are there?", 'color'), ("Are there any {val} objects?", 'color'), ("What is the total number of {val} objects?", 'color'), ] CF_SHAPE_QUESTION_TEMPLATES = [ ("How many {val} are there?", 'shape'), ("Are there any {val}?", 'shape'), ("What is the total number of {val}?", 'shape'), ] CF_MATERIAL_QUESTION_TEMPLATES = [ ("How many {val} objects are there?", 'material'), ("Are there any {val} objects?", 'material'), ("What is the total number of {val} objects?", 'material'), ] CF_SIZE_QUESTION_TEMPLATES = [ ("How many {val} objects are there?", 'size'), ("Are there any {val} objects?", 'size'), ("What is the total number of {val} objects?", 'size'), ] def _pluralize_shape(shape): if not shape: return shape s = shape.strip().lower() if s.endswith('s'): return s return s + 's' def _count_by_attribute(objects, attr): counts = {} for obj in objects: val = (obj.get(attr) or '').lower().strip() if val: counts[val] = counts.get(val, 0) + 1 return counts def _get_attributes_with_different_counts(original_scene, cf_scene): orig_objs = original_scene.get('objects', []) cf_objs = cf_scene.get('objects', []) differing = [] for attr in ['color', 'shape', 'material', 'size']: orig_counts = _count_by_attribute(orig_objs, attr) cf_counts = _count_by_attribute(cf_objs, attr) all_vals = set(orig_counts) | set(cf_counts) for val in all_vals: o = orig_counts.get(val, 0) c = cf_counts.get(val, 0) if o != c: differing.append((attr, val, o, c)) return differing def generate_question_for_counterfactual(cf_type, original_scene, cf_scene, retry_index=0, original_question=None, original_params=None): """ Generate a counterfactual question. If original_question and original_params are provided, uses strict targeting: (1) try original question, (2) try mutated questions that target the change, (3) return (None, None) to signal rejection/retry if no question yields an answer change. """ # --- Strict counterfactual targeting when original question is provided --- if original_question is not None and original_params is not None: a_orig = answer_question_for_scene(original_question, original_scene) a_cf = answer_question_for_scene(original_question, cf_scene) a_orig_n = normalize_answer(a_orig) a_cf_n = normalize_answer(a_cf) if a_orig_n != a_cf_n: return (original_question, original_params) for mut_q, mut_params in create_counterfactual_questions(original_question, original_params, original_scene): a_mut_cf = answer_question_for_scene(mut_q, cf_scene) if normalize_answer(a_mut_cf) != a_orig_n: return (mut_q, mut_params) return (None, None) random.seed(hash((str(cf_type), retry_index, str(id(original_scene)), str(id(cf_scene))))) change = get_change_details(original_scene, cf_scene) orig_objs = original_scene.get('objects', []) cf_objs = cf_scene.get('objects', []) props_orig = get_scene_properties(original_scene) props_cf = get_scene_properties(cf_scene) def _pick_spatial_question(props): """Strict spatial/relational templates only; never simple attribute count.""" relations = props.get('relations') or DEFAULT_RELATIONS colors = list(props.get('colors') or props.get('all_colors') or ['red', 'blue', 'green']) shapes = list(props.get('shapes') or props.get('all_shapes') or ['cube', 'sphere', 'cylinder']) materials = list(props.get('materials') or props.get('all_materials') or ['metal', 'rubber']) sizes = list(props.get('sizes') or props.get('all_sizes') or ['small', 'large']) templates = [ ("What color is the object {relation} the {color} {shape}?", { 'relation': random.choice(relations), 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("What shape is the object {relation} the {material} object?", { 'relation': random.choice(relations), 'material': random.choice(materials) }), ("How many objects are {relation} the {color} {shape}?", { 'relation': random.choice(relations), 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("How many {material} objects are {relation} the {shape}?", { 'material': random.choice(materials), 'relation': random.choice(relations), 'shape': random.choice(shapes) }), ("Is there a {color} object {relation} the {shape}?", { 'color': random.choice(colors), 'relation': random.choice(relations), 'shape': random.choice(shapes) }), ("What is the total number of {size} objects {relation} the {color} object?", { 'size': random.choice(sizes), 'relation': random.choice(relations), 'color': random.choice(colors) }), ("What is the total number of {material} objects {relation} the {color} {shape}?", { 'material': random.choice(materials), 'relation': random.choice(relations), 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("Is there a {size} {material} object {relation} the {shape}?", { 'size': random.choice(sizes), 'material': random.choice(materials), 'relation': random.choice(relations), 'shape': random.choice(shapes) }), ] template, params = random.choice(templates) return template.format(**params), params def _pick_compositional_question(props): """Strict compositional (≥2 attributes) templates only; never single-attribute count.""" colors = list(props.get('colors') or props.get('all_colors') or ['red', 'blue', 'green']) shapes = list(props.get('shapes') or props.get('all_shapes') or ['cube', 'sphere', 'cylinder']) materials = list(props.get('materials') or props.get('all_materials') or ['metal', 'rubber']) sizes = list(props.get('sizes') or props.get('all_sizes') or ['small', 'large']) templates = [ ("How many {color} {shape}s are there?", { 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("Are there any {color} {shape}s?", { 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("Is there a {color} {shape}?", { 'color': random.choice(colors), 'shape': random.choice(shapes) }), ("Is there a {material} {shape}?", { 'material': random.choice(materials), 'shape': random.choice(shapes) }), ("How many {size} {color} objects are there?", { 'size': random.choice(sizes), 'color': random.choice(colors) }), ("What is the total number of {color} {material} objects?", { 'color': random.choice(colors), 'material': random.choice(materials) }), ("Are there any {material} {shape}s?", { 'material': random.choice(materials), 'shape': random.choice(shapes) }), ("How many {size} {shape}s are there?", { 'size': random.choice(sizes), 'shape': random.choice(shapes) }), ] template, params = random.choice(templates) return template.format(**params), params # --- change_position: STRICTLY spatial/relational only; never simple attribute count --- if cf_type == 'change_position': props = props_cf if (props_cf.get('relations') or props_cf.get('colors') or props_cf.get('shapes')) else props_orig question, params = _pick_spatial_question(props) return question, params # --- relational_flip: STRICTLY spatial/relational only --- if cf_type == 'relational_flip': props = props_cf if (props_cf.get('relations') or props_cf.get('colors') or props_cf.get('shapes')) else props_orig question, params = _pick_spatial_question(props) return question, params # --- swap_attribute: STRICTLY compositional (≥2 attributes) only; never single-attribute count --- if cf_type == 'swap_attribute': props = props_cf if (props_cf.get('colors') or props_cf.get('shapes')) else props_orig question, params = _pick_compositional_question(props) return question, params if cf_type and cf_type in IMAGE_CF_TYPES: differing = _get_attributes_with_different_counts(original_scene, cf_scene) if differing: idx = retry_index % len(differing) if differing else 0 attr, val, orig_count, cf_count = differing[idx] if attr == 'color': template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) question = template.format(val=val) elif attr == 'shape': plural = _pluralize_shape(val) template, _ = random.choice(CF_SHAPE_QUESTION_TEMPLATES) question = template.format(val=plural) elif attr == 'material': template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) question = template.format(val=val) elif attr == 'size': template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) question = template.format(val=val) else: question = None if question: return question, {attr: val.rstrip('s') if attr == 'shape' else val} if cf_type and cf_type in NEGATIVE_CF_TYPES: templates = [ ("How many objects are in the scene?", {}), ("How many {color} objects are there?", {'color': random.choice(props_orig['colors'])} if props_orig['colors'] else None), ("Are there any {shape} objects?", {'shape': random.choice(props_orig['shapes'])} if props_orig['shapes'] else None), ("How many {material} objects are there?", {'material': random.choice(props_orig['materials'])} if props_orig['materials'] else None), ("What is the total number of {size} objects?", {'size': random.choice(props_orig['sizes'])} if props_orig['sizes'] else None), ] valid = [(t, p) for t, p in templates if p is not None or t.startswith("How many objects are in")] if not valid: valid = [("How many objects are in the scene?", {})] template, params = random.choice(valid) params = params or {} question = template.format(**params) if params else template return question, params if change and change.get('attribute') == 'count': orig_count = change.get('orig_count', len(orig_objs)) cf_count = change.get('cf_count', len(cf_objs)) templates_with_params = [] templates_with_params.append((random.choice(CF_COUNT_QUESTION_TEMPLATES), {})) if cf_count > orig_count: templates_with_params.append((f"Are there more than {orig_count} objects?", {})) templates_with_params.append((f"Are there at least {cf_count} objects?", {})) if cf_count < orig_count: templates_with_params.append((f"Are there fewer than {orig_count} objects?", {})) templates_with_params.append((f"Are there more than {cf_count} objects?", {})) template, params = random.choice(templates_with_params) return template, params if change and change.get('attribute') in ('color', 'shape', 'material', 'size'): attr = change['attribute'] cf_val = (change.get('cf_val') or '').strip().lower() if not cf_val: cf_val = 'unknown' params = {attr: cf_val} if attr == 'color': template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) question = template.format(val=cf_val) elif attr == 'shape': template, _ = random.choice(CF_SHAPE_QUESTION_TEMPLATES) plural = _pluralize_shape(cf_val) question = template.format(val=plural) params['shape'] = cf_val.rstrip('s') elif attr == 'material': template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) question = template.format(val=cf_val) elif attr == 'size': template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) question = template.format(val=cf_val) else: question = "How many objects are in the scene?" params = {} return question, params if cf_type in ('add_object', 'remove_object'): templates = list(CF_COUNT_QUESTION_TEMPLATES) if len(orig_objs) != len(cf_objs): if len(cf_objs) > len(orig_objs): templates.extend([f"Are there more than {len(orig_objs)} objects?", f"Are there at least {len(cf_objs)} objects?"]) else: templates.extend([f"Are there fewer than {len(orig_objs)} objects?", f"Are there more than {len(cf_objs)} objects?"]) template = random.choice(templates) return template, {} if cf_type in ('change_color', 'change_shape', 'replace_object'): for attr, key in [('color', 'colors'), ('shape', 'shapes'), ('material', 'materials'), ('size', 'sizes')]: vals = list(props_cf.get(key) or props_orig.get(key) or []) if vals: val = random.choice(vals) if attr == 'shape': plural = _pluralize_shape(val) templates = CF_SHAPE_QUESTION_TEMPLATES template, _ = random.choice(templates) question = template.format(val=plural) elif attr == 'color': template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) question = template.format(val=val) elif attr == 'material': template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) question = template.format(val=val) else: template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) question = template.format(val=val) return question, {attr: val.rstrip('s') if attr == 'shape' else val} if cf_type in ('change_size', 'change_material'): key = 'sizes' if cf_type == 'change_size' else 'materials' attr = key.rstrip('s') vals = list(props_cf.get(key) or props_orig.get(key) or []) if vals: val = random.choice(vals) if cf_type == 'change_size': template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) else: template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) question = template.format(val=val) return question, {attr: val} # Fallback: never use generic "How many objects?" for change_position, relational_flip, swap_attribute. if cf_type in ('change_position', 'relational_flip', 'swap_attribute'): props = props_cf if (props_cf.get('relations') or props_cf.get('colors')) else props_orig if cf_type == 'swap_attribute': question, params = _pick_compositional_question(props) else: question, params = _pick_spatial_question(props) return question, params question = random.choice(CF_COUNT_QUESTION_TEMPLATES) return question, {} def generate_question_for_scene(scene_file, retry_index=None): scene = load_scene(scene_file) objects = scene.get('objects', []) if len(objects) == 0: return "How many objects are in the scene?", {} props = get_scene_properties(scene) templates = [ ("How many objects are in the scene?", {}), ("How many {color} objects are there?", {'color': random.choice(props['colors'])}), ("Are there any {shape} objects?", {'shape': random.choice(props['shapes'])}), ("Are there any {shape}s present?", {'shape': random.choice(props['shapes'])}), ("Is there a {color} {shape}?", { 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("How many {material} objects are there?", {'material': random.choice(props['materials'])}), ("What is the total number of {material} objects?", {'material': random.choice(props['materials'])}), ("What is the total number of metallic objects?", {}), ("What is the total number of {size} objects?", {'size': random.choice(props['sizes'])}), ("Is there a {material} {shape}?", { 'material': random.choice(props['materials']), 'shape': random.choice(props['shapes']) }), ("How many {size} {color} objects are there?", { 'size': random.choice(props['sizes']), 'color': random.choice(props['colors']) }), ("Are there any {color} {shape}s?", { 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("What is the total number of {color} {material} objects?", { 'color': random.choice(props['colors']), 'material': random.choice(props['materials']) }), ("What color is the object {relation} the {color} {shape}?", { 'relation': random.choice(props['relations']), 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("What shape is the object {relation} the {material} object?", { 'relation': random.choice(props['relations']), 'material': random.choice(props['materials']) }), ("What material is the {size} object {relation} the {shape}?", { 'size': random.choice(props['sizes']), 'relation': random.choice(props['relations']), 'shape': random.choice(props['shapes']) }), ("How many objects are {relation} the {color} {shape}?", { 'relation': random.choice(props['relations']), 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("How many {material} objects are {relation} the {shape}?", { 'material': random.choice(props['materials']), 'relation': random.choice(props['relations']), 'shape': random.choice(props['shapes']) }), ("What is the total number of {size} objects {relation} the {color} object?", { 'size': random.choice(props['sizes']), 'relation': random.choice(props['relations']), 'color': random.choice(props['colors']) }), ("Is there a {color} object {relation} the {shape}?", { 'color': random.choice(props['colors']), 'relation': random.choice(props['relations']), 'shape': random.choice(props['shapes']) }), ("Are there any {material} {shape}s {relation} the {size} object?", { 'material': random.choice(props['materials']), 'shape': random.choice(props['shapes']), 'relation': random.choice(props['relations']), 'size': random.choice(props['sizes']) }), # --- Attribute Equivalence (Same/Different) --- ("Is the color of the {shape1} the same as the {shape2}?", { 'shape1': random.choice(props['shapes']), 'shape2': random.choice(props['shapes']) }), ("Is the material of the {color} object the same as the {size} object?", { 'color': random.choice(props['colors']), 'size': random.choice(props['sizes']) }), ("Do the {size} object and the {material} object have the same shape?", { 'size': random.choice(props['sizes']), 'material': random.choice(props['materials']) }), # --- Logical Disjunction (OR) --- ("How many objects are either {color} or {shape}?", { 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("Are there any objects that are either {material} or {color}?", { 'material': random.choice(props['materials']), 'color': random.choice(props['colors']) }), ("What is the total number of objects that are either {size} or {shape}?", { 'size': random.choice(props['sizes']), 'shape': random.choice(props['shapes']) }), # --- Exact Numerical Comparison --- ("Is the number of {color} objects equal to the number of {shape}s?", { 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("Are there exactly as many {material} objects as {size} objects?", { 'material': random.choice(props['materials']), 'size': random.choice(props['sizes']) }), ("Does the scene contain an equal number of {color1} objects and {color2} objects?", dict(zip( ['color1', 'color2'], random.sample(props['colors'], 2) if len(props['colors']) >= 2 else [props['colors'][0]] * 2 ))), # --- Complex Spatial & Attribute Composition --- ("What is the total number of {material} objects {relation} the {color} {shape}?", { 'material': random.choice(props['materials']), 'relation': random.choice(props['relations']), 'color': random.choice(props['colors']), 'shape': random.choice(props['shapes']) }), ("Is there a {size} {material} object {relation} the {shape}?", { 'size': random.choice(props['sizes']), 'material': random.choice(props['materials']), 'relation': random.choice(props['relations']), 'shape': random.choice(props['shapes']) }), ] # Add matte/shin only when the scene has a metal/rubber object; caller should accept only when CF is attribute-swap (not add/remove). matte_shiny_objects = [o for o in objects if (o.get('material') or '').lower() in ('metal', 'rubber') and o.get('color') and o.get('shape')] if matte_shiny_objects: obj = random.choice(matte_shiny_objects) templates.append(("Is the {color} {shape} matte or shiny?", {'color': obj.get('color'), 'shape': obj.get('shape')})) if retry_index is not None: random.seed(hash((scene_file, retry_index))) else: random.seed(hash(scene_file)) template, params = random.choice(templates) question = template.format(**params) if params else template return question, params def calculate_question_difficulty(question, params): num_params = len(params) if params else 0 question_lower = question.lower() if "matte or shiny" in question_lower or ("or" in question_lower and ("matte" in question_lower or "shiny" in question_lower)): return "hard" elif "metallic" in question_lower: return "medium" elif "total number" in question_lower and num_params >= 1: return "hard" if num_params >= 2 else "medium" elif num_params == 0: return "easy" elif num_params == 1: return "medium" else: return "hard" def _apply_param_replacements(question, params, cf_params): """Replace param values in question with cf_params, from last to first by position, to avoid double-replacing when the same value appears for different placeholders.""" if not params or not cf_params: return question # Order keys by first occurrence of their value in the question (so we replace in document order) positions = [] for k, v in params.items(): if k not in cf_params or cf_params[k] == v: continue pos = question.find(v) if pos >= 0: positions.append((pos, k, v, cf_params[k])) # Replace from end to start so indices stay valid positions.sort(key=lambda x: -x[0]) for pos, k, old_val, new_val in positions: question = question[:pos] + new_val + question[pos + len(old_val):] return question def create_counterfactual_questions(original_question, params, scene): props = get_scene_properties(scene) cf_questions = [] strategies = ['attribute_swap', 'question_type', 'scope_change', 'negation', 'comparative', 'multi_attribute', 'same_different', 'either_or', 'equal_comparison'] random.seed(hash(str(scene))) selected_strategies = random.sample(strategies, 2) for strategy in selected_strategies: cf_q = None cf_params = {} max_retries = 5 retry_count = 0 while retry_count < max_retries: cf_q = None cf_params = {} if strategy == 'attribute_swap' and params: cf_params = params.copy() param_to_change = random.choice(list(params.keys())) current = params.get(param_to_change) def pick_alternative(attr_key, all_vals_getter): alts = [v for v in all_vals_getter() if v != current] if alts: cf_params[param_to_change] = random.choice(alts) return True return False if param_to_change in ('color', 'color1', 'color2'): if not pick_alternative('color', lambda: props['all_colors']): strategy = 'negation' continue elif param_to_change in ('shape', 'shape1', 'shape2'): if not pick_alternative('shape', lambda: props['all_shapes']): strategy = 'negation' continue elif param_to_change == 'material': if not pick_alternative('material', lambda: props['all_materials']): strategy = 'negation' continue elif param_to_change == 'size': if not pick_alternative('size', lambda: props['all_sizes']): strategy = 'negation' continue elif param_to_change == 'relation': if not pick_alternative('relation', lambda: props['relations']): strategy = 'negation' continue else: retry_count += 1 continue cf_q = _apply_param_replacements(original_question, params, cf_params) elif strategy == 'question_type': cf_params = params.copy() if params else {} if "How many" in original_question and "objects are in the scene" in original_question: if props['colors']: color = random.choice(props['colors']) cf_q = f"How many {color} objects are there?" cf_params = {'color': color} elif props['shapes']: shape = random.choice(props['shapes']) cf_q = f"Are there any {shape}s?" cf_params = {'shape': shape} else: cf_q = "Are there more than 3 objects?" cf_params = {} elif "How many" in original_question: cf_q = original_question.replace("How many", "Are there any") cf_q = cf_q.replace(" are there?", "?") cf_q = cf_q.replace(" are in the scene?", " in the scene?") elif "Are there" in original_question or "Is there" in original_question: if "Are there any" in original_question: cf_q = original_question.replace("Are there any", "How many") if not cf_q.endswith(" are there?"): cf_q = cf_q.replace("?", " are there?") elif "Is there a" in original_question: cf_q = original_question.replace("Is there a", "How many") if not cf_q.endswith(" are there?"): cf_q = cf_q.replace("?", " are there?") else: if props['colors']: color = random.choice(props['colors']) cf_q = f"How many {color} objects are there?" cf_params = {'color': color} else: cf_q = "How many objects are in the scene?" cf_params = {} elif "What is" in original_question: cf_q = original_question.replace("What is the total number of", "How many") else: if props['colors']: color = random.choice(props['colors']) cf_q = f"How many {color} objects are there?" cf_params = {'color': color} else: cf_q = "Are there more than 3 objects?" cf_params = {} elif strategy == 'scope_change': if params and len(params) >= 2: cf_params = params.copy() key_to_remove = random.choice(list(params.keys())) del cf_params[key_to_remove] if len(cf_params) == 1: attr_val = list(cf_params.values())[0] cf_q = f"How many {attr_val} objects are there?" else: if props['colors']: color = random.choice(props['colors']) cf_q = f"How many {color} objects are there?" cf_params = {'color': color} else: cf_q = "Are there more than 3 objects?" cf_params = {} elif params and len(params) == 1: new_attr = random.choice(['material', 'size']) if new_attr not in params: new_val = random.choice(props[new_attr + 's']) existing_key = list(params.keys())[0] existing_val = list(params.values())[0] cf_params = params.copy() cf_params[new_attr] = new_val if new_attr == 'size': cf_q = f"How many {new_val} {existing_val} objects are there?" elif new_attr == 'material': if existing_key == 'size': cf_q = f"How many {existing_val} {new_val} objects are there?" else: cf_q = f"How many {existing_val} {new_val} objects are there?" else: strategy = 'negation' continue else: if props['colors']: color = random.choice(props['colors']) cf_params = {'color': color} cf_q = f"How many {color} objects are there?" elif props['shapes']: shape = random.choice(props['shapes']) cf_params = {'shape': shape} cf_q = f"Are there any {shape}s?" else: cf_q = "Are there more than 3 objects?" cf_params = {} elif strategy == 'negation': cf_params = params.copy() if params else {} if params: if 'color' in params: color = params['color'] cf_q = f"How many objects are NOT {color}?" elif 'shape' in params: shape = params['shape'] cf_q = f"How many objects are NOT {shape}s?" else: attr_val = list(params.values())[0] cf_q = f"How many objects are NOT {attr_val}?" else: cf_q = "Are there fewer than 5 objects?" cf_params = {} elif strategy == 'comparative': cf_params = params.copy() if params else {} if "How many" in original_question: number = random.choice([2, 3, 4, 5]) cf_q = original_question.replace("How many", f"Are there more than {number}") cf_q = cf_q.replace(" are there?", "?") cf_q = cf_q.replace(" are in the scene?", " in the scene?") elif params: if 'color' in params: color1 = params['color'] alternatives = [c for c in props['all_colors'] if c != color1] if alternatives: color2 = random.choice(alternatives) cf_params = {'color': color1, 'color2': color2} cf_q = f"Are there more {color1} objects than {color2} objects?" else: cf_q = f"How many objects are NOT {color1}?" cf_params = {'color': color1} elif 'shape' in params: shape1 = params['shape'] alternatives = [s for s in props['all_shapes'] if s != shape1] if alternatives: shape2 = random.choice(alternatives) cf_params = {'shape': shape1, 'shape2': shape2} cf_q = f"Are there more {shape1}s than {shape2}s?" else: cf_q = f"How many objects are NOT {shape1}s?" cf_params = {'shape': shape1} else: cf_q = "Are there more than 3 objects?" cf_params = {} else: cf_q = "Are there more than 3 objects?" cf_params = {} elif strategy == 'multi_attribute': if params and len(params) >= 2: cf_params = {} changed = False for key in params: if key == 'color': alternatives = [c for c in props['all_colors'] if c != params[key]] if alternatives: cf_params[key] = random.choice(alternatives) changed = True else: cf_params[key] = params[key] elif key == 'shape': alternatives = [s for s in props['all_shapes'] if s != params[key]] if alternatives: cf_params[key] = random.choice(alternatives) changed = True else: cf_params[key] = params[key] elif key == 'material': alternatives = [m for m in props['all_materials'] if m != params[key]] if alternatives: cf_params[key] = random.choice(alternatives) changed = True else: cf_params[key] = params[key] elif key == 'size': alternatives = [s for s in props['all_sizes'] if s != params[key]] if alternatives: cf_params[key] = random.choice(alternatives) changed = True else: cf_params[key] = params[key] if not changed: strategy = 'negation' continue attr_order = ['size', 'color', 'material', 'shape'] ordered_values = [] for attr in attr_order: if attr in cf_params: ordered_values.append(cf_params[attr]) cf_q = f"How many {' '.join(ordered_values)} objects are there?" else: color = random.choice(props['colors']) shape = random.choice(props['shapes']) cf_params = {'color': color, 'shape': shape} cf_q = f"Is there a {color} {shape}?" elif strategy == 'same_different': # Attribute equivalence: "same as" / "same shape/color/material" -> swap one compared attribute or "same" -> "different" q = original_question q_lower = q.lower() if "the same as" in q_lower or "same shape" in q_lower or "same color" in q_lower or "same material" in q_lower: if random.choice([True, False]) and params: # Swap one of the compared attributes (reuse attribute_swap logic for one key) swap_keys = [k for k in params if k in ('shape1', 'shape2', 'color', 'size', 'material', 'shape')] if swap_keys: key = random.choice(swap_keys) current = params.get(key) if key in ('shape1', 'shape2', 'shape'): alts = [s for s in props['all_shapes'] if s != current] val = random.choice(alts) if alts else current elif key in ('color', 'color1', 'color2'): alts = [c for c in props['all_colors'] if c != current] val = random.choice(alts) if alts else current elif key == 'material': alts = [m for m in props['all_materials'] if m != current] val = random.choice(alts) if alts else current elif key == 'size': alts = [s for s in props['all_sizes'] if s != current] val = random.choice(alts) if alts else current else: val = current if val != current: cf_params = params.copy() cf_params[key] = val cf_q = _apply_param_replacements(q, params, cf_params) else: cf_q = None else: cf_q = None else: # Replace "same as" with "different from" / "same" with "different" if "the same as" in q_lower: cf_q = q.replace("the same as", "different from").replace("The same as", "Different from") elif "same shape" in q_lower: cf_q = q.replace("same shape", "different shape").replace("same shape", "different shape") elif "same color" in q_lower: cf_q = q.replace("same color", "different color") elif "same material" in q_lower: cf_q = q.replace("same material", "different material") else: cf_q = q.replace("the same as", "different from") cf_params = params.copy() if params else {} else: cf_q = None elif strategy == 'either_or': # "either X or Y" -> swap X or Y, or "either X or Y" -> "both X and Y" q_lower = original_question.lower() if "either" in q_lower and " or " in q_lower and params: if random.choice([True, False]): # Swap one of the two attributes swap_keys = [k for k in params if k in ('color', 'shape', 'material', 'size')] if swap_keys: key = random.choice(swap_keys) current = params.get(key) if key == 'shape': alts = [s for s in props['all_shapes'] if s != current] val = random.choice(alts) if alts else current elif key == 'color': alts = [c for c in props['all_colors'] if c != current] val = random.choice(alts) if alts else current elif key == 'material': alts = [m for m in props['all_materials'] if m != current] val = random.choice(alts) if alts else current elif key == 'size': alts = [s for s in props['all_sizes'] if s != current] val = random.choice(alts) if alts else current else: val = current if val != current: cf_params = params.copy() cf_params[key] = val cf_q = _apply_param_replacements(original_question, params, cf_params) else: cf_q = None else: cf_q = None else: # "either ... or" -> "both ... and" cf_q = original_question.replace("either", "both").replace(" or ", " and ") cf_params = params.copy() if params else {} else: cf_q = None elif strategy == 'equal_comparison': # "equal to" / "exactly as many" -> swap one target or change to "greater than" / "fewer than" q = original_question q_lower = q.lower() if ("equal to" in q_lower or "exactly as many" in q_lower or "equal number" in q_lower) and params: if random.choice([True, False]): # Swap one of the compared properties (color, shape, material, size, color1, color2) swap_keys = [k for k in params if k in ('color', 'color1', 'color2', 'shape', 'material', 'size')] if swap_keys: key = random.choice(swap_keys) current = params.get(key) if key in ('color', 'color1', 'color2'): alts = [c for c in props['all_colors'] if c != current] val = random.choice(alts) if alts else current elif key == 'shape': alts = [s for s in props['all_shapes'] if s != current] val = random.choice(alts) if alts else current elif key == 'material': alts = [m for m in props['all_materials'] if m != current] val = random.choice(alts) if alts else current elif key == 'size': alts = [s for s in props['all_sizes'] if s != current] val = random.choice(alts) if alts else current else: val = current if val != current: cf_params = params.copy() cf_params[key] = val cf_q = _apply_param_replacements(q, params, cf_params) else: cf_q = None else: cf_q = None else: # "equal to" -> "greater than" or "fewer than"; "equal number" -> "greater/fewer number" # ("exactly as many" left to swap-only path to avoid ungrammatical "more X as Y") if "equal to" in q_lower: direction = random.choice(["greater than", "fewer than"]) cf_q = q.replace("equal to", direction).replace("Equal to", direction.capitalize()) cf_params = params.copy() if params else {} elif "equal number" in q_lower: cf_q = q.replace("equal number", random.choice(["greater number", "fewer number"])) cf_params = params.copy() if params else {} else: cf_q = None cf_params = {} else: cf_q = None if cf_q is None: cf_q = "How many objects are in the scene?" cf_params = {} if not cf_params: cf_params = {} if cf_q and cf_q.strip() != original_question.strip(): break retry_count += 1 if retry_count < max_retries: available_strategies = [s for s in strategies if s != strategy] if available_strategies: strategy = random.choice(available_strategies) else: strategy = 'negation' if cf_q is None or cf_q.strip() == original_question.strip(): if params: if 'color' in params: cf_q = f"How many objects are NOT {params['color']}?" elif 'shape' in params: cf_q = f"How many objects are NOT {params['shape']}s?" else: attr_val = list(params.values())[0] cf_q = f"How many objects are NOT {attr_val}?" cf_params = params.copy() else: if props['colors']: color = random.choice(props['colors']) cf_q = f"How many {color} objects are there?" cf_params = {'color': color} elif props['shapes']: shape = random.choice(props['shapes']) cf_q = f"Are there any {shape}s?" cf_params = {'shape': shape} else: cf_q = "Are there more than 3 objects?" cf_params = {} cf_questions.append((cf_q, cf_params)) return cf_questions def normalize_answer(a): if a is None: return "" return str(a).strip().lower() def answer_question_for_scene(question, scene): objects = scene.get('objects', []) question_lower = question.lower() def _str_answer(val): if val is None: return "unknown" return str(val).strip().lower() # --- Attribute Equivalence ("same as" / "same shape/color/material") --- if "the same as" in question_lower or "different from" in question_lower or "same shape" in question_lower or "same color" in question_lower or "same material" in question_lower: expect_same = "different from" not in question_lower and "different shape" not in question_lower and "different color" not in question_lower and "different material" not in question_lower shape1 = _first_value_in_question(question_lower, _SHAPES) shape2 = None for s in _SHAPES: if s in question_lower and s != shape1: shape2 = s.rstrip('s') break if shape2 is None and shape1: shape2 = shape1.rstrip('s') color1 = _first_value_in_question(question_lower, _COLORS) size1 = _first_value_in_question(question_lower, _SIZES, strip_s=False) material1 = _first_value_in_question(question_lower, _MATERIALS) if "have the same shape" in question_lower: cand_a = _find_objects_matching(objects, size=size1) if size1 else [] cand_b = _find_objects_matching(objects, material=material1) if material1 else [] if not cand_a or not cand_b: return _str_answer("no" if expect_same else "yes") v1 = (objects[cand_a[0]].get('shape') or '').lower() v2 = (objects[cand_b[0]].get('shape') or '').lower() same = (v1 == v2) return _str_answer("yes" if (same == expect_same) else "no") if "material of" in question_lower: attr = 'material' cand1 = _find_objects_matching(objects, color=color1) if color1 else [] cand2 = _find_objects_matching(objects, size=size1) if size1 else [] elif "color of" in question_lower or ("same as" in question_lower and shape1): attr = 'color' cand1 = _find_objects_matching(objects, shape=shape1.rstrip('s') if shape1 else None) if shape1 else [] cand2 = _find_objects_matching(objects, shape=shape2) if shape2 else [] else: attr = 'color' cand1 = _find_objects_matching(objects, shape=shape1.rstrip('s') if shape1 else None) if shape1 else _find_objects_matching(objects, color=color1) if color1 else [] cand2 = _find_objects_matching(objects, shape=shape2) if shape2 else _find_objects_matching(objects, size=size1) if size1 else [] if not cand1 or not cand2: return _str_answer("no" if expect_same else "yes") v1 = (objects[cand1[0]].get(attr) or '').lower() v2 = (objects[cand2[0]].get(attr) or '').lower() if not v1 or not v2: return _str_answer("unknown") same = (v1 == v2) return _str_answer("yes" if (same == expect_same) else "no") # --- Logical Disjunction ("either X or Y") --- if "either" in question_lower and " or " in question_lower: color_val = _first_value_in_question(question_lower, _COLORS) shape_val = _first_value_in_question(question_lower, _SHAPES) material_val = _first_value_in_question(question_lower, _MATERIALS) size_val = _first_value_in_question(question_lower, _SIZES, strip_s=False) count = 0 for obj in objects: c = (obj.get('color') or '').lower() s = (obj.get('shape') or '').lower() m = (obj.get('material') or '').lower() z = (obj.get('size') or '').lower() match = False if color_val and c == color_val.rstrip('s'): match = True if shape_val and (s == shape_val.rstrip('s') or s + 's' == shape_val): match = True if material_val and m == material_val.rstrip('s'): match = True if size_val and z == size_val: match = True if match: count += 1 if "are there any" in question_lower: return _str_answer("yes" if count > 0 else "no") return _str_answer(str(count)) # --- Exact Numerical Comparison ("equal to", "exactly as many", "equal number") --- if "equal to" in question_lower or "exactly as many" in question_lower or "equal number" in question_lower: def _count_for_value(attr_kind, val): if not val: return None if attr_kind == 'color': return sum(1 for o in objects if (o.get('color') or '').lower() == val.rstrip('s')) if attr_kind == 'shape': return sum(1 for o in objects if (o.get('shape') or '').lower() == val.rstrip('s')) if attr_kind == 'material': return sum(1 for o in objects if (o.get('material') or '').lower() == val.rstrip('s')) if attr_kind == 'size': return sum(1 for o in objects if (o.get('size') or '').lower() == val) return None def _parse_one_category(phrase): for c in _COLORS: if c in phrase: return ('color', c.rstrip('s')) for s in _SHAPES: if s in phrase: return ('shape', s.rstrip('s')) for m in _MATERIALS: if m in phrase: return ('material', m.rstrip('s')) for z in _SIZES: if z in phrase: return ('size', z) return (None, None) if "number of" in question_lower and "objects and" in question_lower: parts = question_lower.split("and") if len(parts) >= 2: kind1, v1 = _parse_one_category(parts[0]) kind2, v2 = _parse_one_category(parts[1]) if kind1 and kind2: n1 = _count_for_value(kind1, v1) n2 = _count_for_value(kind2, v2) if n1 is not None and n2 is not None: return _str_answer("yes" if n1 == n2 else "no") if "equal to" in question_lower: left, _, right = question_lower.partition("equal to") kind1, v1 = _parse_one_category(left) kind2, v2 = _parse_one_category(right) if kind1 and kind2: n1 = _count_for_value(kind1, v1) n2 = _count_for_value(kind2, v2) if n1 is not None and n2 is not None: return _str_answer("yes" if n1 == n2 else "no") if "exactly as many" in question_lower: parts = re.split(r'exactly as many\s+', question_lower) if len(parts) >= 2: rest = parts[1] parts2 = re.split(r'\s+as\s+', rest, maxsplit=1) if len(parts2) >= 2: kind1, v1 = _parse_one_category(parts2[0]) kind2, v2 = _parse_one_category(parts2[1]) if kind1 and kind2: n1 = _count_for_value(kind1, v1) n2 = _count_for_value(kind2, v2) if n1 is not None and n2 is not None: return _str_answer("yes" if n1 == n2 else "no") return _str_answer("unknown") # --- Complex Spatial (X objects {relation} the Y) --- for rel_phrase in PHRASES_TO_RELATION_KEYS: if rel_phrase in question_lower and ("objects " in question_lower or "object " in question_lower): before_rel, _, after_rel = question_lower.partition(rel_phrase) ref_color = _first_value_in_question(after_rel, _COLORS) ref_shape = _first_value_in_question(after_rel, _SHAPES) ref_material = _first_value_in_question(after_rel, _MATERIALS) ref_size = _first_value_in_question(after_rel, _SIZES, strip_s=False) in_rel = _objects_in_relation_to_reference(scene, rel_phrase, ref_color=ref_color, ref_shape=ref_shape, ref_material=ref_material, ref_size=ref_size) filter_color = _first_value_in_question(before_rel, _COLORS) filter_shape = _first_value_in_question(before_rel, _SHAPES) filter_material = _first_value_in_question(before_rel, _MATERIALS) filter_size = _first_value_in_question(before_rel, _SIZES, strip_s=False) filtered = 0 for idx in in_rel: if idx >= len(objects): continue o = objects[idx] if filter_color and (o.get('color') or '').lower() != filter_color.rstrip('s'): continue if filter_shape and (o.get('shape') or '').lower() != filter_shape.rstrip('s'): continue if filter_material and (o.get('material') or '').lower() != filter_material.rstrip('s'): continue if filter_size and (o.get('size') or '').lower() != filter_size: continue filtered += 1 if "how many" in question_lower or "total number" in question_lower: return _str_answer(str(filtered)) if "is there a" in question_lower or "are there any" in question_lower: return _str_answer("yes" if filtered > 0 else "no") break if "at least" in question_lower: match = re.search(r'at least (\d+)', question_lower) if match: threshold = int(match.group(1)) count = count_matching_objects(question_lower, objects) return "yes" if count >= threshold else "no" if "more than" in question_lower: match = re.search(r'more than (\d+)', question_lower) if match: threshold = int(match.group(1)) count = count_matching_objects(question_lower, objects) return "yes" if count > threshold else "no" if "fewer than" in question_lower: match = re.search(r'fewer than (\d+)', question_lower) if match: threshold = int(match.group(1)) count = count_matching_objects(question_lower, objects) return "yes" if count < threshold else "no" if " not " in question_lower: count = 0 colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] materials = ['metal', 'rubber', 'metals', 'rubbers'] sizes = ['small', 'large'] excluded_attr = None excluded_type = None for c in colors: if c in question_lower: excluded_attr = c.rstrip('s') excluded_type = 'color' break if excluded_attr is None: for s in shapes: if s in question_lower: excluded_attr = s.rstrip('s') excluded_type = 'shape' break if excluded_attr is None: for m in materials: if m in question_lower: excluded_attr = m.rstrip('s') excluded_type = 'material' break if excluded_attr is None: for s in sizes: if s in question_lower: excluded_attr = s excluded_type = 'size' break if excluded_attr and excluded_type: for obj in objects: obj_attr = obj.get(excluded_type, '').lower() if obj_attr != excluded_attr: count += 1 else: count = len(objects) return str(count) if " than " in question_lower and " more " in question_lower: parts = question_lower.split(" than ") if len(parts) == 2: first_part = parts[0] second_part = parts[1].replace('?', '').strip() count1 = count_matching_objects(first_part, objects) count2 = count_matching_objects(second_part, objects) return "yes" if count1 > count2 else "no" if question_lower.startswith("how many"): if "objects are in the scene" in question_lower or "total number of objects" in question_lower: return str(len(objects)) count = count_matching_objects(question_lower, objects) return str(count) elif question_lower.startswith("are there") or question_lower.startswith("is there"): count = count_matching_objects(question_lower, objects) return "yes" if count > 0 else "no" elif question_lower.startswith("what"): if "colors" in question_lower: colors = list(set(obj.get('color', '') for obj in objects if obj.get('color'))) return ", ".join(colors) if colors else "none" elif "shapes" in question_lower: shapes = list(set(obj.get('shape', '') for obj in objects if obj.get('shape'))) return ", ".join(shapes) if shapes else "none" elif "total number" in question_lower: count = count_matching_objects(question_lower, objects) return str(count) else: return str(len(objects)) if "matte or shiny" in question_lower or ("or" in question_lower and ("matte" in question_lower or "shiny" in question_lower)): colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] color_match = None shape_match = None for c in colors: if c in question_lower: color_match = c.rstrip('s') break for s in shapes: if s in question_lower: shape_match = s.rstrip('s') break for obj in objects: obj_color = (obj.get('color') or '').lower() obj_shape = (obj.get('shape') or '').lower() obj_material = (obj.get('material') or '').lower() matches = True if color_match and obj_color != color_match: matches = False if shape_match and obj_shape != shape_match: matches = False if matches: if obj_material == 'metal': return "shiny" if obj_material == 'rubber': return "matte" # Object not in this scene (e.g. CF removed it); answer without "none" or "not found" return _str_answer("unknown") return _str_answer("unknown") def count_matching_objects(question_lower, objects): count = 0 colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] materials = ['metal', 'rubber', 'metals', 'rubbers'] sizes = ['small', 'large'] color_match = None for c in colors: if c in question_lower: color_match = c.rstrip('s') break shape_match = None for s in shapes: if s in question_lower: shape_match = s.rstrip('s') break material_match = None if "metallic" in question_lower: material_match = "metal" else: for m in materials: if m in question_lower: material_match = m.rstrip('s') break size_match = None for s in sizes: if s in question_lower: size_match = s break for obj in objects: obj_color = obj.get('color', '').lower() obj_shape = obj.get('shape', '').lower() obj_material = obj.get('material', '').lower() obj_size = obj.get('size', '').lower() matches = True if color_match and obj_color != color_match: matches = False if shape_match and obj_shape != shape_match: matches = False if material_match and obj_material != material_match: matches = False if size_match and obj_size != size_match: matches = False if matches: count += 1 return count def classify_question_validity(question, base_scene_graph, counterfactual_scene_graph): answer_base = answer_question_for_scene(question, base_scene_graph) answer_cf = answer_question_for_scene(question, counterfactual_scene_graph) norm_base = normalize_answer(answer_base) norm_cf = normalize_answer(answer_cf) if norm_base != norm_cf: return 'Semantic-Valid' return 'Negative-Valid' def generate_mapping_with_questions(run_dir, csv_filename='image_mapping_with_questions.csv', generate_questions=False, with_links=False, base_url=None, strict_question_validation=True, single_cf_per_row=False): images_dir = os.path.join(run_dir, 'images') scenes_dir = os.path.join(run_dir, 'scenes') if not os.path.exists(images_dir): print(f"ERROR: Images directory not found: {images_dir}") return if not os.path.exists(scenes_dir): print(f"ERROR: Scenes directory not found: {scenes_dir}") return image_files = [f for f in os.listdir(images_dir) if f.endswith('.png')] scene_sets = {} for img_file in image_files: if img_file.startswith('scene_'): parts = img_file.replace('.png', '').split('_') if len(parts) >= 3: scene_num = parts[1] scene_type = parts[2] if scene_num not in scene_sets: scene_sets[scene_num] = {} scene_sets[scene_num][scene_type] = img_file rows = [] if with_links: header = ['scene_id', 'original_image_link', 'original_scene_link', 'counterfactual1_image_link', 'counterfactual1_scene_link', 'counterfactual2_image_link', 'counterfactual2_scene_link', 'counterfactual1_type', 'counterfactual2_type', 'counterfactual1_description', 'counterfactual2_description'] if generate_questions: header.extend([ 'original_question', 'counterfactual1_question', 'counterfactual2_question', 'original_question_difficulty', 'counterfactual1_question_difficulty', 'counterfactual2_question_difficulty', 'original_image_answer_to_original_question', 'original_image_answer_to_counterfactual1_question', 'original_image_answer_to_counterfactual2_question', 'counterfactual1_image_answer_to_original_question', 'counterfactual1_image_answer_to_counterfactual1_question', 'counterfactual1_image_answer_to_counterfactual2_question', 'counterfactual2_image_answer_to_original_question', 'counterfactual2_image_answer_to_counterfactual1_question', 'counterfactual2_image_answer_to_counterfactual2_question' ]) rows.append(header) elif generate_questions: rows.append([ 'original_image', 'counterfactual1_image', 'counterfactual2_image', 'counterfactual1_type', 'counterfactual2_type', 'counterfactual1_description', 'counterfactual2_description', 'original_question', 'counterfactual1_question', 'counterfactual2_question', 'original_question_difficulty', 'counterfactual1_question_difficulty', 'counterfactual2_question_difficulty', 'original_image_answer_to_original_question', 'original_image_answer_to_cf1_question', 'original_image_answer_to_cf2_question', 'cf1_image_answer_to_original_question', 'cf1_image_answer_to_cf1_question', 'cf1_image_answer_to_cf2_question', 'cf2_image_answer_to_original_question', 'cf2_image_answer_to_cf1_question', 'cf2_image_answer_to_cf2_question' ]) else: rows.append(['original_image', 'counterfactual1_image', 'counterfactual2_image', 'counterfactual1_type', 'counterfactual2_type', 'counterfactual1_description', 'counterfactual2_description']) if single_cf_per_row: if with_links: h = ['scene_id', 'original_image_link', 'original_scene_link', 'counterfactual_image_link', 'counterfactual_scene_link', 'counterfactual_type', 'counterfactual_description'] if generate_questions: h.extend(['original_question', 'counterfactual_question', 'original_question_difficulty', 'counterfactual_question_difficulty', 'original_image_answer_to_original_question', 'original_image_answer_to_cf_question', 'cf_image_answer_to_original_question', 'cf_image_answer_to_cf_question']) rows = [h] elif generate_questions: rows = [['original_image', 'counterfactual_image', 'counterfactual_type', 'counterfactual_description', 'original_question', 'counterfactual_question', 'original_question_difficulty', 'counterfactual_question_difficulty', 'original_image_answer_to_original_question', 'original_image_answer_to_cf_question', 'cf_image_answer_to_original_question', 'cf_image_answer_to_cf_question']] else: rows = [['original_image', 'counterfactual_image', 'counterfactual_type', 'counterfactual_description']] total_scenes = len(scene_sets) for idx, scene_num in enumerate(sorted(scene_sets.keys())): scene_data = scene_sets[scene_num] cf_keys = sorted([k for k in scene_data if k.startswith('cf') and len(k) > 2 and k[2:].isdigit()], key=lambda x: int(x[2:])) if single_cf_per_row: if 'original' not in scene_data or len(cf_keys) < 1: continue original_id = scene_data['original'] for cf_key in cf_keys: cf_id = scene_data[cf_key] original_scene_file = find_scene_file(scenes_dir, original_id) cf_scene_file = find_scene_file(scenes_dir, cf_id) if not original_scene_file or not cf_scene_file: continue try: original_scene = load_scene(original_scene_file) cf_scene = load_scene(cf_scene_file) cf_type = get_cf_type_from_scene(cf_scene) or '' cf_description = get_cf_description_from_scene(cf_scene) or '' except Exception: continue if generate_questions: appended = False for cf_retry in range(MAX_CF_ANSWER_RETRIES): try: original_question, params = generate_question_for_scene(original_scene_file, retry_index=cf_retry) original_ans_orig = answer_question_for_scene(original_question, original_scene) cf_question, cf_params = generate_question_for_counterfactual( cf_type, original_scene, cf_scene, retry_index=cf_retry, original_question=original_question, original_params=params ) if cf_question is None or cf_params is None: continue # Matte/shin can yield "unknown" when the object is removed; only allow for attribute-swap CFs. if "matte or shiny" in (original_question or "").lower() and cf_type in ("add_object", "remove_object"): continue original_ans_cf_q = answer_question_for_scene(cf_question, original_scene) cf_ans_orig_q = answer_question_for_scene(original_question, cf_scene) cf_ans_cf_q = answer_question_for_scene(cf_question, cf_scene) orig_diff = calculate_question_difficulty(original_question, params) cf_diff = calculate_question_difficulty(cf_question, cf_params) except Exception: continue # Answers must change between original and counterfactual images for both questions. if normalize_answer(original_ans_orig) == normalize_answer(cf_ans_orig_q): continue if strict_question_validation: validity = classify_question_validity(cf_question, original_scene, cf_scene) required = 'Semantic-Valid' if (cf_type and cf_type in IMAGE_CF_TYPES) else 'Negative-Valid' if validity != required: continue else: if normalize_answer(original_ans_cf_q) == normalize_answer(cf_ans_cf_q): continue if with_links: def _link(fn, ft='image'): return f"{base_url.rstrip('/')}/{ft}s/{fn}" if base_url else f"{ft}s/{fn}" rows.append([ scene_num, _link(original_id, 'image'), _link(original_id.replace('.png', '.json'), 'scene'), _link(cf_id, 'image'), _link(cf_id.replace('.png', '.json'), 'scene'), cf_type, cf_description, original_question, cf_question, orig_diff, cf_diff, original_ans_orig, original_ans_cf_q, cf_ans_orig_q, cf_ans_cf_q ]) else: rows.append([ original_id, cf_id, cf_type, cf_description, original_question, cf_question, orig_diff, cf_diff, original_ans_orig, original_ans_cf_q, cf_ans_orig_q, cf_ans_cf_q ]) appended = True break if not appended and generate_questions: pass # skip this (original, CF) pair after MAX_CF_ANSWER_RETRIES else: if with_links: def _link(fn, ft='image'): return f"{base_url.rstrip('/')}/{ft}s/{fn}" if base_url else f"{ft}s/{fn}" rows.append([ scene_num, _link(original_id, 'image'), _link(original_id.replace('.png', '.json'), 'scene'), _link(cf_id, 'image'), _link(cf_id.replace('.png', '.json'), 'scene'), cf_type, cf_description ]) else: rows.append([original_id, cf_id, cf_type, cf_description]) continue if 'original' not in scene_data or 'cf1' not in scene_data or 'cf2' not in scene_data: print(f"WARNING: Scene {scene_num} missing images") continue original_id = scene_data['original'] cf1_id = scene_data['cf1'] cf2_id = scene_data['cf2'] if generate_questions: original_scene_file = find_scene_file(scenes_dir, original_id) cf1_scene_file = find_scene_file(scenes_dir, cf1_id) cf2_scene_file = find_scene_file(scenes_dir, cf2_id) if not all([original_scene_file, cf1_scene_file, cf2_scene_file]): print(f"WARNING: Scene {scene_num} missing scene files") continue try: original_scene = load_scene(original_scene_file) cf1_scene = load_scene(cf1_scene_file) cf2_scene = load_scene(cf2_scene_file) except Exception as e: import traceback traceback.print_exc() continue try: original_question, params = generate_question_for_scene(original_scene_file) original_ans_orig_q = answer_question_for_scene(original_question, original_scene) cf1_type = get_cf_type_from_scene(cf1_scene) cf2_type = get_cf_type_from_scene(cf2_scene) cf1_description = get_cf_description_from_scene(cf1_scene) cf2_description = get_cf_description_from_scene(cf2_scene) except Exception as e: import traceback traceback.print_exc() continue cf1_question = cf2_question = None cf1_params = cf2_params = {} original_difficulty = cf1_difficulty = cf2_difficulty = None original_ans_cf1_q = original_ans_cf2_q = None cf1_ans_orig_q = cf1_ans_cf1_q = cf1_ans_cf2_q = None cf2_ans_orig_q = cf2_ans_cf1_q = cf2_ans_cf2_q = None orig_norm = normalize_answer(original_ans_orig_q) for cf_retry in range(MAX_CF_ANSWER_RETRIES): try: random.seed(hash((scene_num, idx, cf_retry))) cf_questions = create_counterfactual_questions(original_question, params, original_scene) if (not cf1_type or not cf2_type) else None if cf1_type: cf1_question, cf1_params = generate_question_for_counterfactual( cf1_type, original_scene, cf1_scene, retry_index=cf_retry, original_question=original_question, original_params=params ) if cf1_question is None or cf1_params is None: continue else: cf1_question, cf1_params = cf_questions[0] if cf_questions and len(cf_questions) > 0 else ("How many objects are in the scene?", {}) if cf2_type: cf2_question, cf2_params = generate_question_for_counterfactual( cf2_type, original_scene, cf2_scene, retry_index=cf_retry, original_question=original_question, original_params=params ) if cf2_question is None or cf2_params is None: continue else: cf2_question, cf2_params = cf_questions[1] if cf_questions and len(cf_questions) > 1 else (cf_questions[0] if cf_questions else ("How many objects are in the scene?", {})) # Matte/shin can yield "unknown" when the object is removed; only allow for attribute-swap CFs. if "matte or shiny" in (original_question or "").lower() and (cf1_type in ("add_object", "remove_object") or cf2_type in ("add_object", "remove_object")): continue except Exception as e: import traceback traceback.print_exc() continue try: original_difficulty = calculate_question_difficulty(original_question, params) cf1_difficulty = calculate_question_difficulty(cf1_question, cf1_params) cf2_difficulty = calculate_question_difficulty(cf2_question, cf2_params) except Exception as e: import traceback traceback.print_exc() continue try: original_ans_cf1_q = answer_question_for_scene(cf1_question, original_scene) original_ans_cf2_q = answer_question_for_scene(cf2_question, original_scene) cf1_ans_orig_q = answer_question_for_scene(original_question, cf1_scene) cf1_ans_cf1_q = answer_question_for_scene(cf1_question, cf1_scene) cf1_ans_cf2_q = answer_question_for_scene(cf2_question, cf1_scene) cf2_ans_orig_q = answer_question_for_scene(original_question, cf2_scene) cf2_ans_cf1_q = answer_question_for_scene(cf1_question, cf2_scene) cf2_ans_cf2_q = answer_question_for_scene(cf2_question, cf2_scene) except Exception as e: import traceback traceback.print_exc() continue # Original question answer must change between original and each counterfactual image. orig_n = normalize_answer(original_ans_orig_q) if orig_n == normalize_answer(cf1_ans_orig_q) or orig_n == normalize_answer(cf2_ans_orig_q): continue if strict_question_validation: cf1_validity = classify_question_validity(cf1_question, original_scene, cf1_scene) cf2_validity = classify_question_validity(cf2_question, original_scene, cf2_scene) cf1_required = 'Semantic-Valid' if (cf1_type and cf1_type in IMAGE_CF_TYPES) else 'Negative-Valid' cf2_required = 'Semantic-Valid' if (cf2_type and cf2_type in IMAGE_CF_TYPES) else 'Negative-Valid' cf1_ok = (cf1_required == cf1_validity) cf2_ok = (cf2_required == cf2_validity) if cf1_ok and cf2_ok: break else: # change_position, swap_attribute, relational_flip now use strict spatial/compositional routing and must pass Semantic-Valid. cf1_differs = (cf1_type not in IMAGE_CF_TYPES) or (normalize_answer(original_ans_cf1_q) != normalize_answer(cf1_ans_cf1_q)) cf2_differs = (cf2_type not in IMAGE_CF_TYPES) or (normalize_answer(original_ans_cf2_q) != normalize_answer(cf2_ans_cf2_q)) if cf1_differs or cf2_differs: break else: print(f"WARNING: Scene {scene_num}: could not find questions with different answers for both CFs after {MAX_CF_ANSWER_RETRIES} retries (scene included with best-effort questions)") try: if with_links: def make_link(filename, file_type='image'): if base_url: return f"{base_url.rstrip('/')}/{file_type}s/{filename}" else: return f"{file_type}s/{filename}" original_image_link = make_link(original_id, 'image') original_scene_link = make_link(original_id.replace('.png', '.json'), 'scene') cf1_image_link = make_link(cf1_id, 'image') cf1_scene_link = make_link(cf1_id.replace('.png', '.json'), 'scene') cf2_image_link = make_link(cf2_id, 'image') cf2_scene_link = make_link(cf2_id.replace('.png', '.json'), 'scene') rows.append([ scene_num, original_image_link, original_scene_link, cf1_image_link, cf1_scene_link, cf2_image_link, cf2_scene_link, cf1_type, cf2_type, cf1_description, cf2_description, original_question, cf1_question, cf2_question, original_difficulty, cf1_difficulty, cf2_difficulty, original_ans_orig_q, original_ans_cf1_q, original_ans_cf2_q, cf1_ans_orig_q, cf1_ans_cf1_q, cf1_ans_cf2_q, cf2_ans_orig_q, cf2_ans_cf1_q, cf2_ans_cf2_q ]) else: rows.append([ original_id, cf1_id, cf2_id, cf1_type, cf2_type, cf1_description, cf2_description, original_question, cf1_question, cf2_question, original_difficulty, cf1_difficulty, cf2_difficulty, original_ans_orig_q, original_ans_cf1_q, original_ans_cf2_q, cf1_ans_orig_q, cf1_ans_cf1_q, cf1_ans_cf2_q, cf2_ans_orig_q, cf2_ans_cf1_q, cf2_ans_cf2_q ]) except Exception as e: import traceback traceback.print_exc() continue else: cf1_type = cf2_type = cf1_description = cf2_description = '' cf1_scene_file = find_scene_file(scenes_dir, cf1_id) cf2_scene_file = find_scene_file(scenes_dir, cf2_id) if cf1_scene_file and cf2_scene_file: try: cf1_scene = load_scene(cf1_scene_file) cf2_scene = load_scene(cf2_scene_file) cf1_type = get_cf_type_from_scene(cf1_scene) or '' cf2_type = get_cf_type_from_scene(cf2_scene) or '' cf1_description = get_cf_description_from_scene(cf1_scene) or '' cf2_description = get_cf_description_from_scene(cf2_scene) or '' except Exception: pass if with_links: def make_link(filename, file_type='image'): if base_url: return f"{base_url.rstrip('/')}/{file_type}s/{filename}" else: return f"{file_type}s/{filename}" original_image_link = make_link(original_id, 'image') original_scene_link = make_link(original_id.replace('.png', '.json'), 'scene') cf1_image_link = make_link(cf1_id, 'image') cf1_scene_link = make_link(cf1_id.replace('.png', '.json'), 'scene') cf2_image_link = make_link(cf2_id, 'image') cf2_scene_link = make_link(cf2_id.replace('.png', '.json'), 'scene') rows.append([ scene_num, original_image_link, original_scene_link, cf1_image_link, cf1_scene_link, cf2_image_link, cf2_scene_link, cf1_type, cf2_type, cf1_description, cf2_description ]) else: rows.append([original_id, cf1_id, cf2_id, cf1_type, cf2_type, cf1_description, cf2_description]) csv_path = os.path.join(run_dir, csv_filename) try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f, quoting=csv.QUOTE_ALL) writer.writerows(rows) except Exception as e: import traceback traceback.print_exc() return print(f"\n[OK] Generated mapping CSV: {csv_path}") print(f" Total rows: {len(rows) - 1}") print("\nSample entry:") if len(rows) > 1: row = rows[1] if single_cf_per_row: if generate_questions and len(row) >= 12: print(f" Images: Original: {row[0]}, Counterfactual: {row[1]}") print(f" CF type / description: {row[2]}, {row[3]!r}") print(f" Questions: Original: {row[4]}, CF: {row[5]}") print(f" Answers: orig→orig_q: {row[8]}, orig→cf_q: {row[9]}, cf→orig_q: {row[10]}, cf→cf_q: {row[11]}") elif len(row) >= 4: print(f" Images: Original: {row[0]}, Counterfactual: {row[1]}") print(f" CF type / description: {row[2]}, {row[3]!r}") elif with_links: if generate_questions: print(f" Scene ID: {row[0]}") print(f" Links:") print(f" Original image: {row[1]}, scene: {row[2]}") print(f" CF1 image: {row[3]}, scene: {row[4]}") print(f" CF2 image: {row[5]}, scene: {row[6]}") print(f" CF type / description: CF1 type={row[7]}, CF2 type={row[8]}; CF1 desc={row[9]!r}, CF2 desc={row[10]!r}") print(f" Questions: Original: {row[11]}, CF1: {row[12]}, CF2: {row[13]}") else: print(f" Scene ID: {row[0]}") print(f" Links:") print(f" Original image: {row[1]}, scene: {row[2]}") print(f" CF1 image: {row[3]}, scene: {row[4]}") print(f" CF2 image: {row[5]}, scene: {row[6]}") print(f" CF type / description: CF1 type={row[7]}, CF2 type={row[8]}; CF1 desc={row[9]!r}, CF2 desc={row[10]!r}") elif generate_questions and len(row) > 14: print(f" Images: Original: {row[0]}, CF1: {row[1]}, CF2: {row[2]}") print(f" CF type / description: CF1 type={row[3]}, CF2 type={row[4]}; CF1 desc={row[5]!r}, CF2 desc={row[6]!r}") print(f" Questions: Original: {row[7]}, CF1: {row[8]}, CF2: {row[9]}") print(f" Answer Matrix (scene × question):") print(f" Original image -> Orig Q: {row[10]}, CF1 Q: {row[11]}, CF2 Q: {row[12]}") print(f" CF1 image -> Orig Q: {row[13]}, CF1 Q: {row[14]}, CF2 Q: {row[15]}") print(f" CF2 image -> Orig Q: {row[16]}, CF1 Q: {row[17]}, CF2 Q: {row[18]}") elif len(row) >= 7: print(f" Images: Original: {row[0]}, CF1: {row[1]}, CF2: {row[2]}") print(f" CF type / description: CF1 type={row[3]}, CF2 type={row[4]}; CF1 desc={row[5]!r}, CF2 desc={row[6]!r}") def main(): parser = argparse.ArgumentParser( description='Generate CSV with original and VARIED counterfactual questions applied to all scenes' ) parser.add_argument('--output_dir', default='output', help='Run directory or base output directory (default: output)') parser.add_argument('--auto_latest', action='store_true', help='Automatically find and use the latest run in output_dir') parser.add_argument('--csv_name', default='image_mapping_with_questions.csv', help='Output CSV filename') parser.add_argument('--generate_questions', action='store_true', help='Generate questions and answers for each scene set') parser.add_argument('--no_strict_validation', action='store_true', help='Disable strict question validation (Semantic-Valid / Negative-Valid classifier); use legacy accept logic') parser.add_argument('--single_cf_per_row', action='store_true', help='Emit one row per (original, single counterfactual) instead of one row per (original, cf1, cf2). CSV columns: original_image, counterfactual_image, counterfactual_type, counterfactual_description [, + Q&A if --generate_questions].') args = parser.parse_args() if args.auto_latest: run_dir = find_latest_run(args.output_dir) if run_dir is None: print(f"ERROR: Could not find any run directories in {args.output_dir}") return else: if os.path.exists(os.path.join(args.output_dir, 'images')) and \ os.path.exists(os.path.join(args.output_dir, 'scenes')): run_dir = args.output_dir else: run_dir = find_latest_run(args.output_dir) if run_dir is None: print(f"ERROR: {args.output_dir} does not contain images/scenes directories") print(f" and no run directories found in {args.output_dir}") return print(f"Auto-detected run directory: {run_dir}") generate_mapping_with_questions( run_dir, args.csv_name, args.generate_questions, strict_question_validation=not args.no_strict_validation, single_cf_per_row=getattr(args, 'single_cf_per_row', False) ) if __name__ == '__main__': main()