| | import os |
| | import argparse |
| | import csv |
| | import json |
| | import random |
| | import re |
| | from pathlib import Path |
| |
|
| | def find_latest_run(base_output_dir): |
| | if not os.path.exists(base_output_dir): |
| | return None |
| | |
| | subdirs = [d for d in os.listdir(base_output_dir) |
| | if os.path.isdir(os.path.join(base_output_dir, d))] |
| | |
| | if not subdirs: |
| | return None |
| | |
| | timestamped = [d for d in subdirs if re.match(r'^\d{8}_\d{6}$', d)] |
| | if timestamped: |
| | latest = sorted(timestamped)[-1] |
| | return os.path.join(base_output_dir, latest) |
| | |
| | dirs_with_time = [(d, os.path.getmtime(os.path.join(base_output_dir, d))) |
| | for d in subdirs] |
| | latest = max(dirs_with_time, key=lambda x: x[1])[0] |
| | return os.path.join(base_output_dir, latest) |
| |
|
| | def find_scene_file(scenes_dir, image_filename): |
| | base_name = os.path.splitext(image_filename)[0] |
| | scene_file = os.path.join(scenes_dir, base_name + '.json') |
| | |
| | if os.path.exists(scene_file): |
| | return scene_file |
| | return None |
| |
|
| | def load_scene(scene_file): |
| | with open(scene_file, 'r') as f: |
| | return json.load(f) |
| |
|
| | RELATION_KEYS_TO_PHRASES = {'left': 'left of', 'right': 'right of', 'front': 'in front of', 'behind': 'behind'} |
| | PHRASES_TO_RELATION_KEYS = {'left of': 'left', 'right of': 'right', 'in front of': 'front', 'behind': 'behind'} |
| | DEFAULT_RELATIONS = ['left of', 'right of', 'in front of', 'behind'] |
| |
|
| | |
| | _COLORS = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] |
| | _SHAPES = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] |
| | _MATERIALS = ['metal', 'rubber', 'metals', 'rubbers'] |
| | _SIZES = ['small', 'large'] |
| |
|
| |
|
| | def _find_objects_matching(objects, color=None, shape=None, material=None, size=None): |
| | """Return list of object indices that match all specified attributes (None means any).""" |
| | out = [] |
| | for i, obj in enumerate(objects): |
| | if color is not None and (obj.get('color') or '').lower() != color: |
| | continue |
| | if shape is not None: |
| | s = (obj.get('shape') or '').lower() |
| | if s != shape and s != shape.rstrip('s') and s + 's' != shape: |
| | continue |
| | if material is not None and (obj.get('material') or '').lower() != material: |
| | continue |
| | if size is not None and (obj.get('size') or '').lower() != size: |
| | continue |
| | out.append(i) |
| | return out |
| |
|
| |
|
| | def _first_value_in_question(question_lower, values, strip_s=True): |
| | for v in values: |
| | if v in question_lower: |
| | return v.rstrip('s') if strip_s and v.endswith('s') else v |
| | return None |
| |
|
| |
|
| | def _objects_in_relation_to_reference(scene, relation_phrase, ref_color=None, ref_shape=None, ref_material=None, ref_size=None): |
| | """Return set of object indices that stand in relation_phrase to the reference object (e.g. 'left of' the red cube).""" |
| | objects = scene.get('objects', []) |
| | rel_key = PHRASES_TO_RELATION_KEYS.get(relation_phrase) |
| | if not rel_key: |
| | return set() |
| | rels = scene.get('relationships') or {} |
| | rel_list = rels.get(rel_key) |
| | if not rel_list or len(rel_list) != len(objects): |
| | return set() |
| | ref_indices = _find_objects_matching( |
| | objects, color=ref_color, shape=ref_shape, material=ref_material, size=ref_size |
| | ) |
| | if not ref_indices: |
| | return set() |
| | ref_idx = ref_indices[0] |
| | return set(rel_list[ref_idx]) |
| |
|
| |
|
| | def get_scene_properties(scene): |
| | objects = scene.get('objects', []) |
| | if not objects: |
| | return { |
| | 'colors': ['red', 'blue', 'green'], |
| | 'shapes': ['cube', 'sphere', 'cylinder'], |
| | 'materials': ['metal', 'rubber'], |
| | 'sizes': ['small', 'large'], |
| | 'relations': DEFAULT_RELATIONS |
| | } |
| | |
| | colors = list(set(obj.get('color') for obj in objects if obj.get('color'))) |
| | shapes = list(set(obj.get('shape') for obj in objects if obj.get('shape'))) |
| | materials = list(set(obj.get('material') for obj in objects if obj.get('material'))) |
| | sizes = list(set(obj.get('size') for obj in objects if obj.get('size'))) |
| | relationships = scene.get('relationships') or {} |
| | relations = [RELATION_KEYS_TO_PHRASES[k] for k in relationships if k in RELATION_KEYS_TO_PHRASES] |
| | if not relations: |
| | relations = DEFAULT_RELATIONS |
| | |
| | all_colors = ['gray', 'red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow'] |
| | all_shapes = ['cube', 'sphere', 'cylinder'] |
| | all_materials = ['metal', 'rubber'] |
| | all_sizes = ['small', 'large'] |
| | |
| | return { |
| | 'colors': colors if colors else all_colors, |
| | 'shapes': shapes if shapes else all_shapes, |
| | 'materials': materials if materials else all_materials, |
| | 'sizes': sizes if sizes else all_sizes, |
| | 'relations': relations, |
| | 'all_colors': all_colors, |
| | 'all_shapes': all_shapes, |
| | 'all_materials': all_materials, |
| | 'all_sizes': all_sizes |
| | } |
| |
|
| | IMAGE_CF_TYPES = { |
| | 'change_color', 'change_shape', 'change_size', 'change_material', |
| | 'change_position', 'add_object', 'remove_object', 'replace_object', |
| | 'swap_attribute', 'relational_flip' |
| | } |
| | NEGATIVE_CF_TYPES = { |
| | 'change_background', 'change_lighting', 'add_noise', |
| | 'apply_fisheye', 'apply_blur', 'apply_vignette', 'apply_chromatic_aberration', |
| | 'occlusion_change' |
| | } |
| |
|
| | MAX_CF_ANSWER_RETRIES = 150 |
| |
|
| | def get_cf_type_from_scene(scene): |
| | meta = scene.get('cf_metadata') or {} |
| | if not meta.get('is_counterfactual'): |
| | return None |
| | return meta.get('cf_type') |
| |
|
| | def get_cf_description_from_scene(scene): |
| | meta = scene.get('cf_metadata') or {} |
| | if not meta.get('is_counterfactual'): |
| | return None |
| | return meta.get('cf_description') |
| |
|
| | def get_change_details(original_scene, cf_scene): |
| | orig_objs = original_scene.get('objects', []) |
| | cf_objs = cf_scene.get('objects', []) |
| | if len(orig_objs) != len(cf_objs): |
| | return {'attribute': 'count', 'orig_count': len(orig_objs), 'cf_count': len(cf_objs)} |
| | attrs = ['color', 'shape', 'material', 'size'] |
| | for i, (o, c) in enumerate(zip(orig_objs, cf_objs)): |
| | for attr in attrs: |
| | ov = (o.get(attr) or '').lower().strip() |
| | cv = (c.get(attr) or '').lower().strip() |
| | if ov != cv: |
| | return {'attribute': attr, 'orig_val': ov or 'unknown', 'cf_val': cv or 'unknown', 'object_index': i} |
| | return None |
| |
|
| | CF_COUNT_QUESTION_TEMPLATES = [ |
| | "How many objects are in the scene?", |
| | "What is the total number of objects in the scene?", |
| | ] |
| | CF_COLOR_QUESTION_TEMPLATES = [ |
| | ("How many {val} objects are there?", 'color'), |
| | ("Are there any {val} objects?", 'color'), |
| | ("What is the total number of {val} objects?", 'color'), |
| | ] |
| | CF_SHAPE_QUESTION_TEMPLATES = [ |
| | ("How many {val} are there?", 'shape'), |
| | ("Are there any {val}?", 'shape'), |
| | ("What is the total number of {val}?", 'shape'), |
| | ] |
| | CF_MATERIAL_QUESTION_TEMPLATES = [ |
| | ("How many {val} objects are there?", 'material'), |
| | ("Are there any {val} objects?", 'material'), |
| | ("What is the total number of {val} objects?", 'material'), |
| | ] |
| | CF_SIZE_QUESTION_TEMPLATES = [ |
| | ("How many {val} objects are there?", 'size'), |
| | ("Are there any {val} objects?", 'size'), |
| | ("What is the total number of {val} objects?", 'size'), |
| | ] |
| |
|
| |
|
| | def _pluralize_shape(shape): |
| | if not shape: |
| | return shape |
| | s = shape.strip().lower() |
| | if s.endswith('s'): |
| | return s |
| | return s + 's' |
| |
|
| |
|
| | def _count_by_attribute(objects, attr): |
| | counts = {} |
| | for obj in objects: |
| | val = (obj.get(attr) or '').lower().strip() |
| | if val: |
| | counts[val] = counts.get(val, 0) + 1 |
| | return counts |
| |
|
| |
|
| | def _get_attributes_with_different_counts(original_scene, cf_scene): |
| | orig_objs = original_scene.get('objects', []) |
| | cf_objs = cf_scene.get('objects', []) |
| | differing = [] |
| | for attr in ['color', 'shape', 'material', 'size']: |
| | orig_counts = _count_by_attribute(orig_objs, attr) |
| | cf_counts = _count_by_attribute(cf_objs, attr) |
| | all_vals = set(orig_counts) | set(cf_counts) |
| | for val in all_vals: |
| | o = orig_counts.get(val, 0) |
| | c = cf_counts.get(val, 0) |
| | if o != c: |
| | differing.append((attr, val, o, c)) |
| | return differing |
| |
|
| |
|
| | def generate_question_for_counterfactual(cf_type, original_scene, cf_scene, retry_index=0, original_question=None, original_params=None): |
| | """ |
| | Generate a counterfactual question. If original_question and original_params are provided, |
| | uses strict targeting: (1) try original question, (2) try mutated questions that target |
| | the change, (3) return (None, None) to signal rejection/retry if no question yields an answer change. |
| | """ |
| | |
| | if original_question is not None and original_params is not None: |
| | a_orig = answer_question_for_scene(original_question, original_scene) |
| | a_cf = answer_question_for_scene(original_question, cf_scene) |
| | a_orig_n = normalize_answer(a_orig) |
| | a_cf_n = normalize_answer(a_cf) |
| | if a_orig_n != a_cf_n: |
| | return (original_question, original_params) |
| | for mut_q, mut_params in create_counterfactual_questions(original_question, original_params, original_scene): |
| | a_mut_cf = answer_question_for_scene(mut_q, cf_scene) |
| | if normalize_answer(a_mut_cf) != a_orig_n: |
| | return (mut_q, mut_params) |
| | return (None, None) |
| |
|
| | random.seed(hash((str(cf_type), retry_index, str(id(original_scene)), str(id(cf_scene))))) |
| | change = get_change_details(original_scene, cf_scene) |
| | orig_objs = original_scene.get('objects', []) |
| | cf_objs = cf_scene.get('objects', []) |
| | props_orig = get_scene_properties(original_scene) |
| | props_cf = get_scene_properties(cf_scene) |
| |
|
| | def _pick_spatial_question(props): |
| | """Strict spatial/relational templates only; never simple attribute count.""" |
| | relations = props.get('relations') or DEFAULT_RELATIONS |
| | colors = list(props.get('colors') or props.get('all_colors') or ['red', 'blue', 'green']) |
| | shapes = list(props.get('shapes') or props.get('all_shapes') or ['cube', 'sphere', 'cylinder']) |
| | materials = list(props.get('materials') or props.get('all_materials') or ['metal', 'rubber']) |
| | sizes = list(props.get('sizes') or props.get('all_sizes') or ['small', 'large']) |
| | templates = [ |
| | ("What color is the object {relation} the {color} {shape}?", { |
| | 'relation': random.choice(relations), 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("What shape is the object {relation} the {material} object?", { |
| | 'relation': random.choice(relations), 'material': random.choice(materials) |
| | }), |
| | ("How many objects are {relation} the {color} {shape}?", { |
| | 'relation': random.choice(relations), 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("How many {material} objects are {relation} the {shape}?", { |
| | 'material': random.choice(materials), 'relation': random.choice(relations), 'shape': random.choice(shapes) |
| | }), |
| | ("Is there a {color} object {relation} the {shape}?", { |
| | 'color': random.choice(colors), 'relation': random.choice(relations), 'shape': random.choice(shapes) |
| | }), |
| | ("What is the total number of {size} objects {relation} the {color} object?", { |
| | 'size': random.choice(sizes), 'relation': random.choice(relations), 'color': random.choice(colors) |
| | }), |
| | ("What is the total number of {material} objects {relation} the {color} {shape}?", { |
| | 'material': random.choice(materials), 'relation': random.choice(relations), |
| | 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("Is there a {size} {material} object {relation} the {shape}?", { |
| | 'size': random.choice(sizes), 'material': random.choice(materials), |
| | 'relation': random.choice(relations), 'shape': random.choice(shapes) |
| | }), |
| | ] |
| | template, params = random.choice(templates) |
| | return template.format(**params), params |
| |
|
| | def _pick_compositional_question(props): |
| | """Strict compositional (≥2 attributes) templates only; never single-attribute count.""" |
| | colors = list(props.get('colors') or props.get('all_colors') or ['red', 'blue', 'green']) |
| | shapes = list(props.get('shapes') or props.get('all_shapes') or ['cube', 'sphere', 'cylinder']) |
| | materials = list(props.get('materials') or props.get('all_materials') or ['metal', 'rubber']) |
| | sizes = list(props.get('sizes') or props.get('all_sizes') or ['small', 'large']) |
| | templates = [ |
| | ("How many {color} {shape}s are there?", { |
| | 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("Are there any {color} {shape}s?", { |
| | 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("Is there a {color} {shape}?", { |
| | 'color': random.choice(colors), 'shape': random.choice(shapes) |
| | }), |
| | ("Is there a {material} {shape}?", { |
| | 'material': random.choice(materials), 'shape': random.choice(shapes) |
| | }), |
| | ("How many {size} {color} objects are there?", { |
| | 'size': random.choice(sizes), 'color': random.choice(colors) |
| | }), |
| | ("What is the total number of {color} {material} objects?", { |
| | 'color': random.choice(colors), 'material': random.choice(materials) |
| | }), |
| | ("Are there any {material} {shape}s?", { |
| | 'material': random.choice(materials), 'shape': random.choice(shapes) |
| | }), |
| | ("How many {size} {shape}s are there?", { |
| | 'size': random.choice(sizes), 'shape': random.choice(shapes) |
| | }), |
| | ] |
| | template, params = random.choice(templates) |
| | return template.format(**params), params |
| |
|
| | |
| | if cf_type == 'change_position': |
| | props = props_cf if (props_cf.get('relations') or props_cf.get('colors') or props_cf.get('shapes')) else props_orig |
| | question, params = _pick_spatial_question(props) |
| | return question, params |
| |
|
| | |
| | if cf_type == 'relational_flip': |
| | props = props_cf if (props_cf.get('relations') or props_cf.get('colors') or props_cf.get('shapes')) else props_orig |
| | question, params = _pick_spatial_question(props) |
| | return question, params |
| |
|
| | |
| | if cf_type == 'swap_attribute': |
| | props = props_cf if (props_cf.get('colors') or props_cf.get('shapes')) else props_orig |
| | question, params = _pick_compositional_question(props) |
| | return question, params |
| |
|
| | if cf_type and cf_type in IMAGE_CF_TYPES: |
| | differing = _get_attributes_with_different_counts(original_scene, cf_scene) |
| | if differing: |
| | idx = retry_index % len(differing) if differing else 0 |
| | attr, val, orig_count, cf_count = differing[idx] |
| | if attr == 'color': |
| | template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | elif attr == 'shape': |
| | plural = _pluralize_shape(val) |
| | template, _ = random.choice(CF_SHAPE_QUESTION_TEMPLATES) |
| | question = template.format(val=plural) |
| | elif attr == 'material': |
| | template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | elif attr == 'size': |
| | template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | else: |
| | question = None |
| | if question: |
| | return question, {attr: val.rstrip('s') if attr == 'shape' else val} |
| |
|
| | if cf_type and cf_type in NEGATIVE_CF_TYPES: |
| | templates = [ |
| | ("How many objects are in the scene?", {}), |
| | ("How many {color} objects are there?", {'color': random.choice(props_orig['colors'])} if props_orig['colors'] else None), |
| | ("Are there any {shape} objects?", {'shape': random.choice(props_orig['shapes'])} if props_orig['shapes'] else None), |
| | ("How many {material} objects are there?", {'material': random.choice(props_orig['materials'])} if props_orig['materials'] else None), |
| | ("What is the total number of {size} objects?", {'size': random.choice(props_orig['sizes'])} if props_orig['sizes'] else None), |
| | ] |
| | valid = [(t, p) for t, p in templates if p is not None or t.startswith("How many objects are in")] |
| | if not valid: |
| | valid = [("How many objects are in the scene?", {})] |
| | template, params = random.choice(valid) |
| | params = params or {} |
| | question = template.format(**params) if params else template |
| | return question, params |
| |
|
| | if change and change.get('attribute') == 'count': |
| | orig_count = change.get('orig_count', len(orig_objs)) |
| | cf_count = change.get('cf_count', len(cf_objs)) |
| | templates_with_params = [] |
| | templates_with_params.append((random.choice(CF_COUNT_QUESTION_TEMPLATES), {})) |
| | if cf_count > orig_count: |
| | templates_with_params.append((f"Are there more than {orig_count} objects?", {})) |
| | templates_with_params.append((f"Are there at least {cf_count} objects?", {})) |
| | if cf_count < orig_count: |
| | templates_with_params.append((f"Are there fewer than {orig_count} objects?", {})) |
| | templates_with_params.append((f"Are there more than {cf_count} objects?", {})) |
| | template, params = random.choice(templates_with_params) |
| | return template, params |
| |
|
| | if change and change.get('attribute') in ('color', 'shape', 'material', 'size'): |
| | attr = change['attribute'] |
| | cf_val = (change.get('cf_val') or '').strip().lower() |
| | if not cf_val: |
| | cf_val = 'unknown' |
| | params = {attr: cf_val} |
| | if attr == 'color': |
| | template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) |
| | question = template.format(val=cf_val) |
| | elif attr == 'shape': |
| | template, _ = random.choice(CF_SHAPE_QUESTION_TEMPLATES) |
| | plural = _pluralize_shape(cf_val) |
| | question = template.format(val=plural) |
| | params['shape'] = cf_val.rstrip('s') |
| | elif attr == 'material': |
| | template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) |
| | question = template.format(val=cf_val) |
| | elif attr == 'size': |
| | template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) |
| | question = template.format(val=cf_val) |
| | else: |
| | question = "How many objects are in the scene?" |
| | params = {} |
| | return question, params |
| |
|
| | if cf_type in ('add_object', 'remove_object'): |
| | templates = list(CF_COUNT_QUESTION_TEMPLATES) |
| | if len(orig_objs) != len(cf_objs): |
| | if len(cf_objs) > len(orig_objs): |
| | templates.extend([f"Are there more than {len(orig_objs)} objects?", f"Are there at least {len(cf_objs)} objects?"]) |
| | else: |
| | templates.extend([f"Are there fewer than {len(orig_objs)} objects?", f"Are there more than {len(cf_objs)} objects?"]) |
| | template = random.choice(templates) |
| | return template, {} |
| |
|
| | if cf_type in ('change_color', 'change_shape', 'replace_object'): |
| | for attr, key in [('color', 'colors'), ('shape', 'shapes'), ('material', 'materials'), ('size', 'sizes')]: |
| | vals = list(props_cf.get(key) or props_orig.get(key) or []) |
| | if vals: |
| | val = random.choice(vals) |
| | if attr == 'shape': |
| | plural = _pluralize_shape(val) |
| | templates = CF_SHAPE_QUESTION_TEMPLATES |
| | template, _ = random.choice(templates) |
| | question = template.format(val=plural) |
| | elif attr == 'color': |
| | template, _ = random.choice(CF_COLOR_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | elif attr == 'material': |
| | template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | else: |
| | template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | return question, {attr: val.rstrip('s') if attr == 'shape' else val} |
| |
|
| | if cf_type in ('change_size', 'change_material'): |
| | key = 'sizes' if cf_type == 'change_size' else 'materials' |
| | attr = key.rstrip('s') |
| | vals = list(props_cf.get(key) or props_orig.get(key) or []) |
| | if vals: |
| | val = random.choice(vals) |
| | if cf_type == 'change_size': |
| | template, _ = random.choice(CF_SIZE_QUESTION_TEMPLATES) |
| | else: |
| | template, _ = random.choice(CF_MATERIAL_QUESTION_TEMPLATES) |
| | question = template.format(val=val) |
| | return question, {attr: val} |
| |
|
| | |
| | if cf_type in ('change_position', 'relational_flip', 'swap_attribute'): |
| | props = props_cf if (props_cf.get('relations') or props_cf.get('colors')) else props_orig |
| | if cf_type == 'swap_attribute': |
| | question, params = _pick_compositional_question(props) |
| | else: |
| | question, params = _pick_spatial_question(props) |
| | return question, params |
| | question = random.choice(CF_COUNT_QUESTION_TEMPLATES) |
| | return question, {} |
| |
|
| | def generate_question_for_scene(scene_file, retry_index=None): |
| | scene = load_scene(scene_file) |
| | objects = scene.get('objects', []) |
| | |
| | if len(objects) == 0: |
| | return "How many objects are in the scene?", {} |
| | |
| | props = get_scene_properties(scene) |
| | |
| | templates = [ |
| | ("How many objects are in the scene?", {}), |
| | ("How many {color} objects are there?", {'color': random.choice(props['colors'])}), |
| | ("Are there any {shape} objects?", {'shape': random.choice(props['shapes'])}), |
| | ("Are there any {shape}s present?", {'shape': random.choice(props['shapes'])}), |
| | ("Is there a {color} {shape}?", { |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("How many {material} objects are there?", {'material': random.choice(props['materials'])}), |
| | ("What is the total number of {material} objects?", {'material': random.choice(props['materials'])}), |
| | ("What is the total number of metallic objects?", {}), |
| | ("What is the total number of {size} objects?", {'size': random.choice(props['sizes'])}), |
| | ("Is there a {material} {shape}?", { |
| | 'material': random.choice(props['materials']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("How many {size} {color} objects are there?", { |
| | 'size': random.choice(props['sizes']), |
| | 'color': random.choice(props['colors']) |
| | }), |
| | ("Are there any {color} {shape}s?", { |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("What is the total number of {color} {material} objects?", { |
| | 'color': random.choice(props['colors']), |
| | 'material': random.choice(props['materials']) |
| | }), |
| | ("What color is the object {relation} the {color} {shape}?", { |
| | 'relation': random.choice(props['relations']), |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("What shape is the object {relation} the {material} object?", { |
| | 'relation': random.choice(props['relations']), |
| | 'material': random.choice(props['materials']) |
| | }), |
| | ("What material is the {size} object {relation} the {shape}?", { |
| | 'size': random.choice(props['sizes']), |
| | 'relation': random.choice(props['relations']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("How many objects are {relation} the {color} {shape}?", { |
| | 'relation': random.choice(props['relations']), |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("How many {material} objects are {relation} the {shape}?", { |
| | 'material': random.choice(props['materials']), |
| | 'relation': random.choice(props['relations']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("What is the total number of {size} objects {relation} the {color} object?", { |
| | 'size': random.choice(props['sizes']), |
| | 'relation': random.choice(props['relations']), |
| | 'color': random.choice(props['colors']) |
| | }), |
| | ("Is there a {color} object {relation} the {shape}?", { |
| | 'color': random.choice(props['colors']), |
| | 'relation': random.choice(props['relations']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("Are there any {material} {shape}s {relation} the {size} object?", { |
| | 'material': random.choice(props['materials']), |
| | 'shape': random.choice(props['shapes']), |
| | 'relation': random.choice(props['relations']), |
| | 'size': random.choice(props['sizes']) |
| | }), |
| | |
| | ("Is the color of the {shape1} the same as the {shape2}?", { |
| | 'shape1': random.choice(props['shapes']), |
| | 'shape2': random.choice(props['shapes']) |
| | }), |
| | ("Is the material of the {color} object the same as the {size} object?", { |
| | 'color': random.choice(props['colors']), |
| | 'size': random.choice(props['sizes']) |
| | }), |
| | ("Do the {size} object and the {material} object have the same shape?", { |
| | 'size': random.choice(props['sizes']), |
| | 'material': random.choice(props['materials']) |
| | }), |
| | |
| | ("How many objects are either {color} or {shape}?", { |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("Are there any objects that are either {material} or {color}?", { |
| | 'material': random.choice(props['materials']), |
| | 'color': random.choice(props['colors']) |
| | }), |
| | ("What is the total number of objects that are either {size} or {shape}?", { |
| | 'size': random.choice(props['sizes']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | |
| | ("Is the number of {color} objects equal to the number of {shape}s?", { |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("Are there exactly as many {material} objects as {size} objects?", { |
| | 'material': random.choice(props['materials']), |
| | 'size': random.choice(props['sizes']) |
| | }), |
| | ("Does the scene contain an equal number of {color1} objects and {color2} objects?", dict(zip( |
| | ['color1', 'color2'], |
| | random.sample(props['colors'], 2) if len(props['colors']) >= 2 else [props['colors'][0]] * 2 |
| | ))), |
| | |
| | ("What is the total number of {material} objects {relation} the {color} {shape}?", { |
| | 'material': random.choice(props['materials']), |
| | 'relation': random.choice(props['relations']), |
| | 'color': random.choice(props['colors']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ("Is there a {size} {material} object {relation} the {shape}?", { |
| | 'size': random.choice(props['sizes']), |
| | 'material': random.choice(props['materials']), |
| | 'relation': random.choice(props['relations']), |
| | 'shape': random.choice(props['shapes']) |
| | }), |
| | ] |
| | |
| | matte_shiny_objects = [o for o in objects if (o.get('material') or '').lower() in ('metal', 'rubber') and o.get('color') and o.get('shape')] |
| | if matte_shiny_objects: |
| | obj = random.choice(matte_shiny_objects) |
| | templates.append(("Is the {color} {shape} matte or shiny?", {'color': obj.get('color'), 'shape': obj.get('shape')})) |
| | if retry_index is not None: |
| | random.seed(hash((scene_file, retry_index))) |
| | else: |
| | random.seed(hash(scene_file)) |
| | template, params = random.choice(templates) |
| | |
| | question = template.format(**params) if params else template |
| | |
| | return question, params |
| |
|
| | def calculate_question_difficulty(question, params): |
| | num_params = len(params) if params else 0 |
| | |
| | question_lower = question.lower() |
| | |
| | if "matte or shiny" in question_lower or ("or" in question_lower and ("matte" in question_lower or "shiny" in question_lower)): |
| | return "hard" |
| | elif "metallic" in question_lower: |
| | return "medium" |
| | elif "total number" in question_lower and num_params >= 1: |
| | return "hard" if num_params >= 2 else "medium" |
| | elif num_params == 0: |
| | return "easy" |
| | elif num_params == 1: |
| | return "medium" |
| | else: |
| | return "hard" |
| |
|
| | def _apply_param_replacements(question, params, cf_params): |
| | """Replace param values in question with cf_params, from last to first by position, to avoid double-replacing when the same value appears for different placeholders.""" |
| | if not params or not cf_params: |
| | return question |
| | |
| | positions = [] |
| | for k, v in params.items(): |
| | if k not in cf_params or cf_params[k] == v: |
| | continue |
| | pos = question.find(v) |
| | if pos >= 0: |
| | positions.append((pos, k, v, cf_params[k])) |
| | |
| | positions.sort(key=lambda x: -x[0]) |
| | for pos, k, old_val, new_val in positions: |
| | question = question[:pos] + new_val + question[pos + len(old_val):] |
| | return question |
| |
|
| |
|
| | def create_counterfactual_questions(original_question, params, scene): |
| | props = get_scene_properties(scene) |
| | cf_questions = [] |
| | |
| | strategies = ['attribute_swap', 'question_type', 'scope_change', |
| | 'negation', 'comparative', 'multi_attribute', |
| | 'same_different', 'either_or', 'equal_comparison'] |
| | |
| | random.seed(hash(str(scene))) |
| | selected_strategies = random.sample(strategies, 2) |
| | |
| | for strategy in selected_strategies: |
| | cf_q = None |
| | cf_params = {} |
| | max_retries = 5 |
| | retry_count = 0 |
| | |
| | while retry_count < max_retries: |
| | cf_q = None |
| | cf_params = {} |
| | |
| | if strategy == 'attribute_swap' and params: |
| | cf_params = params.copy() |
| | param_to_change = random.choice(list(params.keys())) |
| | current = params.get(param_to_change) |
| |
|
| | def pick_alternative(attr_key, all_vals_getter): |
| | alts = [v for v in all_vals_getter() if v != current] |
| | if alts: |
| | cf_params[param_to_change] = random.choice(alts) |
| | return True |
| | return False |
| |
|
| | if param_to_change in ('color', 'color1', 'color2'): |
| | if not pick_alternative('color', lambda: props['all_colors']): |
| | strategy = 'negation' |
| | continue |
| | elif param_to_change in ('shape', 'shape1', 'shape2'): |
| | if not pick_alternative('shape', lambda: props['all_shapes']): |
| | strategy = 'negation' |
| | continue |
| | elif param_to_change == 'material': |
| | if not pick_alternative('material', lambda: props['all_materials']): |
| | strategy = 'negation' |
| | continue |
| | elif param_to_change == 'size': |
| | if not pick_alternative('size', lambda: props['all_sizes']): |
| | strategy = 'negation' |
| | continue |
| | elif param_to_change == 'relation': |
| | if not pick_alternative('relation', lambda: props['relations']): |
| | strategy = 'negation' |
| | continue |
| | else: |
| | retry_count += 1 |
| | continue |
| |
|
| | cf_q = _apply_param_replacements(original_question, params, cf_params) |
| | |
| | elif strategy == 'question_type': |
| | cf_params = params.copy() if params else {} |
| | if "How many" in original_question and "objects are in the scene" in original_question: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_q = f"How many {color} objects are there?" |
| | cf_params = {'color': color} |
| | elif props['shapes']: |
| | shape = random.choice(props['shapes']) |
| | cf_q = f"Are there any {shape}s?" |
| | cf_params = {'shape': shape} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | elif "How many" in original_question: |
| | cf_q = original_question.replace("How many", "Are there any") |
| | cf_q = cf_q.replace(" are there?", "?") |
| | cf_q = cf_q.replace(" are in the scene?", " in the scene?") |
| | elif "Are there" in original_question or "Is there" in original_question: |
| | if "Are there any" in original_question: |
| | cf_q = original_question.replace("Are there any", "How many") |
| | if not cf_q.endswith(" are there?"): |
| | cf_q = cf_q.replace("?", " are there?") |
| | elif "Is there a" in original_question: |
| | cf_q = original_question.replace("Is there a", "How many") |
| | if not cf_q.endswith(" are there?"): |
| | cf_q = cf_q.replace("?", " are there?") |
| | else: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_q = f"How many {color} objects are there?" |
| | cf_params = {'color': color} |
| | else: |
| | cf_q = "How many objects are in the scene?" |
| | cf_params = {} |
| | elif "What is" in original_question: |
| | cf_q = original_question.replace("What is the total number of", "How many") |
| | else: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_q = f"How many {color} objects are there?" |
| | cf_params = {'color': color} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | |
| | elif strategy == 'scope_change': |
| | if params and len(params) >= 2: |
| | cf_params = params.copy() |
| | key_to_remove = random.choice(list(params.keys())) |
| | del cf_params[key_to_remove] |
| | |
| | if len(cf_params) == 1: |
| | attr_val = list(cf_params.values())[0] |
| | cf_q = f"How many {attr_val} objects are there?" |
| | else: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_q = f"How many {color} objects are there?" |
| | cf_params = {'color': color} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | elif params and len(params) == 1: |
| | new_attr = random.choice(['material', 'size']) |
| | if new_attr not in params: |
| | new_val = random.choice(props[new_attr + 's']) |
| | existing_key = list(params.keys())[0] |
| | existing_val = list(params.values())[0] |
| | cf_params = params.copy() |
| | cf_params[new_attr] = new_val |
| | if new_attr == 'size': |
| | cf_q = f"How many {new_val} {existing_val} objects are there?" |
| | elif new_attr == 'material': |
| | if existing_key == 'size': |
| | cf_q = f"How many {existing_val} {new_val} objects are there?" |
| | else: |
| | cf_q = f"How many {existing_val} {new_val} objects are there?" |
| | else: |
| | strategy = 'negation' |
| | continue |
| | else: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_params = {'color': color} |
| | cf_q = f"How many {color} objects are there?" |
| | elif props['shapes']: |
| | shape = random.choice(props['shapes']) |
| | cf_params = {'shape': shape} |
| | cf_q = f"Are there any {shape}s?" |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | |
| | elif strategy == 'negation': |
| | cf_params = params.copy() if params else {} |
| | if params: |
| | if 'color' in params: |
| | color = params['color'] |
| | cf_q = f"How many objects are NOT {color}?" |
| | elif 'shape' in params: |
| | shape = params['shape'] |
| | cf_q = f"How many objects are NOT {shape}s?" |
| | else: |
| | attr_val = list(params.values())[0] |
| | cf_q = f"How many objects are NOT {attr_val}?" |
| | else: |
| | cf_q = "Are there fewer than 5 objects?" |
| | cf_params = {} |
| | |
| | elif strategy == 'comparative': |
| | cf_params = params.copy() if params else {} |
| | if "How many" in original_question: |
| | number = random.choice([2, 3, 4, 5]) |
| | cf_q = original_question.replace("How many", f"Are there more than {number}") |
| | cf_q = cf_q.replace(" are there?", "?") |
| | cf_q = cf_q.replace(" are in the scene?", " in the scene?") |
| | elif params: |
| | if 'color' in params: |
| | color1 = params['color'] |
| | alternatives = [c for c in props['all_colors'] if c != color1] |
| | if alternatives: |
| | color2 = random.choice(alternatives) |
| | cf_params = {'color': color1, 'color2': color2} |
| | cf_q = f"Are there more {color1} objects than {color2} objects?" |
| | else: |
| | cf_q = f"How many objects are NOT {color1}?" |
| | cf_params = {'color': color1} |
| | elif 'shape' in params: |
| | shape1 = params['shape'] |
| | alternatives = [s for s in props['all_shapes'] if s != shape1] |
| | if alternatives: |
| | shape2 = random.choice(alternatives) |
| | cf_params = {'shape': shape1, 'shape2': shape2} |
| | cf_q = f"Are there more {shape1}s than {shape2}s?" |
| | else: |
| | cf_q = f"How many objects are NOT {shape1}s?" |
| | cf_params = {'shape': shape1} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | |
| | elif strategy == 'multi_attribute': |
| | if params and len(params) >= 2: |
| | cf_params = {} |
| | changed = False |
| | for key in params: |
| | if key == 'color': |
| | alternatives = [c for c in props['all_colors'] if c != params[key]] |
| | if alternatives: |
| | cf_params[key] = random.choice(alternatives) |
| | changed = True |
| | else: |
| | cf_params[key] = params[key] |
| | elif key == 'shape': |
| | alternatives = [s for s in props['all_shapes'] if s != params[key]] |
| | if alternatives: |
| | cf_params[key] = random.choice(alternatives) |
| | changed = True |
| | else: |
| | cf_params[key] = params[key] |
| | elif key == 'material': |
| | alternatives = [m for m in props['all_materials'] if m != params[key]] |
| | if alternatives: |
| | cf_params[key] = random.choice(alternatives) |
| | changed = True |
| | else: |
| | cf_params[key] = params[key] |
| | elif key == 'size': |
| | alternatives = [s for s in props['all_sizes'] if s != params[key]] |
| | if alternatives: |
| | cf_params[key] = random.choice(alternatives) |
| | changed = True |
| | else: |
| | cf_params[key] = params[key] |
| | |
| | if not changed: |
| | strategy = 'negation' |
| | continue |
| | |
| | attr_order = ['size', 'color', 'material', 'shape'] |
| | ordered_values = [] |
| | for attr in attr_order: |
| | if attr in cf_params: |
| | ordered_values.append(cf_params[attr]) |
| | cf_q = f"How many {' '.join(ordered_values)} objects are there?" |
| | else: |
| | color = random.choice(props['colors']) |
| | shape = random.choice(props['shapes']) |
| | cf_params = {'color': color, 'shape': shape} |
| | cf_q = f"Is there a {color} {shape}?" |
| |
|
| | elif strategy == 'same_different': |
| | |
| | q = original_question |
| | q_lower = q.lower() |
| | if "the same as" in q_lower or "same shape" in q_lower or "same color" in q_lower or "same material" in q_lower: |
| | if random.choice([True, False]) and params: |
| | |
| | swap_keys = [k for k in params if k in ('shape1', 'shape2', 'color', 'size', 'material', 'shape')] |
| | if swap_keys: |
| | key = random.choice(swap_keys) |
| | current = params.get(key) |
| | if key in ('shape1', 'shape2', 'shape'): |
| | alts = [s for s in props['all_shapes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | elif key in ('color', 'color1', 'color2'): |
| | alts = [c for c in props['all_colors'] if c != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'material': |
| | alts = [m for m in props['all_materials'] if m != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'size': |
| | alts = [s for s in props['all_sizes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | else: |
| | val = current |
| | if val != current: |
| | cf_params = params.copy() |
| | cf_params[key] = val |
| | cf_q = _apply_param_replacements(q, params, cf_params) |
| | else: |
| | cf_q = None |
| | else: |
| | cf_q = None |
| | else: |
| | |
| | if "the same as" in q_lower: |
| | cf_q = q.replace("the same as", "different from").replace("The same as", "Different from") |
| | elif "same shape" in q_lower: |
| | cf_q = q.replace("same shape", "different shape").replace("same shape", "different shape") |
| | elif "same color" in q_lower: |
| | cf_q = q.replace("same color", "different color") |
| | elif "same material" in q_lower: |
| | cf_q = q.replace("same material", "different material") |
| | else: |
| | cf_q = q.replace("the same as", "different from") |
| | cf_params = params.copy() if params else {} |
| | else: |
| | cf_q = None |
| |
|
| | elif strategy == 'either_or': |
| | |
| | q_lower = original_question.lower() |
| | if "either" in q_lower and " or " in q_lower and params: |
| | if random.choice([True, False]): |
| | |
| | swap_keys = [k for k in params if k in ('color', 'shape', 'material', 'size')] |
| | if swap_keys: |
| | key = random.choice(swap_keys) |
| | current = params.get(key) |
| | if key == 'shape': |
| | alts = [s for s in props['all_shapes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'color': |
| | alts = [c for c in props['all_colors'] if c != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'material': |
| | alts = [m for m in props['all_materials'] if m != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'size': |
| | alts = [s for s in props['all_sizes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | else: |
| | val = current |
| | if val != current: |
| | cf_params = params.copy() |
| | cf_params[key] = val |
| | cf_q = _apply_param_replacements(original_question, params, cf_params) |
| | else: |
| | cf_q = None |
| | else: |
| | cf_q = None |
| | else: |
| | |
| | cf_q = original_question.replace("either", "both").replace(" or ", " and ") |
| | cf_params = params.copy() if params else {} |
| | else: |
| | cf_q = None |
| |
|
| | elif strategy == 'equal_comparison': |
| | |
| | q = original_question |
| | q_lower = q.lower() |
| | if ("equal to" in q_lower or "exactly as many" in q_lower or "equal number" in q_lower) and params: |
| | if random.choice([True, False]): |
| | |
| | swap_keys = [k for k in params if k in ('color', 'color1', 'color2', 'shape', 'material', 'size')] |
| | if swap_keys: |
| | key = random.choice(swap_keys) |
| | current = params.get(key) |
| | if key in ('color', 'color1', 'color2'): |
| | alts = [c for c in props['all_colors'] if c != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'shape': |
| | alts = [s for s in props['all_shapes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'material': |
| | alts = [m for m in props['all_materials'] if m != current] |
| | val = random.choice(alts) if alts else current |
| | elif key == 'size': |
| | alts = [s for s in props['all_sizes'] if s != current] |
| | val = random.choice(alts) if alts else current |
| | else: |
| | val = current |
| | if val != current: |
| | cf_params = params.copy() |
| | cf_params[key] = val |
| | cf_q = _apply_param_replacements(q, params, cf_params) |
| | else: |
| | cf_q = None |
| | else: |
| | cf_q = None |
| | else: |
| | |
| | |
| | if "equal to" in q_lower: |
| | direction = random.choice(["greater than", "fewer than"]) |
| | cf_q = q.replace("equal to", direction).replace("Equal to", direction.capitalize()) |
| | cf_params = params.copy() if params else {} |
| | elif "equal number" in q_lower: |
| | cf_q = q.replace("equal number", random.choice(["greater number", "fewer number"])) |
| | cf_params = params.copy() if params else {} |
| | else: |
| | cf_q = None |
| | cf_params = {} |
| | else: |
| | cf_q = None |
| |
|
| | if cf_q is None: |
| | cf_q = "How many objects are in the scene?" |
| | cf_params = {} |
| | if not cf_params: |
| | cf_params = {} |
| | |
| | if cf_q and cf_q.strip() != original_question.strip(): |
| | break |
| | |
| | retry_count += 1 |
| | if retry_count < max_retries: |
| | available_strategies = [s for s in strategies if s != strategy] |
| | if available_strategies: |
| | strategy = random.choice(available_strategies) |
| | else: |
| | strategy = 'negation' |
| | |
| | if cf_q is None or cf_q.strip() == original_question.strip(): |
| | if params: |
| | if 'color' in params: |
| | cf_q = f"How many objects are NOT {params['color']}?" |
| | elif 'shape' in params: |
| | cf_q = f"How many objects are NOT {params['shape']}s?" |
| | else: |
| | attr_val = list(params.values())[0] |
| | cf_q = f"How many objects are NOT {attr_val}?" |
| | cf_params = params.copy() |
| | else: |
| | if props['colors']: |
| | color = random.choice(props['colors']) |
| | cf_q = f"How many {color} objects are there?" |
| | cf_params = {'color': color} |
| | elif props['shapes']: |
| | shape = random.choice(props['shapes']) |
| | cf_q = f"Are there any {shape}s?" |
| | cf_params = {'shape': shape} |
| | else: |
| | cf_q = "Are there more than 3 objects?" |
| | cf_params = {} |
| | |
| | cf_questions.append((cf_q, cf_params)) |
| | |
| | return cf_questions |
| |
|
| | def normalize_answer(a): |
| | if a is None: |
| | return "" |
| | return str(a).strip().lower() |
| |
|
| |
|
| | def answer_question_for_scene(question, scene): |
| | objects = scene.get('objects', []) |
| | question_lower = question.lower() |
| |
|
| | def _str_answer(val): |
| | if val is None: |
| | return "unknown" |
| | return str(val).strip().lower() |
| |
|
| | |
| | if "the same as" in question_lower or "different from" in question_lower or "same shape" in question_lower or "same color" in question_lower or "same material" in question_lower: |
| | expect_same = "different from" not in question_lower and "different shape" not in question_lower and "different color" not in question_lower and "different material" not in question_lower |
| | shape1 = _first_value_in_question(question_lower, _SHAPES) |
| | shape2 = None |
| | for s in _SHAPES: |
| | if s in question_lower and s != shape1: |
| | shape2 = s.rstrip('s') |
| | break |
| | if shape2 is None and shape1: |
| | shape2 = shape1.rstrip('s') |
| | color1 = _first_value_in_question(question_lower, _COLORS) |
| | size1 = _first_value_in_question(question_lower, _SIZES, strip_s=False) |
| | material1 = _first_value_in_question(question_lower, _MATERIALS) |
| |
|
| | if "have the same shape" in question_lower: |
| | cand_a = _find_objects_matching(objects, size=size1) if size1 else [] |
| | cand_b = _find_objects_matching(objects, material=material1) if material1 else [] |
| | if not cand_a or not cand_b: |
| | return _str_answer("no" if expect_same else "yes") |
| | v1 = (objects[cand_a[0]].get('shape') or '').lower() |
| | v2 = (objects[cand_b[0]].get('shape') or '').lower() |
| | same = (v1 == v2) |
| | return _str_answer("yes" if (same == expect_same) else "no") |
| |
|
| | if "material of" in question_lower: |
| | attr = 'material' |
| | cand1 = _find_objects_matching(objects, color=color1) if color1 else [] |
| | cand2 = _find_objects_matching(objects, size=size1) if size1 else [] |
| | elif "color of" in question_lower or ("same as" in question_lower and shape1): |
| | attr = 'color' |
| | cand1 = _find_objects_matching(objects, shape=shape1.rstrip('s') if shape1 else None) if shape1 else [] |
| | cand2 = _find_objects_matching(objects, shape=shape2) if shape2 else [] |
| | else: |
| | attr = 'color' |
| | cand1 = _find_objects_matching(objects, shape=shape1.rstrip('s') if shape1 else None) if shape1 else _find_objects_matching(objects, color=color1) if color1 else [] |
| | cand2 = _find_objects_matching(objects, shape=shape2) if shape2 else _find_objects_matching(objects, size=size1) if size1 else [] |
| | if not cand1 or not cand2: |
| | return _str_answer("no" if expect_same else "yes") |
| | v1 = (objects[cand1[0]].get(attr) or '').lower() |
| | v2 = (objects[cand2[0]].get(attr) or '').lower() |
| | if not v1 or not v2: |
| | return _str_answer("unknown") |
| | same = (v1 == v2) |
| | return _str_answer("yes" if (same == expect_same) else "no") |
| |
|
| | |
| | if "either" in question_lower and " or " in question_lower: |
| | color_val = _first_value_in_question(question_lower, _COLORS) |
| | shape_val = _first_value_in_question(question_lower, _SHAPES) |
| | material_val = _first_value_in_question(question_lower, _MATERIALS) |
| | size_val = _first_value_in_question(question_lower, _SIZES, strip_s=False) |
| | count = 0 |
| | for obj in objects: |
| | c = (obj.get('color') or '').lower() |
| | s = (obj.get('shape') or '').lower() |
| | m = (obj.get('material') or '').lower() |
| | z = (obj.get('size') or '').lower() |
| | match = False |
| | if color_val and c == color_val.rstrip('s'): |
| | match = True |
| | if shape_val and (s == shape_val.rstrip('s') or s + 's' == shape_val): |
| | match = True |
| | if material_val and m == material_val.rstrip('s'): |
| | match = True |
| | if size_val and z == size_val: |
| | match = True |
| | if match: |
| | count += 1 |
| | if "are there any" in question_lower: |
| | return _str_answer("yes" if count > 0 else "no") |
| | return _str_answer(str(count)) |
| |
|
| | |
| | if "equal to" in question_lower or "exactly as many" in question_lower or "equal number" in question_lower: |
| | def _count_for_value(attr_kind, val): |
| | if not val: |
| | return None |
| | if attr_kind == 'color': |
| | return sum(1 for o in objects if (o.get('color') or '').lower() == val.rstrip('s')) |
| | if attr_kind == 'shape': |
| | return sum(1 for o in objects if (o.get('shape') or '').lower() == val.rstrip('s')) |
| | if attr_kind == 'material': |
| | return sum(1 for o in objects if (o.get('material') or '').lower() == val.rstrip('s')) |
| | if attr_kind == 'size': |
| | return sum(1 for o in objects if (o.get('size') or '').lower() == val) |
| | return None |
| |
|
| | def _parse_one_category(phrase): |
| | for c in _COLORS: |
| | if c in phrase: |
| | return ('color', c.rstrip('s')) |
| | for s in _SHAPES: |
| | if s in phrase: |
| | return ('shape', s.rstrip('s')) |
| | for m in _MATERIALS: |
| | if m in phrase: |
| | return ('material', m.rstrip('s')) |
| | for z in _SIZES: |
| | if z in phrase: |
| | return ('size', z) |
| | return (None, None) |
| |
|
| | if "number of" in question_lower and "objects and" in question_lower: |
| | parts = question_lower.split("and") |
| | if len(parts) >= 2: |
| | kind1, v1 = _parse_one_category(parts[0]) |
| | kind2, v2 = _parse_one_category(parts[1]) |
| | if kind1 and kind2: |
| | n1 = _count_for_value(kind1, v1) |
| | n2 = _count_for_value(kind2, v2) |
| | if n1 is not None and n2 is not None: |
| | return _str_answer("yes" if n1 == n2 else "no") |
| | if "equal to" in question_lower: |
| | left, _, right = question_lower.partition("equal to") |
| | kind1, v1 = _parse_one_category(left) |
| | kind2, v2 = _parse_one_category(right) |
| | if kind1 and kind2: |
| | n1 = _count_for_value(kind1, v1) |
| | n2 = _count_for_value(kind2, v2) |
| | if n1 is not None and n2 is not None: |
| | return _str_answer("yes" if n1 == n2 else "no") |
| | if "exactly as many" in question_lower: |
| | parts = re.split(r'exactly as many\s+', question_lower) |
| | if len(parts) >= 2: |
| | rest = parts[1] |
| | parts2 = re.split(r'\s+as\s+', rest, maxsplit=1) |
| | if len(parts2) >= 2: |
| | kind1, v1 = _parse_one_category(parts2[0]) |
| | kind2, v2 = _parse_one_category(parts2[1]) |
| | if kind1 and kind2: |
| | n1 = _count_for_value(kind1, v1) |
| | n2 = _count_for_value(kind2, v2) |
| | if n1 is not None and n2 is not None: |
| | return _str_answer("yes" if n1 == n2 else "no") |
| | return _str_answer("unknown") |
| |
|
| | |
| | for rel_phrase in PHRASES_TO_RELATION_KEYS: |
| | if rel_phrase in question_lower and ("objects " in question_lower or "object " in question_lower): |
| | before_rel, _, after_rel = question_lower.partition(rel_phrase) |
| | ref_color = _first_value_in_question(after_rel, _COLORS) |
| | ref_shape = _first_value_in_question(after_rel, _SHAPES) |
| | ref_material = _first_value_in_question(after_rel, _MATERIALS) |
| | ref_size = _first_value_in_question(after_rel, _SIZES, strip_s=False) |
| | in_rel = _objects_in_relation_to_reference(scene, rel_phrase, ref_color=ref_color, ref_shape=ref_shape, ref_material=ref_material, ref_size=ref_size) |
| | filter_color = _first_value_in_question(before_rel, _COLORS) |
| | filter_shape = _first_value_in_question(before_rel, _SHAPES) |
| | filter_material = _first_value_in_question(before_rel, _MATERIALS) |
| | filter_size = _first_value_in_question(before_rel, _SIZES, strip_s=False) |
| | filtered = 0 |
| | for idx in in_rel: |
| | if idx >= len(objects): |
| | continue |
| | o = objects[idx] |
| | if filter_color and (o.get('color') or '').lower() != filter_color.rstrip('s'): |
| | continue |
| | if filter_shape and (o.get('shape') or '').lower() != filter_shape.rstrip('s'): |
| | continue |
| | if filter_material and (o.get('material') or '').lower() != filter_material.rstrip('s'): |
| | continue |
| | if filter_size and (o.get('size') or '').lower() != filter_size: |
| | continue |
| | filtered += 1 |
| | if "how many" in question_lower or "total number" in question_lower: |
| | return _str_answer(str(filtered)) |
| | if "is there a" in question_lower or "are there any" in question_lower: |
| | return _str_answer("yes" if filtered > 0 else "no") |
| | break |
| |
|
| | if "at least" in question_lower: |
| | match = re.search(r'at least (\d+)', question_lower) |
| | if match: |
| | threshold = int(match.group(1)) |
| | count = count_matching_objects(question_lower, objects) |
| | return "yes" if count >= threshold else "no" |
| | |
| | if "more than" in question_lower: |
| | match = re.search(r'more than (\d+)', question_lower) |
| | if match: |
| | threshold = int(match.group(1)) |
| | count = count_matching_objects(question_lower, objects) |
| | |
| | return "yes" if count > threshold else "no" |
| | |
| | if "fewer than" in question_lower: |
| | match = re.search(r'fewer than (\d+)', question_lower) |
| | if match: |
| | threshold = int(match.group(1)) |
| | count = count_matching_objects(question_lower, objects) |
| | return "yes" if count < threshold else "no" |
| | |
| | if " not " in question_lower: |
| | count = 0 |
| | colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] |
| | shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] |
| | materials = ['metal', 'rubber', 'metals', 'rubbers'] |
| | sizes = ['small', 'large'] |
| | |
| | excluded_attr = None |
| | excluded_type = None |
| | |
| | for c in colors: |
| | if c in question_lower: |
| | excluded_attr = c.rstrip('s') |
| | excluded_type = 'color' |
| | break |
| | |
| | if excluded_attr is None: |
| | for s in shapes: |
| | if s in question_lower: |
| | excluded_attr = s.rstrip('s') |
| | excluded_type = 'shape' |
| | break |
| | |
| | if excluded_attr is None: |
| | for m in materials: |
| | if m in question_lower: |
| | excluded_attr = m.rstrip('s') |
| | excluded_type = 'material' |
| | break |
| | |
| | if excluded_attr is None: |
| | for s in sizes: |
| | if s in question_lower: |
| | excluded_attr = s |
| | excluded_type = 'size' |
| | break |
| | |
| | if excluded_attr and excluded_type: |
| | for obj in objects: |
| | obj_attr = obj.get(excluded_type, '').lower() |
| | if obj_attr != excluded_attr: |
| | count += 1 |
| | else: |
| | count = len(objects) |
| | |
| | return str(count) |
| | |
| | if " than " in question_lower and " more " in question_lower: |
| | parts = question_lower.split(" than ") |
| | if len(parts) == 2: |
| | first_part = parts[0] |
| | second_part = parts[1].replace('?', '').strip() |
| | |
| | count1 = count_matching_objects(first_part, objects) |
| | count2 = count_matching_objects(second_part, objects) |
| | |
| | return "yes" if count1 > count2 else "no" |
| | |
| | if question_lower.startswith("how many"): |
| | if "objects are in the scene" in question_lower or "total number of objects" in question_lower: |
| | return str(len(objects)) |
| | |
| | count = count_matching_objects(question_lower, objects) |
| | return str(count) |
| | |
| | elif question_lower.startswith("are there") or question_lower.startswith("is there"): |
| | count = count_matching_objects(question_lower, objects) |
| | return "yes" if count > 0 else "no" |
| | |
| | elif question_lower.startswith("what"): |
| | if "colors" in question_lower: |
| | colors = list(set(obj.get('color', '') for obj in objects if obj.get('color'))) |
| | return ", ".join(colors) if colors else "none" |
| | elif "shapes" in question_lower: |
| | shapes = list(set(obj.get('shape', '') for obj in objects if obj.get('shape'))) |
| | return ", ".join(shapes) if shapes else "none" |
| | elif "total number" in question_lower: |
| | count = count_matching_objects(question_lower, objects) |
| | return str(count) |
| | else: |
| | return str(len(objects)) |
| | |
| | if "matte or shiny" in question_lower or ("or" in question_lower and ("matte" in question_lower or "shiny" in question_lower)): |
| | colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] |
| | shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] |
| | color_match = None |
| | shape_match = None |
| | for c in colors: |
| | if c in question_lower: |
| | color_match = c.rstrip('s') |
| | break |
| | for s in shapes: |
| | if s in question_lower: |
| | shape_match = s.rstrip('s') |
| | break |
| | for obj in objects: |
| | obj_color = (obj.get('color') or '').lower() |
| | obj_shape = (obj.get('shape') or '').lower() |
| | obj_material = (obj.get('material') or '').lower() |
| | matches = True |
| | if color_match and obj_color != color_match: |
| | matches = False |
| | if shape_match and obj_shape != shape_match: |
| | matches = False |
| | if matches: |
| | if obj_material == 'metal': |
| | return "shiny" |
| | if obj_material == 'rubber': |
| | return "matte" |
| | |
| | return _str_answer("unknown") |
| | |
| | return _str_answer("unknown") |
| |
|
| | def count_matching_objects(question_lower, objects): |
| | count = 0 |
| | |
| | colors = ['red', 'blue', 'green', 'brown', 'purple', 'cyan', 'yellow', 'gray', 'grey'] |
| | shapes = ['cube', 'sphere', 'cylinder', 'cubes', 'spheres', 'cylinders'] |
| | materials = ['metal', 'rubber', 'metals', 'rubbers'] |
| | sizes = ['small', 'large'] |
| | |
| | color_match = None |
| | for c in colors: |
| | if c in question_lower: |
| | color_match = c.rstrip('s') |
| | break |
| | |
| | shape_match = None |
| | for s in shapes: |
| | if s in question_lower: |
| | shape_match = s.rstrip('s') |
| | break |
| | |
| | material_match = None |
| | if "metallic" in question_lower: |
| | material_match = "metal" |
| | else: |
| | for m in materials: |
| | if m in question_lower: |
| | material_match = m.rstrip('s') |
| | break |
| | |
| | size_match = None |
| | for s in sizes: |
| | if s in question_lower: |
| | size_match = s |
| | break |
| | |
| | for obj in objects: |
| | obj_color = obj.get('color', '').lower() |
| | obj_shape = obj.get('shape', '').lower() |
| | obj_material = obj.get('material', '').lower() |
| | obj_size = obj.get('size', '').lower() |
| | |
| | matches = True |
| | |
| | if color_match and obj_color != color_match: |
| | matches = False |
| | if shape_match and obj_shape != shape_match: |
| | matches = False |
| | if material_match and obj_material != material_match: |
| | matches = False |
| | if size_match and obj_size != size_match: |
| | matches = False |
| | |
| | if matches: |
| | count += 1 |
| | |
| | return count |
| |
|
| |
|
| | def classify_question_validity(question, base_scene_graph, counterfactual_scene_graph): |
| | answer_base = answer_question_for_scene(question, base_scene_graph) |
| | answer_cf = answer_question_for_scene(question, counterfactual_scene_graph) |
| | norm_base = normalize_answer(answer_base) |
| | norm_cf = normalize_answer(answer_cf) |
| | if norm_base != norm_cf: |
| | return 'Semantic-Valid' |
| | return 'Negative-Valid' |
| |
|
| |
|
| | def generate_mapping_with_questions(run_dir, csv_filename='image_mapping_with_questions.csv', |
| | generate_questions=False, with_links=False, base_url=None, |
| | strict_question_validation=True, single_cf_per_row=False): |
| | images_dir = os.path.join(run_dir, 'images') |
| | scenes_dir = os.path.join(run_dir, 'scenes') |
| | |
| | if not os.path.exists(images_dir): |
| | print(f"ERROR: Images directory not found: {images_dir}") |
| | return |
| | |
| | if not os.path.exists(scenes_dir): |
| | print(f"ERROR: Scenes directory not found: {scenes_dir}") |
| | return |
| | |
| | image_files = [f for f in os.listdir(images_dir) if f.endswith('.png')] |
| | |
| | scene_sets = {} |
| | for img_file in image_files: |
| | if img_file.startswith('scene_'): |
| | parts = img_file.replace('.png', '').split('_') |
| | if len(parts) >= 3: |
| | scene_num = parts[1] |
| | scene_type = parts[2] |
| | |
| | if scene_num not in scene_sets: |
| | scene_sets[scene_num] = {} |
| | scene_sets[scene_num][scene_type] = img_file |
| | |
| | rows = [] |
| | if with_links: |
| | header = ['scene_id', 'original_image_link', 'original_scene_link', |
| | 'counterfactual1_image_link', 'counterfactual1_scene_link', |
| | 'counterfactual2_image_link', 'counterfactual2_scene_link', |
| | 'counterfactual1_type', 'counterfactual2_type', |
| | 'counterfactual1_description', 'counterfactual2_description'] |
| | if generate_questions: |
| | header.extend([ |
| | 'original_question', 'counterfactual1_question', 'counterfactual2_question', |
| | 'original_question_difficulty', 'counterfactual1_question_difficulty', 'counterfactual2_question_difficulty', |
| | 'original_image_answer_to_original_question', |
| | 'original_image_answer_to_counterfactual1_question', |
| | 'original_image_answer_to_counterfactual2_question', |
| | 'counterfactual1_image_answer_to_original_question', |
| | 'counterfactual1_image_answer_to_counterfactual1_question', |
| | 'counterfactual1_image_answer_to_counterfactual2_question', |
| | 'counterfactual2_image_answer_to_original_question', |
| | 'counterfactual2_image_answer_to_counterfactual1_question', |
| | 'counterfactual2_image_answer_to_counterfactual2_question' |
| | ]) |
| | rows.append(header) |
| | elif generate_questions: |
| | rows.append([ |
| | 'original_image', 'counterfactual1_image', 'counterfactual2_image', |
| | 'counterfactual1_type', 'counterfactual2_type', |
| | 'counterfactual1_description', 'counterfactual2_description', |
| | 'original_question', 'counterfactual1_question', 'counterfactual2_question', |
| | 'original_question_difficulty', 'counterfactual1_question_difficulty', 'counterfactual2_question_difficulty', |
| | 'original_image_answer_to_original_question', |
| | 'original_image_answer_to_cf1_question', |
| | 'original_image_answer_to_cf2_question', |
| | 'cf1_image_answer_to_original_question', |
| | 'cf1_image_answer_to_cf1_question', |
| | 'cf1_image_answer_to_cf2_question', |
| | 'cf2_image_answer_to_original_question', |
| | 'cf2_image_answer_to_cf1_question', |
| | 'cf2_image_answer_to_cf2_question' |
| | ]) |
| | else: |
| | rows.append(['original_image', 'counterfactual1_image', 'counterfactual2_image', |
| | 'counterfactual1_type', 'counterfactual2_type', |
| | 'counterfactual1_description', 'counterfactual2_description']) |
| | |
| | if single_cf_per_row: |
| | if with_links: |
| | h = ['scene_id', 'original_image_link', 'original_scene_link', 'counterfactual_image_link', 'counterfactual_scene_link', 'counterfactual_type', 'counterfactual_description'] |
| | if generate_questions: |
| | h.extend(['original_question', 'counterfactual_question', 'original_question_difficulty', 'counterfactual_question_difficulty', |
| | 'original_image_answer_to_original_question', 'original_image_answer_to_cf_question', |
| | 'cf_image_answer_to_original_question', 'cf_image_answer_to_cf_question']) |
| | rows = [h] |
| | elif generate_questions: |
| | rows = [['original_image', 'counterfactual_image', 'counterfactual_type', 'counterfactual_description', |
| | 'original_question', 'counterfactual_question', 'original_question_difficulty', 'counterfactual_question_difficulty', |
| | 'original_image_answer_to_original_question', 'original_image_answer_to_cf_question', |
| | 'cf_image_answer_to_original_question', 'cf_image_answer_to_cf_question']] |
| | else: |
| | rows = [['original_image', 'counterfactual_image', 'counterfactual_type', 'counterfactual_description']] |
| | |
| | total_scenes = len(scene_sets) |
| | |
| | for idx, scene_num in enumerate(sorted(scene_sets.keys())): |
| | scene_data = scene_sets[scene_num] |
| | cf_keys = sorted([k for k in scene_data if k.startswith('cf') and len(k) > 2 and k[2:].isdigit()], key=lambda x: int(x[2:])) |
| | |
| | if single_cf_per_row: |
| | if 'original' not in scene_data or len(cf_keys) < 1: |
| | continue |
| | original_id = scene_data['original'] |
| | for cf_key in cf_keys: |
| | cf_id = scene_data[cf_key] |
| | original_scene_file = find_scene_file(scenes_dir, original_id) |
| | cf_scene_file = find_scene_file(scenes_dir, cf_id) |
| | if not original_scene_file or not cf_scene_file: |
| | continue |
| | try: |
| | original_scene = load_scene(original_scene_file) |
| | cf_scene = load_scene(cf_scene_file) |
| | cf_type = get_cf_type_from_scene(cf_scene) or '' |
| | cf_description = get_cf_description_from_scene(cf_scene) or '' |
| | except Exception: |
| | continue |
| | if generate_questions: |
| | appended = False |
| | for cf_retry in range(MAX_CF_ANSWER_RETRIES): |
| | try: |
| | original_question, params = generate_question_for_scene(original_scene_file, retry_index=cf_retry) |
| | original_ans_orig = answer_question_for_scene(original_question, original_scene) |
| | cf_question, cf_params = generate_question_for_counterfactual( |
| | cf_type, original_scene, cf_scene, retry_index=cf_retry, |
| | original_question=original_question, original_params=params |
| | ) |
| | if cf_question is None or cf_params is None: |
| | continue |
| | |
| | if "matte or shiny" in (original_question or "").lower() and cf_type in ("add_object", "remove_object"): |
| | continue |
| | original_ans_cf_q = answer_question_for_scene(cf_question, original_scene) |
| | cf_ans_orig_q = answer_question_for_scene(original_question, cf_scene) |
| | cf_ans_cf_q = answer_question_for_scene(cf_question, cf_scene) |
| | orig_diff = calculate_question_difficulty(original_question, params) |
| | cf_diff = calculate_question_difficulty(cf_question, cf_params) |
| | except Exception: |
| | continue |
| | |
| | if normalize_answer(original_ans_orig) == normalize_answer(cf_ans_orig_q): |
| | continue |
| | if strict_question_validation: |
| | validity = classify_question_validity(cf_question, original_scene, cf_scene) |
| | required = 'Semantic-Valid' if (cf_type and cf_type in IMAGE_CF_TYPES) else 'Negative-Valid' |
| | if validity != required: |
| | continue |
| | else: |
| | if normalize_answer(original_ans_cf_q) == normalize_answer(cf_ans_cf_q): |
| | continue |
| | if with_links: |
| | def _link(fn, ft='image'): |
| | return f"{base_url.rstrip('/')}/{ft}s/{fn}" if base_url else f"{ft}s/{fn}" |
| | rows.append([ |
| | scene_num, |
| | _link(original_id, 'image'), _link(original_id.replace('.png', '.json'), 'scene'), |
| | _link(cf_id, 'image'), _link(cf_id.replace('.png', '.json'), 'scene'), |
| | cf_type, cf_description, |
| | original_question, cf_question, orig_diff, cf_diff, |
| | original_ans_orig, original_ans_cf_q, cf_ans_orig_q, cf_ans_cf_q |
| | ]) |
| | else: |
| | rows.append([ |
| | original_id, cf_id, cf_type, cf_description, |
| | original_question, cf_question, orig_diff, cf_diff, |
| | original_ans_orig, original_ans_cf_q, cf_ans_orig_q, cf_ans_cf_q |
| | ]) |
| | appended = True |
| | break |
| | if not appended and generate_questions: |
| | pass |
| | else: |
| | if with_links: |
| | def _link(fn, ft='image'): |
| | return f"{base_url.rstrip('/')}/{ft}s/{fn}" if base_url else f"{ft}s/{fn}" |
| | rows.append([ |
| | scene_num, |
| | _link(original_id, 'image'), _link(original_id.replace('.png', '.json'), 'scene'), |
| | _link(cf_id, 'image'), _link(cf_id.replace('.png', '.json'), 'scene'), |
| | cf_type, cf_description |
| | ]) |
| | else: |
| | rows.append([original_id, cf_id, cf_type, cf_description]) |
| | continue |
| | |
| | if 'original' not in scene_data or 'cf1' not in scene_data or 'cf2' not in scene_data: |
| | print(f"WARNING: Scene {scene_num} missing images") |
| | continue |
| | |
| | original_id = scene_data['original'] |
| | cf1_id = scene_data['cf1'] |
| | cf2_id = scene_data['cf2'] |
| | |
| | if generate_questions: |
| | original_scene_file = find_scene_file(scenes_dir, original_id) |
| | cf1_scene_file = find_scene_file(scenes_dir, cf1_id) |
| | cf2_scene_file = find_scene_file(scenes_dir, cf2_id) |
| | |
| | if not all([original_scene_file, cf1_scene_file, cf2_scene_file]): |
| | print(f"WARNING: Scene {scene_num} missing scene files") |
| | continue |
| | |
| | try: |
| | original_scene = load_scene(original_scene_file) |
| | cf1_scene = load_scene(cf1_scene_file) |
| | cf2_scene = load_scene(cf2_scene_file) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | |
| | try: |
| | original_question, params = generate_question_for_scene(original_scene_file) |
| | original_ans_orig_q = answer_question_for_scene(original_question, original_scene) |
| | cf1_type = get_cf_type_from_scene(cf1_scene) |
| | cf2_type = get_cf_type_from_scene(cf2_scene) |
| | cf1_description = get_cf_description_from_scene(cf1_scene) |
| | cf2_description = get_cf_description_from_scene(cf2_scene) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | |
| | cf1_question = cf2_question = None |
| | cf1_params = cf2_params = {} |
| | original_difficulty = cf1_difficulty = cf2_difficulty = None |
| | original_ans_cf1_q = original_ans_cf2_q = None |
| | cf1_ans_orig_q = cf1_ans_cf1_q = cf1_ans_cf2_q = None |
| | cf2_ans_orig_q = cf2_ans_cf1_q = cf2_ans_cf2_q = None |
| | orig_norm = normalize_answer(original_ans_orig_q) |
| | |
| | for cf_retry in range(MAX_CF_ANSWER_RETRIES): |
| | try: |
| | random.seed(hash((scene_num, idx, cf_retry))) |
| | cf_questions = create_counterfactual_questions(original_question, params, original_scene) if (not cf1_type or not cf2_type) else None |
| | if cf1_type: |
| | cf1_question, cf1_params = generate_question_for_counterfactual( |
| | cf1_type, original_scene, cf1_scene, retry_index=cf_retry, |
| | original_question=original_question, original_params=params |
| | ) |
| | if cf1_question is None or cf1_params is None: |
| | continue |
| | else: |
| | cf1_question, cf1_params = cf_questions[0] if cf_questions and len(cf_questions) > 0 else ("How many objects are in the scene?", {}) |
| | if cf2_type: |
| | cf2_question, cf2_params = generate_question_for_counterfactual( |
| | cf2_type, original_scene, cf2_scene, retry_index=cf_retry, |
| | original_question=original_question, original_params=params |
| | ) |
| | if cf2_question is None or cf2_params is None: |
| | continue |
| | else: |
| | cf2_question, cf2_params = cf_questions[1] if cf_questions and len(cf_questions) > 1 else (cf_questions[0] if cf_questions else ("How many objects are in the scene?", {})) |
| | |
| | if "matte or shiny" in (original_question or "").lower() and (cf1_type in ("add_object", "remove_object") or cf2_type in ("add_object", "remove_object")): |
| | continue |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | |
| | try: |
| | original_difficulty = calculate_question_difficulty(original_question, params) |
| | cf1_difficulty = calculate_question_difficulty(cf1_question, cf1_params) |
| | cf2_difficulty = calculate_question_difficulty(cf2_question, cf2_params) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | |
| | try: |
| | original_ans_cf1_q = answer_question_for_scene(cf1_question, original_scene) |
| | original_ans_cf2_q = answer_question_for_scene(cf2_question, original_scene) |
| | cf1_ans_orig_q = answer_question_for_scene(original_question, cf1_scene) |
| | cf1_ans_cf1_q = answer_question_for_scene(cf1_question, cf1_scene) |
| | cf1_ans_cf2_q = answer_question_for_scene(cf2_question, cf1_scene) |
| | cf2_ans_orig_q = answer_question_for_scene(original_question, cf2_scene) |
| | cf2_ans_cf1_q = answer_question_for_scene(cf1_question, cf2_scene) |
| | cf2_ans_cf2_q = answer_question_for_scene(cf2_question, cf2_scene) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | |
| | orig_n = normalize_answer(original_ans_orig_q) |
| | if orig_n == normalize_answer(cf1_ans_orig_q) or orig_n == normalize_answer(cf2_ans_orig_q): |
| | continue |
| | if strict_question_validation: |
| | cf1_validity = classify_question_validity(cf1_question, original_scene, cf1_scene) |
| | cf2_validity = classify_question_validity(cf2_question, original_scene, cf2_scene) |
| | cf1_required = 'Semantic-Valid' if (cf1_type and cf1_type in IMAGE_CF_TYPES) else 'Negative-Valid' |
| | cf2_required = 'Semantic-Valid' if (cf2_type and cf2_type in IMAGE_CF_TYPES) else 'Negative-Valid' |
| | cf1_ok = (cf1_required == cf1_validity) |
| | cf2_ok = (cf2_required == cf2_validity) |
| | if cf1_ok and cf2_ok: |
| | break |
| | else: |
| | |
| | cf1_differs = (cf1_type not in IMAGE_CF_TYPES) or (normalize_answer(original_ans_cf1_q) != normalize_answer(cf1_ans_cf1_q)) |
| | cf2_differs = (cf2_type not in IMAGE_CF_TYPES) or (normalize_answer(original_ans_cf2_q) != normalize_answer(cf2_ans_cf2_q)) |
| | if cf1_differs or cf2_differs: |
| | break |
| | else: |
| | print(f"WARNING: Scene {scene_num}: could not find questions with different answers for both CFs after {MAX_CF_ANSWER_RETRIES} retries (scene included with best-effort questions)") |
| | |
| | try: |
| | if with_links: |
| | def make_link(filename, file_type='image'): |
| | if base_url: |
| | return f"{base_url.rstrip('/')}/{file_type}s/{filename}" |
| | else: |
| | return f"{file_type}s/{filename}" |
| | |
| | original_image_link = make_link(original_id, 'image') |
| | original_scene_link = make_link(original_id.replace('.png', '.json'), 'scene') |
| | cf1_image_link = make_link(cf1_id, 'image') |
| | cf1_scene_link = make_link(cf1_id.replace('.png', '.json'), 'scene') |
| | cf2_image_link = make_link(cf2_id, 'image') |
| | cf2_scene_link = make_link(cf2_id.replace('.png', '.json'), 'scene') |
| | |
| | rows.append([ |
| | scene_num, |
| | original_image_link, original_scene_link, |
| | cf1_image_link, cf1_scene_link, |
| | cf2_image_link, cf2_scene_link, |
| | cf1_type, cf2_type, cf1_description, cf2_description, |
| | original_question, cf1_question, cf2_question, |
| | original_difficulty, cf1_difficulty, cf2_difficulty, |
| | original_ans_orig_q, original_ans_cf1_q, original_ans_cf2_q, |
| | cf1_ans_orig_q, cf1_ans_cf1_q, cf1_ans_cf2_q, |
| | cf2_ans_orig_q, cf2_ans_cf1_q, cf2_ans_cf2_q |
| | ]) |
| | else: |
| | rows.append([ |
| | original_id, cf1_id, cf2_id, |
| | cf1_type, cf2_type, cf1_description, cf2_description, |
| | original_question, cf1_question, cf2_question, |
| | original_difficulty, cf1_difficulty, cf2_difficulty, |
| | original_ans_orig_q, original_ans_cf1_q, original_ans_cf2_q, |
| | cf1_ans_orig_q, cf1_ans_cf1_q, cf1_ans_cf2_q, |
| | cf2_ans_orig_q, cf2_ans_cf1_q, cf2_ans_cf2_q |
| | ]) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | continue |
| | else: |
| | cf1_type = cf2_type = cf1_description = cf2_description = '' |
| | cf1_scene_file = find_scene_file(scenes_dir, cf1_id) |
| | cf2_scene_file = find_scene_file(scenes_dir, cf2_id) |
| | if cf1_scene_file and cf2_scene_file: |
| | try: |
| | cf1_scene = load_scene(cf1_scene_file) |
| | cf2_scene = load_scene(cf2_scene_file) |
| | cf1_type = get_cf_type_from_scene(cf1_scene) or '' |
| | cf2_type = get_cf_type_from_scene(cf2_scene) or '' |
| | cf1_description = get_cf_description_from_scene(cf1_scene) or '' |
| | cf2_description = get_cf_description_from_scene(cf2_scene) or '' |
| | except Exception: |
| | pass |
| | if with_links: |
| | def make_link(filename, file_type='image'): |
| | if base_url: |
| | return f"{base_url.rstrip('/')}/{file_type}s/{filename}" |
| | else: |
| | return f"{file_type}s/{filename}" |
| | |
| | original_image_link = make_link(original_id, 'image') |
| | original_scene_link = make_link(original_id.replace('.png', '.json'), 'scene') |
| | cf1_image_link = make_link(cf1_id, 'image') |
| | cf1_scene_link = make_link(cf1_id.replace('.png', '.json'), 'scene') |
| | cf2_image_link = make_link(cf2_id, 'image') |
| | cf2_scene_link = make_link(cf2_id.replace('.png', '.json'), 'scene') |
| | |
| | rows.append([ |
| | scene_num, |
| | original_image_link, original_scene_link, |
| | cf1_image_link, cf1_scene_link, |
| | cf2_image_link, cf2_scene_link, |
| | cf1_type, cf2_type, cf1_description, cf2_description |
| | ]) |
| | else: |
| | rows.append([original_id, cf1_id, cf2_id, cf1_type, cf2_type, cf1_description, cf2_description]) |
| | |
| | csv_path = os.path.join(run_dir, csv_filename) |
| | try: |
| | with open(csv_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.writer(f, quoting=csv.QUOTE_ALL) |
| | writer.writerows(rows) |
| | except Exception as e: |
| | import traceback |
| | traceback.print_exc() |
| | return |
| | |
| | print(f"\n[OK] Generated mapping CSV: {csv_path}") |
| | print(f" Total rows: {len(rows) - 1}") |
| | |
| | print("\nSample entry:") |
| | if len(rows) > 1: |
| | row = rows[1] |
| | if single_cf_per_row: |
| | if generate_questions and len(row) >= 12: |
| | print(f" Images: Original: {row[0]}, Counterfactual: {row[1]}") |
| | print(f" CF type / description: {row[2]}, {row[3]!r}") |
| | print(f" Questions: Original: {row[4]}, CF: {row[5]}") |
| | print(f" Answers: orig→orig_q: {row[8]}, orig→cf_q: {row[9]}, cf→orig_q: {row[10]}, cf→cf_q: {row[11]}") |
| | elif len(row) >= 4: |
| | print(f" Images: Original: {row[0]}, Counterfactual: {row[1]}") |
| | print(f" CF type / description: {row[2]}, {row[3]!r}") |
| | elif with_links: |
| | if generate_questions: |
| | print(f" Scene ID: {row[0]}") |
| | print(f" Links:") |
| | print(f" Original image: {row[1]}, scene: {row[2]}") |
| | print(f" CF1 image: {row[3]}, scene: {row[4]}") |
| | print(f" CF2 image: {row[5]}, scene: {row[6]}") |
| | print(f" CF type / description: CF1 type={row[7]}, CF2 type={row[8]}; CF1 desc={row[9]!r}, CF2 desc={row[10]!r}") |
| | print(f" Questions: Original: {row[11]}, CF1: {row[12]}, CF2: {row[13]}") |
| | else: |
| | print(f" Scene ID: {row[0]}") |
| | print(f" Links:") |
| | print(f" Original image: {row[1]}, scene: {row[2]}") |
| | print(f" CF1 image: {row[3]}, scene: {row[4]}") |
| | print(f" CF2 image: {row[5]}, scene: {row[6]}") |
| | print(f" CF type / description: CF1 type={row[7]}, CF2 type={row[8]}; CF1 desc={row[9]!r}, CF2 desc={row[10]!r}") |
| | elif generate_questions and len(row) > 14: |
| | print(f" Images: Original: {row[0]}, CF1: {row[1]}, CF2: {row[2]}") |
| | print(f" CF type / description: CF1 type={row[3]}, CF2 type={row[4]}; CF1 desc={row[5]!r}, CF2 desc={row[6]!r}") |
| | print(f" Questions: Original: {row[7]}, CF1: {row[8]}, CF2: {row[9]}") |
| | print(f" Answer Matrix (scene × question):") |
| | print(f" Original image -> Orig Q: {row[10]}, CF1 Q: {row[11]}, CF2 Q: {row[12]}") |
| | print(f" CF1 image -> Orig Q: {row[13]}, CF1 Q: {row[14]}, CF2 Q: {row[15]}") |
| | print(f" CF2 image -> Orig Q: {row[16]}, CF1 Q: {row[17]}, CF2 Q: {row[18]}") |
| | elif len(row) >= 7: |
| | print(f" Images: Original: {row[0]}, CF1: {row[1]}, CF2: {row[2]}") |
| | print(f" CF type / description: CF1 type={row[3]}, CF2 type={row[4]}; CF1 desc={row[5]!r}, CF2 desc={row[6]!r}") |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser( |
| | description='Generate CSV with original and VARIED counterfactual questions applied to all scenes' |
| | ) |
| | parser.add_argument('--output_dir', default='output', |
| | help='Run directory or base output directory (default: output)') |
| | parser.add_argument('--auto_latest', action='store_true', |
| | help='Automatically find and use the latest run in output_dir') |
| | parser.add_argument('--csv_name', default='image_mapping_with_questions.csv', |
| | help='Output CSV filename') |
| | parser.add_argument('--generate_questions', action='store_true', |
| | help='Generate questions and answers for each scene set') |
| | parser.add_argument('--no_strict_validation', action='store_true', |
| | help='Disable strict question validation (Semantic-Valid / Negative-Valid classifier); use legacy accept logic') |
| | parser.add_argument('--single_cf_per_row', action='store_true', |
| | help='Emit one row per (original, single counterfactual) instead of one row per (original, cf1, cf2). CSV columns: original_image, counterfactual_image, counterfactual_type, counterfactual_description [, + Q&A if --generate_questions].') |
| | |
| | args = parser.parse_args() |
| | |
| | if args.auto_latest: |
| | run_dir = find_latest_run(args.output_dir) |
| | if run_dir is None: |
| | print(f"ERROR: Could not find any run directories in {args.output_dir}") |
| | return |
| | else: |
| | if os.path.exists(os.path.join(args.output_dir, 'images')) and \ |
| | os.path.exists(os.path.join(args.output_dir, 'scenes')): |
| | run_dir = args.output_dir |
| | else: |
| | run_dir = find_latest_run(args.output_dir) |
| | if run_dir is None: |
| | print(f"ERROR: {args.output_dir} does not contain images/scenes directories") |
| | print(f" and no run directories found in {args.output_dir}") |
| | return |
| | print(f"Auto-detected run directory: {run_dir}") |
| | |
| | generate_mapping_with_questions( |
| | run_dir, |
| | args.csv_name, |
| | args.generate_questions, |
| | strict_question_validation=not args.no_strict_validation, |
| | single_cf_per_row=getattr(args, 'single_cf_per_row', False) |
| | ) |
| |
|
| | if __name__ == '__main__': |
| | main() |