Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import json | |
| import os | |
| import regex as re | |
| import numpy as np | |
| from functions_ui_version import (title_check, description_check, meta_check, | |
| credits_check, qualifiers_check, advisories_check, | |
| deeplink_check, availability_check, sport_check, language_alignment_check) #,check_images | |
| def prep_csv(file_path, columns_to_json: list): | |
| # Load CSV and prepare JSON columns. Set index to ID. | |
| df = pd.read_csv(file_path) | |
| for col in columns_to_json: | |
| df[col] = df[col].apply( | |
| lambda x: list(json.loads(x)) if pd.notna(x) else [] | |
| if isinstance(x, str) | |
| else (list(x) if isinstance(x, (tuple, np.ndarray)) else []) | |
| ) | |
| df.set_index('ID', inplace=True) | |
| return df | |
| def prep_csv_st(file_input, columns_to_json: list): | |
| import io | |
| import json | |
| import numpy as np | |
| import pandas as pd | |
| # Handle both UploadedFile objects and paths | |
| if hasattr(file_input, "getvalue"): # Streamlit UploadedFile | |
| file_input.seek(0) # reset pointer | |
| df = pd.read_csv(io.BytesIO(file_input.getvalue())) | |
| else: | |
| df = pd.read_csv(file_input) | |
| # === Convert JSON-like columns === | |
| for col in columns_to_json: | |
| df[col] = df[col].apply( | |
| lambda x: list(json.loads(x)) if pd.notna(x) and isinstance(x, str) | |
| else (list(x) if isinstance(x, (tuple, np.ndarray)) else []) | |
| ) | |
| df.set_index('ID', inplace=True) | |
| return df | |
| def main(): | |
| # ===Inputs=== | |
| csv_path = input("Enter path to catalogue CSV: ").strip() | |
| catalog_type = input("Is this catalog in production? y/n") | |
| catalog_language = input("Type the language code for this catalog: ").strip() | |
| if catalog_type.lower() == 'y': | |
| catalog_type = 'production' | |
| else: | |
| catalog_type = "staging" | |
| other_languages = input(""" | |
| List all the language codes we are interested in other than English (en), | |
| separated by commas. Leave blank if none: | |
| """) | |
| other_languages = [lang.strip() for lang in other_languages.split(",") if lang.strip()] | |
| # Extract catalogue ID | |
| match = re.search(r"catalog_(\d+)_programs", csv_path) | |
| if match: | |
| catalog_id = int(match.group(1)) | |
| else: | |
| print("No catalogue ID found in the file name. Please ensure the file name contains 'catalog_<ID>_programs.csv'.") | |
| # ===Load Data=== | |
| columns_to_json = ["Titles", "Descriptions", "Credits", "IDs", "Extras", | |
| "Images", "Advisories", "Deeplinks", "Availabilities"] | |
| df = prep_csv(csv_path, columns_to_json) | |
| # filter out deleted programs if catalog_tpye is staging | |
| if catalog_type == "staging": | |
| # if deleted at is not empty --> filter out item. # FILTER FOR DELETED PROGRAMS. NOT SURE IF WE WANT TO KEEP THIS. | |
| df = df[(df["Deleted At"].isna()) | (df["Deleted At"] == "")] # Commented out for now. | |
| # === Run Checks === | |
| print("\n--- Running Checks ---") | |
| title_issues, title_warnings = title_check(df, catalog_language, other_languages) | |
| language_alignment_warnings = language_alignment_check(df) | |
| description_issues, description_warnings = description_check(df, catalog_language, other_languages) | |
| meta_issues, meta_warnings = meta_check(df) | |
| credits_issues, credits_warnings = credits_check(df) | |
| # images_issues = image_check() | |
| qualifiers_warnings = qualifiers_check(df) | |
| advisories_issues, advisories_warning = advisories_check(df) | |
| deeplinks_issues, deeplinks_warning = deeplink_check(df) | |
| availabilites_issues, availabilites_warning = availability_check(df) | |
| sport_issues = sport_check(df) | |
| # === Collect Results === | |
| results = { | |
| "titles": {"issues": title_issues, "warnings": title_warnings + language_alignment_warnings}, | |
| "descriptions":{"issues": description_issues, "warnings": description_warnings}, | |
| "meta": {"issues": meta_issues, "warnings": meta_warnings}, | |
| "credits": {"issues": credits_issues, "warnings": credits_warnings}, | |
| # "images": images_issues, | |
| "qualifiers warnings": qualifiers_warnings, | |
| "advisories": {"issues": advisories_issues, "warnings": advisories_warning}, | |
| "deeplinks": {"issues": deeplinks_issues, "warnings": deeplinks_warning}, | |
| "availabilites": {"issues": availabilites_issues, "warnings": availabilites_warning}, | |
| "sport": sport_issues | |
| } | |
| # === Print Report === | |
| for section, issues in results.items(): | |
| print(f"\n{section.upper()} RESULTS:") | |
| num_issues = len(issues.get("issues", [])) if isinstance(issues, dict) else 0 | |
| num_warnings = len(issues.get("warnings", [])) if isinstance(issues, dict) else 0 | |
| if num_issues ==0 and num_warnings == 0: | |
| print("No issues and no warnings found!") | |
| else: | |
| print(f"Number of issues: {num_issues}") | |
| print(f"Number of warnings: {num_warnings}") | |
| # # ===Save to JSON file=== # Keeping it in case we still want a JSON for debugging | |
| # output_path = os.path.join("outputs", "catalogue_report1.json") | |
| # os.makedirs("output", exist_ok=True) | |
| # with open(output_path, "w", encoding="utf-8") as f: | |
| # json.dump(results, f, indent=4, ensure_ascii=False) | |
| # === Build per-program issues and warnings for Excel === | |
| program_issues = {row: [] for row in df.index} | |
| program_warnings = {row: [] for row in df.index} | |
| def collect(results, target): | |
| # results --> [{"type": "Missing title list", "rows": [1, 5]}, {"type": "No release year", "rows": [2]}] | |
| for entry in results: | |
| for row_id in entry["rows"]: # access the values for rows --> [1, 5], [2] | |
| target[row_id].append(entry["type"]) # for each row_id appends the actual issue/warnging --> the value for "type" in the dictionaries inside result | |
| # Bundle all results into one iterable | |
| all_checks = [ # TO BE UPDATED WHEN WE ADD IMAGE ISSUES OR MODIFY THE STRUCTURE OF "RESULTS" !!!!!!!!! | |
| (title_issues, program_issues), | |
| (title_warnings + language_alignment_warnings, program_warnings), | |
| (description_issues, program_issues), | |
| (description_warnings, program_warnings), | |
| (meta_issues, program_issues), | |
| (meta_warnings, program_warnings), | |
| (credits_issues, program_issues), | |
| (credits_warnings, program_warnings), | |
| (qualifiers_warnings, program_warnings), | |
| (advisories_issues, program_issues), | |
| (advisories_warning, program_warnings), | |
| (deeplinks_issues, program_issues), | |
| (deeplinks_warning, program_warnings), | |
| (availabilites_issues, program_issues), | |
| (availabilites_warning, program_warnings), | |
| (sport_issues, program_issues) | |
| ] | |
| # Iterate throug all_checks and assigns (ex.) each issue/warning type to the corresponding row (program_issues/program_warnings) | |
| issue_summary = [] | |
| for results_group, target in all_checks: | |
| collect(results_group, target) | |
| problem_type = "Issue" if target == program_issues else "Warning" | |
| for entry in results_group: # add a second sheet with issues and warnings and the affected programs | |
| issue_summary.append({ | |
| "Issue/Warning": problem_type, | |
| "type": entry["type"], | |
| "count": len(entry["rows"]), | |
| "programs": entry["rows"] | |
| }) | |
| issue_summary_df = pd.DataFrame(issue_summary) | |
| # === Build final dataframe for Excel === | |
| df_export = df[["Kind", "Title"]].copy() | |
| df_export.reset_index(inplace=True) # brings ID back as a column | |
| # adding issues and warnings columns | |
| df_export["Issues"] = df_export["ID"].map(lambda i: "; ".join(program_issues[i]) if program_issues[i] else "") | |
| df_export["Warnings"] = df_export["ID"].map(lambda i: "; ".join(program_warnings[i]) if program_warnings[i] else "") | |
| # URL | |
| # df_export["URL"] = df_export["ID"].map(lambda program_id: f"https://streaming.simply.tv/catalogs/{catalog_id}/programs/{program_id}") | |
| if catalog_type == 'production': # In case we ever decide to do this for staging as well | |
| df_export["URL"] = df_export["ID"].map(lambda program_id: f"https://streaming.simply.tv/catalogs/{catalog_id}/programs/{program_id}") | |
| else: | |
| df_export["URL"] = df_export["ID"].map(lambda program_id: f"https://streaming.simply.wtf/catalogs/{catalog_id}/programs/{program_id}") | |
| # === Save to Excel === | |
| date = pd.Timestamp.now().strftime("%Y%m%d_%H%M") # append date and time to fiel name | |
| # Create output folder for this catalog | |
| output_folder = os.path.join("outputs", f"catalogue_{catalog_id}") | |
| os.makedirs(output_folder, exist_ok=True) | |
| output_excel = os.path.join(output_folder, f"catalogue_{catalog_id}_report_{date}.xlsx") | |
| with pd.ExcelWriter(output_excel, engine='openpyxl') as writer: | |
| df_export.to_excel(writer, index=False, sheet_name="Program Overview") | |
| issue_summary_df.to_excel(writer, index=False, sheet_name="Issues Summary") | |
| print(f"\nExcel report saved to {output_excel}") | |
| if __name__ == "__main__": | |
| main() | |