|
|
""" |
|
|
Task 2: Duration - Generate duration comparison questions |
|
|
|
|
|
This task creates audio samples where sources have different effective durations |
|
|
and asks questions about which sound is heard for the longest or shortest time. |
|
|
|
|
|
Key features: |
|
|
- Uses amplitude-filtered (preprocessed) audio clips with known effective durations |
|
|
- First calculates max clips from total duration, then distributes slots |
|
|
- Strategically distributes repetitions to ensure clear longest/shortest answers |
|
|
- Consecutive ordering within sources, random order between sources |
|
|
- Gap multipliers ensure unambiguous answers (e.g., longest is 1.5x longer than next) |
|
|
- NO category preference - random selection to avoid bias |
|
|
""" |
|
|
|
|
|
import csv |
|
|
import random |
|
|
import math |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple, Optional |
|
|
from collections import Counter |
|
|
|
|
|
import sys |
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from utils import ( |
|
|
AudioProcessor, PreprocessedESC50Dataset, QuestionGenerator, LLMQuestionGenerator, |
|
|
setup_logger, set_random_seed, calculate_num_samples_for_task, |
|
|
generate_single_clip_duration, get_max_clip_num_to_be_joined, |
|
|
build_duration_task_audio, distribute_remainder_as_silences, |
|
|
generate_sample_durations_for_task |
|
|
) |
|
|
|
|
|
|
|
|
class DurationTaskGenerator: |
|
|
"""Generator for duration comparison task dataset using preprocessed ESC-50.""" |
|
|
|
|
|
def __init__(self, config: Dict, logger): |
|
|
""" |
|
|
Initialize duration task generator. |
|
|
|
|
|
Args: |
|
|
config: Configuration dictionary |
|
|
logger: Logger instance |
|
|
""" |
|
|
self.config = config |
|
|
self.logger = logger |
|
|
self.task_config = config['tasks']['duration'] |
|
|
|
|
|
|
|
|
self.dataset = PreprocessedESC50Dataset( |
|
|
metadata_path=config['esc50']['metadata_path'], |
|
|
audio_path=config['esc50']['audio_path'], |
|
|
preprocessed_path=self.task_config['preprocessed_data_path'], |
|
|
config=config |
|
|
) |
|
|
|
|
|
|
|
|
self.avg_effective_duration = self.dataset.effective_df['effective_duration_s'].mean() |
|
|
self.logger.info(f"Average effective duration: {self.avg_effective_duration:.2f}s") |
|
|
|
|
|
|
|
|
self.audio_processor = AudioProcessor( |
|
|
crossfade_duration=config['audio']['crossfade_duration'], |
|
|
silence_duration=config['audio']['silence_duration'], |
|
|
with_silence=config['audio']['with_silence'], |
|
|
normalize=config['audio']['normalize'], |
|
|
normalize_target_dBFS=config['audio']['normalize_target_dBFS'], |
|
|
synthetic_silence_path=config['synthetic_silence']['path'] |
|
|
) |
|
|
|
|
|
|
|
|
self.question_generator = QuestionGenerator( |
|
|
num_options=config['mcq']['num_options'], |
|
|
option_labels=config['mcq']['option_labels'], |
|
|
distractor_strategy=config['mcq']['distractor_strategy'] |
|
|
) |
|
|
|
|
|
|
|
|
self.llm_enabled = config.get('llm', {}).get('enabled', False) |
|
|
self.llm_generator = LLMQuestionGenerator( |
|
|
enabled=self.llm_enabled, |
|
|
template_questions=self.task_config |
|
|
) |
|
|
|
|
|
|
|
|
self.min_clip_duration = config['audio']['min_clip_duration'] |
|
|
self.max_clip_duration = config['audio']['max_clip_duration'] |
|
|
self.min_silence_ms = config['audio'].get('min_silence_duration', 100) |
|
|
self.max_extra_silence_per_gap_ms = config['audio'].get('max_extra_silence_per_gap', 500) |
|
|
self.crossfade_within_source_ms = config['audio'].get('crossfade_within_source', 50) |
|
|
self.task_duration_hours = self.task_config['task_duration_size'] |
|
|
|
|
|
|
|
|
self.multiplier_longest = self.task_config.get('multiplier_longest', 1.5) |
|
|
self.multiplier_shortest = self.task_config.get('multiplier_shortest', 0.75) |
|
|
self.reject_if_gap_not_met = self.task_config.get('reject_if_gap_not_met', True) |
|
|
self.sample_different_clips = self.task_config.get('sample_different_clips_same_class', True) |
|
|
|
|
|
self.min_effective_duration_per_source = self.task_config.get('min_effective_duration_per_source', 1.0) |
|
|
|
|
|
|
|
|
self.output_base = Path(config['output']['base_path']) / 'duration' |
|
|
self.output_base.mkdir(parents=True, exist_ok=True) |
|
|
self.audio_output = self.output_base / 'audios' |
|
|
self.audio_output.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self.rejection_count = 0 |
|
|
self.success_count = 0 |
|
|
|
|
|
def _calculate_max_clips_and_sources( |
|
|
self, |
|
|
target_duration_s: float, |
|
|
question_type: str |
|
|
) -> Tuple[int, int, float]: |
|
|
""" |
|
|
Calculate max clips possible and choose n_sources from config that satisfies gap. |
|
|
|
|
|
Key principle: |
|
|
1. Calculate valid range of sources that can satisfy gap constraint |
|
|
2. Filter config values to only those within valid range |
|
|
3. Pick RANDOMLY from valid config values (ensures variety) |
|
|
|
|
|
For LONGEST: |
|
|
- Target needs at least 2 clips to beat max_background by 1.5x |
|
|
- max_sources = max_clips - 2 + 1 (backgrounds get 1 each) |
|
|
- min_sources = 2 (need at least 1 background) |
|
|
|
|
|
For SHORTEST: |
|
|
- Target gets 1 clip |
|
|
- Each background needs at least 2 clips to be 2x target (1/0.5) |
|
|
- max_sources = 1 + (max_clips - 1) // 2 |
|
|
- min_sources = 2 |
|
|
|
|
|
Args: |
|
|
target_duration_s: Target total audio duration |
|
|
question_type: "longest" or "shortest" |
|
|
|
|
|
Returns: |
|
|
Tuple of (max_clips, n_sources, remainder_s) |
|
|
""" |
|
|
|
|
|
max_clips, remainder_s = get_max_clip_num_to_be_joined( |
|
|
target_duration_s, |
|
|
self.avg_effective_duration, |
|
|
self.min_silence_ms |
|
|
) |
|
|
|
|
|
|
|
|
max_clips = max(2, max_clips) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
num_sources_config = self.task_config.get('num_unique_sources', [2, 3, 4, 5]) |
|
|
if isinstance(num_sources_config, int): |
|
|
|
|
|
num_sources_config = list(range(1, num_sources_config + 1)) |
|
|
|
|
|
if question_type == "longest": |
|
|
|
|
|
|
|
|
min_target_clips = 2 |
|
|
|
|
|
|
|
|
min_valid_sources = 2 |
|
|
|
|
|
|
|
|
|
|
|
max_valid_sources = max_clips - min_target_clips + 1 |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
min_clips_per_background = 2 |
|
|
|
|
|
|
|
|
min_valid_sources = 2 |
|
|
|
|
|
|
|
|
remaining_clips = max_clips - 1 |
|
|
max_backgrounds = remaining_clips // min_clips_per_background |
|
|
max_valid_sources = max_backgrounds + 1 |
|
|
|
|
|
|
|
|
valid_config_sources = [ |
|
|
n for n in num_sources_config |
|
|
if min_valid_sources <= n <= max_valid_sources |
|
|
] |
|
|
|
|
|
if not valid_config_sources: |
|
|
raise ValueError( |
|
|
f"Duration task: No valid num_unique_sources for {question_type} question. " |
|
|
f"Config values: {num_sources_config}, Valid range: [{min_valid_sources}, {max_valid_sources}]. " |
|
|
f"max_clips={max_clips}, duration={target_duration_s:.1f}s. " |
|
|
f"Increase min_clip_duration or adjust num_unique_sources config." |
|
|
) |
|
|
|
|
|
|
|
|
n_sources = random.choice(valid_config_sources) |
|
|
|
|
|
|
|
|
if n_sources < 2 or n_sources > len(self.dataset.CATEGORIES): |
|
|
raise ValueError( |
|
|
f"Duration task: Invalid n_sources={n_sources}. " |
|
|
f"Must be in range [2, {len(self.dataset.CATEGORIES)}]" |
|
|
) |
|
|
|
|
|
self.logger.debug( |
|
|
f"Max clips: {max_clips}, Question: {question_type}, " |
|
|
f"Valid range: [{min_valid_sources}, {max_valid_sources}], " |
|
|
f"Valid config: {valid_config_sources}, Selected: {n_sources}" |
|
|
) |
|
|
|
|
|
return max_clips, n_sources, remainder_s |
|
|
|
|
|
def _calculate_slot_distribution( |
|
|
self, |
|
|
max_clips: int, |
|
|
n_sources: int, |
|
|
effective_durations: Dict[str, float], |
|
|
target_category: str, |
|
|
question_type: str |
|
|
) -> Tuple[Dict[str, int], bool, Dict]: |
|
|
""" |
|
|
Calculate how many clips each source gets. |
|
|
|
|
|
For LONGEST: target gets (max_clips - n_backgrounds), backgrounds get 1 each |
|
|
For SHORTEST: target gets 1, backgrounds share (max_clips - 1) |
|
|
|
|
|
Args: |
|
|
max_clips: Maximum number of clips that fit |
|
|
n_sources: Number of unique sources |
|
|
effective_durations: Dict mapping category -> effective duration |
|
|
target_category: The category that should be longest/shortest |
|
|
question_type: "longest" or "shortest" |
|
|
|
|
|
Returns: |
|
|
Tuple of (slot_distribution, gap_satisfied, metadata) |
|
|
""" |
|
|
categories = list(effective_durations.keys()) |
|
|
background_categories = [c for c in categories if c != target_category] |
|
|
n_backgrounds = len(background_categories) |
|
|
|
|
|
if question_type == "longest": |
|
|
|
|
|
|
|
|
target_clips = max_clips - n_backgrounds |
|
|
target_clips = max(1, target_clips) |
|
|
|
|
|
slot_distribution = {target_category: target_clips} |
|
|
for cat in background_categories: |
|
|
slot_distribution[cat] = 1 |
|
|
|
|
|
|
|
|
target_duration = target_clips * effective_durations[target_category] |
|
|
background_durations = [effective_durations[c] for c in background_categories] |
|
|
max_background = max(background_durations) if background_durations else 0 |
|
|
required_target = max_background * self.multiplier_longest |
|
|
gap_satisfied = target_duration >= required_target |
|
|
|
|
|
metadata = { |
|
|
'target_clips': target_clips, |
|
|
'target_duration_s': target_duration, |
|
|
'max_background_s': max_background, |
|
|
'required_target_s': required_target, |
|
|
'multiplier': self.multiplier_longest |
|
|
} |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
remaining_clips = max_clips - 1 |
|
|
clips_per_background = max(1, remaining_clips // n_backgrounds) |
|
|
extra_clips = remaining_clips % n_backgrounds |
|
|
|
|
|
slot_distribution = {target_category: 1} |
|
|
|
|
|
for i, cat in enumerate(background_categories): |
|
|
clips = clips_per_background + (1 if i < extra_clips else 0) |
|
|
slot_distribution[cat] = clips |
|
|
|
|
|
|
|
|
target_duration = effective_durations[target_category] |
|
|
background_durations = [ |
|
|
slot_distribution[c] * effective_durations[c] |
|
|
for c in background_categories |
|
|
] |
|
|
min_background = min(background_durations) if background_durations else float('inf') |
|
|
required_max_target = min_background * self.multiplier_shortest |
|
|
|
|
|
|
|
|
|
|
|
target_too_short = target_duration < self.min_effective_duration_per_source |
|
|
gap_satisfied = (target_duration <= required_max_target) and (not target_too_short) |
|
|
|
|
|
metadata = { |
|
|
'target_clips': 1, |
|
|
'target_duration_s': target_duration, |
|
|
'min_background_s': min_background, |
|
|
'required_max_target_s': required_max_target, |
|
|
'multiplier': self.multiplier_shortest, |
|
|
'target_too_short': target_too_short |
|
|
} |
|
|
|
|
|
return slot_distribution, gap_satisfied, metadata |
|
|
|
|
|
def _try_generate_sample( |
|
|
self, |
|
|
sample_id: int, |
|
|
question_type: str, |
|
|
max_retries: int = 5, |
|
|
target_duration_seconds: float = None |
|
|
) -> Optional[Dict]: |
|
|
""" |
|
|
Try to generate a valid duration sample with retries. |
|
|
|
|
|
Args: |
|
|
sample_id: Sample ID |
|
|
question_type: "longest" or "shortest" |
|
|
max_retries: Maximum retry attempts |
|
|
target_duration_seconds: Pre-generated target duration |
|
|
|
|
|
Returns: |
|
|
Metadata dict if successful, None if all retries failed |
|
|
""" |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
result = self._generate_single_sample(sample_id, question_type, target_duration_seconds=target_duration_seconds) |
|
|
if result is not None: |
|
|
return result |
|
|
except Exception as e: |
|
|
self.logger.warning(f"Sample {sample_id} attempt {attempt+1} failed: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _generate_single_sample( |
|
|
self, |
|
|
sample_id: int, |
|
|
question_type: str, |
|
|
target_duration_seconds: float = None |
|
|
) -> Optional[Dict]: |
|
|
""" |
|
|
Generate a single duration task sample. |
|
|
|
|
|
Corrected Pipeline: |
|
|
1. Use pre-generated target duration (or generate if not provided) |
|
|
2. Calculate max_clips using get_max_clip_num_to_be_joined |
|
|
3. Based on max_clips and question_type, determine n_sources |
|
|
4. Select categories RANDOMLY (no bias toward short/long) |
|
|
5. Pick target category RANDOMLY from selected |
|
|
6. Get effective durations for all sources |
|
|
7. Calculate slot distribution based on max_clips |
|
|
8. Verify gap constraint |
|
|
9. Load audio clips and build final audio |
|
|
|
|
|
Args: |
|
|
sample_id: Sample ID number |
|
|
question_type: "longest" or "shortest" |
|
|
target_duration_seconds: Pre-generated target duration (from generate_sample_durations_for_task) |
|
|
|
|
|
Returns: |
|
|
Dictionary with sample metadata, or None if failed |
|
|
""" |
|
|
|
|
|
if target_duration_seconds is not None: |
|
|
target_duration_s = target_duration_seconds |
|
|
else: |
|
|
target_duration_s = generate_single_clip_duration( |
|
|
self.min_clip_duration, |
|
|
self.max_clip_duration |
|
|
) |
|
|
|
|
|
|
|
|
max_clips, n_sources, remainder_s = self._calculate_max_clips_and_sources( |
|
|
target_duration_s, |
|
|
question_type |
|
|
) |
|
|
|
|
|
|
|
|
all_categories = self.dataset.get_least_used_categories(n_sources) |
|
|
|
|
|
|
|
|
target_category = random.choice(all_categories) |
|
|
self.dataset.category_usage_counts[target_category] += 1 |
|
|
|
|
|
|
|
|
|
|
|
effective_durations = {} |
|
|
selected_files = {} |
|
|
|
|
|
for category in all_categories: |
|
|
filename, filepath, eff_dur = self.dataset.sample_file_from_category_with_duration( |
|
|
category, |
|
|
min_effective_duration=self.min_effective_duration_per_source |
|
|
) |
|
|
effective_durations[category] = eff_dur |
|
|
selected_files[category] = { |
|
|
'filename': filename, |
|
|
'filepath': filepath, |
|
|
'effective_duration_s': eff_dur |
|
|
} |
|
|
|
|
|
|
|
|
slot_distribution, gap_satisfied, calc_metadata = self._calculate_slot_distribution( |
|
|
max_clips=max_clips, |
|
|
n_sources=n_sources, |
|
|
effective_durations=effective_durations, |
|
|
target_category=target_category, |
|
|
question_type=question_type |
|
|
) |
|
|
|
|
|
|
|
|
if not gap_satisfied: |
|
|
|
|
|
if self.sample_different_clips: |
|
|
gap_satisfied = self._try_improve_gap_with_different_clips( |
|
|
question_type=question_type, |
|
|
target_category=target_category, |
|
|
all_categories=all_categories, |
|
|
max_clips=max_clips, |
|
|
n_sources=n_sources, |
|
|
effective_durations=effective_durations, |
|
|
selected_files=selected_files, |
|
|
slot_distribution=slot_distribution |
|
|
) |
|
|
|
|
|
if not gap_satisfied and self.reject_if_gap_not_met: |
|
|
self.rejection_count += 1 |
|
|
self.logger.debug( |
|
|
f"Sample {sample_id} rejected: gap not satisfied " |
|
|
f"(type={question_type}, max_clips={max_clips}, sources={n_sources})" |
|
|
) |
|
|
return None |
|
|
|
|
|
|
|
|
source_audio_lists = {} |
|
|
files_used = {} |
|
|
|
|
|
for category in all_categories: |
|
|
reps = slot_distribution.get(category, 0) |
|
|
if reps == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
if self.sample_different_clips and reps > 1: |
|
|
filenames, filepaths, total_dur = self.dataset.sample_files_from_category_to_reach_duration( |
|
|
category, |
|
|
reps * effective_durations[category], |
|
|
prefer_same_file=False |
|
|
) |
|
|
else: |
|
|
|
|
|
file_info = selected_files[category] |
|
|
filenames = [file_info['filename']] * reps |
|
|
filepaths = [file_info['filepath']] * reps |
|
|
|
|
|
|
|
|
audio_list = [] |
|
|
for fp in filepaths[:reps]: |
|
|
audio = self.audio_processor.load_audio(fp) |
|
|
audio_list.append(audio) |
|
|
|
|
|
|
|
|
while len(audio_list) < reps: |
|
|
audio_list.append(audio_list[len(audio_list) % len(audio_list)]) |
|
|
|
|
|
source_audio_lists[category] = audio_list[:reps] |
|
|
files_used[category] = filenames[:reps] |
|
|
|
|
|
|
|
|
final_audio, category_sequence, build_metadata = build_duration_task_audio( |
|
|
source_audio_lists=source_audio_lists, |
|
|
slot_distribution=slot_distribution, |
|
|
effective_durations=effective_durations, |
|
|
target_total_duration_s=target_duration_s, |
|
|
min_silence_between_sources_ms=self.min_silence_ms, |
|
|
max_extra_silence_per_gap_ms=self.max_extra_silence_per_gap_ms, |
|
|
crossfade_within_source_ms=self.crossfade_within_source_ms |
|
|
) |
|
|
|
|
|
|
|
|
output_audio_path = self.audio_output / f"{sample_id}.wav" |
|
|
final_audio.export(str(output_audio_path), format="wav") |
|
|
|
|
|
|
|
|
correct_category = target_category |
|
|
present_categories = all_categories |
|
|
|
|
|
mcq_question = self.task_config['mcq_questions'][question_type] |
|
|
mcq_data = self.question_generator.generate_category_mcq( |
|
|
mcq_question, |
|
|
correct_category, |
|
|
present_categories, |
|
|
self.dataset.CATEGORIES |
|
|
) |
|
|
|
|
|
open_text_question = self.task_config['open_text_questions'][question_type] |
|
|
open_text_data = self.question_generator.generate_category_open_text( |
|
|
open_text_question, |
|
|
correct_category |
|
|
) |
|
|
|
|
|
|
|
|
actual_effective_durations = { |
|
|
cat: slot_distribution[cat] * effective_durations[cat] |
|
|
for cat in all_categories |
|
|
if cat in slot_distribution |
|
|
} |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'id': sample_id, |
|
|
'audio_path': str(output_audio_path.relative_to(self.output_base.parent)), |
|
|
'question_type': question_type, |
|
|
'max_clips': max_clips, |
|
|
'n_unique_sources': n_sources, |
|
|
'target_category': target_category, |
|
|
'present_categories': present_categories, |
|
|
'source_order': build_metadata['source_order'], |
|
|
'slot_distribution': slot_distribution, |
|
|
'effective_durations_per_clip': effective_durations, |
|
|
'total_effective_durations': actual_effective_durations, |
|
|
'gap_satisfied': gap_satisfied, |
|
|
'multiplier_used': self.multiplier_longest if question_type == 'longest' else self.multiplier_shortest, |
|
|
'files_used': files_used, |
|
|
'target_duration_s': target_duration_s, |
|
|
'actual_duration_s': len(final_audio) / 1000.0, |
|
|
'timestamp_string': build_metadata.get('timestamp_string', ''), |
|
|
'source_timestamps': build_metadata.get('source_timestamps', []), |
|
|
'mcq_question': mcq_data['question'], |
|
|
'mcq_options': mcq_data['options'], |
|
|
'mcq_correct_answer': mcq_data['correct_answer'], |
|
|
'open_text_question': open_text_data['question'], |
|
|
'open_text_answer': open_text_data['correct_answer'], |
|
|
'calc_metadata': calc_metadata |
|
|
} |
|
|
|
|
|
self.success_count += 1 |
|
|
self.logger.info( |
|
|
f"Generated duration sample {sample_id}: {question_type}, " |
|
|
f"max_clips={max_clips}, sources={n_sources}, target={target_category}, " |
|
|
f"slots={slot_distribution}, gap_satisfied={gap_satisfied}" |
|
|
) |
|
|
|
|
|
return metadata |
|
|
|
|
|
def _try_improve_gap_with_different_clips( |
|
|
self, |
|
|
question_type: str, |
|
|
target_category: str, |
|
|
all_categories: List[str], |
|
|
max_clips: int, |
|
|
n_sources: int, |
|
|
effective_durations: Dict[str, float], |
|
|
selected_files: Dict[str, Dict], |
|
|
slot_distribution: Dict[str, int] |
|
|
) -> bool: |
|
|
""" |
|
|
Try to improve gap satisfaction by selecting different clips. |
|
|
|
|
|
For LONGEST: try clips with longer effective duration for target |
|
|
For SHORTEST: try clips with shorter effective duration for target |
|
|
|
|
|
Args: |
|
|
Various state from generate_sample |
|
|
|
|
|
Returns: |
|
|
True if gap is now satisfied |
|
|
""" |
|
|
files = self.dataset.get_files_by_category_with_durations(target_category) |
|
|
|
|
|
if question_type == "longest": |
|
|
|
|
|
files_sorted = sorted(files, key=lambda x: x['effective_duration_s'], reverse=True) |
|
|
else: |
|
|
|
|
|
files_sorted = sorted(files, key=lambda x: x['effective_duration_s']) |
|
|
|
|
|
if files_sorted: |
|
|
best = files_sorted[0] |
|
|
effective_durations[target_category] = best['effective_duration_s'] |
|
|
selected_files[target_category] = { |
|
|
'filename': best['filename'], |
|
|
'filepath': best['filepath'], |
|
|
'effective_duration_s': best['effective_duration_s'] |
|
|
} |
|
|
|
|
|
|
|
|
new_slots, gap_satisfied, _ = self._calculate_slot_distribution( |
|
|
max_clips=max_clips, |
|
|
n_sources=n_sources, |
|
|
effective_durations=effective_durations, |
|
|
target_category=target_category, |
|
|
question_type=question_type |
|
|
) |
|
|
|
|
|
if gap_satisfied: |
|
|
slot_distribution.clear() |
|
|
slot_distribution.update(new_slots) |
|
|
|
|
|
return gap_satisfied |
|
|
|
|
|
def generate_sample(self, sample_id: int, target_question_type: str = None, target_duration_seconds: float = None) -> Optional[Dict]: |
|
|
""" |
|
|
Generate a single duration task sample with retries. |
|
|
|
|
|
Args: |
|
|
sample_id: Sample ID number |
|
|
target_question_type: Target question type for balanced distribution |
|
|
target_duration_seconds: Pre-generated target duration (from generate_sample_durations_for_task) |
|
|
|
|
|
Returns: |
|
|
Dictionary with sample metadata, or None if failed |
|
|
""" |
|
|
question_type = target_question_type or random.choice( |
|
|
self.task_config['question_types'] |
|
|
) |
|
|
|
|
|
return self._try_generate_sample(sample_id, question_type, target_duration_seconds=target_duration_seconds) |
|
|
|
|
|
def generate_dataset(self) -> tuple: |
|
|
""" |
|
|
Generate the complete duration task dataset. |
|
|
|
|
|
Uses generate_sample_durations_for_task() to pre-generate exact sample durations |
|
|
that sum to exactly the target task duration. This guarantees: |
|
|
- Exact coverage of target duration |
|
|
- No estimation errors from average-based calculation |
|
|
|
|
|
Returns: |
|
|
Tuple of (mcq_csv_path, open_text_csv_path) |
|
|
""" |
|
|
|
|
|
sample_durations = generate_sample_durations_for_task( |
|
|
self.task_duration_hours, |
|
|
self.min_clip_duration, |
|
|
self.max_clip_duration |
|
|
) |
|
|
num_samples = len(sample_durations) |
|
|
|
|
|
self.logger.info( |
|
|
f"Generating {num_samples} duration task samples " |
|
|
f"(target: {self.task_duration_hours}h, exact fill)..." |
|
|
) |
|
|
|
|
|
|
|
|
question_types = self.task_config['question_types'] |
|
|
balanced_types = [] |
|
|
samples_per_type = num_samples // len(question_types) |
|
|
remainder = num_samples % len(question_types) |
|
|
|
|
|
for qtype in question_types: |
|
|
count = samples_per_type + (1 if remainder > 0 else 0) |
|
|
balanced_types.extend([qtype] * count) |
|
|
remainder = max(0, remainder - 1) |
|
|
|
|
|
random.shuffle(balanced_types) |
|
|
type_dist = Counter(balanced_types) |
|
|
self.logger.info(f"Balanced question type distribution: {dict(sorted(type_dist.items()))}") |
|
|
|
|
|
all_metadata = [] |
|
|
sample_idx = 0 |
|
|
type_idx = 0 |
|
|
|
|
|
while len(all_metadata) < num_samples and type_idx < len(balanced_types) * 2: |
|
|
question_type = balanced_types[type_idx % len(balanced_types)] |
|
|
target_duration = sample_durations[sample_idx] if sample_idx < len(sample_durations) else None |
|
|
|
|
|
metadata = self.generate_sample(sample_idx, question_type, target_duration_seconds=target_duration) |
|
|
|
|
|
if metadata is not None: |
|
|
all_metadata.append(metadata) |
|
|
sample_idx += 1 |
|
|
|
|
|
type_idx += 1 |
|
|
|
|
|
|
|
|
if len(all_metadata) % 50 == 0: |
|
|
self.logger.info( |
|
|
f"Progress: {len(all_metadata)}/{num_samples} samples, " |
|
|
f"{self.rejection_count} rejections" |
|
|
) |
|
|
|
|
|
self.logger.info( |
|
|
f"Generation complete: {len(all_metadata)} samples, " |
|
|
f"{self.rejection_count} rejections " |
|
|
f"({self.rejection_count/(len(all_metadata)+self.rejection_count)*100:.1f}% rejection rate)" |
|
|
) |
|
|
|
|
|
|
|
|
mcq_csv_path = self.output_base / 'duration_mcq.csv' |
|
|
self._save_mcq_csv(all_metadata, mcq_csv_path) |
|
|
|
|
|
open_text_csv_path = self.output_base / 'duration_open_text.csv' |
|
|
self._save_open_text_csv(all_metadata, open_text_csv_path) |
|
|
|
|
|
metadata_csv_path = self.output_base / 'duration_metadata.csv' |
|
|
self._save_metadata_csv(all_metadata, metadata_csv_path) |
|
|
|
|
|
self.logger.info(f"Duration task dataset generation complete!") |
|
|
self.logger.info(f" - MCQ CSV: {mcq_csv_path}") |
|
|
self.logger.info(f" - Open-text CSV: {open_text_csv_path}") |
|
|
self.logger.info(f" - Metadata CSV: {metadata_csv_path}") |
|
|
self.logger.info(f" - Audio files: {self.audio_output}") |
|
|
|
|
|
return mcq_csv_path, open_text_csv_path |
|
|
|
|
|
def _save_mcq_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save MCQ format CSV.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow([ |
|
|
'question', 'id', 'audio_path', |
|
|
'optionA', 'optionB', 'optionC', 'optionD', |
|
|
'correct', 'question_type', 'max_clips', 'n_sources', |
|
|
'target_category', 'slot_distribution', 'effective_durations' |
|
|
]) |
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['mcq_question'], |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['mcq_options']['A'], |
|
|
meta['mcq_options']['B'], |
|
|
meta['mcq_options']['C'], |
|
|
meta['mcq_options']['D'], |
|
|
meta['mcq_correct_answer'], |
|
|
meta['question_type'], |
|
|
meta['max_clips'], |
|
|
meta['n_unique_sources'], |
|
|
meta['target_category'], |
|
|
str(meta['slot_distribution']), |
|
|
str(meta['total_effective_durations']) |
|
|
]) |
|
|
|
|
|
def _save_open_text_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save open-text format CSV.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow([ |
|
|
'question', 'id', 'audio_path', 'answer', |
|
|
'question_type', 'max_clips', 'n_sources', |
|
|
'target_category', 'effective_durations' |
|
|
]) |
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['open_text_question'], |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['open_text_answer'], |
|
|
meta['question_type'], |
|
|
meta['max_clips'], |
|
|
meta['n_unique_sources'], |
|
|
meta['target_category'], |
|
|
str(meta['total_effective_durations']) |
|
|
]) |
|
|
|
|
|
def _save_metadata_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save detailed metadata CSV with effective durations and timestamps.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow([ |
|
|
'id', 'audio_path', 'question_type', 'max_clips', 'n_sources', |
|
|
'target_category', 'present_categories', 'source_order', |
|
|
'slot_distribution', 'effective_durations_per_clip', |
|
|
'total_effective_durations', 'gap_satisfied', 'multiplier_used', |
|
|
'target_duration_s', 'actual_duration_s', 'clip_timestamps', 'files_used' |
|
|
]) |
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['question_type'], |
|
|
meta['max_clips'], |
|
|
meta['n_unique_sources'], |
|
|
meta['target_category'], |
|
|
str(meta['present_categories']), |
|
|
str(meta['source_order']), |
|
|
str(meta['slot_distribution']), |
|
|
str(meta['effective_durations_per_clip']), |
|
|
str(meta['total_effective_durations']), |
|
|
meta['gap_satisfied'], |
|
|
meta['multiplier_used'], |
|
|
round(meta['target_duration_s'], 2), |
|
|
round(meta['actual_duration_s'], 2), |
|
|
meta.get('timestamp_string', ''), |
|
|
str(meta['files_used']) |
|
|
]) |
|
|
|
|
|
|
|
|
def main(config_path: str = None): |
|
|
"""Main entry point for duration task generation.""" |
|
|
import yaml |
|
|
|
|
|
if config_path is None: |
|
|
config_path = Path(__file__).parent.parent / 'config.yaml' |
|
|
|
|
|
with open(config_path, 'r') as f: |
|
|
config = yaml.safe_load(f) |
|
|
|
|
|
set_random_seed(config['random_seed']) |
|
|
|
|
|
logger = setup_logger( |
|
|
'duration_task', |
|
|
log_file=str(Path(config['output']['base_path']) / config['logging']['log_file']), |
|
|
level=config['logging']['level'], |
|
|
console_output=config['logging']['console_output'] |
|
|
) |
|
|
|
|
|
generator = DurationTaskGenerator(config, logger) |
|
|
generator.generate_dataset() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|