Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| utils.py — pure logic helpers with no Streamlit dependency. | |
| Covers: | |
| - Audio processing (processFile, clip extraction, randomization) | |
| - DataFrame builders (build_df2 … build_df5) | |
| - Plotly figure builders (one function per chart tab) | |
| - Multi-file summary DataFrame builders | |
| """ | |
| import io | |
| import random | |
| import datetime as dt | |
| import copy | |
| import numpy as np | |
| import pandas as pd | |
| import soundfile as sf | |
| import torch | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import sonogram_utility as su | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| CLIP_MIN_S = 3.0 | |
| CLIP_MAX_S = 5.0 | |
| CLIP_SCAN_STEP_S = 0.5 | |
| TRANSPARENT_BG = dict( | |
| plot_bgcolor="rgba(0,0,0,0)", | |
| paper_bgcolor="rgba(0,0,0,0)", | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Audio processing | |
| # --------------------------------------------------------------------------- | |
| def processFile(filePath, pipeline, enableDenoise, earlyCleanup, | |
| gainWindow, minimumGain, maximumGain, | |
| dfModel=None, dfState=None, attenLimDB=3): | |
| """Load, optionally denoise, equalize, and diarize an audio file. | |
| Returns (annotations, totalTimeInSeconds, waveform_tensor, sample_rate). | |
| """ | |
| print("Loading file") | |
| waveformList, sampleRate = su.splitIntoTimeSegments(filePath, 600) | |
| print("File loaded") | |
| enhancedWaveformList = [] | |
| if enableDenoise: | |
| print("Denoising") | |
| for w in waveformList: | |
| if enableDenoise: | |
| from df import enhance | |
| newW = enhance(dfModel, dfState, w, atten_lim_db=attenLimDB).detach().cpu() | |
| enhancedWaveformList.append(newW) | |
| else: | |
| enhancedWaveformList.append(w) | |
| if enableDenoise: | |
| print("Audio denoised") | |
| waveformEnhanced = su.combineWaveforms(enhancedWaveformList) | |
| if earlyCleanup: | |
| del enhancedWaveformList | |
| print("Equalizing Audio") | |
| waveform_gain_adjusted = su.equalizeVolume()( | |
| waveformEnhanced, sampleRate, gainWindow, minimumGain, maximumGain | |
| ) | |
| if earlyCleanup: | |
| del waveformEnhanced | |
| print("Audio Equalized") | |
| print("Detecting speakers") | |
| diarization_output = pipeline({"waveform": waveform_gain_adjusted, "sample_rate": sampleRate}) | |
| annotations = diarization_output.speaker_diarization | |
| print("Speakers Detected") | |
| totalTimeInSeconds = int(waveform_gain_adjusted.shape[-1] / sampleRate) | |
| return annotations, totalTimeInSeconds, waveform_gain_adjusted, sampleRate | |
| # --------------------------------------------------------------------------- | |
| # Speaker clip helpers | |
| # --------------------------------------------------------------------------- | |
| def extract_clip_bytes(waveform, sample_rate, seg_start, seg_end): | |
| """Return WAV bytes for the loudest CLIP_MIN–CLIP_MAX window in [seg_start, seg_end].""" | |
| total_samples = waveform.shape[-1] | |
| seg_start_s = int(seg_start * sample_rate) | |
| seg_end_s = min(int(seg_end * sample_rate), total_samples) | |
| seg_dur = (seg_end_s - seg_start_s) / sample_rate | |
| clip_dur = min(max(min(seg_dur, CLIP_MAX_S), CLIP_MIN_S), seg_dur) | |
| clip_samples = int(clip_dur * sample_rate) | |
| best_start = seg_start_s | |
| best_rms = -1.0 | |
| step_samples = int(CLIP_SCAN_STEP_S * sample_rate) | |
| pos = seg_start_s | |
| while pos + clip_samples <= seg_end_s: | |
| window = waveform[:, pos: pos + clip_samples].float() | |
| rms = float(window.pow(2).mean().sqrt()) | |
| if rms > best_rms: | |
| best_rms = rms | |
| best_start = pos | |
| pos += step_samples | |
| clip_np = waveform[:, best_start: best_start + clip_samples].numpy().T | |
| buf = io.BytesIO() | |
| sf.write(buf, clip_np, sample_rate, format="WAV", subtype="PCM_16") | |
| buf.seek(0) | |
| return buf.read() | |
| def build_speaker_clips(annotations, waveform, sample_rate): | |
| """Return (clips_dict, segments_dict) for all speakers in annotations. | |
| clips_dict : {speaker: wav_bytes} | |
| segments_dict : {speaker: [(start, end), ...]} | |
| """ | |
| clips = {} | |
| segments = {} | |
| for speaker in annotations.labels(): | |
| speaker_segments = [ | |
| seg for seg, _, label in annotations.itertracks(yield_label=True) | |
| if label == speaker | |
| ] | |
| if not speaker_segments: | |
| continue | |
| segments[speaker] = [(s.start, s.end) for s in speaker_segments] | |
| longest = max(speaker_segments, key=lambda s: s.duration) | |
| clips[speaker] = extract_clip_bytes(waveform, sample_rate, longest.start, longest.end) | |
| return clips, segments | |
| def get_randomized_clip(waveform, sample_rate, segments): | |
| """Return WAV bytes for a random 3–5 s window drawn from a random segment. | |
| segments : [(start, end), ...] (all segments for one speaker) | |
| """ | |
| durations = [max(e - s, 0.01) for s, e in segments] | |
| total_dur = sum(durations) | |
| rand_val = random.random() * total_dur | |
| cumulative = 0.0 | |
| chosen_start, chosen_end = segments[0] | |
| for (seg_s, seg_e), dur in zip(segments, durations): | |
| cumulative += dur | |
| if rand_val <= cumulative: | |
| chosen_start, chosen_end = seg_s, seg_e | |
| break | |
| seg_dur = chosen_end - chosen_start | |
| clip_dur = min(max(min(seg_dur, CLIP_MAX_S), CLIP_MIN_S), seg_dur) | |
| max_offset = max(seg_dur - clip_dur, 0.0) | |
| offset = random.uniform(0.0, max_offset) | |
| clip_start = chosen_start + offset | |
| clip_end = clip_start + clip_dur | |
| return extract_clip_bytes(waveform, sample_rate, clip_start, clip_end) | |
| # --------------------------------------------------------------------------- | |
| # DataFrame builders (called from analyze() in state.py) | |
| # --------------------------------------------------------------------------- | |
| def build_df3(noVoice, oneVoice, multiVoice): | |
| """Voice category totals DataFrame.""" | |
| return pd.DataFrame({ | |
| "values": [su.sumTimes(noVoice), su.sumTimes(oneVoice), su.sumTimes(multiVoice)], | |
| "names": ["No Voice", "One Voice", "Multi Voice"], | |
| }) | |
| def build_df4(speakerNames, categorySelections, categoryNames, currAnnotation): | |
| """Speaker-to-category time DataFrame. Returns (df4, nameList, valueList, extraNames, extraValues).""" | |
| nameList = list(categoryNames) | |
| valueList = [0.0] * len(nameList) | |
| extraNames : list = [] | |
| extraValues: list = [] | |
| for sp in speakerNames: | |
| found = False | |
| for i, _ in enumerate(nameList): | |
| if sp in categorySelections[i]: | |
| valueList[i] += su.sumTimes(currAnnotation.subset([sp])) | |
| found = True | |
| break | |
| if not found: | |
| extraNames.append(sp) | |
| extraValues.append(su.sumTimes(currAnnotation.subset([sp]))) | |
| if extraNames: | |
| pairs = sorted(zip(extraNames, extraValues), key=lambda p: p[0]) | |
| extraNames, extraValues = map(list, zip(*pairs)) | |
| else: | |
| extraNames, extraValues = [], [] | |
| df4 = pd.DataFrame({"values": valueList + extraValues, "names": nameList + extraNames}) | |
| return df4, nameList, valueList, extraNames, extraValues | |
| def build_df5(oneVoice, multiVoice, sumNoVoice, sumOneVoice, sumMultiVoice, currTotalTime): | |
| """Hierarchical voice-category DataFrame for sunburst / treemap.""" | |
| speakerList, timeList = su.sumTimesPerSpeaker(oneVoice) | |
| multiSpeakerList, multiTimeList = su.sumMultiTimesPerSpeaker(multiVoice) | |
| speakerList = list(speakerList) if speakerList else [] | |
| timeList = list(timeList) if timeList else [] | |
| multiSpeakerList = list(multiSpeakerList) if multiSpeakerList else [] | |
| multiTimeList = list(multiTimeList) if multiTimeList else [] | |
| summativeMulti = sum(multiTimeList) if multiTimeList else 1 | |
| safeOneVoice = sumOneVoice if sumOneVoice > 0 else 1 | |
| base = [sumNoVoice / currTotalTime, sumOneVoice / currTotalTime, sumMultiVoice / currTotalTime] | |
| timeStrings = su.timeToString(timeList) if timeList else [] | |
| multiTimeStrings = su.timeToString(multiTimeList) if multiTimeList else [] | |
| if isinstance(timeStrings, str): | |
| timeStrings = [timeStrings] | |
| if isinstance(multiTimeStrings, str): | |
| multiTimeStrings = [multiTimeStrings] | |
| n_ov = len(speakerList) | |
| n_mv = len(multiSpeakerList) | |
| return pd.DataFrame({ | |
| "ids": ["NV", "OV", "MV"] + [f"OV_{i}" for i in range(n_ov)] + [f"MV_{i}" for i in range(n_mv)], | |
| "labels": ["No Voice", "One Voice", "Multi Voice"] + speakerList + multiSpeakerList, | |
| "parents": ["", "", ""] + ["OV"] * n_ov + ["MV"] * n_mv, | |
| "parentNames": ["Total", "Total", "Total"] + ["One Voice"] * n_ov + ["Multi Voice"] * n_mv, | |
| "values": [sumNoVoice, sumOneVoice, sumMultiVoice] + timeList + multiTimeList, | |
| "valueStrings": [ | |
| su.timeToString(sumNoVoice), | |
| su.timeToString(sumOneVoice), | |
| su.timeToString(sumMultiVoice), | |
| ] + timeStrings + multiTimeStrings, | |
| "percentiles": [b * 100 for b in base] | |
| + [(t * 100) / safeOneVoice * base[1] for t in timeList] | |
| + [(t * 100) / summativeMulti * base[2] for t in multiTimeList], | |
| "parentPercentiles": [b * 100 for b in base] | |
| + [(t * 100) / safeOneVoice for t in timeList] | |
| + [(t * 100) / summativeMulti for t in multiTimeList], | |
| }) | |
| def build_df2(df4_names, df4_values, currTotalTime): | |
| """Percentage-of-total DataFrame (used by the bar chart tab).""" | |
| return pd.DataFrame({ | |
| "values": [100 * v / currTotalTime for v in df4_values], | |
| "names": df4_names, | |
| }) | |
| # --------------------------------------------------------------------------- | |
| # Plotly figure builders | |
| # --------------------------------------------------------------------------- | |
| def _save_fig(fig, *paths): | |
| """Try to write fig to each path; silently skip on failure.""" | |
| for path in paths: | |
| try: | |
| fig.write_image(path) | |
| except Exception: | |
| pass | |
| def build_fig_pie1(df3, catTypeColors): | |
| """Voice category pie chart.""" | |
| fig = go.Figure() | |
| fig.update_layout( | |
| title_text="Percentage of each Voice Category", | |
| colorway=catTypeColors, | |
| **TRANSPARENT_BG, | |
| ) | |
| fig.add_trace(go.Pie(values=df3["values"], labels=df3["names"], sort=False)) | |
| return fig | |
| def build_fig_pie2(df4, speakerNames, speakerColors, catColors, get_display_name_fn, currFile): | |
| """Speaker / category pie chart.""" | |
| df4 = df4.copy() | |
| figColors = [ | |
| speakerColors[list(speakerNames).index(n)] | |
| for n in df4["names"] if n in speakerNames | |
| ] | |
| df4["names"] = df4["names"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| fig = go.Figure() | |
| fig.update_layout( | |
| title_text="Percentage of Speakers and Custom Categories", | |
| colorway=catColors + figColors, | |
| **TRANSPARENT_BG, | |
| ) | |
| fig.add_trace(go.Pie(values=df4["values"], labels=df4["names"], sort=False)) | |
| return fig | |
| def build_fig_sunburst(df5, catTypeColors, speakerColors, get_display_name_fn, currFile): | |
| """Sunburst voice-category chart.""" | |
| df5 = df5.copy() | |
| df5["labels"] = df5["labels"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| df5["parentNames"] = df5["parentNames"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| fig = px.sunburst( | |
| df5, | |
| branchvalues="total", | |
| names="labels", ids="ids", parents="parents", | |
| values="percentiles", | |
| custom_data=["labels", "valueStrings", "percentiles", "parentNames", "parentPercentiles"], | |
| color="labels", | |
| title="Percentage of each Voice Category with Speakers", | |
| color_discrete_sequence=catTypeColors + speakerColors, | |
| ) | |
| fig.update_traces(hovertemplate="<br>".join([ | |
| "<b>%{customdata[0]}</b>", | |
| "Duration: %{customdata[1]}s", | |
| "Percentage of Total: %{customdata[2]:.2f}%", | |
| "Parent: %{customdata[3]}", | |
| "Percentage of Parent: %{customdata[4]:.2f}%", | |
| ])) | |
| fig.update_layout(**TRANSPARENT_BG) | |
| return fig | |
| def build_fig_treemap(df5, catTypeColors, speakerColors, get_display_name_fn, currFile): | |
| """Treemap voice-category chart.""" | |
| df5 = df5.copy() | |
| df5["labels"] = df5["labels"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| df5["parentNames"] = df5["parentNames"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| fig = px.treemap( | |
| df5, | |
| branchvalues="total", | |
| names="labels", parents="parents", ids="ids", | |
| values="percentiles", | |
| custom_data=["labels", "valueStrings", "percentiles", "parentNames", "parentPercentiles"], | |
| color="labels", | |
| title="Division of Speakers in each Voice Category", | |
| color_discrete_sequence=catTypeColors + speakerColors, | |
| ) | |
| fig.update_traces(hovertemplate="<br>".join([ | |
| "<b>%{customdata[0]}</b>", | |
| "Duration: %{customdata[1]}s", | |
| "Percentage of Total: %{customdata[2]:.2f}%", | |
| "Parent: %{customdata[3]}", | |
| "Percentage of Parent: %{customdata[4]:.2f}%", | |
| ])) | |
| fig.update_layout(**TRANSPARENT_BG) | |
| return fig | |
| def build_fig_timeline(speakers_dataFrame, currTotalTime, speakerColors, get_display_name_fn, currFile): | |
| """Gantt-style speaker timeline.""" | |
| df = speakers_dataFrame.copy() | |
| df["Resource"] = df["Resource"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| base = dt.datetime.combine(dt.date.today(), dt.time.min) | |
| def to_audio_dt(s): | |
| if isinstance(s, (dt.datetime, pd.Timestamp)): | |
| midnight = s.replace(hour=0, minute=0, second=0, microsecond=0) | |
| seconds = (s - midnight).total_seconds() | |
| else: | |
| seconds = float(s) | |
| return base + dt.timedelta(seconds=seconds) | |
| df["Start"] = df["Start"].apply(to_audio_dt) | |
| df["Finish"] = df["Finish"].apply(to_audio_dt) | |
| fig = px.timeline( | |
| df, x_start="Start", x_end="Finish", y="Resource", color="Resource", | |
| title="Timeline of Audio with Speakers", | |
| color_discrete_sequence=speakerColors, | |
| ) | |
| fig.update_yaxes(autorange="reversed") | |
| h = int(currTotalTime // 3600) | |
| m = int(currTotalTime % 3600 // 60) | |
| s = int(currTotalTime % 60) | |
| ms= int(currTotalTime * 1_000_000 % 1_000_000) | |
| time_max = dt.time(h, m, s, ms) | |
| fig.update_layout( | |
| xaxis_tickformatstops=[ | |
| dict(dtickrange=[None, 1000], value="%H:%M:%S.%L"), | |
| dict(dtickrange=[1000, None], value="%H:%M:%S"), | |
| ], | |
| xaxis=dict(range=[ | |
| dt.datetime.combine(dt.date.today(), dt.time.min), | |
| dt.datetime.combine(dt.date.today(), time_max), | |
| ]), | |
| xaxis_title="Time", | |
| yaxis_title="Speaker", | |
| legend_title=None, | |
| legend={"traceorder": "reversed"}, | |
| yaxis={"showticklabels": False}, | |
| **TRANSPARENT_BG, | |
| ) | |
| return fig | |
| def build_fig_bar(df2, catColors, speakerColors, get_display_name_fn, currFile): | |
| """Horizontal bar chart — time spoken per speaker.""" | |
| df2 = df2.copy() | |
| df2["names"] = df2["names"].apply(lambda s: get_display_name_fn(s, currFile)) | |
| fig = px.bar( | |
| df2, x="values", y="names", color="names", orientation="h", | |
| custom_data=["names", "values"], | |
| title="Time Spoken by each Speaker", | |
| color_discrete_sequence=catColors + speakerColors, | |
| ) | |
| fig.update_xaxes(ticksuffix="%") | |
| fig.update_yaxes(autorange="reversed") | |
| fig.update_layout( | |
| xaxis_title="Percentage Time Spoken", | |
| yaxis_title=None, | |
| showlegend=False, | |
| yaxis={"showticklabels": True}, | |
| **TRANSPARENT_BG, | |
| ) | |
| fig.update_traces(hovertemplate="<br>".join([ | |
| "<b>%{customdata[0]}</b>", | |
| "Percentage of Time: %{customdata[1]:.2f}%", | |
| ])) | |
| return fig | |
| # --------------------------------------------------------------------------- | |
| # Multi-file summary DataFrames | |
| # --------------------------------------------------------------------------- | |
| def build_multifile_category_df(validNames, results, summaries, categories, categorySelect): | |
| """Build df6 (category breakdown per file) for the multi-file expander.""" | |
| df6_dict = {"files": validNames} | |
| allCategories = copy.deepcopy(categories) | |
| for fn in validNames: | |
| currAnnotation, _ = results[fn] | |
| catSummary, extraCats = su.calcCategories(currAnnotation, categorySelect[fn]) | |
| summaries[fn]["categories"] = (catSummary, extraCats) | |
| for extra in extraCats: | |
| df6_dict.setdefault(extra, []) | |
| if extra not in allCategories: | |
| allCategories.append(extra) | |
| for category in categories: | |
| df6_dict.setdefault(category, []) | |
| for fn in validNames: | |
| summary, extras = summaries[fn]["categories"] | |
| theseCategories = categories + extras | |
| for j, timeSlots in enumerate(summary): | |
| df6_dict[theseCategories[j]].append( | |
| sum(t.duration for _, t in timeSlots) / results[fn][1] | |
| ) | |
| for category in allCategories: | |
| if category not in theseCategories: | |
| df6_dict[category].append(0) | |
| return pd.DataFrame(df6_dict), allCategories | |
| def build_multifile_voice_df(validNames, summaries): | |
| """Build df7 (no/one/multi voice percentages per file) for the multi-file expander.""" | |
| voiceNames = ["No Voice", "One Voice", "Multi Voice"] | |
| df7_dict = {"files": validNames} | |
| for name in voiceNames: | |
| df7_dict[name] = [] | |
| for fn in validNames: | |
| partial = summaries[fn]["df5"] | |
| for i, name in enumerate(voiceNames): | |
| df7_dict[name].append(partial["percentiles"][i]) | |
| return pd.DataFrame(df7_dict), voiceNames | |