Spaces:
Running
Running
| import re | |
| from io import StringIO | |
| from math import floor | |
| import pandas as pd | |
| import streamlit as st | |
| DISPLAY_COLUMNS = ['测试时长(分)', '广告时长(分)', '影片播放时长(分)', '剩余激光时长(小时)'] | |
| REMAINING_COLUMN = '剩余激光时长(小时)' | |
| st.set_page_config(layout="wide") | |
| st.title('影片放映时间表统计') | |
| def build_table_styles(): | |
| return [ | |
| { | |
| 'selector': 'th.col_heading', | |
| 'props': [ | |
| ('background-color', '#4a4a4a'), | |
| ('color', 'white'), | |
| ('text-align', 'center'), | |
| ], | |
| }, | |
| { | |
| 'selector': 'th.row_heading', | |
| 'props': [('text-align', 'center')], | |
| }, | |
| { | |
| 'selector': 'td', | |
| 'props': [('text-align', 'center')], | |
| }, | |
| ] | |
| def clean_hall_name(name): | |
| if isinstance(name, str): | |
| match = re.search(r'(\d+)号', name) | |
| if match: | |
| return f"{match.group(1)}号厅" | |
| return name | |
| def get_hall_sort_key(name): | |
| numbers = re.findall(r'\d+', str(name)) | |
| if numbers: | |
| return 0, int(numbers[0]), str(name) | |
| return 1, 0, str(name) | |
| def normalize_text(value): | |
| return str(value or '').strip().replace('(', '(').replace(')', ')').replace(' ', '') | |
| def parse_number(value): | |
| text = str(value).strip().replace(',', '') | |
| if not text: | |
| return None | |
| try: | |
| return float(text) | |
| except ValueError: | |
| return None | |
| def format_number(value): | |
| if value == '': | |
| return '' | |
| number = parse_number(value) | |
| if number is None: | |
| return value | |
| if abs(number - round(number)) < 1e-9: | |
| return int(round(number)) | |
| return round(number, 2) | |
| def floor_display_hours(value): | |
| return int(floor(float(value) + 1e-9)) | |
| def is_metric_header(value): | |
| normalized = normalize_text(value) | |
| return any(keyword in normalized for keyword in ['测试时长', '广告时长', '影片播放时长', '剩余激光时长']) | |
| def format_table_for_display(table): | |
| display_table = table.copy().astype(object) | |
| for column in display_table.columns: | |
| display_table[column] = display_table[column].map(format_number) | |
| return display_table | |
| def render_table(table, style_df=None): | |
| styler = format_table_for_display(table).style.set_table_styles(build_table_styles()) | |
| if style_df is not None: | |
| styler = styler.apply(lambda _: style_df, axis=None) | |
| table_height = min(1200, (len(table) + 3) * 35) | |
| st.dataframe(styler, height=table_height, use_container_width=True) | |
| def build_excel_copy_text(table): | |
| export_df = format_table_for_display(table).reset_index(drop=True) | |
| return export_df.to_csv(sep='\t', index=False, header=False).rstrip('\n') | |
| def build_schedule_pivot_table(df, ad_duration, test_duration): | |
| df = df.copy() | |
| df['影片'] = df['影片'].astype(str) | |
| df['影厅'] = df['影厅'].apply(clean_hall_name) | |
| df['放映日期'] = pd.to_datetime(df['放映日期']) | |
| df['日期'] = df['放映日期'].dt.strftime('%m月%d日') | |
| df.dropna(subset=['影厅', '片长'], inplace=True) | |
| summary = df.groupby(['日期', '影厅']).agg( | |
| 影片数量=('影片', 'count'), | |
| 原始影片时长=('片长', 'sum'), | |
| ).reset_index() | |
| summary['测试时长(分)'] = test_duration | |
| summary['广告时长(分)'] = summary['影片数量'] * ad_duration | |
| summary['影片播放时长(分)'] = summary['原始影片时长'] | |
| pivot_table = summary.pivot_table( | |
| index='日期', | |
| columns='影厅', | |
| values=['测试时长(分)', '广告时长(分)', '影片播放时长(分)'], | |
| ).fillna(0).astype(int) | |
| if pivot_table.empty: | |
| return pivot_table | |
| pivot_table = pivot_table.swaplevel(0, 1, axis=1).sort_index(axis=1) | |
| halls = sorted(pivot_table.columns.get_level_values(0).unique(), key=get_hall_sort_key) | |
| new_columns = pd.MultiIndex.from_product([halls, DISPLAY_COLUMNS], names=['影厅', None]) | |
| pivot_table = pivot_table.reindex(columns=new_columns).fillna('') | |
| return pivot_table | |
| def detect_date_column(raw_df): | |
| for row_index in range(min(3, len(raw_df))): | |
| if '日期' in normalize_text(raw_df.iat[row_index, 0]): | |
| return True | |
| for row_index in range(len(raw_df)): | |
| first_value = str(raw_df.iat[row_index, 0]).strip() | |
| rest_values = [str(value).strip() for value in raw_df.iloc[row_index, 1:].tolist()] | |
| non_empty_rest = [value for value in rest_values if value] | |
| if first_value and parse_number(first_value) is None and len(non_empty_rest) >= 4: | |
| if all(parse_number(value) is not None for value in non_empty_rest): | |
| return True | |
| return False | |
| def find_first_data_row(raw_df, start_col): | |
| for row_index in range(len(raw_df)): | |
| row_values = [str(value).strip() for value in raw_df.iloc[row_index, start_col:].tolist()] | |
| non_empty_values = [value for value in row_values if value] | |
| if len(non_empty_values) >= 4 and all(parse_number(value) is not None for value in non_empty_values): | |
| return row_index | |
| return None | |
| def extract_hall_names(raw_df, first_data_row, start_col, group_count) -> list[str]: | |
| hall_names: list[str] = [] | |
| for group_index in range(group_count): | |
| hall_name = '' | |
| group_start_col = start_col + group_index * 4 | |
| for row_index in range(first_data_row): | |
| candidate = str(raw_df.iat[row_index, group_start_col]).strip() | |
| if candidate and not is_metric_header(candidate): | |
| hall_name = clean_hall_name(candidate) | |
| break | |
| if not hall_name: | |
| hall_name = f'{group_index + 1}号厅' | |
| hall_names.append(hall_name) | |
| return hall_names | |
| def parse_pasted_laser_text(pasted_text): | |
| raw_df = pd.read_csv(StringIO(pasted_text), sep='\t', header=None, dtype=str, keep_default_na=False) | |
| raw_df = raw_df.replace(r'^\s*$', pd.NA, regex=True).dropna(axis=0, how='all').dropna(axis=1, how='all') | |
| raw_df = raw_df.fillna('').reset_index(drop=True) | |
| if raw_df.empty: | |
| raise ValueError('未识别到任何数据。') | |
| has_date_column = detect_date_column(raw_df) | |
| start_col = 1 if has_date_column else 0 | |
| first_data_row = find_first_data_row(raw_df, start_col) | |
| if first_data_row is None: | |
| raise ValueError('未识别到可计算的数据行,请检查粘贴内容。') | |
| value_col_count = raw_df.shape[1] - start_col | |
| if value_col_count <= 0 or value_col_count % 4 != 0: | |
| raise ValueError(f'检测到 {value_col_count} 列有效数据,无法按每 4 列一个影厅进行解析。') | |
| group_count = value_col_count // 4 | |
| hall_names: list[str] = extract_hall_names(raw_df, first_data_row, start_col, group_count) | |
| parsed_rows: list[list[float]] = [] | |
| row_labels: list[str] = [] | |
| for row_index in range(first_data_row, len(raw_df)): | |
| label = str(raw_df.iat[row_index, 0]).strip() if has_date_column else f'第{len(row_labels) + 1}天' | |
| if not label: | |
| label = f'第{len(row_labels) + 1}天' | |
| numeric_row = [] | |
| row_values = [str(value).strip() for value in raw_df.iloc[row_index, start_col:].tolist()] | |
| non_empty_values = [value for value in row_values if value] | |
| if not non_empty_values: | |
| continue | |
| if not all(parse_number(value) is not None for value in non_empty_values): | |
| continue | |
| if len(row_values) != value_col_count: | |
| raise ValueError(f'第 {row_index + 1} 行列数不完整。') | |
| for col_index, raw_value in enumerate(row_values, start=1): | |
| number = parse_number(raw_value) | |
| if number is None: | |
| raise ValueError(f'第 {row_index + 1} 行第 {col_index + start_col} 列不是数字:{raw_value}') | |
| numeric_row.append(number) | |
| row_labels.append(label) | |
| parsed_rows.append(numeric_row) | |
| if not parsed_rows: | |
| raise ValueError('未识别到可计算的数据行,请检查粘贴内容。') | |
| columns = pd.MultiIndex.from_product([hall_names, DISPLAY_COLUMNS], names=['影厅', None]) | |
| table = pd.DataFrame(parsed_rows, index=pd.Index(row_labels), columns=columns) | |
| table.index.name = '日期' if has_date_column else '天次' | |
| return table | |
| def find_best_recharge_count(previous_remaining, usage_hours, actual_remaining, recharge_hours): | |
| theoretical_without_recharge = previous_remaining - usage_hours | |
| if recharge_hours <= 0: | |
| theoretical_display = floor_display_hours(theoretical_without_recharge) | |
| return 0, theoretical_display | |
| approx_count = max(0, int(round((actual_remaining - theoretical_without_recharge) / recharge_hours))) | |
| candidate_counts = range(max(0, approx_count - 2), approx_count + 3) | |
| best_count = 0 | |
| best_display = floor_display_hours(theoretical_without_recharge) | |
| best_gap = abs(actual_remaining - best_display) | |
| for recharge_count in candidate_counts: | |
| theoretical_display = floor_display_hours(theoretical_without_recharge + recharge_count * recharge_hours) | |
| gap = abs(actual_remaining - theoretical_display) | |
| if gap < best_gap or (gap == best_gap and recharge_count < best_count): | |
| best_count = recharge_count | |
| best_display = theoretical_display | |
| best_gap = gap | |
| return best_count, best_display | |
| def build_laser_check_style(table, recharge_hours): | |
| style_df = pd.DataFrame('', index=table.index, columns=table.columns) | |
| abnormal_count = 0 | |
| halls = list(dict.fromkeys(table.columns.get_level_values(0))) | |
| for hall_name in halls: | |
| previous_remaining = None | |
| for row_label in table.index: | |
| actual_remaining = float(table.loc[row_label, (hall_name, REMAINING_COLUMN)]) | |
| if previous_remaining is None: | |
| previous_remaining = actual_remaining | |
| continue | |
| ad_minutes = float(table.loc[row_label, (hall_name, '广告时长(分)')]) | |
| movie_minutes = float(table.loc[row_label, (hall_name, '影片播放时长(分)')]) | |
| usage_hours = (ad_minutes + movie_minutes) / 60 | |
| _, theoretical_display = find_best_recharge_count(previous_remaining, usage_hours, actual_remaining, recharge_hours) | |
| diff_hours = actual_remaining - theoretical_display | |
| if abs(diff_hours) > 1: | |
| abnormal_count += 1 | |
| text_color = '#d32f2f' if diff_hours < 0 else '#2e7d32' | |
| style_df.loc[row_label, (hall_name, REMAINING_COLUMN)] = ( | |
| f'background-color: #fff3b0; color: {text_color}; font-weight: 700' | |
| ) | |
| previous_remaining = actual_remaining | |
| return style_df, abnormal_count | |
| def render_schedule_summary(): | |
| st.subheader('上传文件生成统计') | |
| uploaded_file = st.file_uploader('上传“影片放映时间表.xlsx”文件', type=['xlsx'], key='laser_upload_file') | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| ad_duration = st.number_input('输入每个广告的时长(分钟)', min_value=0, value=5, key='laser_ad_duration') | |
| with col2: | |
| test_duration = st.number_input('输入测试时长(分)', min_value=0, value=5, key='laser_test_duration') | |
| if uploaded_file is None: | |
| return | |
| try: | |
| df = pd.read_excel(uploaded_file, header=3) | |
| st.subheader('上传的原始数据') | |
| st.dataframe(df, use_container_width=True) | |
| pivot_table = build_schedule_pivot_table(df, ad_duration, test_duration) | |
| if pivot_table.empty: | |
| st.warning('没有可用于生成统计信息的数据。') | |
| return | |
| st.subheader('影厅播放统计') | |
| render_table(pivot_table) | |
| st.subheader('复制到 Excel') | |
| st.caption('以下内容从“测试时长(分)”第一个数据开始,到最后一个数据结束;不包含日期和表头。') | |
| st.code(build_excel_copy_text(pivot_table), language='text') | |
| except Exception as e: | |
| st.error(f'处理文件时出错: {e}') | |
| def render_laser_checker(): | |
| st.divider() | |
| st.subheader('粘贴文本校验剩余激光时长') | |
| st.caption('支持直接粘贴 Excel 文本。黄色底色表示差异绝对值超过 1 小时;红字表示实际值低于正常值,绿字表示实际值高于正常值。') | |
| recharge_hours = st.number_input('激光默认充值小时数', min_value=0, value=1000, step=100, key='laser_recharge_hours') | |
| pasted_text = st.text_area( | |
| '粘贴需要校验的文本', | |
| height=260, | |
| placeholder='日期\t1号厅\t\t\t\n\t测试时长(分)\t广告时长(分)\t影片播放时长(分)\t剩余激光时长(小时)\n3月1日\t5\t30\t750\t665\n3月2日\t5\t25\t625\t654', | |
| key='laser_pasted_text', | |
| ) | |
| if not pasted_text.strip(): | |
| return | |
| try: | |
| pasted_table = parse_pasted_laser_text(pasted_text) | |
| style_df, abnormal_count = build_laser_check_style(pasted_table, float(recharge_hours)) | |
| except Exception as e: | |
| st.error(f'校验失败:{e}') | |
| return | |
| hall_count = int(pasted_table.columns.get_level_values(0).nunique()) | |
| if abnormal_count > 0: | |
| st.warning(f'共识别 {hall_count} 个影厅,发现 {abnormal_count} 个异常检查点。') | |
| else: | |
| st.success(f'共识别 {hall_count} 个影厅,所有检查点都在 1 小时以内。') | |
| render_table(pasted_table, style_df=style_df) | |
| render_schedule_summary() | |
| render_laser_checker() | |