Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import io | |
| import time | |
| #Configuração | |
| pd.set_option("styler.render.max_elements", 350000) | |
| dataset_file = './data/500+.csv' | |
| predictions_file = './data/predicao_proximo_ano.csv' | |
| #Inicialização | |
| def load_data(agregar_pinkfloyd): | |
| df_data = pd.read_csv(dataset_file) | |
| df_data['Id'] = range(1, len(df_data) + 1) | |
| df_data['Edicao'] = df_data.Ano.astype(str).str[-2:] + "-" + (df_data.Ano +1).astype(str).str[-2:] | |
| df_data['Data_Lancamento_Album'] = pd.to_datetime(df_data['Data_Lancamento_Album']) | |
| df_data['Decada_Lancamento_Album'] = df_data['Data_Lancamento_Album'].dt.year.apply(get_decada) | |
| df_data['Duracao'] = df_data.loc[:,'Duracao'].fillna(value=0) | |
| df_data['Duracao_Formatada'] = df_data.apply(lambda row: time.strftime("%M:%S", time.gmtime(row['Duracao'])), axis=1) | |
| if (agregar_pinkfloyd): | |
| df_data.loc[df_data['Musica'].str.contains('Another Brick', na=False), 'Musica'] = 'Another Brick in the Wall' | |
| df_data.loc[df_data['Musica'].str.contains('Another Brick', na=False), 'Duracao'] = 508 | |
| return df_data | |
| def load_predicoes(): | |
| df_data = pd.read_csv(predictions_file) | |
| return df_data | |
| #Funções | |
| def get_decada(ano): | |
| return 'Anos ' + str(ano)[2] + '0' | |
| def listar_edicoes(df_data): | |
| return np.array(np.unique(df_data.Edicao).tolist()) | |
| def listar_posicoes(df_data): | |
| return np.unique(df_data.Posicao).tolist() | |
| def listar_anos_lancamento(df_data): | |
| return np.unique(df_data.Data_Lancamento_Album.dropna().dt.year.apply(lambda x: f'{x:.0f}')).tolist() | |
| def listar_anos_edicoes(df_data): | |
| return np.array(np.unique(df_data.Ano).tolist()) | |
| def filtrar_edicao(df_data, edicao_inicial, edicao_final): | |
| edicoes = np.unique(df_data.Edicao).tolist() | |
| indice_inicial = edicoes.index(edicao_inicial) | |
| indice_final = edicoes.index(edicao_final)+1 | |
| edicoes_selecionadas = edicoes[indice_inicial:indice_final] | |
| return df_data[df_data['Edicao'].isin(edicoes_selecionadas)] | |
| def filtrar_posicoes(df_data, posicao_inicial, posicao_final): | |
| posicoes = list(range(posicao_inicial, posicao_final + 1)) | |
| return df_data[df_data['Posicao'].isin(posicoes)] | |
| def filtrar_anos(df_data, ano_inicial, ano_final): | |
| anos = list(range(int(ano_inicial), int(ano_final) + 1)) | |
| return df_data[df_data['Data_Lancamento_Album'].dt.year.isin(anos)] | |
| def filtrar_inconsistencias(df_data): | |
| return df_data.loc[(df_data['Artista'] != '???') & (df_data['Musica'].str.len() > 0) & (df_data['Observacao'] != 'repetida')] | |
| def get_primeiro_ano(df_data): | |
| return df_data.sort_values(by='Ano').head(1)['Ano'] | |
| def get_ultimo_ano(df_data): | |
| return df_data.sort_values(by='Ano').tail(1)['Ano'] | |
| def get_primeira_edicao(df_data): | |
| return df_data.sort_values(by='Ano').head(1)['Edicao'] | |
| def get_ultima_edicao(df_data): | |
| return df_data.sort_values(by='Ano').tail(1)['Edicao'] | |
| def get_primeiro_ano_lancamento(df_data): | |
| return df_data.dropna(subset=['Musica']).sort_values(by = 'Data_Lancamento_Album').head(1)['Data_Lancamento_Album'].dt.year | |
| def get_ultimo_ano_lancamento(df_data): | |
| return df_data.dropna(subset=['Musica']).sort_values(by = 'Data_Lancamento_Album').tail(1)['Data_Lancamento_Album'].dt.year | |
| def get_total_musicas_distintas(df_data): | |
| return len(get_musicas_distintas(df_data)) | |
| def get_total_artistas_distintos(df_data): | |
| return len(np.unique(df_data.Artista.dropna()).tolist()) | |
| def get_total_albuns_distintos(df_data): | |
| return len(np.unique(df_data.Album_Single.dropna().astype(str)).tolist()) | |
| def get_total_paises_distintos(df_data): | |
| return len(np.unique(df_data.Pais.dropna()).tolist()) | |
| def get_total_generos_distintos(df_data): | |
| return len(get_generos_distintos(df_data)) | |
| def get_musicas_distintas(df_data): | |
| return filtrar_inconsistencias(df_data).drop_duplicates(subset=['Artista', 'Musica', 'Observacao']) | |
| def get_generos_distintos(df_data): | |
| return filtrar_inconsistencias(df_data).drop_duplicates(subset='Genero') | |
| def get_total_horas(df_data): | |
| return np.sum(df_data.Duracao.dropna()) / 3600 | |
| def get_dicionario_musicas(df_data): | |
| df = (filtrar_inconsistencias(df_data) | |
| .drop_duplicates(subset={'Artista', 'Musica'}) | |
| .apply(lambda row: (row['Musica'] + ' (' + row['Artista'] + ')', row['Id']), axis=1) | |
| .sort_values() | |
| .tolist()) | |
| return dict((y, x) for x, y in df) | |
| def get_dicionario_artistas(df_data): | |
| df = (filtrar_inconsistencias(df_data) | |
| .drop_duplicates('Artista') | |
| .apply(lambda row: (row['Artista'], row['Artista']), axis=1) | |
| .sort_values() | |
| .tolist()) | |
| return dict((y, x) for x, y in df) | |
| def get_acumulado_musicas_distintas(df_data): | |
| edicoes = np.unique(df_data.Edicao).tolist() | |
| distinta_acumulado_periodo = [] | |
| for e in edicoes: | |
| distinta_acumulado_periodo.append(get_total_musicas_distintas(filtrar_edicao(df_data, edicoes[0], e))) | |
| return pd.DataFrame({'Anos': edicoes, 'Acumulado': distinta_acumulado_periodo}) | |
| def get_acumulado_generos_distintos(df_data): | |
| edicoes = np.unique(df_data.Edicao).tolist() | |
| distinto_acumulado_periodo = [] | |
| for e in edicoes: | |
| distinto_acumulado_periodo.append(get_total_generos_distintos(filtrar_edicao(df_data, edicoes[0], e))) | |
| return pd.DataFrame({'Anos': edicoes, 'Acumulado': distinto_acumulado_periodo}) | |
| def get_musicas_ano_lancamento(df_data): | |
| df_temp = get_musicas_distintas(df_data) | |
| return pd.DataFrame(df_temp.groupby(df_temp['Data_Lancamento_Album'].dt.year).size().reset_index().rename(columns={0: 'Total_Musicas'})) | |
| def get_musicas_decada_lancamento(df_data): | |
| df_temp = get_musicas_distintas(df_data) | |
| df_temp['Total_Musicas'] = df_temp.groupby('Decada_Lancamento_Album')['Decada_Lancamento_Album'].transform('count') | |
| return pd.DataFrame(df_temp.sort_values('Data_Lancamento_Album').groupby(['Decada_Lancamento_Album', 'Total_Musicas']).head(1))[['Decada_Lancamento_Album', 'Total_Musicas']] | |
| def get_musicas_todos_anos(df_data): | |
| df = filtrar_inconsistencias(df_data).copy() | |
| df['Count'] = df.groupby(['Artista', 'Musica', 'Observacao'], dropna=False)['Musica'].transform('count') | |
| df['Musica'] = df.apply(lambda row: row['Artista'] + ' - ' + row['Musica'], axis=1) | |
| df = df.loc[df['Count'] == df['Ano'].nunique()].sort_values(['Ano','Posicao']) | |
| return pd.pivot(data=df, index='Musica', columns='Edicao', values='Posicao') | |
| def get_musicas_por_pais(df_data, agrupar_edicoes=False): | |
| df = filtrar_inconsistencias(df_data) | |
| if (agrupar_edicoes): | |
| return df.groupby(['Country', 'Pais']).size().reset_index(name='Total_Musicas') | |
| else: | |
| return (df.groupby(['Edicao', 'Pais']) | |
| .size() | |
| .reset_index(name='Total_Musicas') | |
| .groupby(['Edicao', 'Pais']) | |
| .agg({'Total_Musicas': 'sum'}) | |
| .reset_index() | |
| .sort_values(by='Edicao') | |
| .sort_values(by='Total_Musicas', ascending=True)) | |
| def get_musicas_por_genero(df_data): | |
| df = filtrar_inconsistencias(df_data) | |
| return (df.groupby(['Edicao', 'Genero']) | |
| .size() | |
| .reset_index(name='Total_Musicas') | |
| .groupby(['Edicao', 'Genero']) | |
| .agg({'Total_Musicas': 'sum'}) | |
| .reset_index() | |
| .sort_values(by='Edicao') | |
| .sort_values(by='Total_Musicas', ascending=True)) | |
| def get_musicas_media_posicao(df_data): | |
| #Fórmula Si = wi * Ai + (1 - wi) * S, em que: | |
| #wi = mi/mi+m_avg, sendo mi número total de aparições da música e m_avg média de todas as aparições de músicas | |
| #Ai = média aritmética da posição da música | |
| #S = média aritmética da posição de todas as músicas | |
| #Si = média bayesiana da posição da música | |
| #https://arpitbhayani.me/blogs/bayesian-average/ | |
| df_distintas = filtrar_inconsistencias(df_data.copy()) | |
| #Workaround devido a problema de index com NaN no pivot_table. Necessário preencher o que está NaN com um valor dummy para poder fazer o grouping | |
| #https://github.com/pandas-dev/pandas/issues/3729 | |
| df_distintas['Observacao'] = df_distintas['Observacao'].fillna('dummy') | |
| df_totalizador = (df_distintas | |
| .groupby(['Artista', 'Musica', 'Observacao'], dropna=False) | |
| .size() | |
| .reset_index(name='Total_Aparicoes')) | |
| m_avg = df_totalizador['Total_Aparicoes'].mean() | |
| pivot_table = (pd.pivot_table(df_distintas, | |
| index=['Artista', 'Musica', 'Observacao'], | |
| columns='Ano', | |
| values='Posicao', | |
| margins=True, | |
| margins_name = 'Media_Posicao')) | |
| S = pivot_table.loc[('Media_Posicao', '', ''), 'Media_Posicao'] | |
| newdf = (df_distintas | |
| .groupby(['Artista', 'Musica', 'Observacao'], dropna=False) | |
| .size() | |
| .reset_index(name='Total_Aparicoes')) | |
| merged_df = pd.merge(df_totalizador, pivot_table, on = ['Artista', 'Musica', 'Observacao']) | |
| merged_df['Media_Bayesiana_Posicao'] = get_bayesian_average(merged_df['Total_Aparicoes'], m_avg, merged_df['Media_Posicao'], S) | |
| return merged_df.sort_values('Media_Bayesiana_Posicao') | |
| def get_bayesian_average(m, m_avg, A, S): | |
| w = m/(m+m_avg) | |
| return w * A + (1-w) * S | |
| def get_artistas_top_n(df_data, top_n): | |
| df = filtrar_posicoes(df_data, 1, top_n) | |
| df = (filtrar_inconsistencias(df) | |
| .groupby('Artista') | |
| .size() | |
| .sort_values(ascending=False) | |
| .reset_index(name='Total_Aparicoes')) | |
| return df | |
| def get_musicas_top_n(df_data, top_n): | |
| df = filtrar_posicoes(df_data, 1, top_n) | |
| df = (filtrar_inconsistencias(df) | |
| .groupby(['Artista', 'Musica']) | |
| .size() | |
| .sort_values(ascending=False) | |
| .reset_index(name='Total_Aparicoes')) | |
| return df | |
| def get_albuns_top_n(df_data, top_n): | |
| df = filtrar_posicoes(df_data, 1, top_n) | |
| df = (filtrar_inconsistencias(df) | |
| .groupby(['Artista', 'Album_Single']) | |
| .size() | |
| .sort_values(ascending=False) | |
| .reset_index(name='Total_Aparicoes')) | |
| return df | |
| def get_generos_top_n(df_data, top_n): | |
| df = filtrar_posicoes(df_data, 1, top_n) | |
| df = (filtrar_inconsistencias(df) | |
| .groupby('Genero') | |
| .size() | |
| .sort_values(ascending=False) | |
| .reset_index(name='Total_Aparicoes')) | |
| return df | |
| def get_artistas_posicoes_semelhantes_top_n(df_data, top_n): | |
| def analisar_retorno(grupo): | |
| grupo = grupo.sort_values('Ano') | |
| grupo['Posicao_Anterior'] = grupo['Posicao'].shift(1) | |
| return grupo | |
| df = df_data.groupby('Musica', group_keys=False)[['Ano', 'Posicao', 'Artista', 'Musica']].apply(analisar_retorno).dropna(subset=['Posicao_Anterior']) | |
| df['Posicao_Semelhante'] = np.abs(df['Posicao'] - df['Posicao_Anterior']) <= 5 | |
| df = (df.groupby('Artista')['Posicao_Semelhante'] | |
| .mean() | |
| .reset_index()) | |
| return df.sort_values(by=['Posicao_Semelhante', 'Artista'], ascending=[False, True]).head(top_n) | |
| def get_top_n_musicas_media_posicao(df_data, top_n): | |
| df = get_musicas_media_posicao(df_data).loc[:,['Artista', 'Musica']] | |
| df['Posicao'] = range(1, len(df) + 1) | |
| return df[['Posicao', 'Artista', 'Musica']].head(top_n).set_index('Posicao') | |
| def get_top_n_todas_edicoes(df_data, top_n): | |
| edicoes = np.unique(df_data.Edicao) | |
| edicao_inicial = edicoes[0] | |
| edicao_anterior = edicoes[len(edicoes) -2] | |
| df1 = get_top_n_musicas_media_posicao(df_data, top_n).reset_index() | |
| df2 = get_top_n_musicas_media_posicao(filtrar_edicao(df_data, edicao_inicial, edicao_anterior), 100).reset_index() | |
| merged_df = pd.merge(df1, df2, how='left', on = ['Artista', 'Musica'], suffixes=('_Atual', '_Anterior')) | |
| merged_df['Variacao'] = merged_df['Posicao_Anterior'] - merged_df['Posicao_Atual'] | |
| return merged_df | |
| def get_melhor_posicao_genero(df_data): | |
| df = df = df_data.sort_values('Ano') | |
| indexes = df.groupby(['Genero'])['Posicao'].idxmin() | |
| return df.loc[indexes, ['Genero', 'Posicao', 'Edicao']] | |
| def get_analise_edicao(df_data, medida, analise): | |
| agregadores = {"Musica_Artista":['Artista', 'Edicao'], | |
| "Album_Artista":['Album_Single', 'Edicao'], | |
| "Musica_Genero":['Genero', 'Edicao'], | |
| "Genero_Pais":['Pais','Edicao'], | |
| "Duracao":['Duracao','Edicao']} | |
| dimensoes = {"Musica_Artista":'Musica', | |
| "Album_Artista":'Musica', | |
| "Musica_Genero":'Musica', | |
| "Genero_Pais":'Genero', | |
| "Duracao":'Duracao'} | |
| agregador = agregadores[analise] | |
| dimensao = dimensoes[analise] | |
| index_name = 'Contagem' | |
| df = filtrar_inconsistencias(df_data) | |
| if (dimensao != 'Duracao'): | |
| df = df.groupby(agregador)[dimensao].count().reset_index(name=index_name) | |
| else: | |
| index_name = dimensao | |
| match medida: | |
| case 'Média': | |
| df = df.groupby('Edicao')[index_name].mean().reset_index(name=medida) | |
| case 'Mediana': | |
| df = df.groupby('Edicao')[index_name].median().reset_index(name=medida) | |
| case 'Máximo': | |
| df = df.groupby('Edicao')[index_name].max().reset_index(name=medida) | |
| case default: | |
| df = df | |
| if (dimensao == 'Duracao'): | |
| df[medida] = pd.to_datetime(df[medida], unit='s') | |
| return np.around(df,2) | |
| def get_idade_por_edicao(df_data): | |
| df = df_data.copy() | |
| df = filtrar_inconsistencias(df) | |
| df['Idade_Lancamento'] = df['Ano'] + 1 - df['Data_Lancamento_Album'].dt.year | |
| df = df.loc[:,['Edicao', 'Idade_Lancamento']] | |
| df['Media_Idade_Lancamento'] = df.groupby('Edicao')['Idade_Lancamento'].transform('mean').round(2) | |
| df['Mediana_Idade_Lancamento'] = df.groupby('Edicao')['Idade_Lancamento'].transform('median').round(0) | |
| return df.groupby(['Edicao', 'Media_Idade_Lancamento', 'Mediana_Idade_Lancamento']).size().reset_index() | |
| def get_onehit_por_edicao(df_data): | |
| df = df_data.copy() | |
| df = filtrar_inconsistencias(df) | |
| contagem = get_musicas_distintas(df).groupby('Artista').count().reset_index()[['Artista', 'Ano']] | |
| contagem.columns = ['Artista', 'Count'] | |
| one_hit_wonders = contagem[contagem['Count'] == 1].sort_values(by='Artista') | |
| one_hit_wonders = (pd.merge(df, one_hit_wonders[['Artista']], on='Artista', how='inner') | |
| .groupby('Edicao')['Artista'] | |
| .nunique() | |
| .reset_index(name='One_Hit_Wonders')) | |
| artistas_recorrentes = contagem[contagem['Count'] > 1].sort_values(by='Count', ascending=False) | |
| artistas_recorrentes = (pd.merge(df, artistas_recorrentes[['Artista']], on='Artista', how='inner') | |
| .groupby('Edicao')['Artista'] | |
| .nunique() | |
| .reset_index(name='Recorrentes')) | |
| return pd.merge(one_hit_wonders, artistas_recorrentes, on='Edicao', how='outer').fillna(0) | |
| def get_dados_cumulativos(df_data, atributo): | |
| df_data = filtrar_inconsistencias(df_data) | |
| df_data = (df_data.groupby(['Ano', atributo]) | |
| .size() | |
| .reset_index(name='Count') | |
| .groupby(['Ano', atributo])['Count'] | |
| .sum() | |
| .groupby(level=atributo) | |
| .cumsum() | |
| .reset_index()) | |
| df_data = df_data.sort_values(by='Count', ascending=False).groupby('Ano').head(len(df_data)) | |
| return df_data | |
| def get_variacao_entre_anos(df, ano_inicial, ano_final, quantidade_musicas, quedas): | |
| anos_para_comparar = [ano_inicial, ano_final] | |
| df_sorted = df.sort_values(by=['Musica', 'Artista', 'Ano']) | |
| df_sorted = df_sorted[df_sorted['Ano'].isin(anos_para_comparar)] | |
| pivot = df_sorted.pivot_table(index=['Musica', 'Artista'], columns='Ano', values='Posicao').reset_index() | |
| pivot.columns.name = None | |
| pivot = pivot.rename(columns={ | |
| anos_para_comparar[0]: ano_inicial, | |
| anos_para_comparar[1]: ano_final | |
| }) | |
| pivot['Variacao'] = pivot[ano_inicial] - pivot[ano_final] | |
| if (quedas): | |
| top_n = pivot.sort_values(by='Variacao').head(quantidade_musicas) | |
| top_n['Posicao_Anterior'] = top_n[ano_inicial] *1.5 | |
| top_n['Posicao_Atual'] = top_n[ano_final] | |
| else: | |
| top_n = pivot.sort_values(by='Variacao', ascending=False).head(quantidade_musicas) | |
| top_n['Posicao_Anterior'] = top_n[ano_inicial] | |
| top_n['Posicao_Atual'] = top_n[ano_final] *1.5 | |
| return top_n | |
| def get_predicoes(df): | |
| df = df[["posicao_ranking", "Artista", "Musica"]].head(500) | |
| return df | |
| def get_probabilidades(df): | |
| df = df.sort_values(by=['prob_aparecer', 'Artista', 'Musica'], ascending=[False, True, True]) | |
| df["prob_aparecer"] = df["prob_aparecer"] * 100 | |
| df = df[["Artista", "Musica", "prob_aparecer"]] | |
| return df |