AutoCenzurer / helpers /build_span.py
HunterNope's picture
ACZ-1 - Created gradio app for HF Space
5f2a5b3
import pandas as pd
from static.config import MAX_WORDS, OVERLAP_WORDS, PAUSE_THRESHOLD, LABEL_ORDER
def build_spans(words_df: pd.DataFrame,
max_words: int = MAX_WORDS,
overlap_words: int = OVERLAP_WORDS,
pause_threshold: float = PAUSE_THRESHOLD) -> pd.DataFrame:
spans = []
# span_df = pd.DataFrame(columns=[
# "span_id",
# "span_text",
# "start_time",
# "end_time",
# "has_excessive_profanity",
# "has_slur",
# "has_targeted_insult",
# "profanity_hits",
# "slur_hits",
# "insult_hits",
# "llm_label",
# "llm_confidence",
# "llm_rationale",
# "final_enforced_label"
# ])
### Assuming that words will be grouped by audio_id beforehand and invoked iteratively this if needed
cursor = 0
span_counter = 0
len_df = len(words_df)
while cursor < len_df:
span_start_idx = cursor
# Determine position: beginning, middle, or end
is_beginning = (cursor == 0)
# Build main span chunk (up to max_words, respecting pause threshold)
main_span_chunk = words_df.iloc[span_start_idx:span_start_idx + max_words]
main_words = []
for i in range(len(main_span_chunk)):
word_curr = main_span_chunk.iloc[i]
main_words.append(word_curr['word'])
# Check for pause break
if i + 1 < len(main_span_chunk):
word_next = main_span_chunk.iloc[i + 1]
if word_next['start_time'] - word_curr['end_time'] >= pause_threshold:
break
effective_len = len(main_words)
span_end_idx = span_start_idx + effective_len - 1
is_end = (span_end_idx >= len_df - 1)
# Calculate overlap indices
overlap_before_start = max(0, span_start_idx - overlap_words)
overlap_after_end = min(len_df - 1, span_end_idx + overlap_words)
# Build span text with overlaps
span_text = []
# Add overlap before (if not at beginning)
if not is_beginning and overlap_before_start < span_start_idx:
overlap_before_words = words_df.iloc[overlap_before_start:span_start_idx]
for j in range(len(overlap_before_words)):
span_text.append(overlap_before_words.iloc[j]['word'])
# Add main words
span_text.extend(main_words)
# Add overlap after (if not at end)
if not is_end and span_end_idx + 1 <= overlap_after_end:
overlap_after_words = words_df.iloc[span_end_idx + 1:overlap_after_end + 1]
for j in range(len(overlap_after_words)):
span_text.append(overlap_after_words.iloc[j]['word'])
# Calculate time boundaries (including overlap)
actual_start_idx = overlap_before_start if not is_beginning else span_start_idx
actual_end_idx = overlap_after_end if not is_end else span_end_idx
span_row = {
"span_id": span_counter,
"span_text": ' '.join(span_text),
"start_time": words_df.iloc[actual_start_idx]['start_time'],
"end_time": words_df.iloc[actual_end_idx]['end_time'],
"has_excessive_profanity": None,
"has_slur": None,
"has_targeted_insult": None,
"has_threat_or_violence": None,
"profanity_hits": None,
"slur_hits": None,
"insult_hits": None,
"threat_or_violence_hits": None
}
spans.append(span_row)
# Advance cursor by max_words (no overlap stride, just full steps)
cursor += max(effective_len, 1)
span_counter += 1
if not spans:
return pd.DataFrame(columns=[
'span_id', 'span_text', 'start_time', 'end_time',
'has_excessive_profanity', 'has_slur', 'has_targeted_insult',
'has_threat_or_violence', 'profanity_hits', 'slur_hits',
'insult_hits', 'threat_or_violence_hits'
])
span_df = pd.DataFrame(spans)
span_df = span_df.astype({
'span_id': 'int64',
'span_text': 'object',
'start_time': 'float64',
'end_time': 'float64',
'has_excessive_profanity': 'boolean',
'has_slur': 'boolean',
'has_targeted_insult': 'boolean',
'has_threat_or_violence': 'boolean',
'profanity_hits': 'object',
'slur_hits': 'object',
'insult_hits': 'object',
'threat_or_violence_hits': 'object',
})
return span_df
def deduplicate_harmful_spans(spans_df: pd.DataFrame, time_tolerance: float = 0.5) -> pd.DataFrame:
"""
Remove duplicate harmful spans that overlap significantly.
When multiple spans share similar end_time (within tolerance), keep only the one
with the highest severity label. This prevents muting the same region multiple times.
Args:
spans_df: DataFrame with classified spans (must have 'final_enforced_label', 'start_time', 'end_time')
time_tolerance: Maximum time difference (seconds) to consider spans as overlapping
Returns:
Deduplicated DataFrame with only the highest-severity span per overlapping group
"""
if spans_df.empty:
return spans_df
if 'final_enforced_label' not in spans_df.columns:
return spans_df
# Filter only harmful spans
harmful = spans_df[spans_df['final_enforced_label'] != 'NONE'].copy()
if harmful.empty:
return spans_df
# Add severity score for sorting
harmful['_severity'] = harmful['final_enforced_label'].map(LABEL_ORDER).fillna(0)
# Sort by end_time, then by severity (descending)
harmful = harmful.sort_values(['end_time', '_severity'], ascending=[True, False])
# Group spans by similar end_time and keep highest severity
keep_indices = []
last_end_time = None
for idx, row in harmful.iterrows():
current_end = row['end_time']
# If this is a new time group or first span
if last_end_time is None or abs(current_end - last_end_time) > time_tolerance:
keep_indices.append(idx)
last_end_time = current_end
# If same time group, we already have the highest severity (due to sorting)
# Get deduplicated harmful spans
deduplicated_harmful = harmful.loc[keep_indices].drop(columns=['_severity'])
# Combine with non-harmful spans
non_harmful = spans_df[spans_df['final_enforced_label'] == 'NONE']
result = pd.concat([non_harmful, deduplicated_harmful], ignore_index=False)
result = result.sort_values('span_id').reset_index(drop=True)
return result
def deduplicate_by_overlap(spans_df: pd.DataFrame, overlap_threshold: float = 0.8) -> pd.DataFrame:
"""
Remove spans that significantly overlap with higher-severity spans.
Args:
spans_df: DataFrame with classified spans
overlap_threshold: Minimum overlap ratio (0-1) to consider as duplicate
Returns:
Deduplicated DataFrame
"""
if spans_df.empty or 'final_enforced_label' not in spans_df.columns:
return spans_df
harmful = spans_df[spans_df['final_enforced_label'] != 'NONE'].copy()
if harmful.empty:
return spans_df
# Add severity and sort by severity descending
harmful['_severity'] = harmful['final_enforced_label'].map(LABEL_ORDER).fillna(0)
harmful = harmful.sort_values('_severity', ascending=False)
keep_indices = []
kept_intervals = [] # List of (start, end) tuples
for idx, row in harmful.iterrows():
start, end = row['start_time'], row['end_time']
span_duration = end - start
if span_duration <= 0:
continue
# Check overlap with already kept spans
is_duplicate = False
for kept_start, kept_end in kept_intervals:
# Calculate overlap
overlap_start = max(start, kept_start)
overlap_end = min(end, kept_end)
overlap_duration = max(0, overlap_end - overlap_start)
overlap_ratio = overlap_duration / span_duration
if overlap_ratio >= overlap_threshold:
is_duplicate = True
break
if not is_duplicate:
keep_indices.append(idx)
kept_intervals.append((start, end))
deduplicated_harmful = harmful.loc[keep_indices].drop(columns=['_severity'])
non_harmful = spans_df[spans_df['final_enforced_label'] == 'NONE']
result = pd.concat([non_harmful, deduplicated_harmful], ignore_index=False)
result = result.sort_values('span_id').reset_index(drop=True)
return result