Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import ast | |
| import pandas as pd | |
| import gspread | |
| from google.oauth2.service_account import Credentials | |
| import glob | |
| import soundfile as sf | |
| from maad import sound | |
| from maad.util import power2dB | |
| from skimage import transform | |
| import logging | |
| import zipfile | |
| import tempfile | |
| from datetime import datetime | |
| import matplotlib.pyplot as plt | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| gcp_service_account = ast.literal_eval(os.getenv("gcp_service_account")) | |
| def authorize_google_sheets(): | |
| scope = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] | |
| creds = Credentials.from_service_account_info(gcp_service_account, scopes=scope) | |
| client = gspread.authorize(creds) | |
| return client | |
| def get_google_sheet_data(rec_name): | |
| client = authorize_google_sheets() | |
| sheet = client.open("Annotações_Parque_das_Neblinas").worksheet(rec_name) | |
| data = sheet.get_all_records() | |
| df = pd.DataFrame(data) | |
| return df | |
| def get_annotation_status(): | |
| client = authorize_google_sheets() | |
| sheet = client.open("XP_annotation_status").worksheet("status") | |
| data = sheet.get_all_records() | |
| df = pd.DataFrame(data) | |
| # Ensure required columns are present | |
| if 'cluster_folder' not in df.columns: | |
| df['cluster_folder'] = '' | |
| if 'user' not in df.columns: | |
| df['user'] = '' | |
| if 'status' not in df.columns: | |
| df['status'] = '' | |
| if 'timestamp' not in df.columns: | |
| df['timestamp'] = '' | |
| return df | |
| def update_annotation_status(cluster_folder, user, status): | |
| client = authorize_google_sheets() | |
| sheet = client.open("XP_annotation_status").worksheet("status") | |
| df = get_annotation_status() | |
| idx = df[df['cluster_folder'] == cluster_folder].index | |
| if not idx.empty: | |
| sheet.update_cell(idx[0] + 2, 2, user) | |
| sheet.update_cell(idx[0] + 2, 3, status) | |
| sheet.update_cell(idx[0] + 2, 4, datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| else: | |
| sheet.append_row([cluster_folder, user, status, datetime.now().strftime("%Y-%m-%d %H:%M:%S")]) | |
| def load_audio_files(folder): | |
| audio_files = glob.glob(os.path.join(folder, "*.WAV")) | |
| logger.debug(f"Audio files loaded from {folder}: {audio_files}") | |
| return sorted(audio_files) # Sort audio files alphabetically | |
| #@st.cache_data | |
| def plot_spec(file_path, cmap: str): | |
| import matplotlib.pyplot as plt | |
| s, fs = sound.load(file_path) | |
| duration = len(s) / fs | |
| # Adjust figure size based on the duration of the audio file | |
| if duration < 1: | |
| fig_size = (2, 2) | |
| elif duration < 2: | |
| fig_size = (2.5, 2) | |
| elif duration < 3: | |
| fig_size = (4, 2.5) | |
| else: | |
| fig_size = (5, 3.5) | |
| Sxx, tn, fn, ext = sound.spectrogram(s, fs, nperseg=1024, noverlap=512, flims=(0, fs // 2)) | |
| Sxx_db = power2dB(Sxx, db_range=70) | |
| Sxx_db = transform.rescale(Sxx_db, 0.5, anti_aliasing=True, channel_axis=None) | |
| fig, ax = plt.subplots(figsize=fig_size) | |
| img = ax.imshow(Sxx_db, aspect='auto', extent=ext, origin='lower', interpolation='bilinear', cmap=cmap) | |
| fig.colorbar(img, ax=ax, format="%+2.0f dB") | |
| ax.set(title='', xlabel='Time [s]', ylabel='Frequency [Hz]') | |
| plt.tight_layout() | |
| spectrogram_path = 'temp_spectrogram.png' | |
| plt.savefig(spectrogram_path) | |
| plt.close(fig) | |
| st.image(spectrogram_path) | |
| def spacing(): | |
| st.markdown("<br></br>", unsafe_allow_html=True) | |
| def update_google_sheet(client, rec_name, annotations_df): | |
| sheet = client.open("Annotações_Parque_das_Neblinas").worksheet(rec_name) | |
| sheet.clear() # Clear existing data | |
| sheet.update([annotations_df.columns.values.tolist()] + annotations_df.values.tolist()) | |
| def plot_pie_chart(annotations_df): | |
| total_clusters = len(annotations_df['cluster_number'].unique()) | |
| annotated_clusters = annotations_df[annotations_df['validated_class'] != 0]['cluster_number'].nunique() | |
| remaining_clusters = total_clusters - annotated_clusters | |
| labels = 'Annotated', 'Unannotated' | |
| sizes = [annotated_clusters, remaining_clusters] | |
| colors = ['#1fd655', '#ff9999'] | |
| explode = (0.1, 0) # explode the 1st slice | |
| fig1, ax1 = plt.subplots(figsize=(1, 1)) | |
| ax1.pie(sizes, explode=explode, labels=labels, colors=colors, autopct='%1.0f%%', | |
| shadow=True, startangle=90) | |
| ax1.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle. | |
| plt.rcParams['font.size'] = 9.0 | |
| st.pyplot(fig1) | |
| def iden(): | |
| # Set up the credentials and client | |
| st.markdown('#####') | |
| st.header("Bamscape Clusters Annotator") | |
| client = authorize_google_sheets() | |
| # Select a recorder to analyze | |
| rec_name = st.selectbox('**:violet[Please, select a recorder to analyze]**', | |
| options=['C1_G01', 'C1_G01_v2', 'C1_G02', 'C1_G03', 'C1_G04', 'C1_G05', 'T24_G06', 'T24_G07', 'T24_G08', 'T24_G09', 'T24_G10', 'T28_G11', 'T28_G12', 'T28_G13', 'T28_G14', 'T28_G15']) | |
| if rec_name: | |
| # Load the CSV files and Google Sheets | |
| sheet = client.open("Annotações_Parque_das_Neblinas").worksheet(f"{rec_name}") | |
| final_annotations = pd.DataFrame(sheet.get_all_records()) | |
| st.session_state.final_annotations = final_annotations | |
| annotations_df = st.session_state.final_annotations | |
| csv_file = f'{rec_name}_all_CLUSTERS_COMBINED.csv' | |
| # Display the pie chart | |
| plot_pie_chart(annotations_df) | |
| # Filter out the annotated rows based on specific columns | |
| unannotated_df = annotations_df[(annotations_df['validated_class'] == 0) | | |
| (annotations_df['validated_specie'] == 0) | | |
| (annotations_df['validator_name'] == 0)] | |
| # Load the initial state from the Google Sheet | |
| if 'folders' not in st.session_state: | |
| folders = unannotated_df['cluster_number'].astype(str).unique() | |
| st.session_state.folders = { | |
| folder: unannotated_df[unannotated_df['cluster_number'] == int(folder)]['period'].astype( | |
| str).unique().tolist() for folder in folders} | |
| # Get current annotation status | |
| annotation_status = get_annotation_status() | |
| # Check if user has previously uploaded files for the selected rec_name and store in session state | |
| if 'uploaded_files' not in st.session_state: | |
| st.session_state.uploaded_files = {} | |
| # Allow the user to upload a ZIP file | |
| uploaded_files = st.file_uploader(f"**:violet[Upload a ZIP file containing Clusters folders of {rec_name}]**", type=["zip"], accept_multiple_files=True) | |
| if uploaded_files: | |
| # Create a temporary directory to extract the ZIP file | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| for uploaded_file in uploaded_files: | |
| with zipfile.ZipFile(uploaded_file, 'r') as zip_ref: | |
| zip_ref.extractall(tmpdir) | |
| st.success(f"Clusters folders extracted successfully") | |
| # Log the extracted files and directories | |
| for root, dirs, files in os.walk(tmpdir): | |
| logger.debug(f"Extracted root: {root}") | |
| logger.debug(f"Extracted dirs: {dirs}") | |
| logger.debug(f"Extracted files: {files}") | |
| # Use the extracted directory as the base path | |
| base_path = tmpdir | |
| col1, col2, col3 = st.columns(3) | |
| selected_folder = None | |
| selected_subfolder = None | |
| with st.container(): | |
| with col1: | |
| if st.session_state.folders: | |
| selected_folder = st.selectbox("**:violet[Select a cluster folder to analyze]**", | |
| list(st.session_state.folders.keys())) | |
| logger.debug(f"Selected folder: {selected_folder}") | |
| else: | |
| st.success("Congratulations, all the clusters have been annotated! Please select another recorder to annotate.") | |
| with col2: | |
| if selected_folder: | |
| subfolders = st.session_state.folders[selected_folder] | |
| if subfolders: | |
| selected_subfolder = st.selectbox("**:violet[Select a subfolder to analyze]**", subfolders) | |
| logger.debug(f"Selected subfolder: {selected_subfolder}") | |
| with col3: | |
| selected_cmap = st.selectbox("**:violet[Choose a colormap to display spectrograms]**", | |
| options=['jet', 'Greys', 'plasma', 'viridis', 'inferno']) | |
| if selected_folder and selected_subfolder: | |
| subfolder_path = os.path.join(base_path, selected_folder, selected_subfolder) | |
| logger.debug(f"Subfolder path: {subfolder_path}") | |
| for root, dirs, files in os.walk(subfolder_path, topdown=False): | |
| targetfolder = files | |
| logger.debug(f"Files in subfolder: {files}") | |
| st.write(targetfolder) | |
| st.markdown("---") | |
| audio_files = load_audio_files(subfolder_path) | |
| logger.debug(f"Audio files found: {audio_files}") | |
| if audio_files: | |
| form = st.form(key=f"user_form") | |
| annotations = [] # Initialize annotations list here | |
| with form: | |
| for i, audio_file in enumerate(audio_files): | |
| file_name = os.path.basename(audio_file) | |
| cols = [1.70, 1, 1, 1, 1, 1] | |
| col1, col2, col3, col4, col5, col6 = st.columns(cols) | |
| with col1: | |
| with st.spinner('Processing...'): | |
| st.markdown( | |
| f"<h6 style='text-align: center; color: green;'>ROI: {file_name} </h10>", | |
| unsafe_allow_html=True) | |
| plot_spec(audio_file, cmap=selected_cmap) | |
| with col2: | |
| st.markdown(f"<h2 style='text-align: center; color: black;'></h10>", | |
| unsafe_allow_html=True) | |
| st.markdown('######') | |
| audio_data, audio_sr = sf.read(audio_file) | |
| st.audio(audio_data, format='audio/wav', sample_rate=audio_sr, ) | |
| with col3: | |
| st.markdown('#####') | |
| st.markdown(f"<h4 style='text-align: center; color: blue;'>Group</h5>", | |
| unsafe_allow_html=True) | |
| suggested_group = annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'suggested_class'].values[0] | |
| group_input = st.text_input(f"*(modify the text if needed)*", value=suggested_group, | |
| key=f"group_{file_name}") | |
| with col4: | |
| st.markdown('#####') | |
| st.markdown(f"<h4 style='text-align: center; color: blue;'>Species</h5>", | |
| unsafe_allow_html=True) | |
| suggested_label = annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'suggested_label'].values[0] | |
| scientific_name_input = st.text_input("*(modify the text if needed)*", | |
| value=suggested_label, | |
| key=f"scientific_name_{file_name}") | |
| with col5: | |
| st.markdown('#####') | |
| st.markdown(f"<h4 style='text-align: center; color: blue;'>Validator</h5>", | |
| unsafe_allow_html=True) | |
| validator_name = annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'validator_name'].values[0] | |
| validator_name_input = st.text_input("*(please, enter your name)*", | |
| value=validator_name, | |
| key=f"validator_name_{file_name}") | |
| with col6: | |
| st.markdown('#####') | |
| st.markdown(f"<h4 style='text-align: center; color: blue;'>Comment</h5>", | |
| unsafe_allow_html=True) | |
| comment = annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'comment'].values[0] | |
| comment_input = st.text_input("*(feel free to tell something)*", value=comment, | |
| key=f"validator_comment_{file_name}") | |
| annotations.append({ | |
| 'file_name': file_name, | |
| 'group_input': group_input, | |
| 'scientific_name_input': scientific_name_input, | |
| 'validator_name_input': validator_name_input, | |
| 'comment_input': comment_input | |
| }) | |
| submitButton = form.form_submit_button(label="Submit annotations") | |
| if submitButton: | |
| with st.spinner('Saving annotations...'): | |
| for annotation in annotations: | |
| file_name = annotation['file_name'] | |
| group_input = annotation['group_input'] | |
| scientific_name_input = annotation['scientific_name_input'] | |
| validator_name_input = annotation['validator_name_input'] | |
| comment_input = annotation['comment_input'] | |
| # Update the annotations_df DataFrame with new annotations | |
| annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'validated_class'] = group_input | |
| annotations_df.loc[ | |
| annotations_df[ | |
| 'filename_ts'] == file_name, 'validated_specie'] = scientific_name_input | |
| annotations_df.loc[ | |
| annotations_df[ | |
| 'filename_ts'] == file_name, 'validator_name'] = validator_name_input | |
| annotations_df.loc[ | |
| annotations_df['filename_ts'] == file_name, 'comment'] = comment_input | |
| annotations_df['validated_class'] = annotations_df['validated_class'].astype(str) | |
| # Save to CSV file | |
| annotations_df.to_csv(csv_file, index=False) | |
| # Update the Google Sheet | |
| update_google_sheet(client, rec_name, annotations_df) | |
| st.success("All annotations have been saved.") | |
| # Remove the analyzed subfolder from the list | |
| st.session_state.folders[selected_folder].remove(selected_subfolder) | |
| # If no more subfolders in the main folder, remove the main folder as well | |
| if not st.session_state.folders[selected_folder]: | |
| del st.session_state.folders[selected_folder] | |
| st.rerun() | |
| else: | |
| st.error("No audio files found in the selected subfolder.") | |
| spacing() | |
| # Display the DataFrame | |
| st.header("Annotated DataFrame") | |
| st.write( | |
| ":orange[Feel free to also access the dataframe on google sheet [link](https://docs.google.com/spreadsheets/d/1_-Zeg3lqif3_a5QnM4LQApVC8kdcyOksqdg1lOxRXcc/edit?gid=458225708#gid=458225708)]") | |
| df = get_google_sheet_data(rec_name) | |
| df_display = df.astype(str) | |
| st.write(df_display) | |
| st.markdown('#####') | |
| if __name__ == "__main__": | |
| iden() | |