import pandas as pd import numpy as np import io import time #Configuração pd.set_option("styler.render.max_elements", 350000) dataset_file = './data/500+.csv' predictions_file = './data/predicao_proximo_ano.csv' #Inicialização def load_data(agregar_pinkfloyd): df_data = pd.read_csv(dataset_file) df_data['Id'] = range(1, len(df_data) + 1) df_data['Edicao'] = df_data.Ano.astype(str).str[-2:] + "-" + (df_data.Ano +1).astype(str).str[-2:] df_data['Data_Lancamento_Album'] = pd.to_datetime(df_data['Data_Lancamento_Album']) df_data['Decada_Lancamento_Album'] = df_data['Data_Lancamento_Album'].dt.year.apply(get_decada) df_data['Duracao'] = df_data.loc[:,'Duracao'].fillna(value=0) df_data['Duracao_Formatada'] = df_data.apply(lambda row: time.strftime("%M:%S", time.gmtime(row['Duracao'])), axis=1) if (agregar_pinkfloyd): df_data.loc[df_data['Musica'].str.contains('Another Brick', na=False), 'Musica'] = 'Another Brick in the Wall' df_data.loc[df_data['Musica'].str.contains('Another Brick', na=False), 'Duracao'] = 508 return df_data def load_predicoes(): df_data = pd.read_csv(predictions_file) return df_data #Funções def get_decada(ano): return 'Anos ' + str(ano)[2] + '0' def listar_edicoes(df_data): return np.array(np.unique(df_data.Edicao).tolist()) def listar_posicoes(df_data): return np.unique(df_data.Posicao).tolist() def listar_anos_lancamento(df_data): return np.unique(df_data.Data_Lancamento_Album.dropna().dt.year.apply(lambda x: f'{x:.0f}')).tolist() def listar_anos_edicoes(df_data): return np.array(np.unique(df_data.Ano).tolist()) def filtrar_edicao(df_data, edicao_inicial, edicao_final): edicoes = np.unique(df_data.Edicao).tolist() indice_inicial = edicoes.index(edicao_inicial) indice_final = edicoes.index(edicao_final)+1 edicoes_selecionadas = edicoes[indice_inicial:indice_final] return df_data[df_data['Edicao'].isin(edicoes_selecionadas)] def filtrar_posicoes(df_data, posicao_inicial, posicao_final): posicoes = list(range(posicao_inicial, posicao_final + 1)) return df_data[df_data['Posicao'].isin(posicoes)] def filtrar_anos(df_data, ano_inicial, ano_final): anos = list(range(int(ano_inicial), int(ano_final) + 1)) return df_data[df_data['Data_Lancamento_Album'].dt.year.isin(anos)] def filtrar_inconsistencias(df_data): return df_data.loc[(df_data['Artista'] != '???') & (df_data['Musica'].str.len() > 0) & (df_data['Observacao'] != 'repetida')] def get_primeiro_ano(df_data): return df_data.sort_values(by='Ano').head(1)['Ano'] def get_ultimo_ano(df_data): return df_data.sort_values(by='Ano').tail(1)['Ano'] def get_primeira_edicao(df_data): return df_data.sort_values(by='Ano').head(1)['Edicao'] def get_ultima_edicao(df_data): return df_data.sort_values(by='Ano').tail(1)['Edicao'] def get_primeiro_ano_lancamento(df_data): return df_data.dropna(subset=['Musica']).sort_values(by = 'Data_Lancamento_Album').head(1)['Data_Lancamento_Album'].dt.year def get_ultimo_ano_lancamento(df_data): return df_data.dropna(subset=['Musica']).sort_values(by = 'Data_Lancamento_Album').tail(1)['Data_Lancamento_Album'].dt.year def get_total_musicas_distintas(df_data): return len(get_musicas_distintas(df_data)) def get_total_artistas_distintos(df_data): return len(np.unique(df_data.Artista.dropna()).tolist()) def get_total_albuns_distintos(df_data): return len(np.unique(df_data.Album_Single.dropna().astype(str)).tolist()) def get_total_paises_distintos(df_data): return len(np.unique(df_data.Pais.dropna()).tolist()) def get_total_generos_distintos(df_data): return len(get_generos_distintos(df_data)) def get_musicas_distintas(df_data): return filtrar_inconsistencias(df_data).drop_duplicates(subset=['Artista', 'Musica', 'Observacao']) def get_generos_distintos(df_data): return filtrar_inconsistencias(df_data).drop_duplicates(subset='Genero') def get_total_horas(df_data): return np.sum(df_data.Duracao.dropna()) / 3600 def get_dicionario_musicas(df_data): df = (filtrar_inconsistencias(df_data) .drop_duplicates(subset={'Artista', 'Musica'}) .apply(lambda row: (row['Musica'] + ' (' + row['Artista'] + ')', row['Id']), axis=1) .sort_values() .tolist()) return dict((y, x) for x, y in df) def get_dicionario_artistas(df_data): df = (filtrar_inconsistencias(df_data) .drop_duplicates('Artista') .apply(lambda row: (row['Artista'], row['Artista']), axis=1) .sort_values() .tolist()) return dict((y, x) for x, y in df) def get_acumulado_musicas_distintas(df_data): edicoes = np.unique(df_data.Edicao).tolist() distinta_acumulado_periodo = [] for e in edicoes: distinta_acumulado_periodo.append(get_total_musicas_distintas(filtrar_edicao(df_data, edicoes[0], e))) return pd.DataFrame({'Anos': edicoes, 'Acumulado': distinta_acumulado_periodo}) def get_acumulado_generos_distintos(df_data): edicoes = np.unique(df_data.Edicao).tolist() distinto_acumulado_periodo = [] for e in edicoes: distinto_acumulado_periodo.append(get_total_generos_distintos(filtrar_edicao(df_data, edicoes[0], e))) return pd.DataFrame({'Anos': edicoes, 'Acumulado': distinto_acumulado_periodo}) def get_musicas_ano_lancamento(df_data): df_temp = get_musicas_distintas(df_data) return pd.DataFrame(df_temp.groupby(df_temp['Data_Lancamento_Album'].dt.year).size().reset_index().rename(columns={0: 'Total_Musicas'})) def get_musicas_decada_lancamento(df_data): df_temp = get_musicas_distintas(df_data) df_temp['Total_Musicas'] = df_temp.groupby('Decada_Lancamento_Album')['Decada_Lancamento_Album'].transform('count') return pd.DataFrame(df_temp.sort_values('Data_Lancamento_Album').groupby(['Decada_Lancamento_Album', 'Total_Musicas']).head(1))[['Decada_Lancamento_Album', 'Total_Musicas']] def get_musicas_todos_anos(df_data): df = filtrar_inconsistencias(df_data).copy() df['Count'] = df.groupby(['Artista', 'Musica', 'Observacao'], dropna=False)['Musica'].transform('count') df['Musica'] = df.apply(lambda row: row['Artista'] + ' - ' + row['Musica'], axis=1) df = df.loc[df['Count'] == df['Ano'].nunique()].sort_values(['Ano','Posicao']) return pd.pivot(data=df, index='Musica', columns='Edicao', values='Posicao') def get_musicas_por_pais(df_data, agrupar_edicoes=False): df = filtrar_inconsistencias(df_data) if (agrupar_edicoes): return df.groupby(['Country', 'Pais']).size().reset_index(name='Total_Musicas') else: return (df.groupby(['Edicao', 'Pais']) .size() .reset_index(name='Total_Musicas') .groupby(['Edicao', 'Pais']) .agg({'Total_Musicas': 'sum'}) .reset_index() .sort_values(by='Edicao') .sort_values(by='Total_Musicas', ascending=True)) def get_musicas_por_genero(df_data): df = filtrar_inconsistencias(df_data) return (df.groupby(['Edicao', 'Genero']) .size() .reset_index(name='Total_Musicas') .groupby(['Edicao', 'Genero']) .agg({'Total_Musicas': 'sum'}) .reset_index() .sort_values(by='Edicao') .sort_values(by='Total_Musicas', ascending=True)) def get_musicas_media_posicao(df_data): #Fórmula Si = wi * Ai + (1 - wi) * S, em que: #wi = mi/mi+m_avg, sendo mi número total de aparições da música e m_avg média de todas as aparições de músicas #Ai = média aritmética da posição da música #S = média aritmética da posição de todas as músicas #Si = média bayesiana da posição da música #https://arpitbhayani.me/blogs/bayesian-average/ df_distintas = filtrar_inconsistencias(df_data.copy()) #Workaround devido a problema de index com NaN no pivot_table. Necessário preencher o que está NaN com um valor dummy para poder fazer o grouping #https://github.com/pandas-dev/pandas/issues/3729 df_distintas['Observacao'] = df_distintas['Observacao'].fillna('dummy') df_totalizador = (df_distintas .groupby(['Artista', 'Musica', 'Observacao'], dropna=False) .size() .reset_index(name='Total_Aparicoes')) m_avg = df_totalizador['Total_Aparicoes'].mean() pivot_table = (pd.pivot_table(df_distintas, index=['Artista', 'Musica', 'Observacao'], columns='Ano', values='Posicao', margins=True, margins_name = 'Media_Posicao')) S = pivot_table.loc[('Media_Posicao', '', ''), 'Media_Posicao'] newdf = (df_distintas .groupby(['Artista', 'Musica', 'Observacao'], dropna=False) .size() .reset_index(name='Total_Aparicoes')) merged_df = pd.merge(df_totalizador, pivot_table, on = ['Artista', 'Musica', 'Observacao']) merged_df['Media_Bayesiana_Posicao'] = get_bayesian_average(merged_df['Total_Aparicoes'], m_avg, merged_df['Media_Posicao'], S) return merged_df.sort_values('Media_Bayesiana_Posicao') def get_bayesian_average(m, m_avg, A, S): w = m/(m+m_avg) return w * A + (1-w) * S def get_artistas_top_n(df_data, top_n): df = filtrar_posicoes(df_data, 1, top_n) df = (filtrar_inconsistencias(df) .groupby('Artista') .size() .sort_values(ascending=False) .reset_index(name='Total_Aparicoes')) return df def get_musicas_top_n(df_data, top_n): df = filtrar_posicoes(df_data, 1, top_n) df = (filtrar_inconsistencias(df) .groupby(['Artista', 'Musica']) .size() .sort_values(ascending=False) .reset_index(name='Total_Aparicoes')) return df def get_albuns_top_n(df_data, top_n): df = filtrar_posicoes(df_data, 1, top_n) df = (filtrar_inconsistencias(df) .groupby(['Artista', 'Album_Single']) .size() .sort_values(ascending=False) .reset_index(name='Total_Aparicoes')) return df def get_generos_top_n(df_data, top_n): df = filtrar_posicoes(df_data, 1, top_n) df = (filtrar_inconsistencias(df) .groupby('Genero') .size() .sort_values(ascending=False) .reset_index(name='Total_Aparicoes')) return df def get_artistas_posicoes_semelhantes_top_n(df_data, top_n): def analisar_retorno(grupo): grupo = grupo.sort_values('Ano') grupo['Posicao_Anterior'] = grupo['Posicao'].shift(1) return grupo df = df_data.groupby('Musica', group_keys=False)[['Ano', 'Posicao', 'Artista', 'Musica']].apply(analisar_retorno).dropna(subset=['Posicao_Anterior']) df['Posicao_Semelhante'] = np.abs(df['Posicao'] - df['Posicao_Anterior']) <= 5 df = (df.groupby('Artista')['Posicao_Semelhante'] .mean() .reset_index()) return df.sort_values(by=['Posicao_Semelhante', 'Artista'], ascending=[False, True]).head(top_n) def get_top_n_musicas_media_posicao(df_data, top_n): df = get_musicas_media_posicao(df_data).loc[:,['Artista', 'Musica']] df['Posicao'] = range(1, len(df) + 1) return df[['Posicao', 'Artista', 'Musica']].head(top_n).set_index('Posicao') def get_top_n_todas_edicoes(df_data, top_n): edicoes = np.unique(df_data.Edicao) edicao_inicial = edicoes[0] edicao_anterior = edicoes[len(edicoes) -2] df1 = get_top_n_musicas_media_posicao(df_data, top_n).reset_index() df2 = get_top_n_musicas_media_posicao(filtrar_edicao(df_data, edicao_inicial, edicao_anterior), 100).reset_index() merged_df = pd.merge(df1, df2, how='left', on = ['Artista', 'Musica'], suffixes=('_Atual', '_Anterior')) merged_df['Variacao'] = merged_df['Posicao_Anterior'] - merged_df['Posicao_Atual'] return merged_df def get_melhor_posicao_genero(df_data): df = df = df_data.sort_values('Ano') indexes = df.groupby(['Genero'])['Posicao'].idxmin() return df.loc[indexes, ['Genero', 'Posicao', 'Edicao']] def get_analise_edicao(df_data, medida, analise): agregadores = {"Musica_Artista":['Artista', 'Edicao'], "Album_Artista":['Album_Single', 'Edicao'], "Musica_Genero":['Genero', 'Edicao'], "Genero_Pais":['Pais','Edicao'], "Duracao":['Duracao','Edicao']} dimensoes = {"Musica_Artista":'Musica', "Album_Artista":'Musica', "Musica_Genero":'Musica', "Genero_Pais":'Genero', "Duracao":'Duracao'} agregador = agregadores[analise] dimensao = dimensoes[analise] index_name = 'Contagem' df = filtrar_inconsistencias(df_data) if (dimensao != 'Duracao'): df = df.groupby(agregador)[dimensao].count().reset_index(name=index_name) else: index_name = dimensao match medida: case 'Média': df = df.groupby('Edicao')[index_name].mean().reset_index(name=medida) case 'Mediana': df = df.groupby('Edicao')[index_name].median().reset_index(name=medida) case 'Máximo': df = df.groupby('Edicao')[index_name].max().reset_index(name=medida) case default: df = df if (dimensao == 'Duracao'): df[medida] = pd.to_datetime(df[medida], unit='s') return np.around(df,2) def get_idade_por_edicao(df_data): df = df_data.copy() df = filtrar_inconsistencias(df) df['Idade_Lancamento'] = df['Ano'] + 1 - df['Data_Lancamento_Album'].dt.year df = df.loc[:,['Edicao', 'Idade_Lancamento']] df['Media_Idade_Lancamento'] = df.groupby('Edicao')['Idade_Lancamento'].transform('mean').round(2) df['Mediana_Idade_Lancamento'] = df.groupby('Edicao')['Idade_Lancamento'].transform('median').round(0) return df.groupby(['Edicao', 'Media_Idade_Lancamento', 'Mediana_Idade_Lancamento']).size().reset_index() def get_onehit_por_edicao(df_data): df = df_data.copy() df = filtrar_inconsistencias(df) contagem = get_musicas_distintas(df).groupby('Artista').count().reset_index()[['Artista', 'Ano']] contagem.columns = ['Artista', 'Count'] one_hit_wonders = contagem[contagem['Count'] == 1].sort_values(by='Artista') one_hit_wonders = (pd.merge(df, one_hit_wonders[['Artista']], on='Artista', how='inner') .groupby('Edicao')['Artista'] .nunique() .reset_index(name='One_Hit_Wonders')) artistas_recorrentes = contagem[contagem['Count'] > 1].sort_values(by='Count', ascending=False) artistas_recorrentes = (pd.merge(df, artistas_recorrentes[['Artista']], on='Artista', how='inner') .groupby('Edicao')['Artista'] .nunique() .reset_index(name='Recorrentes')) return pd.merge(one_hit_wonders, artistas_recorrentes, on='Edicao', how='outer').fillna(0) def get_dados_cumulativos(df_data, atributo): df_data = filtrar_inconsistencias(df_data) df_data = (df_data.groupby(['Ano', atributo]) .size() .reset_index(name='Count') .groupby(['Ano', atributo])['Count'] .sum() .groupby(level=atributo) .cumsum() .reset_index()) df_data = df_data.sort_values(by='Count', ascending=False).groupby('Ano').head(len(df_data)) return df_data def get_variacao_entre_anos(df, ano_inicial, ano_final, quantidade_musicas, quedas): anos_para_comparar = [ano_inicial, ano_final] df_sorted = df.sort_values(by=['Musica', 'Artista', 'Ano']) df_sorted = df_sorted[df_sorted['Ano'].isin(anos_para_comparar)] pivot = df_sorted.pivot_table(index=['Musica', 'Artista'], columns='Ano', values='Posicao').reset_index() pivot.columns.name = None pivot = pivot.rename(columns={ anos_para_comparar[0]: ano_inicial, anos_para_comparar[1]: ano_final }) pivot['Variacao'] = pivot[ano_inicial] - pivot[ano_final] if (quedas): top_n = pivot.sort_values(by='Variacao').head(quantidade_musicas) top_n['Posicao_Anterior'] = top_n[ano_inicial] *1.5 top_n['Posicao_Atual'] = top_n[ano_final] else: top_n = pivot.sort_values(by='Variacao', ascending=False).head(quantidade_musicas) top_n['Posicao_Anterior'] = top_n[ano_inicial] top_n['Posicao_Atual'] = top_n[ano_final] *1.5 return top_n def get_predicoes(df): df = df[["posicao_ranking", "Artista", "Musica"]].head(500) return df def get_probabilidades(df): df = df.sort_values(by=['prob_aparecer', 'Artista', 'Musica'], ascending=[False, True, True]) df["prob_aparecer"] = df["prob_aparecer"] * 100 df = df[["Artista", "Musica", "prob_aparecer"]] return df