hengdian / pages /tms_content.py
Ethscriptions's picture
Upload 5 files
caf3a60 verified
import streamlit as st
import pandas as pd
import requests
import time
import os
import re
import urllib3
from collections import defaultdict
from dotenv import load_dotenv
# --- 基础配置 ---
st.set_page_config(page_title="TMS 影片查询", page_icon="🎬", layout="wide")
# 屏蔽 HTTPS 证书警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 加载环境变量
load_dotenv()
# --- 工具函数 ---
def get_circled_number(hall_name):
"""
将影厅数字转换为带圈数字,例如 1 -> ①
"""
mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'}
# 提取字符串中的数字
num_str = ''.join(filter(str.isdigit, str(hall_name)))
return mapping.get(num_str, num_str)
def format_play_time(time_str):
"""
格式化时长字符串,例如 "01:30" -> 90
"""
if not time_str or not isinstance(time_str, str): return None
try:
parts = time_str.split(':')
hours = int(parts[0])
minutes = int(parts[1])
return hours * 60 + minutes
except (ValueError, IndexError):
return None
def format_content_name_with_explanation(content_name):
raw = str(content_name or '').strip()
if not raw:
return ''
lang_map = {
'CMN': '国语/普通话', 'YUE': '粤语', 'EN': '英语', 'JP': '日语/或简化命名中的加密标记',
'KO': '韩语', 'FR': '法语', 'ES': '西班牙语', 'TH': '泰语', 'HI': '印地语', 'RU': '俄语',
'PTH': '普通话', 'GDH': '广东话', 'YS': '原声', 'YZ': '译制', 'SCH': '四川话',
'NAN': '闽南语', 'WU': '吴语/上海话', 'XX': '无字幕', 'QMS': '简中字幕',
'QMT': '繁中字幕', 'CCAP': '听障字幕'
}
audio_map = {'20': '2.0', '51': '5.1', '71': '7.1', 'ATMOS': 'Dolby Atmos', 'DTSX': 'DTS:X'}
type_map = {'FTR': '正片', 'TLR': '预告片', 'TSR': '先导预告'}
pack_map = {'OV': '原始版本包', 'VF': '版本增量包'}
notes = []
parts = raw.split('_')
first_tokens = parts[0].split('-') if parts else []
if first_tokens:
notes.append(f"[片名/标识:{first_tokens[0]}]")
for token in first_tokens[1:]:
up = token.upper()
if up in type_map:
notes.append(f"[内容类型:{type_map[up]}({token})]")
elif up in {'2D', '3D'}:
notes.append(f"[制式:{up}]")
elif up in {'4FL', '24FPS', '48FPS', '60FPS', '120FPS'}:
notes.append(f"[技术参数:{token}]")
elif re.fullmatch(r'\d+', up):
notes.append(f"[版本号:{token}]")
else:
notes.append(f"[{token}]")
for token in parts[1:]:
up = token.upper()
if '-' in up:
a, b = up.split('-', 1)
if a in lang_map and b in lang_map:
notes.append(f"[音频:{lang_map[a]}({a})]")
notes.append(f"[字幕:{lang_map[b]}({b})]")
continue
if up in {'F', 'S', 'C', 'F-178', 'C-19', '235', '185'}:
notes.append(f"[画幅:{token}]")
elif re.fullmatch(r'\d{2,3}M', up):
notes.append(f"[时长:{token}]")
elif up in audio_map:
notes.append(f"[音效:{audio_map[up]}({token})]")
elif up in {'2K', '4K'}:
notes.append(f"[分辨率:{up}]")
elif up in {'SMPTE', 'IOP'}:
notes.append(f"[封装标准:{up}]")
elif re.fullmatch(r'\d{8}', up):
notes.append(f"[打包日期:{token}]")
elif re.fullmatch(r'\d{4}', up):
notes.append(f"[月日批次:{token}]")
elif up in pack_map:
notes.append(f"[包类型:{pack_map[up]}({up})]")
elif up in lang_map:
notes.append(f"[语言/标记:{lang_map[up]}({up})]")
elif up.startswith('CN'):
notes.append(f"[地区/分级:{token}]")
else:
notes.append(f"[{token}]")
return f"{raw} / {' '.join(notes)}"
def clean_movie_title(raw_title, canonical_names=None):
"""
电影名称标准化清洗函数
"""
if not isinstance(raw_title, str):
return raw_title
base_name = None
# 1. 尝试匹配标准名称
if canonical_names:
# 按长度倒序排序,确保最长匹配优先
sorted_names = sorted(canonical_names, key=len, reverse=True)
for name in sorted_names:
if name in raw_title:
base_name = name
break
# 2. 回退逻辑:如果没传列表或没匹配到,使用空格分割
if not base_name:
base_name = raw_title.split(' ', 1)[0]
# 3. 后缀追加逻辑
raw_upper = raw_title.upper()
suffix = ""
if "HDR LED" in raw_upper:
suffix = "(HDR LED)"
elif "CINITY" in raw_upper:
suffix = "(CINITY)"
elif "杜比" in raw_upper or "DOLBY" in raw_upper:
suffix = "(杜比视界)"
elif "IMAX" in raw_upper:
if "3D" in raw_upper:
suffix = "(数字IMAX3D)"
else:
suffix = "(数字IMAX)"
elif "巨幕" in raw_upper:
if "立体" in raw_upper:
suffix = "(中国巨幕立体)"
else:
suffix = "(中国巨幕)"
elif "3D" in raw_upper:
suffix = "(数字3D)"
# 只有当 base_name 自身不包含该后缀时才添加
if suffix and suffix not in base_name:
return f"{base_name}{suffix}"
return base_name
# --- 核心功能模块 ---
@st.cache_data(show_spinner=False, ttl=600)
def fetch_and_process_server_movies(priority_movie_titles=None):
if priority_movie_titles is None: priority_movie_titles = []
# 获取环境变量
app_secret = os.getenv("TMS_APP_SECRET")
ticket = os.getenv("TMS_TICKET")
theater_id_str = os.getenv("TMS_THEATER_ID")
x_session_id = os.getenv("TMS_X_SESSION_ID")
# 转换 ID 为整数
try:
theater_id = int(theater_id_str) if theater_id_str else 0
except ValueError:
st.error("环境变量 TMS_THEATER_ID 格式错误,应为数字。")
return {}, []
token_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6',
'Content-Type': 'application/json',
'Cookie': f'JSESSIONID={x_session_id}',
'DNT': '1',
'Origin': 'https://tms.hengdianfilm.com',
'Priority': 'u=0, i',
'Referer': f'https://tms.hengdianfilm.com/hd/oalogin?ticket={ticket}',
'Sec-CH-UA': '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
'Sec-CH-UA-Mobile': '?0',
'Sec-CH-UA-Platform': '"macOS"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
}
# 使用变量
token_json_data = {'appId': 'hd', 'appSecret': app_secret, 'timeStamp': int(time.time() * 1000)}
# 动态构建 URL
token_url = f'https://tms.hengdianfilm.com/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={ticket}'
try:
response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10)
response.raise_for_status()
token_data = response.json()
if token_data.get('error_code') != '0000':
raise Exception(f"获取Token失败: {token_data.get('error_desc')}")
auth_token = token_data['param']
except Exception as e:
st.error(f"连接 TMS 认证服务失败: {e}")
return {}, []
all_movies, page_index = [], 1
while True:
list_headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6',
'Content-Type': 'application/json; charset=UTF-8',
'Cookie': f'JSESSIONID={x_session_id}',
'DNT': '1',
'Origin': 'https://tms.hengdianfilm.com',
'Priority': 'u=1, i',
'Referer': f'https://tms.hengdianfilm.com/hd/index?ContentMovie&THEATER_ID={theater_id}&SOURCE=SERVER&ASSERT_TYPE=2&PAGE_CAPACITY=20&PAGE_INDEX=1',
'Sec-CH-UA': '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
'Sec-CH-UA-Mobile': '?0',
'Sec-CH-UA-Platform': '"macOS"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Token': auth_token,
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
'X-SESSIONID': x_session_id,
}
list_params = {'token': 'hd', 'murl': 'ContentMovie'}
list_json_data = {'THEATER_ID': theater_id, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20,
'PAGE_INDEX': page_index}
list_url = 'https://tms.hengdianfilm.com/cinema-api/cinema/server/dcp/list'
try:
response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False, timeout=15)
response.raise_for_status()
movie_data = response.json()
if movie_data.get("RSPCD") != "000000":
raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}")
body = movie_data.get("BODY", {})
movies_on_page = body.get("LIST", [])
if not movies_on_page: break
all_movies.extend(movies_on_page)
if len(all_movies) >= body.get("COUNT", 0): break
page_index += 1
time.sleep(0.5)
except Exception as e:
st.error(f"获取影片列表页 {page_index} 失败: {e}")
break
# 处理数据
movie_details = {m.get('CONTENT_NAME'): {'assert_name': m.get('ASSERT_NAME'),
'halls': sorted([h.get('HALL_NAME') for h in m.get('HALL_INFO', [])]),
'play_time': m.get('PLAY_TIME')} for m in all_movies if
m.get('CONTENT_NAME')}
by_hall = defaultdict(list)
for content_name, details in movie_details.items():
for hall_name in details['halls']:
by_hall[hall_name].append({'content_name': content_name, 'details': details})
for hall_name in by_hall:
by_hall[hall_name].sort(
key=lambda item: (item['details']['assert_name'] is None or item['details']['assert_name'] == '',
item['details']['assert_name'] or item['content_name']))
view2_list = [{'assert_name': d['assert_name'], 'content_name': c, 'halls': d['halls'], 'play_time': d['play_time']}
for c, d in movie_details.items() if d.get('assert_name')]
priority_list = [item for item in view2_list if
any(p_title in item['assert_name'] for p_title in priority_movie_titles)]
other_list_items = [item for item in view2_list if item not in priority_list]
priority_list.sort(key=lambda x: x['assert_name'])
other_list_items.sort(key=lambda x: x['assert_name'])
final_sorted_list = priority_list + other_list_items
return dict(sorted(by_hall.items())), final_sorted_list
# --- 主界面 ---
def main():
st.title("🎬 TMS 服务器影片内容查询")
st.info("查询 TMS 服务器上的 DCP 内容及分布情况。")
# 尝试从 Session State 获取优先显示的影片(如果在主页加载了排片)
priority_titles = []
if 'api_df' in st.session_state and not st.session_state.api_df.empty:
df = st.session_state.api_df
if '影片名称_清理后' in df.columns:
priority_titles = df['影片名称_清理后'].unique().tolist()
elif '影片名称' in df.columns:
priority_titles = df['影片名称'].apply(lambda x: clean_movie_title(x)).unique().tolist()
# 也可以检查 file_df
elif 'file_df' in st.session_state and not st.session_state.file_df.empty:
df = st.session_state.file_df
if '影片名称_清理后' in df.columns:
priority_titles = df['影片名称_清理后'].unique().tolist()
elif '影片名称' in df.columns:
priority_titles = df['影片名称'].apply(lambda x: clean_movie_title(x)).unique().tolist()
if st.button('点击查询 TMS 服务器', key="query_tms", type="primary", icon="🔍"):
with st.spinner("正在从 TMS 服务器获取数据中..."):
try:
halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles)
if not movie_list_sorted:
st.warning("未获取到任何影片数据,请检查 TMS 连接配置。")
else:
st.success("TMS 服务器数据获取成功!")
# 1. 按影片查看
st.markdown("### 🎥 按影片查看所在影厅")
view2_data = [{'影片名称': item['assert_name'],
'所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])),
'时长(分钟)': format_play_time(item['play_time']),
'文件名': format_content_name_with_explanation(item['content_name'])}
for item in movie_list_sorted]
st.dataframe(pd.DataFrame(view2_data), hide_index=True, use_container_width=True)
st.divider()
# 2. 按影厅查看
st.markdown("### 🏢 按影厅查看影片内容")
if halls_data:
hall_tabs = st.tabs(list(halls_data.keys()))
for tab, hall_name in zip(hall_tabs, halls_data.keys()):
with tab:
view1_data = [{'影片名称': item['details']['assert_name'],
'所在影厅': " ".join(sorted([get_circled_number(h) for h in item['details']['halls']])),
'时长(分钟)': format_play_time(item['details']['play_time']),
'文件名': format_content_name_with_explanation(item['content_name'])} for item in
halls_data[hall_name]]
st.dataframe(pd.DataFrame(view1_data), hide_index=True, use_container_width=True)
else:
st.info("暂无影厅数据。")
except Exception as e:
st.error(f"查询 TMS 服务器时出错: {e}")
if __name__ == "__main__":
main()