hengdian / pages /laser-count.py
Ethscriptions's picture
Upload laser-count.py
350f82d verified
import re
from io import StringIO
from math import floor
import pandas as pd
import streamlit as st
DISPLAY_COLUMNS = ['测试时长(分)', '广告时长(分)', '影片播放时长(分)', '剩余激光时长(小时)']
REMAINING_COLUMN = '剩余激光时长(小时)'
st.set_page_config(layout="wide")
st.title('影片放映时间表统计')
def build_table_styles():
return [
{
'selector': 'th.col_heading',
'props': [
('background-color', '#4a4a4a'),
('color', 'white'),
('text-align', 'center'),
],
},
{
'selector': 'th.row_heading',
'props': [('text-align', 'center')],
},
{
'selector': 'td',
'props': [('text-align', 'center')],
},
]
def clean_hall_name(name):
if isinstance(name, str):
match = re.search(r'(\d+)号', name)
if match:
return f"{match.group(1)}号厅"
return name
def get_hall_sort_key(name):
numbers = re.findall(r'\d+', str(name))
if numbers:
return 0, int(numbers[0]), str(name)
return 1, 0, str(name)
def normalize_text(value):
return str(value or '').strip().replace('(', '(').replace(')', ')').replace(' ', '')
def parse_number(value):
text = str(value).strip().replace(',', '')
if not text:
return None
try:
return float(text)
except ValueError:
return None
def format_number(value):
if value == '':
return ''
number = parse_number(value)
if number is None:
return value
if abs(number - round(number)) < 1e-9:
return int(round(number))
return round(number, 2)
def floor_display_hours(value):
return int(floor(float(value) + 1e-9))
def is_metric_header(value):
normalized = normalize_text(value)
return any(keyword in normalized for keyword in ['测试时长', '广告时长', '影片播放时长', '剩余激光时长'])
def format_table_for_display(table):
display_table = table.copy().astype(object)
for column in display_table.columns:
display_table[column] = display_table[column].map(format_number)
return display_table
def render_table(table, style_df=None):
styler = format_table_for_display(table).style.set_table_styles(build_table_styles())
if style_df is not None:
styler = styler.apply(lambda _: style_df, axis=None)
table_height = min(1200, (len(table) + 3) * 35)
st.dataframe(styler, height=table_height, use_container_width=True)
def build_excel_copy_text(table):
export_df = format_table_for_display(table).reset_index(drop=True)
return export_df.to_csv(sep='\t', index=False, header=False).rstrip('\n')
def build_schedule_pivot_table(df, ad_duration, test_duration):
df = df.copy()
df['影片'] = df['影片'].astype(str)
df['影厅'] = df['影厅'].apply(clean_hall_name)
df['放映日期'] = pd.to_datetime(df['放映日期'])
df['日期'] = df['放映日期'].dt.strftime('%m月%d日')
df.dropna(subset=['影厅', '片长'], inplace=True)
summary = df.groupby(['日期', '影厅']).agg(
影片数量=('影片', 'count'),
原始影片时长=('片长', 'sum'),
).reset_index()
summary['测试时长(分)'] = test_duration
summary['广告时长(分)'] = summary['影片数量'] * ad_duration
summary['影片播放时长(分)'] = summary['原始影片时长']
pivot_table = summary.pivot_table(
index='日期',
columns='影厅',
values=['测试时长(分)', '广告时长(分)', '影片播放时长(分)'],
).fillna(0).astype(int)
if pivot_table.empty:
return pivot_table
pivot_table = pivot_table.swaplevel(0, 1, axis=1).sort_index(axis=1)
halls = sorted(pivot_table.columns.get_level_values(0).unique(), key=get_hall_sort_key)
new_columns = pd.MultiIndex.from_product([halls, DISPLAY_COLUMNS], names=['影厅', None])
pivot_table = pivot_table.reindex(columns=new_columns).fillna('')
return pivot_table
def detect_date_column(raw_df):
for row_index in range(min(3, len(raw_df))):
if '日期' in normalize_text(raw_df.iat[row_index, 0]):
return True
for row_index in range(len(raw_df)):
first_value = str(raw_df.iat[row_index, 0]).strip()
rest_values = [str(value).strip() for value in raw_df.iloc[row_index, 1:].tolist()]
non_empty_rest = [value for value in rest_values if value]
if first_value and parse_number(first_value) is None and len(non_empty_rest) >= 4:
if all(parse_number(value) is not None for value in non_empty_rest):
return True
return False
def find_first_data_row(raw_df, start_col):
for row_index in range(len(raw_df)):
row_values = [str(value).strip() for value in raw_df.iloc[row_index, start_col:].tolist()]
non_empty_values = [value for value in row_values if value]
if len(non_empty_values) >= 4 and all(parse_number(value) is not None for value in non_empty_values):
return row_index
return None
def extract_hall_names(raw_df, first_data_row, start_col, group_count) -> list[str]:
hall_names: list[str] = []
for group_index in range(group_count):
hall_name = ''
group_start_col = start_col + group_index * 4
for row_index in range(first_data_row):
candidate = str(raw_df.iat[row_index, group_start_col]).strip()
if candidate and not is_metric_header(candidate):
hall_name = clean_hall_name(candidate)
break
if not hall_name:
hall_name = f'{group_index + 1}号厅'
hall_names.append(hall_name)
return hall_names
def parse_pasted_laser_text(pasted_text):
raw_df = pd.read_csv(StringIO(pasted_text), sep='\t', header=None, dtype=str, keep_default_na=False)
raw_df = raw_df.replace(r'^\s*$', pd.NA, regex=True).dropna(axis=0, how='all').dropna(axis=1, how='all')
raw_df = raw_df.fillna('').reset_index(drop=True)
if raw_df.empty:
raise ValueError('未识别到任何数据。')
has_date_column = detect_date_column(raw_df)
start_col = 1 if has_date_column else 0
first_data_row = find_first_data_row(raw_df, start_col)
if first_data_row is None:
raise ValueError('未识别到可计算的数据行,请检查粘贴内容。')
value_col_count = raw_df.shape[1] - start_col
if value_col_count <= 0 or value_col_count % 4 != 0:
raise ValueError(f'检测到 {value_col_count} 列有效数据,无法按每 4 列一个影厅进行解析。')
group_count = value_col_count // 4
hall_names: list[str] = extract_hall_names(raw_df, first_data_row, start_col, group_count)
parsed_rows: list[list[float]] = []
row_labels: list[str] = []
for row_index in range(first_data_row, len(raw_df)):
label = str(raw_df.iat[row_index, 0]).strip() if has_date_column else f'第{len(row_labels) + 1}天'
if not label:
label = f'第{len(row_labels) + 1}天'
numeric_row = []
row_values = [str(value).strip() for value in raw_df.iloc[row_index, start_col:].tolist()]
non_empty_values = [value for value in row_values if value]
if not non_empty_values:
continue
if not all(parse_number(value) is not None for value in non_empty_values):
continue
if len(row_values) != value_col_count:
raise ValueError(f'第 {row_index + 1} 行列数不完整。')
for col_index, raw_value in enumerate(row_values, start=1):
number = parse_number(raw_value)
if number is None:
raise ValueError(f'第 {row_index + 1} 行第 {col_index + start_col} 列不是数字:{raw_value}')
numeric_row.append(number)
row_labels.append(label)
parsed_rows.append(numeric_row)
if not parsed_rows:
raise ValueError('未识别到可计算的数据行,请检查粘贴内容。')
columns = pd.MultiIndex.from_product([hall_names, DISPLAY_COLUMNS], names=['影厅', None])
table = pd.DataFrame(parsed_rows, index=pd.Index(row_labels), columns=columns)
table.index.name = '日期' if has_date_column else '天次'
return table
def find_best_recharge_count(previous_remaining, usage_hours, actual_remaining, recharge_hours):
theoretical_without_recharge = previous_remaining - usage_hours
if recharge_hours <= 0:
theoretical_display = floor_display_hours(theoretical_without_recharge)
return 0, theoretical_display
approx_count = max(0, int(round((actual_remaining - theoretical_without_recharge) / recharge_hours)))
candidate_counts = range(max(0, approx_count - 2), approx_count + 3)
best_count = 0
best_display = floor_display_hours(theoretical_without_recharge)
best_gap = abs(actual_remaining - best_display)
for recharge_count in candidate_counts:
theoretical_display = floor_display_hours(theoretical_without_recharge + recharge_count * recharge_hours)
gap = abs(actual_remaining - theoretical_display)
if gap < best_gap or (gap == best_gap and recharge_count < best_count):
best_count = recharge_count
best_display = theoretical_display
best_gap = gap
return best_count, best_display
def build_laser_check_style(table, recharge_hours):
style_df = pd.DataFrame('', index=table.index, columns=table.columns)
abnormal_count = 0
halls = list(dict.fromkeys(table.columns.get_level_values(0)))
for hall_name in halls:
previous_remaining = None
for row_label in table.index:
actual_remaining = float(table.loc[row_label, (hall_name, REMAINING_COLUMN)])
if previous_remaining is None:
previous_remaining = actual_remaining
continue
ad_minutes = float(table.loc[row_label, (hall_name, '广告时长(分)')])
movie_minutes = float(table.loc[row_label, (hall_name, '影片播放时长(分)')])
usage_hours = (ad_minutes + movie_minutes) / 60
_, theoretical_display = find_best_recharge_count(previous_remaining, usage_hours, actual_remaining, recharge_hours)
diff_hours = actual_remaining - theoretical_display
if abs(diff_hours) > 1:
abnormal_count += 1
text_color = '#d32f2f' if diff_hours < 0 else '#2e7d32'
style_df.loc[row_label, (hall_name, REMAINING_COLUMN)] = (
f'background-color: #fff3b0; color: {text_color}; font-weight: 700'
)
previous_remaining = actual_remaining
return style_df, abnormal_count
def render_schedule_summary():
st.subheader('上传文件生成统计')
uploaded_file = st.file_uploader('上传“影片放映时间表.xlsx”文件', type=['xlsx'], key='laser_upload_file')
col1, col2 = st.columns(2)
with col1:
ad_duration = st.number_input('输入每个广告的时长(分钟)', min_value=0, value=5, key='laser_ad_duration')
with col2:
test_duration = st.number_input('输入测试时长(分)', min_value=0, value=5, key='laser_test_duration')
if uploaded_file is None:
return
try:
df = pd.read_excel(uploaded_file, header=3)
st.subheader('上传的原始数据')
st.dataframe(df, use_container_width=True)
pivot_table = build_schedule_pivot_table(df, ad_duration, test_duration)
if pivot_table.empty:
st.warning('没有可用于生成统计信息的数据。')
return
st.subheader('影厅播放统计')
render_table(pivot_table)
st.subheader('复制到 Excel')
st.caption('以下内容从“测试时长(分)”第一个数据开始,到最后一个数据结束;不包含日期和表头。')
st.code(build_excel_copy_text(pivot_table), language='text')
except Exception as e:
st.error(f'处理文件时出错: {e}')
def render_laser_checker():
st.divider()
st.subheader('粘贴文本校验剩余激光时长')
st.caption('支持直接粘贴 Excel 文本。黄色底色表示差异绝对值超过 1 小时;红字表示实际值低于正常值,绿字表示实际值高于正常值。')
recharge_hours = st.number_input('激光默认充值小时数', min_value=0, value=1000, step=100, key='laser_recharge_hours')
pasted_text = st.text_area(
'粘贴需要校验的文本',
height=260,
placeholder='日期\t1号厅\t\t\t\n\t测试时长(分)\t广告时长(分)\t影片播放时长(分)\t剩余激光时长(小时)\n3月1日\t5\t30\t750\t665\n3月2日\t5\t25\t625\t654',
key='laser_pasted_text',
)
if not pasted_text.strip():
return
try:
pasted_table = parse_pasted_laser_text(pasted_text)
style_df, abnormal_count = build_laser_check_style(pasted_table, float(recharge_hours))
except Exception as e:
st.error(f'校验失败:{e}')
return
hall_count = int(pasted_table.columns.get_level_values(0).nunique())
if abnormal_count > 0:
st.warning(f'共识别 {hall_count} 个影厅,发现 {abnormal_count} 个异常检查点。')
else:
st.success(f'共识别 {hall_count} 个影厅,所有检查点都在 1 小时以内。')
render_table(pasted_table, style_df=style_df)
render_schedule_summary()
render_laser_checker()