Spaces:
Running
Running
| import pandas as pd | |
| import warnings | |
| import re | |
| warnings.filterwarnings("ignore") | |
| from fuzzywuzzy import process | |
| def standardize_author_names(df): | |
| # Create a list of unique author names | |
| unique_authors = df['Author'].unique() | |
| # Create a dictionary to map original names to standard names | |
| author_map = {} | |
| for author in unique_authors: | |
| match = process.extractOne(author, author_map.keys()) | |
| if match and match[1] > 90: # Adjust threshold as needed | |
| author_map[author] = match[0] | |
| else: | |
| author_map[author] = author | |
| # Apply the mapping to the dataframe | |
| df['Author'] = df['Author'].map(author_map) | |
| return df | |
| def is_contained(subtitle, fulltitle): | |
| return subtitle in fulltitle | |
| def consolidate_titles_based_on_containment(df): | |
| # Get unique titles | |
| titles = df['Title'].unique() | |
| # Create a dictionary to map original titles to consolidated titles | |
| title_map = {} | |
| for title in titles: | |
| # Check if the title is already in the map | |
| if title not in title_map: | |
| # Find all titles that this title is contained in | |
| for candidate in titles: | |
| if "All Stars Bonus" not in candidate and title != candidate and is_contained(title, candidate): | |
| title_map[title] = candidate | |
| break | |
| else: | |
| title_map[title] = title | |
| # Apply the mapping to the dataframe | |
| df['Title'] = df['Title'].map(title_map) | |
| return df | |
| def generate_report(royalties_path, payouts_path): | |
| sales_df = pd.read_excel(royalties_path, sheet_name="Total Earnings", header=1) | |
| payments_df = pd.read_excel(payouts_path, sheet_name="Payments") | |
| sales_df_trim = sales_df[["Title", "Author", "Payout Plan", "Currency", "Earnings"]] | |
| sales_df_trim['Title'] = sales_df_trim.apply( | |
| lambda row: f"{row['Title']} ({row['Payout Plan']})" if row['Payout Plan'] == "All Stars Bonus" else row['Title'], | |
| axis=1) | |
| royalties_norm_text = consolidate_titles_based_on_containment(standardize_author_names(sales_df_trim)) | |
| royalties_currencies = royalties_norm_text.groupby('Currency', as_index=False).agg({'Earnings':'sum'}) | |
| payouts_df= payments_df[["Currency","Net Earnings","FX Rate","Payout Amount"]].dropna(subset=['Payout Amount']) | |
| payouts_df = payouts_df.groupby('Currency', as_index=False).agg({'Net Earnings':'sum', 'FX Rate':'mean', 'Payout Amount':'sum'}) | |
| payouts_df['FX Rate'] = payouts_df['FX Rate'].fillna(1) | |
| merged_df = pd.merge(royalties_currencies, payouts_df, on='Currency', how='left') | |
| merged_df['Earnings_conversion'] = merged_df['Earnings'] * merged_df['FX Rate'] | |
| merged_df['Division_Quotient'] = merged_df['Earnings_conversion']/merged_df['Payout Amount'] | |
| merged_df['Adjusted_FX_rate'] = (merged_df['FX Rate']/merged_df['Division_Quotient']) | |
| final_royalties = pd.merge(royalties_norm_text,(merged_df[['Currency', 'Adjusted_FX_rate']]), on='Currency', how='left') | |
| final_royalties['Adjusted_Earnings'] = final_royalties['Earnings'] * final_royalties['Adjusted_FX_rate'] | |
| return final_royalties | |
| def print_report(royalties_path, payments_path): | |
| final_royalties = generate_report(royalties_path, payments_path) | |
| grouped = final_royalties.groupby(['Author', 'Title'], as_index=False)['Adjusted_Earnings'].sum() | |
| # Step 5: Generate the report | |
| print("Total: ", final_royalties["Adjusted_Earnings"].sum()) | |
| print("-----------------------\n") | |
| for author, group in grouped.groupby('Author'): | |
| total_earnings = group['Adjusted_Earnings'].sum() | |
| print(f"Author: {author}") | |
| print(f"Total Earnings: ${total_earnings:.2f}") | |
| # Loop through the books and print each title with earnings | |
| for _, row in group.iterrows(): | |
| print(f" {row['Title']}: ${row['Adjusted_Earnings']:.2f}") | |
| print("\n") | |
| def generate_report_for_insertion(royalties_path, payouts_path): | |
| """Generates a DataFrame of individual royalty entries for database insertion.""" | |
| # Extract month and year from the royalties_path filename | |
| match = re.search(r"(\d{4})-(\d{2})", royalties_path) | |
| if match: | |
| year = int(match.group(1)) | |
| month = int(match.group(2)) | |
| else: | |
| # Handle cases where the pattern doesn't match (e.g., raise an error or use defaults) | |
| raise ValueError("Could not extract year and month from royalties_path filename.") | |
| sales_df = pd.read_excel(royalties_path, sheet_name="Total Earnings", header=1) | |
| payments_df = pd.read_excel(payouts_path, sheet_name="Payments") | |
| sales_df_trim = sales_df[["Title", "Author", "Payout Plan", "Currency", "Earnings"]] | |
| sales_df_trim['Title'] = sales_df_trim.apply( | |
| lambda row: f"{row['Title']} ({row['Payout Plan']})" if row['Payout Plan'] == "All Stars Bonus" else row['Title'], | |
| axis=1 | |
| ) | |
| royalties_norm_text = consolidate_titles_based_on_containment(standardize_author_names(sales_df_trim)) | |
| royalties_currencies = royalties_norm_text.groupby('Currency', as_index=False).agg({'Earnings': 'sum'}) | |
| payouts_df = payments_df[["Currency", "Net Earnings", "FX Rate", "Payout Amount"]].dropna(subset=['Payout Amount']) | |
| payouts_df = payouts_df.groupby('Currency', as_index=False).agg({'Net Earnings': 'sum', 'FX Rate': 'mean', 'Payout Amount': 'sum'}) | |
| payouts_df['FX Rate'] = payouts_df['FX Rate'].fillna(1) | |
| merged_df = pd.merge(royalties_currencies, payouts_df, on='Currency', how='left') | |
| merged_df['Earnings_conversion'] = merged_df['Earnings'] * merged_df['FX Rate'] | |
| merged_df['Division_Quotient'] = merged_df['Earnings_conversion'] / merged_df['Payout Amount'] | |
| merged_df['Adjusted_FX_rate'] = (merged_df['FX Rate'] / merged_df['Division_Quotient']) | |
| final_royalties = pd.merge(royalties_norm_text, (merged_df[['Currency', 'Adjusted_FX_rate']]), on='Currency', how='left') | |
| final_royalties['Adjusted_Earnings'] = final_royalties['Earnings'] * final_royalties['Adjusted_FX_rate'] | |
| # Add month and year columns | |
| final_royalties['month'] = month | |
| final_royalties['year'] = year | |
| # Return the DataFrame | |
| return final_royalties | |