Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| import streamlit as st | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import torchaudio | |
| import sonogram_utility as su | |
| import time | |
| import ParquetScheduler as ps | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Union | |
| import copy | |
| import datetime | |
| import tempfile | |
| import os | |
| import shutil | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import torch | |
| #import torch_xla.core.xla_model as xm | |
| from pyannote.audio import Pipeline | |
| from pyannote.core import Annotation, Segment, Timeline | |
| import datetime as dt | |
| enableDenoise = False | |
| earlyCleanup = True | |
| # [None,Low,Medium,High,Debug] | |
| # [0,1,2,3,4] | |
| verbosity=4 | |
| config = { | |
| 'displayModeBar': True, | |
| 'modeBarButtonsToRemove':[], | |
| } | |
| def printV(message,verbosityLevel): | |
| global verbosity | |
| if verbosity>=verbosityLevel: | |
| print(message) | |
| def get_display_name(speaker, fileName): | |
| """Return the user-assigned display name for a speaker, or the original label.""" | |
| renames = st.session_state.speakerRenames | |
| return renames.get(fileName, {}).get(speaker, speaker) | |
| def apply_speaker_renames_to_df(df, fileName, column="task"): | |
| """Replace speaker_## labels in a DataFrame column with display names.""" | |
| if column not in df.columns: | |
| return df | |
| df = df.copy() | |
| df[column] = df[column].apply(lambda s: get_display_name(s, fileName)) | |
| return df | |
| def convert_df(df): | |
| return df.to_csv(index=False).encode('utf-8') | |
| def save_data( | |
| config_dict: Dict[str,str], audio_paths: List[str], userid: str, | |
| ) -> None: | |
| """Save data, i.e. move audio to a new folder and send paths+config to scheduler.""" | |
| save_dir = PARQUET_DATASET_DIR / f"{userid}" | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| data = copy.deepcopy(config_dict) | |
| # Add timestamp | |
| data["timestamp"] = datetime.datetime.utcnow().isoformat() | |
| # Copy and add audio | |
| for i,p in enumerate(audio_paths): | |
| name = f"{i:03d}" | |
| dst_path = save_dir / f"{name}{Path(p).suffix}" | |
| shutil.copyfile(p, dst_path) | |
| data[f"audio_{name}"] = dst_path | |
| # Send to scheduler | |
| scheduler.append(data) | |
| def processFile(filePath): | |
| global attenLimDb | |
| global gainWindow | |
| global minimumGain | |
| global maximumGain | |
| print("Loading file") | |
| waveformList, sampleRate = su.splitIntoTimeSegments(filePath,600) | |
| print("File loaded") | |
| enhancedWaveformList = [] | |
| if (enableDenoise): | |
| print("Denoising") | |
| for w in waveformList: | |
| if (enableDenoise): | |
| newW = enhance(dfModel,dfState,w,atten_lim_db=attenLimDB).detach().cpu() | |
| enhancedWaveformList.append(newW) | |
| else: | |
| enhancedWaveformList.append(w) | |
| if (enableDenoise): | |
| print("Audio denoised") | |
| waveformEnhanced = su.combineWaveforms(enhancedWaveformList) | |
| if (earlyCleanup): | |
| del enhancedWaveformList | |
| print("Equalizing Audio") | |
| waveform_gain_adjusted = su.equalizeVolume()(waveformEnhanced,sampleRate,gainWindow,minimumGain,maximumGain) | |
| if (earlyCleanup): | |
| del waveformEnhanced | |
| print("Audio Equalized") | |
| print("Detecting speakers") | |
| diarization_output = pipeline({"waveform": waveform_gain_adjusted, "sample_rate": sampleRate}) | |
| annotations = diarization_output.speaker_diarization | |
| print("Speakers Detected") | |
| totalTimeInSeconds = int(waveform_gain_adjusted.shape[-1]/sampleRate) | |
| print("Time in seconds calculated") | |
| return annotations, totalTimeInSeconds, waveform_gain_adjusted, sampleRate | |
| def _extract_clip_bytes(waveform, sample_rate, seg_start, seg_end): | |
| """ | |
| Extract a 3–5 s clip from [seg_start, seg_end] by finding the loudest | |
| RMS window within that range. Returns raw WAV bytes. | |
| """ | |
| import io | |
| import soundfile as sf | |
| CLIP_MIN = 3.0 | |
| CLIP_MAX = 5.0 | |
| STEP = 0.5 # scanning step in seconds | |
| total_samples = waveform.shape[-1] | |
| seg_start_s = int(seg_start * sample_rate) | |
| seg_end_s = min(int(seg_end * sample_rate), total_samples) | |
| seg_len_s = seg_end_s - seg_start_s | |
| # Duration of this segment in seconds | |
| seg_dur = (seg_end_s - seg_start_s) / sample_rate | |
| # Clip duration: between CLIP_MIN and CLIP_MAX, capped by segment length | |
| clip_dur = min(max(min(seg_dur, CLIP_MAX), CLIP_MIN), seg_dur) | |
| clip_samples = int(clip_dur * sample_rate) | |
| best_start = seg_start_s | |
| best_rms = -1.0 | |
| # Slide a window and pick the loudest position | |
| step_samples = int(STEP * sample_rate) | |
| pos = seg_start_s | |
| while pos + clip_samples <= seg_end_s: | |
| window = waveform[:, pos: pos + clip_samples].float() | |
| rms = float(window.pow(2).mean().sqrt()) | |
| if rms > best_rms: | |
| best_rms = rms | |
| best_start = pos | |
| pos += step_samples | |
| clip_waveform = waveform[:, best_start: best_start + clip_samples] | |
| clip_np = clip_waveform.numpy().T # (samples, channels) | |
| buf = io.BytesIO() | |
| sf.write(buf, clip_np, sample_rate, format="WAV", subtype="PCM_16") | |
| buf.seek(0) | |
| return buf.read() | |
| def generate_speaker_clips(annotations, waveform, sample_rate, file_index): | |
| """ | |
| For each unique speaker in `annotations`: | |
| - Store all their segments in st.session_state.speakerSegments[file_index][speaker]. | |
| - Pick the loudest 3–5 s window within their longest segment as the default clip. | |
| Saves clips as WAV bytes in st.session_state.speakerClips[file_index]. | |
| """ | |
| # Initialise speakerSegments store if needed | |
| if 'speakerSegments' not in st.session_state: | |
| st.session_state.speakerSegments = {} | |
| clips = {} | |
| segments = {} | |
| for speaker in annotations.labels(): | |
| speaker_segments = [ | |
| segment for segment, _, label in annotations.itertracks(yield_label=True) | |
| if label == speaker | |
| ] | |
| if not speaker_segments: | |
| continue | |
| # Persist all segments so the randomize button can draw from them later | |
| segments[speaker] = [(s.start, s.end) for s in speaker_segments] | |
| longest = max(speaker_segments, key=lambda s: s.duration) | |
| clips[speaker] = _extract_clip_bytes( | |
| waveform, sample_rate, longest.start, longest.end | |
| ) | |
| st.session_state.speakerClips[file_index] = clips | |
| st.session_state.speakerSegments[file_index] = segments | |
| print(f"Generated {len(clips)} speaker clips for {file_index}") | |
| def randomize_speaker_clip(file_index, speaker): | |
| """ | |
| Pick a random segment (weighted by duration) for `speaker` and extract | |
| a random 3–5 s window from it. Updates speakerClips in session_state. | |
| Requires that st.session_state.speakerWaveforms[file_index] is present. | |
| """ | |
| import random | |
| segs = st.session_state.speakerSegments.get(file_index, {}).get(speaker) | |
| waveform_data = st.session_state.speakerWaveforms.get(file_index) | |
| if not segs or waveform_data is None: | |
| return | |
| waveform, sample_rate = waveform_data | |
| CLIP_MIN = 3.0 | |
| CLIP_MAX = 5.0 | |
| # Weight selection by segment duration so longer segments are more likely | |
| durations = [max(e - s, 0.01) for s, e in segs] | |
| total_dur = sum(durations) | |
| rand_val = random.random() * total_dur | |
| cumulative = 0.0 | |
| chosen_start, chosen_end = segs[0] | |
| for (seg_s, seg_e), dur in zip(segs, durations): | |
| cumulative += dur | |
| if rand_val <= cumulative: | |
| chosen_start, chosen_end = seg_s, seg_e | |
| break | |
| seg_dur = chosen_end - chosen_start | |
| clip_dur = min(max(min(seg_dur, CLIP_MAX), CLIP_MIN), seg_dur) | |
| # Random offset within the chosen segment | |
| max_offset = max(seg_dur - clip_dur, 0.0) | |
| offset = random.uniform(0.0, max_offset) | |
| clip_start = chosen_start + offset | |
| clip_end = clip_start + clip_dur | |
| new_clip = _extract_clip_bytes(waveform, sample_rate, clip_start, clip_end) | |
| st.session_state.speakerClips[file_index][speaker] = new_clip | |
| print(f"Randomized clip for {speaker} in {file_index}: {clip_start:.2f}–{clip_end:.2f}s") | |
| def addCategory(): | |
| newCategory = st.session_state.categoryInput | |
| st.toast(f"Adding {newCategory}") | |
| st.session_state[f'multiselect_{newCategory}'] = [] | |
| st.session_state.categories.append(newCategory) | |
| st.session_state.categoryInput = '' | |
| for fname in st.session_state.categorySelect: | |
| st.session_state.categorySelect[fname].append([]) | |
| def removeCategory(index): | |
| categoryName = st.session_state.categories[index] | |
| st.toast(f"Removing {categoryName}") | |
| del st.session_state[f'multiselect_{categoryName}'] | |
| del st.session_state[f'remove_{categoryName}'] | |
| del st.session_state.categories[index] | |
| for fname in st.session_state.categorySelect: | |
| del st.session_state.categorySelect[fname][index] | |
| def _global_rename_key(index): | |
| return f"grename_speakers_{index}" | |
| def applyGlobalRenames(): | |
| """Write all globalRenames entries into speakerRenames and refresh widget keys.""" | |
| # Clear all existing renames first, then re-apply so removals take effect | |
| for fname in st.session_state.speakerRenames: | |
| st.session_state.speakerRenames[fname] = {} | |
| for entry in st.session_state.globalRenames: | |
| display_name = entry["name"] | |
| for token in entry["speakers"]: | |
| # token format: "filename: SPEAKER_##" | |
| if ": " not in token: | |
| continue | |
| fname, raw_sp = token.split(": ", 1) | |
| if fname in st.session_state.speakerRenames: | |
| st.session_state.speakerRenames[fname][raw_sp] = display_name | |
| # Refresh rename widget keys for the currently viewed file | |
| curr = st.session_state.get("select_currFile") | |
| if curr and curr in st.session_state.speakerRenames: | |
| saved = st.session_state.speakerRenames[curr] | |
| results = st.session_state.results.get(curr) | |
| if results: | |
| for sp in results[0].labels(): | |
| wk = f"rename_{curr}_{sp}" | |
| st.session_state[wk] = saved.get(sp, "") | |
| def addGlobalRename(): | |
| new_name = st.session_state.globalRenameInput.strip() | |
| if not new_name: | |
| return | |
| st.toast(f"Adding rename '{new_name}'") | |
| st.session_state.globalRenames.append({"name": new_name, "speakers": []}) | |
| st.session_state[_global_rename_key(len(st.session_state.globalRenames) - 1)] = [] | |
| st.session_state.globalRenameInput = "" | |
| def removeGlobalRename(index): | |
| entry = st.session_state.globalRenames[index] | |
| st.toast(f"Removing rename '{entry['name']}'") | |
| del st.session_state.globalRenames[index] | |
| # Rebuild widget keys for remaining entries to stay in sync | |
| for i in range(index, len(st.session_state.globalRenames)): | |
| next_key = _global_rename_key(i) | |
| st.session_state[next_key] = [s for s in st.session_state.globalRenames[i]["speakers"]] | |
| applyGlobalRenames() | |
| def updateCategoryOptions(fileName): | |
| if st.session_state.resetResult: | |
| return | |
| currAnnotation, _ = st.session_state.results[fileName] | |
| speakerNames = list(currAnnotation.labels()) | |
| # Build reverse map from speakerRenames (source of truth): display name -> SPEAKER_## | |
| saved_renames = st.session_state.speakerRenames.get(fileName, {}) | |
| display_to_raw = {} | |
| for sp in speakerNames: | |
| display = saved_renames.get(sp, sp) | |
| display_to_raw[display] = sp | |
| unusedSpeakers = copy.deepcopy(speakerNames) | |
| for i, category in enumerate(st.session_state['categories']): | |
| display_choices = list(st.session_state[f'multiselect_{category}']) | |
| raw_choices = [display_to_raw.get(d, d) for d in display_choices] | |
| st.session_state["categorySelect"][fileName][i] = raw_choices | |
| for sp in raw_choices: | |
| try: | |
| unusedSpeakers.remove(sp) | |
| except: | |
| continue | |
| st.session_state.unusedSpeakers[fileName] = unusedSpeakers | |
| def updateMultiSelect(): | |
| fileName = st.session_state["select_currFile"] | |
| st.session_state.resetResult = True | |
| result = st.session_state.results.get(fileName) | |
| if result: | |
| currAnnotation, _ = result | |
| speakerNames = list(currAnnotation.labels()) | |
| # Always restore rename widgets from the persistent speakerRenames dict | |
| # so that coming back to a file after visiting another shows saved names. | |
| saved_renames = st.session_state.speakerRenames.get(fileName, {}) | |
| raw_to_display = {} | |
| for sp in speakerNames: | |
| wk = f"rename_{fileName}_{sp}" | |
| saved = saved_renames.get(sp, "") | |
| st.session_state[wk] = saved # unconditionally restore | |
| raw_to_display[sp] = saved if saved else sp | |
| for i, category in enumerate(st.session_state['categories']): | |
| raw_choices = st.session_state['categorySelect'][fileName][i] | |
| st.session_state[f'multiselect_{category}'] = [raw_to_display.get(sp, sp) for sp in raw_choices] | |
| def analyze(inFileName): | |
| try: | |
| print(f"Start analyzing {inFileName}") | |
| st.session_state.resetResult = False | |
| if inFileName in st.session_state.results and inFileName in st.session_state.summaries and len(st.session_state.results[inFileName]) > 0: | |
| printV(f'In if',4) | |
| currAnnotation, currTotalTime = st.session_state.results[inFileName] | |
| speakerNames = currAnnotation.labels() | |
| printV(f'Loaded results',4) | |
| unusedSpeakers = st.session_state.unusedSpeakers[inFileName] | |
| categorySelections = st.session_state["categorySelect"][inFileName] | |
| printV(f'Loaded speaker selections',4) | |
| noVoice, oneVoice, multiVoice = su.calcSpeakingTypes(currAnnotation,currTotalTime) | |
| sumNoVoice = su.sumTimes(noVoice) | |
| sumOneVoice = su.sumTimes(oneVoice) | |
| sumMultiVoice = su.sumTimes(multiVoice) | |
| printV(f'Calculated speaking types',4) | |
| df3 = pd.DataFrame( | |
| { | |
| "values": [sumNoVoice, | |
| sumOneVoice, | |
| sumMultiVoice], | |
| "names": ["No Voice","One Voice","Multi Voice"], | |
| } | |
| ) | |
| df3.name = "df3" | |
| st.session_state.summaries[inFileName]["df3"] = df3 | |
| printV(f'Set df3',4) | |
| # --- Build df4 --- | |
| nameList = st.session_state.categories | |
| extraNames = [] | |
| valueList = [0 for i in range(len(nameList))] | |
| extraValues = [] | |
| for sp in speakerNames: | |
| foundSp = False | |
| for i, categoryName in enumerate(nameList): | |
| if sp in categorySelections[i]: | |
| valueList[i] += su.sumTimes(currAnnotation.subset([sp])) | |
| foundSp = True | |
| break | |
| if not foundSp: | |
| extraNames.append(sp) | |
| extraValues.append(su.sumTimes(currAnnotation.subset([sp]))) | |
| if extraNames: | |
| extraPairsSorted = sorted(zip(extraNames, extraValues), key=lambda pair: pair[0]) | |
| extraNames, extraValues = list(zip(*extraPairsSorted)) | |
| extraNames = list(extraNames) | |
| extraValues = list(extraValues) | |
| else: | |
| extraNames, extraValues = [], [] | |
| df4_dict = { | |
| "values": valueList + extraValues, | |
| "names": nameList + extraNames, | |
| } | |
| df4 = pd.DataFrame(data=df4_dict) | |
| df4.name = "df4" | |
| st.session_state.summaries[inFileName]["df4"] = df4 | |
| printV(f'Set df4', 4) | |
| # --- Build df5 --- | |
| speakerList, timeList = su.sumTimesPerSpeaker(oneVoice) | |
| multiSpeakerList, multiTimeList = su.sumMultiTimesPerSpeaker(multiVoice) | |
| speakerList = list(speakerList) if speakerList else [] | |
| timeList = list(timeList) if timeList else [] | |
| multiSpeakerList = list(multiSpeakerList) if multiSpeakerList else [] | |
| multiTimeList = list(multiTimeList) if multiTimeList else [] | |
| summativeMultiSpeaker = sum(multiTimeList) if multiTimeList else 1 | |
| safeOneVoice = sumOneVoice if sumOneVoice > 0 else 1 | |
| basePercentiles = [ | |
| sumNoVoice / currTotalTime, | |
| sumOneVoice / currTotalTime, | |
| sumMultiVoice / currTotalTime, | |
| ] | |
| timeStrings = su.timeToString(timeList) if timeList else [] | |
| multiTimeStrings = su.timeToString(multiTimeList) if multiTimeList else [] | |
| if isinstance(timeStrings, str): | |
| timeStrings = [timeStrings] | |
| if isinstance(multiTimeStrings, str): | |
| multiTimeStrings = [multiTimeStrings] | |
| n_ov = len(speakerList) | |
| n_mv = len(multiSpeakerList) | |
| df5 = pd.DataFrame({ | |
| "ids": ["NV", "OV", "MV"] + [f"OV_{i}" for i in range(n_ov)] + [f"MV_{i}" for i in range(n_mv)], | |
| "labels": ["No Voice", "One Voice", "Multi Voice"] + speakerList + multiSpeakerList, | |
| "parents": ["", "", ""] + ["OV"] * n_ov + ["MV"] * n_mv, | |
| "parentNames": ["Total", "Total", "Total"] + ["One Voice"] * n_ov + ["Multi Voice"] * n_mv, | |
| "values": [sumNoVoice, sumOneVoice, sumMultiVoice] + timeList + multiTimeList, | |
| "valueStrings": [ | |
| su.timeToString(sumNoVoice), | |
| su.timeToString(sumOneVoice), | |
| su.timeToString(sumMultiVoice), | |
| ] + timeStrings + multiTimeStrings, | |
| "percentiles": [ | |
| basePercentiles[0] * 100, | |
| basePercentiles[1] * 100, | |
| basePercentiles[2] * 100, | |
| ] + [(t * 100) / safeOneVoice * basePercentiles[1] for t in timeList] | |
| + [(t * 100) / summativeMultiSpeaker * basePercentiles[2] for t in multiTimeList], | |
| "parentPercentiles": [ | |
| basePercentiles[0] * 100, | |
| basePercentiles[1] * 100, | |
| basePercentiles[2] * 100, | |
| ] + [(t * 100) / safeOneVoice for t in timeList] | |
| + [(t * 100) / summativeMultiSpeaker for t in multiTimeList], | |
| }) | |
| df5.name = "df5" | |
| st.session_state.summaries[inFileName]["df5"] = df5 | |
| printV(f'Set df5', 4) | |
| # --- Build speakers_dataFrame, df2 --- | |
| speakers_dataFrame, speakers_times = su.annotationToDataFrame(currAnnotation) | |
| st.session_state.summaries[inFileName]["speakers_dataFrame"] = speakers_dataFrame | |
| st.session_state.summaries[inFileName]["speakers_times"] = speakers_times | |
| df2_dict = { | |
| "values": [100 * t / currTotalTime for t in df4_dict["values"]], | |
| "names": df4_dict["names"], | |
| } | |
| df2 = pd.DataFrame(df2_dict) | |
| st.session_state.summaries[inFileName]["df2"] = df2 | |
| printV(f'Set df2', 4) | |
| except Exception as e: | |
| import traceback | |
| print(f"Error in analyze: {e}") | |
| traceback.print_exc() | |
| st.error(f"Debug - analyze() failed: {e}") | |
| #---------------------------------------------------------------------------------------------------------------------- | |
| torch.classes.__path__ = [os.path.join(torch.__path__[0], torch.classes.__file__)] | |
| PARQUET_DATASET_DIR = Path("parquet_dataset") | |
| PARQUET_DATASET_DIR.mkdir(parents=True,exist_ok=True) | |
| sample_data = [f"CHEM1402_gt/24F_CHEM1402_Night_Class_Week_{i}_gt.rttm" for i in range(1,11)] | |
| scheduler = ps.ParquetScheduler(repo_id="Sonogram/SampleDataset") | |
| secondDifference = 5 | |
| gainWindow = 4 | |
| minimumGain = -45 | |
| maximumGain = -5 | |
| attenLimDB = 3 | |
| isGPU = False | |
| try: | |
| raise(RuntimeError("Not an error")) | |
| #device = xm.xla_device() | |
| print("TPU is available.") | |
| isGPU = True | |
| except RuntimeError as e: | |
| print(f"TPU is not available: {e}") | |
| # Fallback to CPU or other devices if needed | |
| isGPU = torch.cuda.is_available() | |
| device = torch.device("cuda" if isGPU else "cpu") | |
| print(f"Using {device} instead.") | |
| #device = xm.xla_device() | |
| if (enableDenoise): | |
| # Instantiate and prepare model for training. | |
| dfModel, dfState, _ = init_df(model_base_dir="DeepFilterNet3") | |
| dfModel.to(device)#torch.device("cuda")) | |
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1") | |
| pipeline.to(device)#torch.device("cuda")) | |
| # Store results for viewing and further processing | |
| # All per-file state is keyed by filename (str) so it survives upload order changes. | |
| if 'results' not in st.session_state: | |
| st.session_state.results = {} # {filename: (annotations, totalSeconds)} | |
| if 'speakerRenames' not in st.session_state: | |
| st.session_state.speakerRenames = {} # {filename: {speaker: name}} | |
| if 'summaries' not in st.session_state: | |
| st.session_state.summaries = {} # {filename: {df2, df3, ...}} | |
| if 'categories' not in st.session_state: | |
| st.session_state.categories = [] | |
| st.session_state.categorySelect = {} # {filename: [[], [], ...]} | |
| if 'removeCategory' not in st.session_state: | |
| st.session_state.removeCategory = None | |
| if 'resetResult' not in st.session_state: | |
| st.session_state.resetResult = False | |
| if 'unusedSpeakers' not in st.session_state: | |
| st.session_state.unusedSpeakers = {} # {filename: [speaker, ...]} | |
| if 'file_names' not in st.session_state: | |
| st.session_state.file_names = [] | |
| if 'valid_files' not in st.session_state: | |
| st.session_state.valid_files = [] | |
| if 'file_paths' not in st.session_state: | |
| st.session_state.file_paths = {} # {filename: path} | |
| if 'showSummary' not in st.session_state: | |
| st.session_state.showSummary = 'No' | |
| if 'speakerClips' not in st.session_state: | |
| st.session_state.speakerClips = {} # {filename: {speaker: wav_bytes}} | |
| if 'speakerSegments' not in st.session_state: | |
| st.session_state.speakerSegments = {} # {filename: {speaker: [(start,end), ...]}} | |
| if 'speakerWaveforms' not in st.session_state: | |
| st.session_state.speakerWaveforms = {} # {filename: (waveform_tensor, sample_rate)} | |
| if 'globalRenames' not in st.session_state: | |
| st.session_state.globalRenames = [] # [{"name": str, "speakers": ["file:SPEAKER_##", ...]}] | |
| if 'analyzeAllToggle' not in st.session_state: | |
| st.session_state.analyzeAllToggle = False | |
| #st.set_page_config(layout="wide") | |
| st.title("Instructor Support Tool") | |
| if not isGPU: | |
| st.warning("TOOL CURRENTLY USING CPU, ANALYSIS EXTREMELY SLOW") | |
| st.write('If you would like to see a sample result generated from real classroom audio, use the sidebar on the left and press "Load Demo Example"') | |
| st.write('Keep in mind that this is a very early draft of the tool. Please be patient with any bugs/errors, and email Connor Young at czyoung@ualr.edu if you need help using the tool!') | |
| st.divider() | |
| with st.expander("Instructions and additional details"): | |
| st.write("Thank you for viewing our experimental app! The overall presentations and features are expected to be improved over time, you can think of this as our first rough draft!") | |
| st.write("To use this app:\n1. Upload an audio file for live analysis. Alternatively, you can upload an already generated [rttm file](https://stackoverflow.com/questions/30975084/rttm-file-format)") | |
| st.write("2. Press Analyze All. Note that no data is saved on our side, so we will not have access to your recordings. Future versions of this app will support donating audio to us for aid in our research.") | |
| st.write("3. Use the side bar on the left to select your file (may have to be expanded by clicking the > ). Our app supports uploading multiple files for more comprehensive analysis.") | |
| st.write("4. Use the tabs provided to view different visualizations of your audio. Each example can be downloaded for personal use.") | |
| st.write("4a. The graphs are built using [plotly](https://plotly.com/). This allows for a high degree of interaction. Feel free to experiment with the graphs, as you can always return to the original view by double-clicking on the graph. For more examples of easily supported visualizations, see [here](https://plotly.com/python/basic-charts/)") | |
| st.write("Would you like additional data, charts, or features? We would love to hear more from you [about our project!](https://forms.gle/A32CdfGYSZoMPyyX9)") | |
| st.write("If you would like to learn more or work with us, please contact Dr. Mark Baillie at mtbaillie@ualr.edu") | |
| uploaded_file_paths = st.file_uploader("Upload an audio of classroom activity to analyze", accept_multiple_files=True) | |
| supported_file_types = ('.wav','.mp3','.mp4','.txt','.rttm','.csv') | |
| viewChoices = ["Voice Categories","Custom Categories","Detailed Voice Categories","Voice Category Treemap","Speaker Timeline","Time per Speaker"] | |
| valid_files = st.session_state.valid_files | |
| file_paths = st.session_state.file_paths | |
| currDF = None | |
| temp_dir = tempfile.mkdtemp() | |
| if uploaded_file_paths is not None and len(uploaded_file_paths) > 0: | |
| print("Found file paths") | |
| for uploaded_file in uploaded_file_paths: | |
| if not uploaded_file.name.lower().endswith(supported_file_types): | |
| st.error('File must be of type: {}'.format(supported_file_types)) | |
| else: | |
| fname = uploaded_file.name | |
| print(f"Valid file: {fname}") | |
| # Write to disk (always refresh so file bytes are current) | |
| path = os.path.join(temp_dir, fname) | |
| with open(path, "wb") as f: | |
| f.write(uploaded_file.getvalue()) | |
| # Add to master lists only if not already tracked | |
| if fname not in st.session_state.file_names: | |
| st.session_state.file_names.append(fname) | |
| st.session_state.results.setdefault(fname, []) | |
| st.session_state.summaries.setdefault(fname, []) | |
| st.session_state.unusedSpeakers.setdefault(fname, []) | |
| st.session_state.categorySelect.setdefault(fname, [[] for _ in st.session_state.categories]) | |
| st.session_state.speakerRenames.setdefault(fname, {}) | |
| st.session_state.speakerClips.setdefault(fname, {}) | |
| st.session_state.file_paths[fname] = path | |
| # Rebuild valid_files / file_paths lists from tracked state | |
| valid_files = [f for f in st.session_state.file_names] | |
| file_paths = [st.session_state.file_paths[f] for f in valid_files] | |
| file_names = valid_files | |
| st.session_state.valid_files = valid_files | |
| st.session_state.file_paths = {f: st.session_state.file_paths[f] for f in valid_files} | |
| file_names = st.session_state.file_names | |
| file_paths_dict = st.session_state.file_paths # dict {fname: path} | |
| class FakeUpload: | |
| def __init__(self,filepath): | |
| self.path = filepath | |
| self.name = filepath.split('/')[-1 | |
| ] | |
| demoPath = "sample.rttm" | |
| isDemo = False | |
| if st.sidebar.button("Single File Demo"): | |
| demoName = demoPath.split('/')[-1] | |
| start_time = time.time() | |
| if demoName not in st.session_state.file_names: | |
| st.session_state.file_names.append(demoName) | |
| st.session_state.file_paths[demoName] = demoPath | |
| st.session_state.results.setdefault(demoName, []) | |
| st.session_state.summaries.setdefault(demoName, {}) | |
| st.session_state.unusedSpeakers.setdefault(demoName, []) | |
| st.session_state.categorySelect.setdefault(demoName, [[] for _ in st.session_state.categories]) | |
| st.session_state.speakerRenames.setdefault(demoName, {}) | |
| st.session_state.speakerClips.setdefault(demoName, {}) | |
| file_names = st.session_state.file_names | |
| with st.spinner(text=f'Loading Demo Sample'): | |
| speakerList, annotations = su.loadAudioRTTM(demoPath) | |
| totalSeconds = 0 | |
| for segment in annotations.itersegments(): | |
| if segment.end > totalSeconds: | |
| totalSeconds = segment.end | |
| st.session_state.results[demoName] = (annotations, totalSeconds) | |
| st.session_state.summaries[demoName] = {} | |
| st.session_state.unusedSpeakers[demoName] = list(annotations.labels()) | |
| with st.spinner(text=f'Analyzing Demo Data'): | |
| analyze(demoName) | |
| st.success(f"Took {time.time() - start_time} seconds to analyze the demo file!") | |
| st.session_state.select_currFile = demoName | |
| isDemo = True | |
| multiFileDemoPaths = ["audioSamples/media-afc-cal-afc1986022_sr01a05.rttm","audioSamples/media-afc-cal-afc1986022_sr34a01.rttm","audioSamples/media-afc-cal-afc1986022_sr14b02.rttm", | |
| "audioSamples/media-afc-cal-afc1986022_sr52a02.rttm","audioSamples/media-afc-cal-afc1986022_sr14b01.rttm"] | |
| # TODO: prepare audio for playback of audio | |
| multiFileAudioPaths = ["audioSamples/media-afc-cal-afc1986022_sr01a05.mp3","audioSamples/media-afc-cal-afc1986022_sr34a01.mp3","audioSamples/media-afc-cal-afc1986022_sr14b02.mp3", | |
| "audioSamples/media-afc-cal-afc1986022_sr52a02.mp3","audioSamples/media-afc-cal-afc1986022_sr14b01.mp3"] | |
| if st.sidebar.button("Multiple Files Demo"): | |
| for demoPath in multiFileDemoPaths: | |
| demoName = demoPath.split('/')[-1] | |
| start_time = time.time() | |
| if demoName not in st.session_state.file_names: | |
| st.session_state.file_names.append(demoName) | |
| st.session_state.file_paths[demoName] = demoPath | |
| st.session_state.results.setdefault(demoName, []) | |
| st.session_state.summaries.setdefault(demoName, {}) | |
| st.session_state.unusedSpeakers.setdefault(demoName, []) | |
| st.session_state.categorySelect.setdefault(demoName, [[] for _ in st.session_state.categories]) | |
| st.session_state.speakerRenames.setdefault(demoName, {}) | |
| st.session_state.speakerClips.setdefault(demoName, {}) | |
| file_names = st.session_state.file_names | |
| with st.spinner(text=f'Loading Demo Sample'): | |
| speakerList, annotations = su.loadAudioRTTM(demoPath) | |
| totalSeconds = 0 | |
| for segment in annotations.itersegments(): | |
| if segment.end > totalSeconds: | |
| totalSeconds = segment.end | |
| st.session_state.results[demoName] = (annotations, totalSeconds) | |
| st.session_state.summaries[demoName] = {} | |
| st.session_state.unusedSpeakers[demoName] = list(annotations.labels()) | |
| # TODO: Remove if not necessary | |
| #st.session_state.select_currFile = demoName | |
| isDemo = True | |
| st.session_state.analyzeAllToggle = True | |
| if len(file_names) == 0: | |
| st.text("Upload file(s) to enable analysis") | |
| else: | |
| col_analyze, col_spacer, col_reset = st.columns([3, 5, 2]) | |
| with col_analyze: | |
| if st.button("Analyze All New Audio", key="button_all"): | |
| if len(file_names) == 0: | |
| st.error('Upload file(s) first!') | |
| else: | |
| st.session_state.analyzeAllToggle = True | |
| with col_reset: | |
| if st.button("🗑️ Reset App", key="button_reset", type="secondary", use_container_width=True): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| if st.session_state.analyzeAllToggle == True: | |
| print("Start analyzing") | |
| start_time = time.time() | |
| totalFiles = len(file_names) | |
| for i, fname in enumerate(file_names): | |
| printV(f'On {i} : {fname}',4) | |
| fpath = file_paths_dict.get(fname, "") | |
| printV(f'Path : {fpath}',4) | |
| # TODO: Fix shortcut for already analyzed files here | |
| #if fname in st.session_state.results and fname in st.session_state.summaries and len(st.session_state.results[fname]) > 0: | |
| #continue | |
| if fpath.lower().endswith('.txt'): | |
| with st.spinner(text=f'Loading Demo File {i+1} of {totalFiles}'): | |
| speakerList, annotations = su.loadAudioTXT(fpath) | |
| printV(annotations,4) | |
| totalSeconds = 0 | |
| for segment in annotations.itersegments(): | |
| if segment.end > totalSeconds: | |
| totalSeconds = segment.end | |
| st.session_state.results[fname] = (annotations, totalSeconds) | |
| st.session_state.summaries[fname] = {} | |
| st.session_state.unusedSpeakers[fname] = list(annotations.labels()) | |
| elif fpath.lower().endswith('.rttm'): | |
| with st.spinner(text=f'Loading File {i+1} of {totalFiles}'): | |
| speakerList, annotations = su.loadAudioRTTM(fpath) | |
| printV(annotations,4) | |
| totalSeconds = 0 | |
| for segment in annotations.itersegments(): | |
| if segment.end > totalSeconds: | |
| totalSeconds = segment.end | |
| st.session_state.results[fname] = (annotations, totalSeconds) | |
| st.session_state.summaries[fname] = {} | |
| st.session_state.unusedSpeakers[fname] = list(annotations.labels()) | |
| elif fpath.lower().endswith('.csv'): | |
| with st.spinner(text=f'Loading File {i+1} of {totalFiles}'): | |
| speakerList, annotations = su.loadAudioCSV(fpath) | |
| printV(annotations,4) | |
| totalSeconds = 0 | |
| for segment in annotations.itersegments(): | |
| if segment.end > totalSeconds: | |
| totalSeconds = segment.end | |
| st.session_state.results[fname] = (annotations, totalSeconds) | |
| st.session_state.summaries[fname] = {} | |
| st.session_state.unusedSpeakers[fname] = list(annotations.labels()) | |
| else: | |
| with st.spinner(text=f'Processing File {i+1} of {totalFiles}'): | |
| annotations, totalSeconds, waveform, sample_rate = processFile(fpath) | |
| print(f"Finished processing {fpath}") | |
| st.session_state.results[fname] = (annotations, totalSeconds) | |
| st.session_state.summaries[fname] = {} | |
| st.session_state.unusedSpeakers[fname] = list(annotations.labels()) | |
| with st.spinner(text=f'Generating speaker clips for File {i+1} of {totalFiles}'): | |
| generate_speaker_clips(annotations, waveform, sample_rate, fname) | |
| # Keep a reference so the "Try Another Clip" button can re-sample later | |
| st.session_state.speakerWaveforms[fname] = (waveform, sample_rate) | |
| del waveform | |
| print(f"Speaker clips generated for {fpath}") | |
| with st.spinner(text=f'Analyzing File {i+1} of {totalFiles}'): | |
| analyze(fname) | |
| print(f"Finished analyzing {fpath}") | |
| print(f"Took {time.time() - start_time} seconds to analyze {totalFiles} files!") | |
| st.success(f"Took {time.time() - start_time} seconds to analyze {totalFiles} files!") | |
| st.session_state.analyzeAllToggle = False | |
| currFile = st.sidebar.selectbox('Current File', file_names, on_change=updateMultiSelect, key="select_currFile") | |
| if isDemo: | |
| currFile = file_names[0] | |
| isDemo = False | |
| if currFile is None: | |
| st.write("Select a file to view from the sidebar") | |
| try: | |
| if currFile is None: | |
| raise ValueError("No file selected") | |
| st.session_state.resetResult = False | |
| currPlainName = currFile.split('.')[0] | |
| if currFile in st.session_state.results and currFile in st.session_state.summaries and len(st.session_state.results[currFile]) > 0: | |
| st.header(f"Analysis of file {currFile}") | |
| graphNames = ["Data","Voice Categories","Speaker Percentage","Speakers with Categories","Treemap","Timeline","Time Spoken"] | |
| dataTab, pie1, pie2, sunburst1, treemap1, timeline, bar1 = st.tabs(graphNames) | |
| currAnnotation, currTotalTime = st.session_state.results[currFile] | |
| speakerNames = currAnnotation.labels() | |
| speakers_dataFrame = st.session_state.summaries[currFile]["speakers_dataFrame"] | |
| currDF, _ = su.annotationToSimpleDataFrame(currAnnotation) | |
| speakers_times = st.session_state.summaries[currFile]["speakers_times"] | |
| unusedSpeakers = st.session_state.unusedSpeakers[currFile] | |
| categorySelections = st.session_state["categorySelect"][currFile] | |
| # Build raw->display map from speakerRenames (source of truth, written by applyGlobalRenames) | |
| _saved_renames = st.session_state.speakerRenames.get(currFile, {}) | |
| raw_to_display = {sp: (_saved_renames.get(sp, sp)) for sp in speakerNames} | |
| all_speakers_display = [raw_to_display[sp] for sp in speakerNames] | |
| for i,category in enumerate(st.session_state.categories): | |
| ms_key = f"multiselect_{category}" | |
| speakerSet = categorySelections[i] # SPEAKER_## internally | |
| default_display = [raw_to_display.get(sp, sp) for sp in speakerSet] | |
| # Seed widget state once with display names; omit default= to let Streamlit own state | |
| if ms_key not in st.session_state: | |
| st.session_state[ms_key] = default_display | |
| st.sidebar.multiselect(category, | |
| all_speakers_display, | |
| key=ms_key, | |
| on_change=updateCategoryOptions, | |
| args=(currFile,)) | |
| st.sidebar.button(f"Remove {category}",key=f"remove_{category}",on_click=removeCategory,args=(i,)) | |
| newCategory = st.sidebar.text_input('Add category', key='categoryInput',on_change=addCategory) | |
| st.sidebar.divider() | |
| st.sidebar.subheader("Rename Speakers") | |
| st.sidebar.caption( | |
| "Assign a name and select which speaker labels (across all files) it applies to. " | |
| "Changes apply to all matched speakers instantly." | |
| ) | |
| # --- Speaker clip preview (identification aid) --- | |
| file_clips = st.session_state.speakerClips.get(currFile, {}) | |
| if file_clips: | |
| st.sidebar.caption("🎧 Listen to clips to help identify speakers:") | |
| current_renames = st.session_state.speakerRenames[currFile] | |
| for sp in speakerNames: | |
| widget_key = f"rename_{currFile}_{sp}" | |
| if widget_key not in st.session_state: | |
| st.session_state[widget_key] = current_renames.get(sp, "") | |
| live_name = st.session_state[widget_key].strip() | |
| display_label = live_name if live_name else sp | |
| st.sidebar.markdown(f"**{display_label}**") | |
| if sp in file_clips: | |
| st.sidebar.audio(file_clips[sp], format="audio/wav") | |
| sp_segs = st.session_state.speakerSegments.get(currFile, {}).get(sp, []) | |
| has_waveform = currFile in st.session_state.speakerWaveforms | |
| if has_waveform and len(sp_segs) >= 1: | |
| if st.sidebar.button( | |
| "🔀 Try Another Clip", | |
| key=f"randomize_{currFile}_{sp}", | |
| help="Pick a random clip from a different part of this speaker's audio", | |
| ): | |
| randomize_speaker_clip(currFile, sp) | |
| st.rerun() | |
| # Build the full list of "filename: SPEAKER_##" tokens across all analyzed files | |
| all_speaker_tokens = [] | |
| for fn in st.session_state.file_names: | |
| if fn in st.session_state.results and len(st.session_state.results[fn]) == 2: | |
| ann, _ = st.session_state.results[fn] | |
| for sp in ann.labels(): | |
| all_speaker_tokens.append(f"{fn}: {sp}") | |
| st.sidebar.divider() | |
| # --- Render existing global rename entries --- | |
| def _on_grename_change(idx): | |
| key = _global_rename_key(idx) | |
| st.session_state.globalRenames[idx]["speakers"] = list(st.session_state[key]) | |
| applyGlobalRenames() | |
| for idx, entry in enumerate(st.session_state.globalRenames): | |
| grkey = _global_rename_key(idx) | |
| if grkey not in st.session_state: | |
| st.session_state[grkey] = list(entry["speakers"]) | |
| st.sidebar.markdown(f"**{entry['name']}**") | |
| st.sidebar.multiselect( | |
| f"Speakers for {entry['name']}", | |
| options=all_speaker_tokens, | |
| key=grkey, | |
| on_change=_on_grename_change, | |
| args=(idx,), | |
| label_visibility="collapsed", | |
| ) | |
| st.sidebar.button( | |
| f"Remove '{entry['name']}'", | |
| key=f"remove_grename_{idx}", | |
| on_click=removeGlobalRename, | |
| args=(idx,), | |
| ) | |
| # --- Add new global rename --- | |
| st.sidebar.text_input( | |
| "Add rename", | |
| placeholder="e.g. John", | |
| key="globalRenameInput", | |
| on_change=addGlobalRename, | |
| ) | |
| catTypeColors = su.colorsCSS(3) | |
| allColors = su.colorsCSS(len(speakerNames)+len(st.session_state.categories)) | |
| speakerColors = allColors[:len(speakerNames)] | |
| catColors = allColors[len(speakerNames):] | |
| df4_dict = {} | |
| nameList = st.session_state.categories | |
| extraNames = [] | |
| valueList = [0 for i in range(len(nameList))] | |
| extraValues = [] | |
| for i,speakerSet in enumerate(categorySelections): | |
| valueList[i] += su.sumTimes(currAnnotation.subset(speakerSet)) | |
| for sp in unusedSpeakers: | |
| extraNames.append(sp) | |
| extraValues.append(su.sumTimes(currAnnotation.subset([sp]))) | |
| df4_dict = { | |
| "names": nameList+extraNames, | |
| "values": valueList+extraValues, | |
| } | |
| df4 = pd.DataFrame(data=df4_dict) | |
| df4.name = "df4" | |
| st.session_state.summaries[currFile]["df4"] = df4 | |
| with dataTab: | |
| displayDF = apply_speaker_renames_to_df(currDF, currFile, column="Resource") | |
| csv = convert_df(displayDF) | |
| st.download_button( | |
| "Press to Download analysis data", | |
| csv, | |
| 'sonogram-analysis-'+currPlainName+'.csv', | |
| "text/csv", | |
| key='download-csv', | |
| on_click="ignore", | |
| ) | |
| st.dataframe(displayDF) | |
| with pie1: | |
| printV("In Pie1",4) | |
| df3 = st.session_state.summaries[currFile]["df3"] | |
| fig1 = go.Figure() | |
| fig1.update_layout( | |
| title_text="Percentage of each Voice Category", | |
| colorway=catTypeColors, | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| ) | |
| printV("Pie1 Pretrace",4) | |
| fig1.add_trace(go.Pie(values=df3["values"],labels=df3["names"],sort=False)) | |
| printV("Pie1 Posttrace",4) | |
| st.plotly_chart(fig1, use_container_width=True, config=config) | |
| col1_1, col1_2 = st.columns(2) | |
| try: | |
| fig1.write_image("ascn_pie1.pdf") | |
| fig1.write_image("ascn_pie1.svg") | |
| except Exception: | |
| pass | |
| printV("Pie1 files written",4) | |
| with col1_1: | |
| if os.path.exists('ascn_pie1.pdf'): | |
| printV("Pie1 in col1_1",4) | |
| with open('ascn_pie1.pdf','rb') as f: | |
| printV("Pie1 in file open",4) | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-voice-category-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf1', | |
| on_click="ignore", | |
| ) | |
| printV("Pie1 after col1_1",4) | |
| with col1_2: | |
| if os.path.exists('ascn_pie1.svg'): | |
| with open('ascn_pie1.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-voice-category-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg1', | |
| on_click="ignore", | |
| ) | |
| printV("Pie1 in col1_2",4) | |
| printV("Pie1 post plotly",4) | |
| with pie2: | |
| printV("In Pie2",4) | |
| df4 = st.session_state.summaries[currFile]["df4"].copy() | |
| # Some speakers may be missing, so fix colors | |
| figColors = [] | |
| for n in df4["names"]: | |
| if n in speakerNames: | |
| figColors.append(speakerColors[speakerNames.index(n)]) | |
| df4["names"] = df4["names"].apply(lambda s: get_display_name(s, currFile)) | |
| fig2 = go.Figure() | |
| fig2.update_layout( | |
| title_text="Percentage of Speakers and Custom Categories", | |
| colorway=catColors+figColors, | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| ) | |
| printV("Pie2 Pretrace",4) | |
| fig2.add_trace(go.Pie(values=df4["values"],labels=df4["names"],sort=False)) | |
| printV("Pie2 Posttrace",4) | |
| st.plotly_chart(fig2, use_container_width=True, config=config) | |
| col2_1, col2_2 = st.columns(2) | |
| try: | |
| fig2.write_image("ascn_pie2.pdf") | |
| fig2.write_image("ascn_pie2.svg") | |
| except Exception: | |
| pass | |
| with col2_1: | |
| if os.path.exists('ascn_pie2.pdf'): | |
| with open('ascn_pie2.pdf','rb') as f: | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-speaker-percent-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf2', | |
| on_click="ignore", | |
| ) | |
| with col2_2: | |
| if os.path.exists('ascn_pie2.svg'): | |
| with open('ascn_pie2.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-speaker-percent-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg2', | |
| on_click="ignore", | |
| ) | |
| with sunburst1: | |
| df5 = st.session_state.summaries[currFile]["df5"].copy() | |
| df5["labels"] = df5["labels"].apply(lambda s: get_display_name(s, currFile)) | |
| df5["parentNames"] = df5["parentNames"].apply(lambda s: get_display_name(s, currFile)) | |
| fig3_1 = px.sunburst(df5, | |
| branchvalues = 'total', | |
| names = "labels", | |
| ids = "ids", | |
| parents = "parents", | |
| values = "percentiles", | |
| custom_data=['labels','valueStrings','percentiles','parentNames','parentPercentiles'], | |
| color = 'labels', | |
| title="Percentage of each Voice Category with Speakers", | |
| color_discrete_sequence=catTypeColors+speakerColors, | |
| ) | |
| fig3_1.update_traces( | |
| hovertemplate="<br>".join([ | |
| '<b>%{customdata[0]}</b>', | |
| 'Duration: %{customdata[1]}s', | |
| 'Percentage of Total: %{customdata[2]:.2f}%', | |
| 'Parent: %{customdata[3]}', | |
| 'Percentage of Parent: %{customdata[4]:.2f}%' | |
| ]) | |
| ) | |
| fig3_1.update_layout( | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| ) | |
| st.plotly_chart(fig3_1, use_container_width=True, config=config) | |
| col3_1, col3_2 = st.columns(2) | |
| try: | |
| fig3_1.write_image("ascn_sunburst.pdf") | |
| fig3_1.write_image("ascn_sunburst.svg") | |
| except Exception: | |
| pass | |
| with col3_1: | |
| if os.path.exists('ascn_sunburst.pdf'): | |
| with open('ascn_sunburst.pdf','rb') as f: | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-speaker-categories-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf3', | |
| on_click="ignore", | |
| ) | |
| with col3_2: | |
| if os.path.exists('ascn_sunburst.svg'): | |
| with open('ascn_sunburst.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-speaker-categories-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg3', | |
| on_click="ignore", | |
| ) | |
| with treemap1: | |
| df5 = st.session_state.summaries[currFile]["df5"].copy() | |
| df5["labels"] = df5["labels"].apply(lambda s: get_display_name(s, currFile)) | |
| df5["parentNames"] = df5["parentNames"].apply(lambda s: get_display_name(s, currFile)) | |
| fig3 = px.treemap(df5, | |
| branchvalues = "total", | |
| names = "labels", | |
| parents = "parents", | |
| ids="ids", | |
| values = "percentiles", | |
| custom_data=['labels','valueStrings','percentiles','parentNames','parentPercentiles'], | |
| color='labels', | |
| title="Division of Speakers in each Voice Category", | |
| color_discrete_sequence=catTypeColors+speakerColors, | |
| ) | |
| fig3.update_traces( | |
| hovertemplate="<br>".join([ | |
| '<b>%{customdata[0]}</b>', | |
| 'Duration: %{customdata[1]}s', | |
| 'Percentage of Total: %{customdata[2]:.2f}%', | |
| 'Parent: %{customdata[3]}', | |
| 'Percentage of Parent: %{customdata[4]:.2f}%' | |
| ]) | |
| ) | |
| fig3.update_layout( | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| ) | |
| st.plotly_chart(fig3, use_container_width=True, config=config) | |
| col4_1, col4_2 = st.columns(2) | |
| try: | |
| fig3.write_image("ascn_treemap.pdf") | |
| fig3.write_image("ascn_treemap.svg") | |
| except Exception: | |
| pass | |
| with col4_1: | |
| if os.path.exists('ascn_treemap.pdf'): | |
| with open('ascn_treemap.pdf','rb') as f: | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-treemap-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf4', | |
| on_click="ignore", | |
| ) | |
| with col4_2: | |
| if os.path.exists('ascn_treemap.svg'): | |
| with open('ascn_treemap.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-treemap-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg4', | |
| on_click="ignore", | |
| ) | |
| # generate plotting window | |
| with timeline: | |
| timeline_df = speakers_dataFrame.copy() | |
| timeline_df["Resource"] = timeline_df["Resource"].apply(lambda s: get_display_name(s, currFile)) | |
| base = dt.datetime.combine(dt.date.today(), dt.time.min) | |
| def to_audio_datetime(s): | |
| # If already a datetime/Timestamp, extract seconds since midnight of that date | |
| if isinstance(s, (dt.datetime, pd.Timestamp)): | |
| midnight = s.replace(hour=0, minute=0, second=0, microsecond=0) | |
| seconds = (s - midnight).total_seconds() | |
| else: | |
| seconds = float(s) | |
| return base + dt.timedelta(seconds=seconds) | |
| timeline_df["Start"] = timeline_df["Start"].apply(to_audio_datetime) | |
| timeline_df["Finish"] = timeline_df["Finish"].apply(to_audio_datetime) | |
| fig_la = px.timeline(timeline_df, x_start="Start", x_end="Finish", y="Resource", color="Resource",title="Timeline of Audio with Speakers", | |
| color_discrete_sequence=speakerColors) | |
| fig_la.update_yaxes(autorange="reversed") | |
| hMax = int(currTotalTime//3600) | |
| mMax = int(currTotalTime%3600//60) | |
| sMax = int(currTotalTime%60) | |
| msMax = int(currTotalTime*1000000%1000000) | |
| timeMax = dt.time(hMax,mMax,sMax,msMax) | |
| fig_la.update_layout( | |
| xaxis_tickformatstops = [ | |
| dict(dtickrange=[None, 1000], value="%H:%M:%S.%L"), | |
| dict(dtickrange=[1000, None], value="%H:%M:%S") | |
| ], | |
| xaxis=dict( | |
| range=[dt.datetime.combine(dt.date.today(), dt.time.min),dt.datetime.combine(dt.date.today(), timeMax)] | |
| ), | |
| xaxis_title="Time", | |
| yaxis_title="Speaker", | |
| legend_title=None, | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| legend={'traceorder':'reversed'}, | |
| yaxis= {'showticklabels': False}, | |
| ) | |
| st.plotly_chart(fig_la, use_container_width=True, config=config) | |
| col5_1, col5_2 = st.columns(2) | |
| try: | |
| fig_la.write_image("ascn_timeline.pdf") | |
| fig_la.write_image("ascn_timeline.svg") | |
| except Exception: | |
| pass | |
| with col5_1: | |
| if os.path.exists('ascn_timeline.pdf'): | |
| with open('ascn_timeline.pdf','rb') as f: | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-timeline-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf5', | |
| on_click="ignore", | |
| ) | |
| with col5_2: | |
| if os.path.exists('ascn_timeline.svg'): | |
| with open('ascn_timeline.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-timeline-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg5', | |
| on_click="ignore", | |
| ) | |
| with bar1: | |
| df2 = st.session_state.summaries[currFile]["df2"].copy() | |
| df2["names"] = df2["names"].apply(lambda s: get_display_name(s, currFile)) | |
| fig2_la = px.bar(df2, x="values", y="names", color="names", orientation='h', | |
| custom_data=["names","values"],title="Time Spoken by each Speaker", | |
| color_discrete_sequence=catColors+speakerColors) | |
| fig2_la.update_xaxes(ticksuffix="%") | |
| fig2_la.update_yaxes(autorange="reversed") | |
| fig2_la.update_layout( | |
| xaxis_title="Percentage Time Spoken", | |
| yaxis_title=None, | |
| plot_bgcolor='rgba(0, 0, 0, 0)', | |
| paper_bgcolor='rgba(0, 0, 0, 0)', | |
| showlegend=False, | |
| yaxis={'showticklabels': True}, | |
| ) | |
| fig2_la.update_traces( | |
| hovertemplate="<br>".join([ | |
| '<b>%{customdata[0]}</b>', | |
| 'Percentage of Time: %{customdata[1]:.2f}%' | |
| ]) | |
| ) | |
| st.plotly_chart(fig2_la, use_container_width=True, config=config) | |
| col6_1, col6_2 = st.columns(2) | |
| try: | |
| fig2_la.write_image("ascn_bar.pdf") | |
| fig2_la.write_image("ascn_bar.svg") | |
| except Exception: | |
| pass | |
| with col6_1: | |
| if os.path.exists('ascn_bar.pdf'): | |
| with open('ascn_bar.pdf','rb') as f: | |
| st.download_button( | |
| "Save As PDF", | |
| f, | |
| 'sonogram-speaker-time-'+currPlainName+'.pdf', | |
| 'application/pdf', | |
| key='download-pdf6', | |
| on_click="ignore", | |
| ) | |
| with col6_2: | |
| if os.path.exists('ascn_bar.svg'): | |
| with open('ascn_bar.svg','rb') as f: | |
| st.download_button( | |
| "Save As SVG", | |
| f, | |
| 'sonogram-speaker-time-'+currPlainName+'.svg', | |
| 'image/svg+xml', | |
| key='download-svg6', | |
| on_click="ignore", | |
| ) | |
| except ValueError: | |
| pass | |
| if len(st.session_state.results) > 0: | |
| with st.expander("Multi-file Summary Data"): | |
| st.header("Multi-file Summary Data") | |
| with st.spinner(text='Processing summary results...'): | |
| fileNames = st.session_state.file_names | |
| validNames = [fn for fn in fileNames if fn in st.session_state.results and len(st.session_state.results[fn]) == 2] | |
| if len(validNames) > 1: | |
| df6_dict = {"files": validNames} | |
| allCategories = copy.deepcopy(st.session_state.categories) | |
| for fn in validNames: | |
| currAnnotation, currTotalTime = st.session_state.results[fn] | |
| categorySelections = st.session_state["categorySelect"][fn] | |
| catSummary, extraCats = su.calcCategories(currAnnotation, categorySelections) | |
| st.session_state.summaries[fn]["categories"] = (catSummary, extraCats) | |
| for extra in extraCats: | |
| df6_dict[extra] = [] | |
| if extra not in allCategories: | |
| allCategories.append(extra) | |
| for category in st.session_state.categories: | |
| df6_dict[category] = [] | |
| for fn in validNames: | |
| summary, extras = st.session_state.summaries[fn]["categories"] | |
| theseCategories = st.session_state.categories + extras | |
| for j, timeSlots in enumerate(summary): | |
| df6_dict[theseCategories[j]].append(sum([t.duration for _,t in timeSlots])/st.session_state.results[fn][1]) | |
| for category in allCategories: | |
| if category not in theseCategories: | |
| df6_dict[category].append(0) | |
| df6 = pd.DataFrame(df6_dict) | |
| summFig = px.bar(df6, x="files", y=allCategories,title="Time Spoken by Each Speaker in Each File") | |
| st.plotly_chart(summFig, use_container_width=True,config=config) | |
| voiceNames = ["No Voice","One Voice","Multi Voice"] | |
| df7_dict = { | |
| "files": validNames, | |
| } | |
| for category in voiceNames: | |
| df7_dict[category] = [] | |
| for fn in validNames: | |
| partialDf = st.session_state.summaries[fn]["df5"] | |
| for i in range(len(voiceNames)): | |
| df7_dict[voiceNames[i]].append(partialDf["percentiles"][i]) | |
| df7 = pd.DataFrame(df7_dict) | |
| sorted_df7 = df7.sort_values(by=['One Voice', 'Multi Voice']) | |
| summFig2 = px.bar(sorted_df7, x="files", y=["One Voice","Multi Voice","No Voice",],title="Cross-file Voice Categories sorted for One Voice") | |
| st.plotly_chart(summFig2, use_container_width=True,config=config) | |
| sorted_df7_3 = df7.sort_values(by=['Multi Voice','One Voice']) | |
| summFig3 = px.bar(sorted_df7_3, x="files", y=["One Voice","Multi Voice","No Voice",],title="Cross-file Voice Categories sorted for Multi Voice") | |
| st.plotly_chart(summFig3, use_container_width=True,config=config) | |
| sorted_df7_4 = df7.sort_values(by=['No Voice', 'Multi Voice'],ascending=False) | |
| summFig4 = px.bar(sorted_df7_4, x="files", y=["One Voice","Multi Voice","No Voice",],title="Cross-file Voice Categories sorted for Any Voice") | |
| st.plotly_chart(summFig4, use_container_width=True,config=config) | |
| old = '''userid = st.text_input("user id:", "Guest") | |
| colorPref = st.text_input("Favorite color?", "None") | |
| radio = st.radio('Pick one:', ['Left','Right']) | |
| selection = st.selectbox('Select', [1,2,3]) | |
| if st.button("Upload Files to Dataset"): | |
| save_data({"color":colorPref,"direction":radio,"number":selection}, | |
| file_paths, | |
| userid) | |
| st.success('I think it worked!') | |
| ''' | |
| def convert_df(df): | |
| return df.to_csv(index=False).encode('utf-8') | |
| with st.expander("(Potentially) FAQ"): | |
| st.write(f"**1. I tried analyzing a file, but the page refreshed and nothing happened! Why?**\n\t") | |
| st.write("You may need to select a file using the side bar on the left. This app supports multiple files, so we require that you select which file to view after analysis.") | |
| st.write(f"**2. I don't see a sidebar! Where is it?**\n\t") | |
| st.write("The side bar may start by being minimized. Press the '>' in the upper left to expand the side bar.") | |
| st.write(f"**3. I still don't have a file to select in the dropdown! Why?**\n\t") | |
| st.write("If you are sure that you have run Analyze All and after refresh no files may be selected, then your file is likely too large. We currently have a limitation of approximately 1.5 hours of audio. This is a known issue that requires additional time **or** money to solve, and is expected to be fixed by the next update of this app. Please be patient!") | |
| st.write(f"**4. I want to be able to view my previously analyzed data! How can I do this?**\n\t") | |
| st.write("You can download a CSV copy of the data using the first tab. From there, you can reupload the CSV copy at a later date to view the data visualizations without having to use your original audio file. Future versions of this app will support creating optional logins for long term storage and analysis.") | |
| st.write(f"**5. The app says 'TOOL CURRENTLY USING CPU, ANALYSIS EXTREMELY SLOW' and takes forever to analyze audio! What is wrong?**\n\t") | |
| st.write("We are currently in the process of securing funding to allow permanent public access to this tool. Until then, we can provide an interface to view already analyzed data without cost to you or us. While this mode will technically still work, it may take over a day to analyze your audio. Feel free to reach out to us to discuss temporary solutions to this until the app's funding is secured!") |