import os import sys import subprocess import json import time import re import zipfile import datetime from concurrent.futures import ThreadPoolExecutor # 1. 自动安装依赖 def ensure_dependencies(): required_packages = ["gradio", "requests", "opencc-python-reimplemented"] try: import gradio import requests import opencc except ImportError: print("正在安装依赖,请稍等片刻...") subprocess.check_call([sys.executable, "-m", "pip", "install"] + required_packages) ensure_dependencies() import gradio as gr import requests import opencc # ================= 您的专属配置 ================= DEFAULT_API_KEY = "sk-DZ5g7Zu0lFDlR7mBkbNsZLFTt1KBqA8ocsAH1mcvsZDWtydx" MERCHANT_BASE_URL = "https://xingjiabiapi.com" # 模型配置 TEXT_MODEL = "gemini-3-pro-preview-thinking" MAX_WORKERS = 20 # 并发数 # =============================================== # 初始化繁体转换器 (s2twp: 简体到台湾正体,包含惯用词转换) cc = opencc.OpenCC('s2twp') # --- 角色1:主编 (脑暴) --- EDITOR_SYSTEM_PROMPT = """ 你是一位**“今日头条/百家号”的情感版块金牌主编**,深谙**“人性弱点”**与**“流量密码”**。 你的任务是根据关键词,炮制出**10个**让人点击欲爆棚的**情感故事标题**。 **选题核心策略 (必须包含强烈的伦理冲突或窥探欲)**: 1. **家庭伦理**:婆媳大战、姑嫂矛盾、凤凰男、扶弟魔、上门女婿。 2. **婚外情愫**:闺蜜背叛、同学会旧情、邻居的秘密、出差时的意外。 3. **私密空间**:试衣间、深夜办公室、合租房的隔音、私家车后的秘密。 4. **标题风格**:要像真实的倾诉,带有悬念和反转。 **严格输出格式**:只输出10行文字,每行一个标题,不要带序号,不要带引号。 """ # --- 角色2:作家 (写作) --- WRITER_SYSTEM_PROMPT = """ 你是一位**“百万阅读量”的情感专栏作家**,擅长用最真实的笔触,描写成年人世界里**“隐秘的渴望”**与**“失控的边缘”**。 你的文章风格要像**“深夜电台倾诉”**,让读者读完感到**脸红心跳、头皮发麻**,但又挑不出任何违规的毛病。 **写作铁律 (严格遵守)**: 1. **极致的拉扯感**:不要直接写发生了什么,要写**“将发未发”**时的紧张。 2. **合法的刺激**: * **严禁**描写性器官和具体的性行为过程。 * **必须**用隐晦的暗喻:例如“大脑一片空白”、“身体像过电一样”。 * 重点描写**心理的背德感**和**偷尝禁果的刺激感**。 3. **真实感沉浸**:用第一人称“我”来叙述。多用短句。 4. **格式清洗**:**全文除了逗号和句号,绝对不要出现任何其他符号(如引号、感叹号、括号等),也不要出现空格**。 任务:基于用户给定的“主题”,创作一篇让人欲罢不能、肾上腺素飙升的情感实录。 """ def clean_content_strict(text): """ 核心清洗逻辑:清洗符号 + 转换为台湾繁体 """ if not text: return "" # 1. 去除空格 text = text.replace(" ", "").replace("\t", "") # 2. 保留白名单字符 pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9,,.。\n]' cleaned_text = re.sub(pattern, '', text) # 3. 处理连续空行 cleaned_text = re.sub(r'\n+', '\n', cleaned_text) # 4. 繁体转换 (关键步骤) cleaned_text = cc.convert(cleaned_text.strip()) return cleaned_text def format_to_srt(text): """ 将纯文本转换为 SRT 字幕格式 """ parts = re.split(r'([,,.。\n])', text) sentences = [] current_sentence = "" for part in parts: if re.match(r'[,,.。\n]', part): if current_sentence: sentences.append(current_sentence + part) current_sentence = "" else: current_sentence += part if current_sentence: sentences.append(current_sentence) sentences = [s.strip() for s in sentences if s.strip()] srt_content = "" start_time = datetime.timedelta(seconds=0) for index, sentence in enumerate(sentences): # 估算时长:假设每秒读 4 个字,最少 1.5 秒 duration_seconds = max(1.5, len(sentence) / 4.0) end_time = start_time + datetime.timedelta(seconds=duration_seconds) def format_timestamp(td): total_seconds = int(td.total_seconds()) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 milliseconds = int(td.microseconds / 1000) return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}" srt_content += f"{index + 1}\n" srt_content += f"{format_timestamp(start_time)} --> {format_timestamp(end_time)}\n" srt_content += f"{sentence}\n\n" start_time = end_time return srt_content def stream_chat_request(api_key, url, headers, data): """流式请求""" try: data['stream'] = True response = requests.post(url, headers=headers, json=data, stream=True, timeout=120) if response.status_code != 200: return None, f"HTTP错误 {response.status_code}: {response.text[:100]}" full_content = "" for line in response.iter_lines(): if not line: continue decoded_line = line.decode('utf-8').strip() if decoded_line.startswith("data: "): decoded_line = decoded_line[6:] if decoded_line == "[DONE]": break try: chunk = json.loads(decoded_line) delta = chunk['choices'][0].get('delta', {}) content_piece = delta.get('content', '') if content_piece: full_content += content_piece except: continue return full_content, "success" except Exception as e: return None, f"流式请求异常: {str(e)}" def generate_themes_from_keywords(api_key, keywords): """Step 1: 脑暴选题""" if not keywords: return [gr.update()] * 10 if not api_key: return [gr.update(placeholder="请先输入API Key")] * 10 print(f"🧠 主编正在构思选题: {keywords}...") url = f"{MERCHANT_BASE_URL}/v1/chat/completions" headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key.strip()}"} data = { "model": TEXT_MODEL, "messages": [{"role": "system", "content": EDITOR_SYSTEM_PROMPT}, {"role": "user", "content": f"关键词:{keywords}\n请生成10个爆款标题:"}], "temperature": 0.95, "stream": False } try: resp = requests.post(url, headers=headers, json=data, timeout=120) if resp.status_code != 200: return [f"API Error: {resp.status_code}"] * 10 content = resp.json()['choices'][0]['message']['content'] themes = [line.strip() for line in content.split('\n') if line.strip()] themes = [re.sub(r'^\d+[\.,、]\s*', '', t) for t in themes] # 标题转繁体 themes = [cc.convert(t) for t in themes] if len(themes) < 10: themes += [""] * (10 - len(themes)) return themes[:10] except Exception as e: return [f"错误: {e}"] * 10 def generate_story_task(api_key, topic, index, sub_index, system_prompt, word_count): """Step 3: 单篇写作""" if not topic or not topic.strip(): return None log_prefix = f"[主题{index}-{sub_index}]" url = f"{MERCHANT_BASE_URL}/v1/chat/completions" headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key.strip()}"} variations = ["第一人称沉浸式", "侧重心理极度拉扯", "侧重伦理禁忌感", "侧重反转与悔恨", "侧重深夜私密氛围"] style_guide = variations[(sub_index - 1) % len(variations)] data = { "model": TEXT_MODEL, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"标题:{topic}\n要求字数:{word_count}字以上\n重点技法:{style_guide}\n请开始讲述这个故事,记住只能用逗号和句号:"} ], "temperature": 1.0, "max_tokens": 65536 } for attempt in range(3): print(f"{log_prefix} 第{attempt+1}次尝试撰写...") content, msg = stream_chat_request(api_key, url, headers, data) if msg == "success" and content: # 清洗 + 转繁体 cleaned_content = clean_content_strict(content) # 生成 SRT srt_content = format_to_srt(cleaned_content) return { "status": "success", "title": cc.convert(topic), "clean_content": cleaned_content, # TXT 内容 "srt_content": srt_content, # SRT 内容 "msg": f"{log_prefix} ✅ 完成 ({len(cleaned_content)}字)" } print(f"{log_prefix} ⚠️ 失败: {msg},休息3秒重试...") time.sleep(3) return {"status": "error", "msg": f"{log_prefix} ❌ 彻底失败"} def run_writer_factory(api_key, system_prompt, word_count, *args): """主流程:同时保存 SRT 和 TXT""" if not api_key: yield "❌ 请先输入 API Key", None, None; return topics = args[:10] counts = args[10:] tasks = [] for i, (t, c) in enumerate(zip(topics, counts)): if t.strip(): tasks.append({'topic': t, 'count': int(c), 'id': i+1}) total = sum([t['count'] for t in tasks]) if total == 0: yield "❌ 请先生成或输入主题", None, None; return logs = [f"🔥 繁体双格式工厂启动:共 {total} 篇 (SRT + TXT)"]; yield "\n".join(logs), None, None generated_files = [] preview_text = "" finished = 0 timestamp = int(time.time()) output_dir = f"stories_dual_{timestamp}" os.makedirs(output_dir, exist_ok=True) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for task in tasks: for j in range(task['count']): futures.append(executor.submit(generate_story_task, api_key, task['topic'], task['id'], j+1, system_prompt, word_count)) for future in futures: res = future.result() if res: logs.append(res['msg']) if res['status'] == 'success': finished += 1 # === 1. 文件名处理 (确保一致性) === safe_title = re.sub(r'[\\/*?:"<>|]', "", res['title']).strip() if len(safe_title) > 50: safe_title = safe_title[:50] if not safe_title: safe_title = f"Story_{finished}" # 检查重名,获取唯一的基础文件名 base_filename = safe_title counter = 1 # 只要存在同名的 .srt 或 .txt,就重命名 while os.path.exists(os.path.join(output_dir, f"{base_filename}.srt")) or \ os.path.exists(os.path.join(output_dir, f"{base_filename}.txt")): base_filename = f"{safe_title}_{counter}" counter += 1 # === 2. 保存 SRT 文件 === srt_path = os.path.join(output_dir, f"{base_filename}.srt") with open(srt_path, "w", encoding="utf-8") as f: f.write(res['srt_content']) generated_files.append(srt_path) # === 3. 保存 TXT 文件 === txt_path = os.path.join(output_dir, f"{base_filename}.txt") with open(txt_path, "w", encoding="utf-8") as f: f.write(res['clean_content']) generated_files.append(txt_path) preview_text += f"【{base_filename}】\n(已保存 .srt 和 .txt)\n{res['clean_content'][:80]}...\n\n" yield "\n".join(logs), preview_text, None if generated_files: zip_name = f"Stories_DualFormat_{len(generated_files)//2}sets_{timestamp}.zip" with zipfile.ZipFile(zip_name, 'w') as zf: for file_path in generated_files: zf.write(file_path, arcname=os.path.basename(file_path)) logs.append(f"\n🎉 全部完成!已打包 {len(generated_files)} 个文件 (SRT+TXT)。"); yield "\n".join(logs), preview_text, zip_name else: logs.append("\n❌ 失败: 未生成任何内容") yield "\n".join(logs), preview_text, None # === 界面布局 === with gr.Blocks(title="情感故事工厂 Pro Max (双格式版)") as app: gr.Markdown("# 🔥 情感故事工厂 Pro Max (繁体双格式版)") gr.Markdown("特性:**同时输出 SRT字幕 和 TXT纯文本** | **台湾繁体** | **合法刺激**") with gr.Row(variant="panel"): api_key_input = gr.Textbox( label="🔑 API Key (必填)", value=DEFAULT_API_KEY, type="password" ) # --- Step 1: 脑暴区 --- with gr.Row(): with gr.Column(scale=4): keywords_input = gr.Textbox( label="Step 1: 输入流量关键词", placeholder="例如:婆婆、同学会、前男友、出差、隔壁邻居、上错车", lines=2 ) with gr.Column(scale=1): brainstorm_btn = gr.Button("🧠 生成爆款标题 (繁体)", variant="secondary") gr.HTML("
") # --- Step 2: 10个独立槽位 --- topic_inputs = [] count_sliders = [] with gr.Row(): with gr.Column(): # 左5个 for i in range(1, 6): with gr.Row(): t = gr.Textbox(show_label=False, placeholder=f"标题 {i}", scale=3) c = gr.Slider(1, 5, 1, 1, label="篇数", scale=1) topic_inputs.append(t); count_sliders.append(c) with gr.Column(): # 右5个 for i in range(6, 11): with gr.Row(): t = gr.Textbox(show_label=False, placeholder=f"标题 {i}", scale=3) c = gr.Slider(1, 5, 1, 1, label="篇数", scale=1) topic_inputs.append(t); count_sliders.append(c) gr.HTML("
") # --- Step 3: 全局控制 --- with gr.Row(): with gr.Column(scale=1): word_slider = gr.Slider(500, 15000, 2500, 500, label="单篇字数") with gr.Accordion("🎭 作家设定 (已调教为头条情感大V)", open=True): system_prompt_input = gr.Textbox(label="System Prompt", value=WRITER_SYSTEM_PROMPT, lines=8) run_btn = gr.Button("🚀 Step 3: 启动流水线 (双格式打包)", variant="primary", size="lg") with gr.Column(scale=1): log_out = gr.Textbox(label="生产日志", lines=10) file_out = gr.File(label="下载结果 (.zip)") result_preview = gr.Textbox(label="内容预览 (繁体TXT)", lines=10) brainstorm_btn.click( generate_themes_from_keywords, inputs=[api_key_input, keywords_input], outputs=topic_inputs ) run_btn.click( run_writer_factory, inputs=[api_key_input, system_prompt_input, word_slider] + topic_inputs + count_sliders, outputs=[log_out, result_preview, file_out] ) if __name__ == "__main__": app.launch(share=True)