|
|
""" |
|
|
Task 4: Volume - Generate volume comparison questions |
|
|
|
|
|
This task joins multiple audio sources with different volume levels |
|
|
and asks questions about the loudest or softest sound. |
|
|
""" |
|
|
|
|
|
import csv |
|
|
import random |
|
|
import math |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple, Optional |
|
|
|
|
|
import sys |
|
|
sys.path.append(str(Path(__file__).parent.parent)) |
|
|
|
|
|
from utils import ( |
|
|
AudioProcessor, ESC50Dataset, QuestionGenerator, LLMQuestionGenerator, |
|
|
setup_logger, set_random_seed, calculate_num_samples_for_task, |
|
|
generate_single_clip_duration, get_max_clip_num_to_be_joined, |
|
|
build_clip_sequence_with_silences, generate_sample_durations_for_task, |
|
|
get_lufs_loudness, normalize_to_lufs |
|
|
) |
|
|
|
|
|
|
|
|
class VolumeTaskGenerator: |
|
|
"""Generator for volume comparison task dataset.""" |
|
|
|
|
|
def __init__(self, config: Dict, logger): |
|
|
""" |
|
|
Initialize volume task generator. |
|
|
|
|
|
Args: |
|
|
config: Configuration dictionary |
|
|
logger: Logger instance |
|
|
""" |
|
|
self.config = config |
|
|
self.logger = logger |
|
|
self.task_config = config['tasks']['volume'] |
|
|
|
|
|
|
|
|
self.dataset = ESC50Dataset( |
|
|
config['esc50']['metadata_path'], |
|
|
config['esc50']['audio_path'], |
|
|
config |
|
|
) |
|
|
self.audio_processor = AudioProcessor( |
|
|
crossfade_duration=config['audio']['crossfade_duration'], |
|
|
silence_duration=config['audio']['silence_duration'], |
|
|
with_silence=config['audio']['with_silence'], |
|
|
normalize=config['audio']['normalize'], |
|
|
normalize_target_dBFS=config['audio']['normalize_target_dBFS'], |
|
|
synthetic_silence_path=config['synthetic_silence']['path'] |
|
|
) |
|
|
self.question_generator = QuestionGenerator( |
|
|
num_options=config['mcq']['num_options'], |
|
|
option_labels=config['mcq']['option_labels'], |
|
|
distractor_strategy=config['mcq']['distractor_strategy'] |
|
|
) |
|
|
|
|
|
|
|
|
self.llm_enabled = config.get('llm', {}).get('enabled', False) |
|
|
self.llm_generator = LLMQuestionGenerator( |
|
|
enabled=self.llm_enabled, |
|
|
template_questions=self.task_config |
|
|
) |
|
|
|
|
|
|
|
|
self.min_clip_duration = config['audio']['min_clip_duration'] |
|
|
self.max_clip_duration = config['audio']['max_clip_duration'] |
|
|
|
|
|
self.source_clip_duration = config['audio'].get('source_clip_duration', 5.0) |
|
|
self.min_silence_ms = config['audio'].get('min_silence_duration', 100) |
|
|
self.max_extra_silence_per_gap_ms = config['audio'].get('max_extra_silence_per_gap', 500) |
|
|
self.crossfade_ms = config['audio'].get('crossfade_duration', 0) |
|
|
self.task_duration_hours = self.task_config['task_duration_size'] |
|
|
|
|
|
|
|
|
self.normalize_to_baseline = self.task_config.get('normalize_to_baseline', True) |
|
|
self.baseline_dBFS = self.task_config.get('baseline_dBFS', -20.0) |
|
|
self.use_same_clip_different_volumes = self.task_config.get('use_same_clip_different_volumes', False) |
|
|
self.repetitions_per_source = self.task_config.get('repetitions_per_source', [2, 3, 4]) |
|
|
if isinstance(self.repetitions_per_source, int): |
|
|
self.repetitions_per_source = [self.repetitions_per_source] |
|
|
|
|
|
|
|
|
self.multiplier_max_loudness = self.task_config.get('multiplier_max_loudness', 1.5) |
|
|
self.multiplier_min_loudness = self.task_config.get('multiplier_min_loudness', 0.5) |
|
|
self.reject_if_gap_not_met = self.task_config.get('reject_if_gap_not_met', True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.use_lufs = self.task_config.get('use_lufs', True) |
|
|
self.baseline_lufs = self.task_config.get('baseline_lufs', -23.0) |
|
|
|
|
|
|
|
|
self.output_base = Path(config['output']['base_path']) / 'volume' |
|
|
self.output_base.mkdir(parents=True, exist_ok=True) |
|
|
self.audio_output = self.output_base / 'audios' |
|
|
self.audio_output.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
self.clips_count_pool = [] |
|
|
|
|
|
def _normalize_to_baseline(self, audio: "AudioSegment") -> "AudioSegment": |
|
|
""" |
|
|
Normalize audio to the baseline loudness level. |
|
|
|
|
|
Uses LUFS (perceived loudness) if use_lufs=True, otherwise dBFS. |
|
|
This ensures all clips start from the same perceived loudness before |
|
|
applying volume adjustments. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
|
|
|
Returns: |
|
|
Normalized audio segment |
|
|
""" |
|
|
if not self.normalize_to_baseline: |
|
|
return audio |
|
|
|
|
|
if self.use_lufs: |
|
|
|
|
|
normalized = normalize_to_lufs(audio, self.baseline_lufs) |
|
|
self.logger.debug( |
|
|
f"Normalized to baseline LUFS: {get_lufs_loudness(audio):.2f} -> {get_lufs_loudness(normalized):.2f} LUFS" |
|
|
) |
|
|
return normalized |
|
|
else: |
|
|
|
|
|
change_in_dBFS = self.baseline_dBFS - audio.dBFS |
|
|
normalized = audio.apply_gain(change_in_dBFS) |
|
|
self.logger.debug( |
|
|
f"Normalized to baseline dBFS: {audio.dBFS:.2f} -> {normalized.dBFS:.2f} dBFS" |
|
|
) |
|
|
return normalized |
|
|
|
|
|
def _get_amplitude_loudness(self, audio: "AudioSegment") -> float: |
|
|
""" |
|
|
Get the loudness of an audio clip. |
|
|
|
|
|
Uses LUFS (perceived loudness) if use_lufs=True, otherwise dBFS. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
|
|
|
Returns: |
|
|
Loudness in LUFS or dBFS depending on configuration |
|
|
""" |
|
|
if self.use_lufs: |
|
|
return get_lufs_loudness(audio) |
|
|
else: |
|
|
return audio.dBFS |
|
|
|
|
|
def _verify_loudness_gap( |
|
|
self, |
|
|
volume_levels: List[float], |
|
|
question_type: str |
|
|
) -> Tuple[bool, int, Dict]: |
|
|
""" |
|
|
Verify that loudness gap constraint is satisfied. |
|
|
|
|
|
For MAX_LOUDNESS: max_volume >= second_max × multiplier_max |
|
|
For MIN_LOUDNESS: min_volume <= second_min × multiplier_min |
|
|
|
|
|
Since we work with dB (logarithmic), the gap is in dB difference: |
|
|
- For max: max_dB - second_max_dB >= required_gap_dB |
|
|
- For min: second_min_dB - min_dB >= required_gap_dB |
|
|
|
|
|
The multiplier translates to dB: 1.5x linear = ~3.5dB, 2x = ~6dB |
|
|
|
|
|
Args: |
|
|
volume_levels: List of volume adjustments in dB |
|
|
question_type: "max_loudness" or "min_loudness" |
|
|
|
|
|
Returns: |
|
|
Tuple of (gap_satisfied, answer_idx, metadata) |
|
|
""" |
|
|
import math |
|
|
|
|
|
sorted_levels = sorted(volume_levels, reverse=True) |
|
|
|
|
|
if question_type == "max_loudness": |
|
|
max_level = sorted_levels[0] |
|
|
second_max = sorted_levels[1] if len(sorted_levels) > 1 else sorted_levels[0] |
|
|
|
|
|
|
|
|
|
|
|
required_gap_dB = 20 * math.log10(self.multiplier_max_loudness) |
|
|
actual_gap_dB = max_level - second_max |
|
|
|
|
|
gap_satisfied = actual_gap_dB >= required_gap_dB |
|
|
answer_idx = volume_levels.index(max_level) |
|
|
|
|
|
metadata = { |
|
|
'max_level_dB': max_level, |
|
|
'second_max_dB': second_max, |
|
|
'required_gap_dB': required_gap_dB, |
|
|
'actual_gap_dB': actual_gap_dB, |
|
|
'multiplier': self.multiplier_max_loudness |
|
|
} |
|
|
|
|
|
else: |
|
|
min_level = sorted_levels[-1] |
|
|
second_min = sorted_levels[-2] if len(sorted_levels) > 1 else sorted_levels[-1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
required_gap_dB = abs(20 * math.log10(self.multiplier_min_loudness)) |
|
|
actual_gap_dB = second_min - min_level |
|
|
|
|
|
gap_satisfied = actual_gap_dB >= required_gap_dB |
|
|
answer_idx = volume_levels.index(min_level) |
|
|
|
|
|
metadata = { |
|
|
'min_level_dB': min_level, |
|
|
'second_min_dB': second_min, |
|
|
'required_gap_dB': required_gap_dB, |
|
|
'actual_gap_dB': actual_gap_dB, |
|
|
'multiplier': self.multiplier_min_loudness |
|
|
} |
|
|
|
|
|
return gap_satisfied, answer_idx, metadata |
|
|
|
|
|
def generate_volume_levels(self, n_clips: int, question_type: str = None) -> List[float]: |
|
|
""" |
|
|
Generate volume levels dynamically based on multiplier constraints. |
|
|
|
|
|
The levels are generated to ensure proper gap for the question type: |
|
|
- For max_loudness: the loudest is clearly distinguishable (gap = multiplier_max) |
|
|
- For min_loudness: the softest is clearly distinguishable (gap = multiplier_min) |
|
|
|
|
|
Args: |
|
|
n_clips: Number of clips |
|
|
question_type: "max_loudness" or "min_loudness" to ensure proper gap |
|
|
|
|
|
Returns: |
|
|
List of volume adjustments in dB (integers) |
|
|
""" |
|
|
|
|
|
|
|
|
min_diff = 12 |
|
|
|
|
|
|
|
|
if question_type == "max_loudness": |
|
|
required_gap = int(math.ceil(20 * math.log10(self.multiplier_max_loudness))) |
|
|
elif question_type == "min_loudness": |
|
|
required_gap = int(math.ceil(abs(20 * math.log10(self.multiplier_min_loudness)))) |
|
|
else: |
|
|
required_gap = min_diff |
|
|
|
|
|
|
|
|
required_gap = max(required_gap, min_diff) |
|
|
|
|
|
if question_type == "max_loudness": |
|
|
|
|
|
|
|
|
max_level = 18 |
|
|
|
|
|
|
|
|
|
|
|
other_levels = [] |
|
|
current_level = max_level - required_gap |
|
|
for i in range(n_clips - 1): |
|
|
other_levels.append(current_level) |
|
|
current_level -= min_diff |
|
|
|
|
|
selected_levels = other_levels + [max_level] |
|
|
|
|
|
elif question_type == "min_loudness": |
|
|
|
|
|
|
|
|
min_level = -24 |
|
|
|
|
|
|
|
|
|
|
|
other_levels = [] |
|
|
current_level = min_level + required_gap |
|
|
for i in range(n_clips - 1): |
|
|
other_levels.append(current_level) |
|
|
current_level += min_diff |
|
|
|
|
|
selected_levels = [min_level] + other_levels |
|
|
|
|
|
else: |
|
|
|
|
|
total_range = (n_clips - 1) * min_diff |
|
|
start_level = -total_range // 2 |
|
|
selected_levels = [start_level + i * min_diff for i in range(n_clips)] |
|
|
|
|
|
|
|
|
random.shuffle(selected_levels) |
|
|
|
|
|
return selected_levels |
|
|
|
|
|
def generate_sample(self, sample_id: int, target_question_type: str = None, target_duration_seconds: float = None) -> Dict: |
|
|
""" |
|
|
Generate a single volume task sample. |
|
|
|
|
|
Pipeline: |
|
|
1. Pick dataset -> pick class -> pick audio clip |
|
|
2. NORMALIZE all clips to baseline dBFS (critical for controlled comparison) |
|
|
3. Apply different volume adjustments to each clip |
|
|
4. Concatenate clips with silences |
|
|
|
|
|
Optionally: use same clip with different volume levels if configured. |
|
|
|
|
|
Args: |
|
|
sample_id: Sample ID number |
|
|
target_question_type: Target question type for balanced distribution |
|
|
target_duration_seconds: Pre-generated target duration (from generate_sample_durations_for_task) |
|
|
|
|
|
Returns: |
|
|
Dictionary with sample metadata |
|
|
""" |
|
|
|
|
|
if target_duration_seconds is not None: |
|
|
clip_duration_seconds = target_duration_seconds |
|
|
else: |
|
|
clip_duration_seconds = generate_single_clip_duration( |
|
|
self.min_clip_duration, |
|
|
self.max_clip_duration |
|
|
) |
|
|
|
|
|
|
|
|
max_clips, remainder_seconds = get_max_clip_num_to_be_joined( |
|
|
clip_duration_seconds, |
|
|
self.source_clip_duration, |
|
|
self.min_silence_ms |
|
|
) |
|
|
|
|
|
max_clips_per_sample = self.task_config.get('max_clips_per_sample', 10) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
min_clips_for_sample = max(2, max_clips - 3) |
|
|
max_clips_for_sample = min(max_clips, max_clips_per_sample, len(self.dataset.CATEGORIES)) |
|
|
|
|
|
|
|
|
if max_clips_for_sample < 2: |
|
|
raise ValueError( |
|
|
f"Sample {sample_id}: Cannot generate volume task - need at least 2 clips. " |
|
|
f"max_clips={max_clips}, max_clips_per_sample={max_clips_per_sample}, " |
|
|
f"duration={clip_duration_seconds:.1f}s. Increase min_clip_duration." |
|
|
) |
|
|
|
|
|
if min_clips_for_sample > max_clips_for_sample: |
|
|
raise ValueError( |
|
|
f"Sample {sample_id}: Invalid clip range - min_clips ({min_clips_for_sample}) > max_clips ({max_clips_for_sample}). " |
|
|
f"max_clips={max_clips}, max_clips_per_sample={max_clips_per_sample}, duration={clip_duration_seconds:.1f}s" |
|
|
) |
|
|
|
|
|
|
|
|
n_clips = random.randint(min_clips_for_sample, max_clips_for_sample) |
|
|
n_clips = max(2, n_clips) |
|
|
|
|
|
|
|
|
|
|
|
if target_question_type is not None: |
|
|
question_type = target_question_type |
|
|
else: |
|
|
question_type = random.choice(self.task_config['question_types']) |
|
|
|
|
|
|
|
|
max_attempts = 10 |
|
|
gap_satisfied = False |
|
|
volume_levels = None |
|
|
gap_metadata = None |
|
|
|
|
|
for attempt in range(max_attempts): |
|
|
volume_levels = self.generate_volume_levels(n_clips, question_type) |
|
|
gap_satisfied, answer_idx, gap_metadata = self._verify_loudness_gap( |
|
|
volume_levels, question_type |
|
|
) |
|
|
|
|
|
if gap_satisfied: |
|
|
break |
|
|
|
|
|
self.logger.debug( |
|
|
f"Sample {sample_id} attempt {attempt+1}: gap not satisfied, " |
|
|
f"required={gap_metadata['required_gap_dB']:.1f}dB, " |
|
|
f"actual={gap_metadata['actual_gap_dB']:.1f}dB" |
|
|
) |
|
|
|
|
|
if not gap_satisfied and self.reject_if_gap_not_met: |
|
|
self.logger.warning( |
|
|
f"Sample {sample_id} rejected: loudness gap not satisfied after {max_attempts} attempts" |
|
|
) |
|
|
return None |
|
|
|
|
|
|
|
|
if question_type == 'max_loudness': |
|
|
answer_idx = volume_levels.index(max(volume_levels)) |
|
|
else: |
|
|
answer_idx = volume_levels.index(min(volume_levels)) |
|
|
|
|
|
|
|
|
answer_category = self.dataset.get_least_used_categories(1)[0] |
|
|
|
|
|
|
|
|
if self.use_same_clip_different_volumes: |
|
|
|
|
|
selected_categories = [answer_category] * n_clips |
|
|
|
|
|
self.dataset.category_usage_counts[answer_category] += 1 |
|
|
correct_category = answer_category |
|
|
else: |
|
|
|
|
|
|
|
|
if n_clips <= len(self.dataset.CATEGORIES): |
|
|
other_categories = self.dataset.get_least_used_categories( |
|
|
n_clips - 1, |
|
|
exclude=[answer_category] |
|
|
) |
|
|
else: |
|
|
|
|
|
other_categories = self.dataset.get_least_used_categories( |
|
|
min(n_clips - 1, len(self.dataset.CATEGORIES) - 1), |
|
|
exclude=[answer_category] |
|
|
) |
|
|
|
|
|
while len(other_categories) < n_clips - 1: |
|
|
other_categories.append(random.choice(self.dataset.CATEGORIES)) |
|
|
|
|
|
|
|
|
selected_categories = [] |
|
|
other_idx = 0 |
|
|
for i in range(n_clips): |
|
|
if i == answer_idx: |
|
|
selected_categories.append(answer_category) |
|
|
else: |
|
|
selected_categories.append(other_categories[other_idx]) |
|
|
other_idx += 1 |
|
|
|
|
|
|
|
|
self.dataset.category_usage_counts[answer_category] += 1 |
|
|
|
|
|
|
|
|
if selected_categories[answer_idx] != answer_category: |
|
|
self.logger.error(f"Sample {sample_id}: Answer mismatch! Expected {answer_category} at index {answer_idx}, got {selected_categories[answer_idx]}") |
|
|
correct_category = selected_categories[answer_idx] |
|
|
else: |
|
|
correct_category = answer_category |
|
|
|
|
|
|
|
|
audio_segments = [] |
|
|
filenames_list = [] |
|
|
original_loudness = [] |
|
|
final_loudness = [] |
|
|
|
|
|
if self.use_same_clip_different_volumes: |
|
|
|
|
|
filename, filepath = self.dataset.sample_file_from_category(answer_category) |
|
|
base_audio = self.audio_processor.load_audio(filepath) |
|
|
original_loudness_val = self._get_amplitude_loudness(base_audio) |
|
|
|
|
|
|
|
|
base_audio_normalized = self._normalize_to_baseline(base_audio) |
|
|
|
|
|
for i in range(n_clips): |
|
|
|
|
|
audio_adjusted = self.audio_processor.adjust_volume( |
|
|
base_audio_normalized, |
|
|
volume_levels[i] |
|
|
) |
|
|
audio_segments.append(audio_adjusted) |
|
|
filenames_list.append(filename) |
|
|
original_loudness.append(original_loudness_val) |
|
|
final_loudness.append(self._get_amplitude_loudness(audio_adjusted)) |
|
|
else: |
|
|
|
|
|
for i, category in enumerate(selected_categories): |
|
|
filename, filepath = self.dataset.sample_file_from_category(category) |
|
|
audio = self.audio_processor.load_audio(filepath) |
|
|
|
|
|
|
|
|
orig_loud = self._get_amplitude_loudness(audio) |
|
|
original_loudness.append(orig_loud) |
|
|
|
|
|
|
|
|
audio_normalized = self._normalize_to_baseline(audio) |
|
|
|
|
|
|
|
|
audio_adjusted = self.audio_processor.adjust_volume( |
|
|
audio_normalized, |
|
|
volume_levels[i] |
|
|
) |
|
|
|
|
|
audio_segments.append(audio_adjusted) |
|
|
filenames_list.append(filename) |
|
|
final_loudness.append(self._get_amplitude_loudness(audio_adjusted)) |
|
|
|
|
|
|
|
|
output_audio_path = self.audio_output / f"{sample_id}.wav" |
|
|
final_audio = build_clip_sequence_with_silences( |
|
|
audio_segments, |
|
|
clip_duration_seconds, |
|
|
min_silence_ms=self.min_silence_ms, |
|
|
max_extra_silence_per_gap_ms=self.max_extra_silence_per_gap_ms, |
|
|
crossfade_ms=self.crossfade_ms |
|
|
) |
|
|
|
|
|
|
|
|
final_audio.export(str(output_audio_path), format="wav") |
|
|
|
|
|
|
|
|
mcq_question = self.task_config['mcq_questions'][question_type] |
|
|
mcq_data = self.question_generator.generate_category_mcq( |
|
|
mcq_question, |
|
|
correct_category, |
|
|
selected_categories, |
|
|
self.dataset.CATEGORIES |
|
|
) |
|
|
|
|
|
|
|
|
open_text_question = self.task_config['open_text_questions'][question_type] |
|
|
open_text_data = self.question_generator.generate_category_open_text( |
|
|
open_text_question, |
|
|
correct_category |
|
|
) |
|
|
|
|
|
|
|
|
category_volumes = { |
|
|
selected_categories[i]: volume_levels[i] |
|
|
for i in range(n_clips) |
|
|
} |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'id': sample_id, |
|
|
'audio_path': str(output_audio_path.relative_to(self.output_base.parent)), |
|
|
'n_clips': n_clips, |
|
|
'question_type': question_type, |
|
|
'audio_sequence': selected_categories, |
|
|
'volume_levels_db': volume_levels, |
|
|
'category_volumes': category_volumes, |
|
|
'correct_answer_category': correct_category, |
|
|
'correct_volume_db': volume_levels[answer_idx], |
|
|
'source_files': filenames_list, |
|
|
'use_same_clip': self.use_same_clip_different_volumes, |
|
|
'baseline_dBFS': self.baseline_dBFS if self.normalize_to_baseline else None, |
|
|
'original_loudness_dBFS': original_loudness, |
|
|
'final_loudness_dBFS': final_loudness, |
|
|
'gap_satisfied': gap_satisfied, |
|
|
'gap_metadata': gap_metadata, |
|
|
'mcq_question': mcq_data['question'], |
|
|
'mcq_options': mcq_data['options'], |
|
|
'mcq_correct_answer': mcq_data['correct_answer'], |
|
|
'open_text_question': open_text_data['question'], |
|
|
'open_text_answer': open_text_data['correct_answer'] |
|
|
} |
|
|
|
|
|
self.logger.info( |
|
|
f"Generated volume sample {sample_id}: {question_type}, {n_clips} clips, " |
|
|
f"volumes={volume_levels}, gap_satisfied={gap_satisfied}, " |
|
|
f"gap={gap_metadata['actual_gap_dB']:.1f}dB (required={gap_metadata['required_gap_dB']:.1f}dB)" |
|
|
) |
|
|
|
|
|
return metadata |
|
|
|
|
|
def generate_dataset(self) -> tuple: |
|
|
""" |
|
|
Generate the complete volume task dataset. |
|
|
|
|
|
Uses generate_sample_durations_for_task() to pre-generate exact sample durations |
|
|
that sum to exactly the target task duration. This guarantees: |
|
|
- Exact coverage of target duration |
|
|
- No estimation errors from average-based calculation |
|
|
|
|
|
Returns: |
|
|
Tuple of (mcq_csv_path, open_text_csv_path) |
|
|
""" |
|
|
|
|
|
sample_durations = generate_sample_durations_for_task( |
|
|
self.task_duration_hours, |
|
|
self.min_clip_duration, |
|
|
self.max_clip_duration |
|
|
) |
|
|
num_samples = len(sample_durations) |
|
|
|
|
|
self.logger.info(f"Generating {num_samples} volume task samples (target: {self.task_duration_hours}h, exact fill)...") |
|
|
|
|
|
|
|
|
question_types = self.task_config['question_types'] |
|
|
balanced_question_types = [] |
|
|
samples_per_type = num_samples // len(question_types) |
|
|
remainder = num_samples % len(question_types) |
|
|
|
|
|
for qtype in question_types: |
|
|
count = samples_per_type + (1 if remainder > 0 else 0) |
|
|
balanced_question_types.extend([qtype] * count) |
|
|
remainder = max(0, remainder - 1) |
|
|
|
|
|
random.shuffle(balanced_question_types) |
|
|
from collections import Counter |
|
|
type_dist = Counter(balanced_question_types) |
|
|
self.logger.info(f"Balanced question type distribution: {dict(sorted(type_dist.items()))}") |
|
|
|
|
|
all_metadata = [] |
|
|
|
|
|
for i, target_duration in enumerate(sample_durations): |
|
|
metadata = self.generate_sample(i, target_question_type=balanced_question_types[i], target_duration_seconds=target_duration) |
|
|
all_metadata.append(metadata) |
|
|
mcq_csv_path = self.output_base / 'volume_mcq.csv' |
|
|
self._save_mcq_csv(all_metadata, mcq_csv_path) |
|
|
|
|
|
|
|
|
open_text_csv_path = self.output_base / 'volume_open_text.csv' |
|
|
self._save_open_text_csv(all_metadata, open_text_csv_path) |
|
|
|
|
|
|
|
|
metadata_csv_path = self.output_base / 'volume_metadata.csv' |
|
|
self._save_metadata_csv(all_metadata, metadata_csv_path) |
|
|
|
|
|
self.logger.info(f"Volume task dataset generation complete!") |
|
|
self.logger.info(f" - MCQ CSV: {mcq_csv_path}") |
|
|
self.logger.info(f" - Open-text CSV: {open_text_csv_path}") |
|
|
self.logger.info(f" - Metadata CSV: {metadata_csv_path}") |
|
|
self.logger.info(f" - Audio files: {self.audio_output}") |
|
|
|
|
|
return mcq_csv_path, open_text_csv_path |
|
|
|
|
|
def _save_mcq_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save MCQ format CSV.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
|
|
|
writer.writerow([ |
|
|
'question', 'id', 'audio_path', |
|
|
'optionA', 'optionB', 'optionC', 'optionD', |
|
|
'correct', 'question_type', 'audio_sequence', |
|
|
'category_volumes' |
|
|
]) |
|
|
|
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['mcq_question'], |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['mcq_options']['A'], |
|
|
meta['mcq_options']['B'], |
|
|
meta['mcq_options']['C'], |
|
|
meta['mcq_options']['D'], |
|
|
meta['mcq_correct_answer'], |
|
|
meta['question_type'], |
|
|
str(meta['audio_sequence']), |
|
|
str(meta['category_volumes']) |
|
|
]) |
|
|
|
|
|
def _save_open_text_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save open-text format CSV.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
|
|
|
writer.writerow([ |
|
|
'question', 'id', 'audio_path', 'answer', |
|
|
'question_type', 'audio_sequence', 'category_volumes' |
|
|
]) |
|
|
|
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['open_text_question'], |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['open_text_answer'], |
|
|
meta['question_type'], |
|
|
str(meta['audio_sequence']), |
|
|
str(meta['category_volumes']) |
|
|
]) |
|
|
|
|
|
def _save_metadata_csv(self, metadata_list: List[Dict], output_path: Path): |
|
|
"""Save detailed metadata CSV.""" |
|
|
with open(output_path, 'w', newline='') as f: |
|
|
writer = csv.writer(f) |
|
|
|
|
|
writer.writerow([ |
|
|
'id', 'audio_path', 'n_clips', 'question_type', |
|
|
'audio_sequence', 'volume_levels_db', 'correct_answer', |
|
|
'correct_volume_db', 'source_files' |
|
|
]) |
|
|
|
|
|
|
|
|
for meta in metadata_list: |
|
|
writer.writerow([ |
|
|
meta['id'], |
|
|
meta['audio_path'], |
|
|
meta['n_clips'], |
|
|
meta['question_type'], |
|
|
str(meta['audio_sequence']), |
|
|
str(meta['volume_levels_db']), |
|
|
meta['correct_answer_category'], |
|
|
meta['correct_volume_db'], |
|
|
str(meta['source_files']) |
|
|
]) |
|
|
|
|
|
|
|
|
def main(config_path: str = None): |
|
|
"""Main entry point for volume task generation.""" |
|
|
import yaml |
|
|
|
|
|
|
|
|
if config_path is None: |
|
|
config_path = Path(__file__).parent.parent / 'config.yaml' |
|
|
|
|
|
with open(config_path, 'r') as f: |
|
|
config = yaml.safe_load(f) |
|
|
|
|
|
|
|
|
set_random_seed(config['random_seed']) |
|
|
|
|
|
|
|
|
logger = setup_logger( |
|
|
'volume_task', |
|
|
log_file=str(Path(config['output']['base_path']) / config['logging']['log_file']), |
|
|
level=config['logging']['level'], |
|
|
console_output=config['logging']['console_output'] |
|
|
) |
|
|
|
|
|
|
|
|
generator = VolumeTaskGenerator(config, logger) |
|
|
generator.generate_dataset() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|